PRIV ProtocolPRIV Docs
Contracts

DataPipeline

Recurring fresh data pipelines for AI model retraining with prepaid batch delivery, oracle consensus, and priority staking.

Overview

DataPipeline enables businesses to create recurring data delivery pipelines on the PRIV Protocol. Unlike BountyEscrow (one-time dataset collection), DataPipeline supports ongoing batch deliveries for AI model retraining and continuous data needs. Subscribers prepay PRIV for a set number of batches, and an oracle delivers each batch by distributing payouts to contributing data providers.

Optionally, subscribers can stake PRIV for priority queue placement, ensuring their pipeline is filled before non-priority pipelines. This priority stake is non-yielding (no ve-PRIV rewards) -- it is purely functional utility for queue ordering.

PropertyValue
ContractDataPipeline
Address (Base Sepolia)0xE45124f073d8447b96fe59669a81d2bd166C740C
Protocol Fee3% (300 BPS)
Min Delivery Interval1 day
Max Delivery Interval90 days
Min Records per Batch10
Max Records per Batch100,000
Min Prepaid Batches1
Min Price per Record0.001 PRIV
Priority Stake Lock30 days
Priority Unstake Cooldown7 days
Oracle Staleness1 hour
NetworkBase (Chain ID: 8453)

How It Works

Subscriber creates pipeline  -->  PRIV deposited to escrow  -->  Oracle delivers batches
         |                              |                              |
         v                              v                              v
  Define specs:                 (batchCost + fee)              Contributors receive
  - price per record            x batchesPrepaid               PRIV for their data
  - records per batch                                                  |
  - delivery interval                                                  v
  - batch count                                             Protocol fee sent to
                                                            FeeManagerV2 (40% burn /
                                                            35% stakers / 25% treasury)
         |
         v
  Pipeline runs until depleted, paused, or cancelled
  Subscriber can top up to extend

Pipeline Lifecycle

A pipeline progresses through four possible statuses:

StatusDescription
ActiveAccepting deliveries at the configured interval
PausedDeliveries halted; funds remain in escrow. Subscriber can resume.
DepletedRemaining funds are insufficient for another batch. Auto-tops up to Active on refund.
CancelledPermanently stopped. Remaining undelivered funds refunded to subscriber.

Cost Calculation

Each batch costs:

batchCost    = pricePerRecord * recordsPerBatch
feePerBatch  = batchCost * 300 / 10000        (3% protocol fee)
costPerBatch = batchCost + feePerBatch
totalDeposit = costPerBatch * batchesPrepaid

The protocol fee is routed through FeeManagerV2 on each delivery: 40% burned, 35% to stakers, 25% to treasury.


Pipeline Creation

Creating a Pipeline

Subscribers define their data requirements and prepay for a number of batches:

function createPipeline(
    uint256 pricePerRecord,
    uint256 recordsPerBatch,
    uint256 deliveryInterval,
    uint256 batchesPrepaid,
    bytes32 metadataHash
) external nonReentrant whenNotPaused returns (uint256 pipelineId)
ParameterTypeDescription
pricePerRecorduint256PRIV per data record (min 0.001 PRIV)
recordsPerBatchuint256Records per delivery (10 to 100,000)
deliveryIntervaluint256Seconds between deliveries (1 day to 90 days)
batchesPrepaiduint256Number of batches to prepay (min 1)
metadataHashbytes32IPFS hash of pipeline specifications

Requirements:

  • All parameters must be within their valid ranges
  • Caller must have approved the contract to spend the total deposit amount
  • Contract must not be paused

Topping Up a Pipeline

Extend an existing pipeline by prepaying additional batches:

function topUpPipeline(uint256 pipelineId, uint256 additionalBatches) external nonReentrant whenNotPaused

If the pipeline is in Depleted status, topping up automatically resumes it to Active.


Batch Delivery

Oracle Delivery

Authorized oracles deliver data batches by specifying which contributors provided data and how many records each supplied:

function deliverBatch(
    uint256 pipelineId,
    address[] calldata contributors,
    uint256[] calldata recordCounts,
    bytes32 batchHash
) external nonReentrant whenNotPaused
ParameterTypeDescription
pipelineIduint256The pipeline receiving the delivery
contributorsaddress[]Array of contributor addresses
recordCountsuint256[]Array of record counts per contributor
batchHashbytes32Off-chain reference to the delivered batch

