DataPipeline
Recurring fresh data pipelines for AI model retraining with prepaid batch delivery, oracle consensus, and priority staking.
Overview
DataPipeline enables businesses to create recurring data delivery pipelines on the PRIV Protocol. Unlike BountyEscrow (one-time dataset collection), DataPipeline supports ongoing batch deliveries for AI model retraining and continuous data needs. Subscribers prepay PRIV for a set number of batches, and an oracle delivers each batch by distributing payouts to contributing data providers.
Optionally, subscribers can stake PRIV for priority queue placement, ensuring their pipeline is filled before non-priority pipelines. This priority stake is non-yielding (no ve-PRIV rewards) -- it is purely functional utility for queue ordering.
| Property | Value |
|---|---|
| Contract | DataPipeline |
| Address (Base Sepolia) | 0xE45124f073d8447b96fe59669a81d2bd166C740C |
| Protocol Fee | 3% (300 BPS) |
| Min Delivery Interval | 1 day |
| Max Delivery Interval | 90 days |
| Min Records per Batch | 10 |
| Max Records per Batch | 100,000 |
| Min Prepaid Batches | 1 |
| Min Price per Record | 0.001 PRIV |
| Priority Stake Lock | 30 days |
| Priority Unstake Cooldown | 7 days |
| Oracle Staleness | 1 hour |
| Network | Base (Chain ID: 8453) |
How It Works
Subscriber creates pipeline --> PRIV deposited to escrow --> Oracle delivers batches
| | |
v v v
Define specs: (batchCost + fee) Contributors receive
- price per record x batchesPrepaid PRIV for their data
- records per batch |
- delivery interval v
- batch count Protocol fee sent to
FeeManagerV2 (40% burn /
35% stakers / 25% treasury)
|
v
Pipeline runs until depleted, paused, or cancelled
Subscriber can top up to extendPipeline Lifecycle
A pipeline progresses through four possible statuses:
| Status | Description |
|---|---|
| Active | Accepting deliveries at the configured interval |
| Paused | Deliveries halted; funds remain in escrow. Subscriber can resume. |
| Depleted | Remaining funds are insufficient for another batch. Auto-tops up to Active on refund. |
| Cancelled | Permanently stopped. Remaining undelivered funds refunded to subscriber. |
Cost Calculation
Each batch costs:
batchCost = pricePerRecord * recordsPerBatch
feePerBatch = batchCost * 300 / 10000 (3% protocol fee)
costPerBatch = batchCost + feePerBatch
totalDeposit = costPerBatch * batchesPrepaidThe protocol fee is routed through FeeManagerV2 on each delivery: 40% burned, 35% to stakers, 25% to treasury.
Pipeline Creation
Creating a Pipeline
Subscribers define their data requirements and prepay for a number of batches:
function createPipeline(
uint256 pricePerRecord,
uint256 recordsPerBatch,
uint256 deliveryInterval,
uint256 batchesPrepaid,
bytes32 metadataHash
) external nonReentrant whenNotPaused returns (uint256 pipelineId)| Parameter | Type | Description |
|---|---|---|
pricePerRecord | uint256 | PRIV per data record (min 0.001 PRIV) |
recordsPerBatch | uint256 | Records per delivery (10 to 100,000) |
deliveryInterval | uint256 | Seconds between deliveries (1 day to 90 days) |
batchesPrepaid | uint256 | Number of batches to prepay (min 1) |
metadataHash | bytes32 | IPFS hash of pipeline specifications |
Requirements:
- All parameters must be within their valid ranges
- Caller must have approved the contract to spend the total deposit amount
- Contract must not be paused
Topping Up a Pipeline
Extend an existing pipeline by prepaying additional batches:
function topUpPipeline(uint256 pipelineId, uint256 additionalBatches) external nonReentrant whenNotPausedIf the pipeline is in Depleted status, topping up automatically resumes it to Active.
Batch Delivery
Oracle Delivery
Authorized oracles deliver data batches by specifying which contributors provided data and how many records each supplied:
function deliverBatch(
uint256 pipelineId,
address[] calldata contributors,
uint256[] calldata recordCounts,
bytes32 batchHash
) external nonReentrant whenNotPaused| Parameter | Type | Description |
|---|---|---|
pipelineId | uint256 | The pipeline receiving the delivery |
contributors | address[] | Array of contributor addresses |
recordCounts | uint256[] | Array of record counts per contributor |
batchHash | bytes32 | Off-chain reference to the delivered batch |
Requirements:
- Caller must be an authorized oracle with a fresh heartbeat (within 1 hour)
- Pipeline must be
Active - Delivery interval must have elapsed since the last batch
- Total records in the batch must not exceed
recordsPerBatch - Pipeline must have sufficient remaining funds
- Oracle confirmation threshold must be met (default: 1 for MVP)
Effects:
- Each contributor receives
pricePerRecord * recordCountPRIV - Protocol fee is sent to FeeManagerV2
- Pipeline state is updated (batches delivered, total paid, last delivery timestamp)
- If remaining funds are insufficient for another batch, status changes to
Depleted
Oracle Heartbeat
Oracles must maintain freshness by periodically pinging:
function oracleHeartbeatPing() externalAn oracle whose last heartbeat is older than 1 hour cannot deliver batches. This prevents stale or compromised oracles from executing deliveries.
Multi-Oracle Consensus
The contract supports configurable multi-oracle confirmations. Multiple oracles can confirm the same delivery, and execution only proceeds once the minimum confirmation threshold is met:
uint256 public minOracleConfirmations; // Default: 1 (MVP)Pause and Resume
Pausing a Pipeline
Subscribers can pause an active pipeline to temporarily halt deliveries. Funds remain safely in escrow.
function pausePipeline(uint256 pipelineId) externalResuming a Pipeline
Resume a paused pipeline to re-enable deliveries:
function resumePipeline(uint256 pipelineId) external whenNotPausedRequirements:
- Pipeline must be in
Pausedstatus - Pipeline must have remaining funds for at least one more batch
Cancelling a Pipeline
Permanently cancel a pipeline and receive a refund of any undelivered funds:
function cancelPipeline(uint256 pipelineId) external nonReentrantThe refund equals totalDeposit - totalPaidOut - totalFeesPaid.
Priority Staking
Subscribers can stake PRIV to receive priority queue placement, ensuring their pipeline's data requests are filled before non-priority pipelines. This stake is non-yielding -- it does not generate ve-PRIV rewards or any other returns. It is purely a functional mechanism for queue ordering.
Staking for Priority
function stakePriority(uint256 pipelineId, uint256 amount) external nonReentrant whenNotPaused| Parameter | Type | Description |
|---|---|---|
pipelineId | uint256 | The pipeline to add priority to |
amount | uint256 | Amount of PRIV to stake |
Requirements:
- Caller must be the pipeline subscriber
- Amount must be greater than 0
The priority stake is locked for 30 days from the time of staking.
Unstaking Priority
Unstaking is a two-step process to prevent flash-unstake gaming:
Step 1: Request unstake (after the 30-day lock has elapsed)
function requestPriorityUnstake(uint256 pipelineId) externalPriority access is revoked immediately when the unstake is requested. The tokens enter a 7-day cooldown period.
Step 2: Execute unstake (after the 7-day cooldown)
function executePriorityUnstake(uint256 pipelineId) external nonReentrantReturns the full priority stake to the subscriber.
Checking Priority Status
function hasPriority(uint256 pipelineId) external view returns (bool)Returns true if the pipeline has an active priority stake and no pending unstake request.
Contract Interface
Constants
uint256 public constant PROTOCOL_FEE_BPS = 300; // 3%
uint256 public constant BPS_DENOMINATOR = 10_000;
uint256 public constant MIN_DELIVERY_INTERVAL = 1 days;
uint256 public constant MAX_DELIVERY_INTERVAL = 90 days;
uint256 public constant MIN_RECORDS_PER_BATCH = 10;
uint256 public constant MAX_RECORDS_PER_BATCH = 100_000;
uint256 public constant MIN_PREPAID_BATCHES = 1;
uint256 public constant MIN_PRICE_PER_RECORD = 1_000_000_000_000_000; // 0.001 PRIV
uint256 public constant PRIORITY_STAKE_LOCK = 30 days;
uint256 public constant PRIORITY_UNSTAKE_COOLDOWN = 7 days;
uint256 public constant ORACLE_STALENESS = 1 hours;Structs
Pipeline
struct Pipeline {
address subscriber;
uint256 pricePerRecord; // PRIV per data record
uint256 recordsPerBatch; // Records in each delivery
uint256 deliveryInterval; // Seconds between deliveries
uint256 totalDeposit; // Total PRIV deposited (lifetime)
uint256 totalPaidOut; // Total PRIV paid to contributors
uint256 totalFeesPaid; // Total protocol fees collected
uint256 batchesDelivered; // Batches delivered so far
uint256 batchesPrepaid; // Total batches prepaid (lifetime)
uint256 lastDeliveryAt; // Timestamp of last batch delivery
uint256 createdAt;
bytes32 metadataHash; // IPFS hash of pipeline specs
PipelineStatus status;
uint256 priorityStake; // PRIV staked for priority
}BatchDelivery
struct BatchDelivery {
uint256 pipelineId;
uint256 recordCount;
uint256 totalPayout;
uint256 protocolFee;
uint256 deliveredAt;
bytes32 batchHash; // Off-chain batch reference
}PipelineStatus
enum PipelineStatus {
Active,
Paused,
Depleted,
Cancelled
}View Functions
function getPipelineRemaining(uint256 pipelineId) external view returns (uint256)
function getBatchesRemaining(uint256 pipelineId) external view returns (uint256)
function getPipelineBatchCost(
uint256 pricePerRecord,
uint256 recordsPerBatch,
uint256 batchCount
) external pure returns (uint256 totalCost, uint256 costPerBatch, uint256 feePerBatch)
function getSubscriberPipelineCount(address subscriber) external view returns (uint256)
function getSubscriberPipelines(address subscriber) external view returns (uint256[] memory)
function hasPriority(uint256 pipelineId) external view returns (bool)
function getNextDeliveryTime(uint256 pipelineId) external view returns (uint256)Admin Functions
function addOracle(address oracle) external onlyOwner
function removeOracle(address oracle) external onlyOwner
function setMinOracleConfirmations(uint256 _min) external onlyOwner
function setFeeManager(address _feeManager) external onlyOwner
function pause() external onlyOwner
function unpause() external onlyOwnerEvents
event PipelineCreated(
uint256 indexed pipelineId,
address indexed subscriber,
uint256 pricePerRecord,
uint256 recordsPerBatch,
uint256 deliveryInterval,
uint256 batchesPrepaid,
uint256 totalDeposit,
bytes32 metadataHash
);
event BatchDelivered(
uint256 indexed pipelineId,
uint256 indexed deliveryId,
uint256 recordCount,
uint256 totalPayout,
uint256 protocolFee,
bytes32 batchHash
);
event PipelineToppedUp(uint256 indexed pipelineId, uint256 additionalBatches, uint256 additionalDeposit);
event PipelinePaused(uint256 indexed pipelineId);
event PipelineResumed(uint256 indexed pipelineId);
event PipelineCancelled(uint256 indexed pipelineId, uint256 refunded);
event PipelineDepleted(uint256 indexed pipelineId);
event PriorityStaked(uint256 indexed pipelineId, uint256 amount);
event PriorityUnstakeRequested(uint256 indexed pipelineId, uint256 amount, uint256 availableAt);
event PriorityUnstaked(uint256 indexed pipelineId, uint256 amount);
event OracleAdded(address indexed oracle);
event OracleRemoved(address indexed oracle);
event FeeManagerUpdated(address indexed newFeeManager);Errors
error ZeroAddress();
error ZeroAmount();
error InvalidPipelineId();
error PipelineNotActive();
error PipelineNotActiveOrPaused();
error NotPipelineSubscriber();
error NotOracle();
error StaleOracle();
error DeliveryTooEarly();
error BatchExceedsRecordLimit();
error InsufficientPipelineFunds();
error InsufficientPriorityStake();
error PriorityStakeLocked();
error UnstakeCooldownActive();
error InvalidDeliveryInterval();
error InvalidBatchSize();
error InvalidPrepaidBatches();
error NothingToWithdraw();
error AlreadyActive();
error BatchRecordMismatch();Usage Examples
Create a Pipeline
import { useWriteContract } from 'wagmi'
import { parseEther, keccak256, toHex } from 'viem'
function useCreatePipeline() {
const { writeContract } = useWriteContract()
return async (params: {
pricePerRecord: string
recordsPerBatch: number
deliveryIntervalDays: number
batchesPrepaid: number
metadataHash: `0x${string}`
}) => {
const priceWei = parseEther(params.pricePerRecord)
const interval = BigInt(params.deliveryIntervalDays * 86400)
// Calculate total deposit for approval
const batchCost = priceWei * BigInt(params.recordsPerBatch)
const fee = batchCost * 300n / 10000n
const totalDeposit = (batchCost + fee) * BigInt(params.batchesPrepaid)
// 1. Approve the DataPipeline contract
await writeContract({
address: PRIV_TOKEN_ADDRESS,
abi: privTokenAbi,
functionName: 'approve',
args: [DATA_PIPELINE_ADDRESS, totalDeposit],
})
// 2. Create the pipeline
await writeContract({
address: DATA_PIPELINE_ADDRESS,
abi: dataPipelineAbi,
functionName: 'createPipeline',
args: [
priceWei,
BigInt(params.recordsPerBatch),
interval,
BigInt(params.batchesPrepaid),
params.metadataHash,
],
})
}
}Check Pipeline Status
import { useReadContract } from 'wagmi'
function usePipelineStatus(pipelineId: number) {
const { data: remaining } = useReadContract({
address: DATA_PIPELINE_ADDRESS,
abi: dataPipelineAbi,
functionName: 'getBatchesRemaining',
args: [BigInt(pipelineId)],
})
const { data: nextDelivery } = useReadContract({
address: DATA_PIPELINE_ADDRESS,
abi: dataPipelineAbi,
functionName: 'getNextDeliveryTime',
args: [BigInt(pipelineId)],
})
const { data: hasPriorityAccess } = useReadContract({
address: DATA_PIPELINE_ADDRESS,
abi: dataPipelineAbi,
functionName: 'hasPriority',
args: [BigInt(pipelineId)],
})
return { remaining, nextDelivery, hasPriorityAccess }
}Stake for Priority
function useStakePriority() {
const { writeContract } = useWriteContract()
return async (pipelineId: number, amount: string) => {
const amountWei = parseEther(amount)
// 1. Approve
await writeContract({
address: PRIV_TOKEN_ADDRESS,
abi: privTokenAbi,
functionName: 'approve',
args: [DATA_PIPELINE_ADDRESS, amountWei],
})
// 2. Stake for priority
await writeContract({
address: DATA_PIPELINE_ADDRESS,
abi: dataPipelineAbi,
functionName: 'stakePriority',
args: [BigInt(pipelineId), amountWei],
})
}
}Top Up a Pipeline
function useTopUpPipeline() {
const { writeContract } = useWriteContract()
return async (pipelineId: number, additionalBatches: number, costPerBatch: bigint) => {
const additionalDeposit = costPerBatch * BigInt(additionalBatches)
// 1. Approve
await writeContract({
address: PRIV_TOKEN_ADDRESS,
abi: privTokenAbi,
functionName: 'approve',
args: [DATA_PIPELINE_ADDRESS, additionalDeposit],
})
// 2. Top up
await writeContract({
address: DATA_PIPELINE_ADDRESS,
abi: dataPipelineAbi,
functionName: 'topUpPipeline',
args: [BigInt(pipelineId), BigInt(additionalBatches)],
})
}
}Security Notes
-
Reentrancy Protection: All functions involving token transfers use the
nonReentrantmodifier. -
Oracle Heartbeat: Oracles must ping within the 1-hour staleness window before delivering batches. A stale oracle cannot execute deliveries, protecting against compromised or abandoned oracle keys.
-
Delivery Interval Enforcement: The contract enforces the configured delivery interval between batches, preventing rapid-fire batch spam that could drain pipeline funds.
-
Batch Record Limit: Each delivery's total record count cannot exceed the pipeline's
recordsPerBatchparameter, preventing oracle overcharging. -
Two-Step Unstake: Priority unstaking requires a request followed by a 7-day cooldown, preventing flash-unstake attacks where a subscriber could claim priority and immediately withdraw.
-
Fund Isolation: Each pipeline's funds are tracked independently. Remaining funds are calculated as
totalDeposit - totalPaidOut - totalFeesPaid. -
Emergency Pause: The owner can pause the contract in emergencies, halting all pipeline creation, delivery, and priority staking operations.