Requirements:

  • Caller must be an authorized oracle with a fresh heartbeat (within 1 hour)
  • Pipeline must be Active
  • Delivery interval must have elapsed since the last batch
  • Total records in the batch must not exceed recordsPerBatch
  • Pipeline must have sufficient remaining funds
  • Oracle confirmation threshold must be met (default: 1 for MVP)

Effects:

  • Each contributor receives pricePerRecord * recordCount PRIV
  • Protocol fee is sent to FeeManagerV2
  • Pipeline state is updated (batches delivered, total paid, last delivery timestamp)
  • If remaining funds are insufficient for another batch, status changes to Depleted

Oracle Heartbeat

Oracles must maintain freshness by periodically pinging:

function oracleHeartbeatPing() external

An oracle whose last heartbeat is older than 1 hour cannot deliver batches. This prevents stale or compromised oracles from executing deliveries.

Multi-Oracle Consensus

The contract supports configurable multi-oracle confirmations. Multiple oracles can confirm the same delivery, and execution only proceeds once the minimum confirmation threshold is met:

uint256 public minOracleConfirmations;  // Default: 1 (MVP)

Pause and Resume

Pausing a Pipeline

Subscribers can pause an active pipeline to temporarily halt deliveries. Funds remain safely in escrow.

function pausePipeline(uint256 pipelineId) external

Resuming a Pipeline

Resume a paused pipeline to re-enable deliveries:

function resumePipeline(uint256 pipelineId) external whenNotPaused

Requirements:

  • Pipeline must be in Paused status
  • Pipeline must have remaining funds for at least one more batch

Cancelling a Pipeline

Permanently cancel a pipeline and receive a refund of any undelivered funds:

function cancelPipeline(uint256 pipelineId) external nonReentrant

The refund equals totalDeposit - totalPaidOut - totalFeesPaid.


Priority Staking

Subscribers can stake PRIV to receive priority queue placement, ensuring their pipeline's data requests are filled before non-priority pipelines. This stake is non-yielding -- it does not generate ve-PRIV rewards or any other returns. It is purely a functional mechanism for queue ordering.

Staking for Priority

function stakePriority(uint256 pipelineId, uint256 amount) external nonReentrant whenNotPaused
ParameterTypeDescription
pipelineIduint256The pipeline to add priority to
amountuint256Amount of PRIV to stake

Requirements:

  • Caller must be the pipeline subscriber
  • Amount must be greater than 0

The priority stake is locked for 30 days from the time of staking.

Unstaking Priority

Unstaking is a two-step process to prevent flash-unstake gaming:

Step 1: Request unstake (after the 30-day lock has elapsed)

function requestPriorityUnstake(uint256 pipelineId) external

Priority access is revoked immediately when the unstake is requested. The tokens enter a 7-day cooldown period.

Step 2: Execute unstake (after the 7-day cooldown)

function executePriorityUnstake(uint256 pipelineId) external nonReentrant

Returns the full priority stake to the subscriber.

Checking Priority Status

function hasPriority(uint256 pipelineId) external view returns (bool)

Returns true if the pipeline has an active priority stake and no pending unstake request.


Contract Interface

Constants

uint256 public constant PROTOCOL_FEE_BPS = 300;             // 3%
uint256 public constant BPS_DENOMINATOR = 10_000;
uint256 public constant MIN_DELIVERY_INTERVAL = 1 days;
uint256 public constant MAX_DELIVERY_INTERVAL = 90 days;
uint256 public constant MIN_RECORDS_PER_BATCH = 10;
uint256 public constant MAX_RECORDS_PER_BATCH = 100_000;
uint256 public constant MIN_PREPAID_BATCHES = 1;
uint256 public constant MIN_PRICE_PER_RECORD = 1_000_000_000_000_000;  // 0.001 PRIV
uint256 public constant PRIORITY_STAKE_LOCK = 30 days;
uint256 public constant PRIORITY_UNSTAKE_COOLDOWN = 7 days;
uint256 public constant ORACLE_STALENESS = 1 hours;

Structs

Pipeline

struct Pipeline {
    address subscriber;
    uint256 pricePerRecord;         // PRIV per data record
    uint256 recordsPerBatch;        // Records in each delivery
    uint256 deliveryInterval;       // Seconds between deliveries
    uint256 totalDeposit;           // Total PRIV deposited (lifetime)
    uint256 totalPaidOut;           // Total PRIV paid to contributors
    uint256 totalFeesPaid;          // Total protocol fees collected
    uint256 batchesDelivered;       // Batches delivered so far
    uint256 batchesPrepaid;         // Total batches prepaid (lifetime)
    uint256 lastDeliveryAt;         // Timestamp of last batch delivery
    uint256 createdAt;
    bytes32 metadataHash;           // IPFS hash of pipeline specs
    PipelineStatus status;
    uint256 priorityStake;          // PRIV staked for priority
}

BatchDelivery

struct BatchDelivery {
    uint256 pipelineId;
    uint256 recordCount;
    uint256 totalPayout;
    uint256 protocolFee;
    uint256 deliveredAt;
    bytes32 batchHash;              // Off-chain batch reference
}

PipelineStatus

enum PipelineStatus {
    Active,
    Paused,
    Depleted,
    Cancelled
}

View Functions

function getPipelineRemaining(uint256 pipelineId) external view returns (uint256)
function getBatchesRemaining(uint256 pipelineId) external view returns (uint256)
function getPipelineBatchCost(
    uint256 pricePerRecord,
    uint256 recordsPerBatch,
    uint256 batchCount
) external pure returns (uint256 totalCost, uint256 costPerBatch, uint256 feePerBatch)
function getSubscriberPipelineCount(address subscriber) external view returns (uint256)
function getSubscriberPipelines(address subscriber) external view returns (uint256[] memory)
function hasPriority(uint256 pipelineId) external view returns (bool)
function getNextDeliveryTime(uint256 pipelineId) external view returns (uint256)

Admin Functions

function addOracle(address oracle) external onlyOwner
function removeOracle(address oracle) external onlyOwner
function setMinOracleConfirmations(uint256 _min) external onlyOwner
function setFeeManager(address _feeManager) external onlyOwner
function pause() external onlyOwner
function unpause() external onlyOwner

Events

event PipelineCreated(
    uint256 indexed pipelineId,
    address indexed subscriber,
    uint256 pricePerRecord,
    uint256 recordsPerBatch,
    uint256 deliveryInterval,
    uint256 batchesPrepaid,
    uint256 totalDeposit,
    bytes32 metadataHash
);

event BatchDelivered(
    uint256 indexed pipelineId,
    uint256 indexed deliveryId,
    uint256 recordCount,
    uint256 totalPayout,
    uint256 protocolFee,
    bytes32 batchHash
);

event PipelineToppedUp(uint256 indexed pipelineId, uint256 additionalBatches, uint256 additionalDeposit);
event PipelinePaused(uint256 indexed pipelineId);
event PipelineResumed(uint256 indexed pipelineId);
event PipelineCancelled(uint256 indexed pipelineId, uint256 refunded);
event PipelineDepleted(uint256 indexed pipelineId);

event PriorityStaked(uint256 indexed pipelineId, uint256 amount);
event PriorityUnstakeRequested(uint256 indexed pipelineId, uint256 amount, uint256 availableAt);
event PriorityUnstaked(uint256 indexed pipelineId, uint256 amount);

event OracleAdded(address indexed oracle);
event OracleRemoved(address indexed oracle);
event FeeManagerUpdated(address indexed newFeeManager);

Errors

error ZeroAddress();
error ZeroAmount();
error InvalidPipelineId();
error PipelineNotActive();
error PipelineNotActiveOrPaused();
error NotPipelineSubscriber();
error NotOracle();
error StaleOracle();
error DeliveryTooEarly();
error BatchExceedsRecordLimit();
error InsufficientPipelineFunds();
error InsufficientPriorityStake();
error PriorityStakeLocked();
error UnstakeCooldownActive();
error InvalidDeliveryInterval();
error InvalidBatchSize();
error InvalidPrepaidBatches();
error NothingToWithdraw();
error AlreadyActive();
error BatchRecordMismatch();

Usage Examples

Create a Pipeline

import { useWriteContract } from 'wagmi'
import { parseEther, keccak256, toHex } from 'viem'

function useCreatePipeline() {
  const { writeContract } = useWriteContract()

  return async (params: {
    pricePerRecord: string
    recordsPerBatch: number
    deliveryIntervalDays: number
    batchesPrepaid: number
    metadataHash: `0x${string}`
  }) => {
    const priceWei = parseEther(params.pricePerRecord)
    const interval = BigInt(params.deliveryIntervalDays * 86400)

    // Calculate total deposit for approval
    const batchCost = priceWei * BigInt(params.recordsPerBatch)
    const fee = batchCost * 300n / 10000n
    const totalDeposit = (batchCost + fee) * BigInt(params.batchesPrepaid)

    // 1. Approve the DataPipeline contract
    await writeContract({
      address: PRIV_TOKEN_ADDRESS,
      abi: privTokenAbi,
      functionName: 'approve',
      args: [DATA_PIPELINE_ADDRESS, totalDeposit],
    })

    // 2. Create the pipeline
    await writeContract({
      address: DATA_PIPELINE_ADDRESS,
      abi: dataPipelineAbi,
      functionName: 'createPipeline',
      args: [
        priceWei,
        BigInt(params.recordsPerBatch),
        interval,
        BigInt(params.batchesPrepaid),
        params.metadataHash,
      ],
    })
  }
}

Check Pipeline Status

import { useReadContract } from 'wagmi'

function usePipelineStatus(pipelineId: number) {
  const { data: remaining } = useReadContract({
    address: DATA_PIPELINE_ADDRESS,
    abi: dataPipelineAbi,
    functionName: 'getBatchesRemaining',
    args: [BigInt(pipelineId)],
  })

  const { data: nextDelivery } = useReadContract({
    address: DATA_PIPELINE_ADDRESS,
    abi: dataPipelineAbi,
    functionName: 'getNextDeliveryTime',
    args: [BigInt(pipelineId)],
  })

  const { data: hasPriorityAccess } = useReadContract({
    address: DATA_PIPELINE_ADDRESS,
    abi: dataPipelineAbi,
    functionName: 'hasPriority',
    args: [BigInt(pipelineId)],
  })

  return { remaining, nextDelivery, hasPriorityAccess }
}

Stake for Priority

function useStakePriority() {
  const { writeContract } = useWriteContract()

  return async (pipelineId: number, amount: string) => {
    const amountWei = parseEther(amount)

    // 1. Approve
    await writeContract({
      address: PRIV_TOKEN_ADDRESS,
      abi: privTokenAbi,
      functionName: 'approve',
      args: [DATA_PIPELINE_ADDRESS, amountWei],
    })

    // 2. Stake for priority
    await writeContract({
      address: DATA_PIPELINE_ADDRESS,
      abi: dataPipelineAbi,
      functionName: 'stakePriority',
      args: [BigInt(pipelineId), amountWei],
    })
  }
}

Top Up a Pipeline

function useTopUpPipeline() {
  const { writeContract } = useWriteContract()

  return async (pipelineId: number, additionalBatches: number, costPerBatch: bigint) => {
    const additionalDeposit = costPerBatch * BigInt(additionalBatches)

    // 1. Approve
    await writeContract({
      address: PRIV_TOKEN_ADDRESS,
      abi: privTokenAbi,
      functionName: 'approve',
      args: [DATA_PIPELINE_ADDRESS, additionalDeposit],
    })

    // 2. Top up
    await writeContract({
      address: DATA_PIPELINE_ADDRESS,
      abi: dataPipelineAbi,
      functionName: 'topUpPipeline',
      args: [BigInt(pipelineId), BigInt(additionalBatches)],
    })
  }
}

Security Notes

  1. Reentrancy Protection: All functions involving token transfers use the nonReentrant modifier.

  2. Oracle Heartbeat: Oracles must ping within the 1-hour staleness window before delivering batches. A stale oracle cannot execute deliveries, protecting against compromised or abandoned oracle keys.

  3. Delivery Interval Enforcement: The contract enforces the configured delivery interval between batches, preventing rapid-fire batch spam that could drain pipeline funds.

  4. Batch Record Limit: Each delivery's total record count cannot exceed the pipeline's recordsPerBatch parameter, preventing oracle overcharging.

  5. Two-Step Unstake: Priority unstaking requires a request followed by a 7-day cooldown, preventing flash-unstake attacks where a subscriber could claim priority and immediately withdraw.

  6. Fund Isolation: Each pipeline's funds are tracked independently. Remaining funds are calculated as totalDeposit - totalPaidOut - totalFeesPaid.

  7. Emergency Pause: The owner can pause the contract in emergencies, halting all pipeline creation, delivery, and priority staking operations.


Source Code

View on GitHub