Skip to content

Commit 24f3bf2

Browse files
committed
feat(permission0): stream funnels
1 parent c09a91e commit 24f3bf2

File tree

13 files changed

+733
-52
lines changed

13 files changed

+733
-52
lines changed

.helix/languages.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
cargo.extraEnv = { SKIP_WASM_BUILD = "true" }
33
cargo.features = ["runtime-benchmarks"]
44
check.extraEnv = { SKIP_WASM_BUILD = "true" }
5-
check.overrideCommand = ["cargo", "check", "--message-format=json"]
5+
check.overrideCommand = ["cargo", "check", "--tests", "--all", "--message-format=json"]

docs/permission0.md

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -395,14 +395,16 @@ Batch creates multiple namespace permissions with the same properties but differ
395395
pub fn update_stream_permission(
396396
origin: OriginFor<T>,
397397
permission_id: PermissionId,
398-
recipients: Option<Vec<(T::AccountId, u16)>>,
399-
accumulating: Option<bool>,
400-
recipient_manager: Option<T::AccountId>,
401-
weight_setter: Option<T::AccountId>,
398+
new_recipients: Option<BoundedBTreeMap<T::AccountId, u16, T::MaxRecipientsPerPermission>>,
399+
new_streams: Option<BoundedBTreeMap<StreamId, Percent, T::MaxStreamsPerPermission>>,
400+
new_distribution_control: Option<DistributionControl<T>>,
401+
new_recipient_manager: Option<Option<T::AccountId>>,
402+
new_weight_setter: Option<Option<T::AccountId>>,
403+
funnel: Option<bool>,
402404
) -> DispatchResult
403405
```
404406

405-
Allows delegator or authorized accounts to update stream permission properties, including recipients, accumulation state, and management roles.
407+
Allows delegator or authorized accounts to update stream permission properties, including recipients, stream allocations, distribution control, management roles, and funnel state. The `funnel` parameter controls whether distributions output the original stream IDs or a derived funnel stream ID to recipients.
406408

407409
### update_namespace_permission
408410

@@ -528,7 +530,76 @@ When streams are redelegated through the permission system, their IDs are preser
528530

529531
This stream-based model allows for much more granular control over stream delegation, enabling agents to specify different delegation percentages for different types of streams they receive.
530532

531-
## Integration with Stream Distribution
533+
## Funneling Streams
534+
535+
While the stream-based model preserves lineage across delegations, complex multi-level swarms reveal a limitation: when multiple permissions route different portions of the same stream to an agent, all incoming tokens accumulate into a single bucket at `(Agent, StreamId)`. This mixes otherwise distinct delegation paths into one mixed balance, obscuring provenance and preventing agents from treating tokens differently based on their intended purpose or origin.
536+
537+
Consider an agent that performs multiple specialized functions for the same swarm. A research collective with a root stream of 1000 TORS creates two permissions: one allocating 30% for compute services, another allocating 20% for data curation. Both permissions reference the same root stream, so the agent receives 500 TORS total, but cannot distinguish which 300 TORS arrived for compute work versus which 200 TORS came for curation. The agent cannot route compute emissions to GPU infrastructure while directing curation payments to data validators. It cannot apply function-specific fee structures, cannot generate separate invoices for each service, and cannot prove to sub-delegators how much funding flows through each functional role.
538+
539+
This problem compounds when the agent needs to create child permissions. If it wants to forward 50% of compute earnings to a GPU cluster operator and 30% of curation earnings to a dataset validator, the mixed bucket prevents such functional isolation. The semantic separation needed for proper accounting, specialized routing, and role-based policy enforcement is lost.
540+
541+
```mermaid
542+
graph TB
543+
ROOT[Research Collective<br/>RootStream: 1000 TORS]
544+
545+
ROOT -->|Permission 1: 30%<br/>For Compute<br/>StreamID: 0xABC...| A[Multi-Function Agent]
546+
ROOT -->|Permission 2: 20%<br/>For Curation<br/>StreamID: 0xABC...| A
547+
548+
A -->|Total: 500 TORS<br/>Mixed bucket<br/>Cannot distinguish| MIXED[Indistinguishable<br/>300 TORS compute + 200 TORS curation<br/>Origin lost]
549+
```
550+
551+
Stream funnels solve this by transforming input streams into new derived Stream IDs that represent specific delegation paths. When a delegator creates a permission with `enable_funnel: true`, the system generates a unique derived Stream ID by hashing the permission ID. This derived stream becomes the output identifier that recipients see and accumulate, effectively creating a new stream identity that carries the semantic context of that delegation path.
552+
553+
With funnels, the research collective creates two separate funnel permissions: one for compute services outputting `derived_stream_compute`, another for curation outputting `derived_stream_curation`. The agent now receives two isolated buckets, each carrying its own functional context. It can create child permissions on `derived_stream_compute` to route compute tokens to GPU providers, and separate permissions on `derived_stream_curation` for data validators. Each sub-delegator knows precisely which functional allocation their tokens originated from, without needing to query complex delegation graphs or reconstruct paths off-chain.
554+
555+
```mermaid
556+
graph TB
557+
ROOT[Research Collective<br/>RootStream: 1000 TORS]
558+
559+
ROOT -->|Permission 1 with Funnel<br/>Input: RootStream 30%| FC[Funnel: Compute<br/>Outputs: derived_compute]
560+
ROOT -->|Permission 2 with Funnel<br/>Input: RootStream 20%| FD[Funnel: Curation<br/>Outputs: derived_curation]
561+
562+
FC -->|300 TORS<br/>derived_compute| A[Multi-Function Agent]
563+
FD -->|200 TORS<br/>derived_curation| A
564+
565+
A -->|derived_compute<br/>50% sub-delegation| GPU[GPU Clusters<br/>Model training]
566+
A -->|derived_curation<br/>30% sub-delegation| VAL[Data Validators<br/>Dataset verification]
567+
```
568+
569+
Funnels preserve composability throughout the delegation chain. An agent receiving `derived_stream_compute` can create its own permissions on that stream, including another funnel that further specializes the path (for example, separating training workloads from inference services). Revocation works cleanly: revoking a funnel permission stops emissions along that path, but child permissions remain structurally valid and simply stop receiving funds. No cascade deletions occur, no coordination is required between levels.
570+
571+
The funnel structure is minimal, containing only what's necessary for deterministic ID generation:
572+
573+
```rust
574+
pub struct FunnelStream<T: Config> {
575+
pub derived_stream: StreamId,
576+
pub nonce: u64,
577+
pub created_at: BlockNumberFor<T>,
578+
}
579+
```
580+
581+
The derived Stream ID is computed as `hash(permission_id, nonce)` where the nonce is always 1 for the first funnel on a permission. This deterministic generation means any observer can reconstruct the derived stream ID from the permission ID without additional storage lookups. The `created_at` block number provides an audit trail for when the funnel was established.
582+
583+
A stream permission can enable or disable funneling through the `update_stream_permission` extrinsic by passing `funnel: Some(true)` or `funnel: Some(false)`. Accumulation always occurs at the original input stream buckets regardless of funnel state, the funnel only transforms which stream ID recipients see during distribution. Enabling a funnel on a permission that already has accumulated amounts means the next distribution will pass the derived stream ID to recipients instead of the original. Disabling a funnel means recipients will receive the original stream IDs on subsequent distributions.
584+
585+
During distribution, the system reads accumulated amounts from original input stream buckets (where accumulation occurs), but passes the derived Stream ID to recipients through `do_distribute_to_targets`. Recipients accumulate tokens under `(Agent, derived_stream_id, permission_id)` buckets, achieving full separation from other permissions.
586+
587+
Events capture both source and target streams for complete observability:
588+
589+
```rust
590+
Event::StreamDistribution {
591+
permission_id: H256,
592+
source_stream: Some(0xROOT...), // Original input stream
593+
target_stream: Some(0xDERIVED_COMPUTE...), // Funnel's derived output
594+
recipient: AgentAccount,
595+
amount: 300_TORS,
596+
reason: DistributionReason,
597+
}
598+
```
599+
600+
This dual-stream event structure allows indexers to reconstruct the full flow without maintaining complex state. Accumulation events show original streams, distribution events show both source and target, enabling path tracing from root streams through arbitrary delegation depths. Off-chain systems can identify funnel operations by comparing source and target streams, when they differ, a funnel transformation occurred.
601+
602+
## Hooking into emission rewards
532603

533604
Permission0 integrates with the `Emission0` pallet by intercepting the stream distribution process. When the linear rewards mechanism distributes tokens, the `do_accumulate_streams` function is called to divert portions according to active permissions.
534605

pallets/faucet/tests/faucet.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ impl pallet_permission0::Config for Test {
264264
type MaxChildrenPerPermission = ConstU32<0>;
265265
type MaxCuratorSubpermissionsPerPermission = ConstU32<0>;
266266
type MaxBulkOperationsPerCall = ConstU32<20>;
267+
type MaxFunnelsPerPermission = ConstU32<1>;
267268
}
268269

269270
impl pallet_balances::Config for Test {

pallets/permission0/api/src/lib.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,17 @@ pub fn generate_root_stream_id<AccountId: Encode>(agent_id: &AccountId) -> Strea
2626
blake2_256(&data).into()
2727
}
2828

29+
/// Static identifier for funneled streams
30+
pub const FUNNEL_STREAM_PREFIX: &[u8] = b"torus:permission:funnel-v1";
31+
32+
/// Generates a funneled stream ID.
33+
pub fn generate_funnel_stream_id(permission_id: &PermissionId, nonce: u32) -> StreamId {
34+
let mut data = FUNNEL_STREAM_PREFIX.to_vec();
35+
data.extend(permission_id.encode());
36+
data.extend(nonce.encode());
37+
blake2_256(&data).into()
38+
}
39+
2940
/// Defines what portion of streams the permission applies to
3041
#[derive(Encode, Decode, TypeInfo, Clone, PartialEq, Eq, Debug)]
3142
pub enum StreamAllocation<Balance> {
@@ -113,6 +124,7 @@ pub trait Permission0StreamApi<AccountId, Origin, BlockNumber, Balance, Negative
113124
enforcement: EnforcementAuthority<AccountId>,
114125
recipient_manager: Option<AccountId>,
115126
weight_setter: Option<AccountId>,
127+
enable_funnel: bool,
116128
) -> Result<PermissionId, DispatchError>;
117129

118130
/// Accumulate streams for an agent with permissions

pallets/permission0/src/ext/stream_impl.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ impl<T: Config>
4343
enforcement: ApiEnforcementAuthority<T::AccountId>,
4444
recipient_manager: Option<T::AccountId>,
4545
weight_setter: Option<T::AccountId>,
46+
enable_funnel: bool,
4647
) -> Result<PermissionId, DispatchError> {
4748
let internal_allocation = match allocation {
4849
ApiStreamAllocation::Streams(streams) => StreamAllocation::Streams(
@@ -81,6 +82,7 @@ impl<T: Config>
8182
enforcement,
8283
recipient_manager,
8384
weight_setter,
85+
enable_funnel,
8486
)
8587
}
8688

@@ -121,6 +123,7 @@ pub(crate) fn delegate_stream_permission_impl<T: Config>(
121123
enforcement: EnforcementAuthority<T>,
122124
recipient_manager: Option<T::AccountId>,
123125
weight_setter: Option<T::AccountId>,
126+
enable_funnel: bool,
124127
) -> Result<PermissionId, DispatchError> {
125128
use polkadot_sdk::frame_support::ensure;
126129

@@ -155,17 +158,22 @@ pub(crate) fn delegate_stream_permission_impl<T: Config>(
155158

156159
let recipients_ids: Vec<_> = recipients.keys().cloned().collect();
157160

158-
let scope = PermissionScope::Stream(StreamScope {
161+
let mut scope = PermissionScope::Stream(StreamScope {
159162
recipients,
160163
allocation: allocation.clone(),
161164
distribution,
162-
accumulating: true, // Start with accumulation enabled by default
165+
accumulating: true,
163166
recipient_managers: validate_stream_managers::<T>(&delegator, recipient_manager)?,
164167
weight_setters: validate_stream_managers::<T>(&delegator, weight_setter)?,
168+
funnels: Default::default(),
165169
});
166170

167171
let permission_id = generate_permission_id::<T>(&delegator, &scope)?;
168172

173+
if enable_funnel && let PermissionScope::Stream(stream) = &mut scope {
174+
stream.enable_funnel(permission_id)?;
175+
}
176+
169177
let contract =
170178
PermissionContract::<T>::new(delegator.clone(), scope, duration, revocation, enforcement);
171179

@@ -316,6 +324,7 @@ pub(crate) fn update_stream_permission<T: Config>(
316324
new_distribution_control: Option<DistributionControl<T>>,
317325
new_recipient_manager: Option<Option<T::AccountId>>,
318326
new_weight_setter: Option<Option<T::AccountId>>,
327+
funnel: Option<bool>,
319328
) -> DispatchResult {
320329
let caller = ensure_signed(origin)?;
321330

@@ -421,6 +430,16 @@ pub(crate) fn update_stream_permission<T: Config>(
421430
validate_stream_managers::<T>(&permission.delegator, new_weight_setter)?;
422431
}
423432

433+
if let Some(funnel) = funnel {
434+
ensure!(allowed_delegator, Error::<T>::NotAuthorizedToEdit);
435+
436+
if funnel {
437+
let _ = scope.enable_funnel(permission_id);
438+
} else {
439+
let _ = scope.disable_funnel();
440+
}
441+
}
442+
424443
permission.scope = PermissionScope::Stream(scope);
425444
permission.last_update = frame_system::Pallet::<T>::block_number();
426445
Permissions::<T>::set(permission_id, Some(permission));

pallets/permission0/src/lib.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub mod pallet {
3939

4040
use super::*;
4141

42-
const STORAGE_VERSION: StorageVersion = StorageVersion::new(7);
42+
const STORAGE_VERSION: StorageVersion = StorageVersion::new(8);
4343

4444
/// Configure the pallet by specifying the parameters and types on which it depends.
4545
#[pallet::config]
@@ -92,6 +92,10 @@ pub mod pallet {
9292
/// Max operations a bulk extrinsic can perform per extrinsic call.
9393
#[pallet::constant]
9494
type MaxBulkOperationsPerCall: Get<u32>;
95+
96+
/// Max number of funnels per stream permission.
97+
#[pallet::constant]
98+
type MaxFunnelsPerPermission: Get<u32>;
9599
}
96100

97101
pub type BalanceOf<T> =
@@ -220,7 +224,11 @@ pub mod pallet {
220224
/// An stream distribution happened
221225
StreamDistribution {
222226
permission_id: PermissionId,
223-
stream_id: Option<StreamId>,
227+
/// The source stream ID from which the tokens were derived.
228+
source_stream: Option<StreamId>,
229+
/// The actual final stream ID to which tokens were accumulated,
230+
/// this is the funnel stream ID if enabled.
231+
target_stream: Option<StreamId>,
224232
recipient: T::AccountId,
225233
amount: BalanceOf<T>,
226234
reason: permission::stream::DistributionReason,
@@ -329,6 +337,8 @@ pub mod pallet {
329337
TooManyCuratorPermissions,
330338
/// Namespace delegation depth exceeded the maximum allowed limit.
331339
DelegationDepthExceeded,
340+
/// Current funnel system is limited to one derived stream per permission.
341+
TooManyStreamFunnels,
332342
}
333343

334344
#[pallet::hooks]
@@ -353,6 +363,7 @@ pub mod pallet {
353363
enforcement: EnforcementAuthority<T>,
354364
recipient_manager: Option<T::AccountId>,
355365
weight_setter: Option<T::AccountId>,
366+
enable_funnel: bool,
356367
) -> DispatchResult {
357368
let delegator = ensure_signed(origin)?;
358369

@@ -366,6 +377,7 @@ pub mod pallet {
366377
enforcement,
367378
recipient_manager,
368379
weight_setter,
380+
enable_funnel,
369381
)?;
370382

371383
Ok(())
@@ -560,6 +572,7 @@ pub mod pallet {
560572
new_distribution_control: Option<DistributionControl<T>>,
561573
new_recipient_manager: Option<Option<T::AccountId>>,
562574
new_weight_setter: Option<Option<T::AccountId>>,
575+
funnel: Option<bool>,
563576
) -> DispatchResult {
564577
ext::stream_impl::update_stream_permission(
565578
origin,
@@ -569,6 +582,7 @@ pub mod pallet {
569582
new_distribution_control,
570583
new_recipient_manager,
571584
new_weight_setter,
585+
funnel,
572586
)?;
573587

574588
Ok(())
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,103 @@
1+
use polkadot_sdk::frame_support::{
2+
migrations::VersionedMigration, pallet_prelude::*, traits::UncheckedOnRuntimeUpgrade,
3+
};
14

5+
use crate::{Config, PermissionId, Permissions};
6+
7+
pub mod v8 {
8+
9+
use super::*;
10+
11+
pub type MigrationToV8<T, W> = VersionedMigration<7, 8, MigrateToV8<T>, crate::Pallet<T>, W>;
12+
13+
pub struct MigrateToV8<T>(core::marker::PhantomData<T>);
14+
15+
mod old {
16+
use codec::{Decode, Encode};
17+
use polkadot_sdk::{
18+
polkadot_sdk_frame::prelude::BlockNumberFor,
19+
sp_runtime::{BoundedBTreeMap, BoundedBTreeSet},
20+
};
21+
22+
use super::*;
23+
use crate::permission::*;
24+
25+
#[derive(Encode, Decode)]
26+
pub struct StreamScope<T: Config> {
27+
pub recipients: BoundedBTreeMap<T::AccountId, u16, T::MaxRecipientsPerPermission>,
28+
pub allocation: StreamAllocation<T>,
29+
pub distribution: DistributionControl<T>,
30+
pub accumulating: bool,
31+
pub recipient_managers: BoundedBTreeSet<T::AccountId, T::MaxControllersPerPermission>,
32+
pub weight_setters: BoundedBTreeSet<T::AccountId, T::MaxControllersPerPermission>,
33+
}
34+
35+
#[derive(Encode, Decode)]
36+
pub enum PermissionScope<T: Config> {
37+
Stream(StreamScope<T>),
38+
Curator(CuratorScope<T>),
39+
Namespace(NamespaceScope<T>),
40+
Wallet(wallet::WalletScope<T>),
41+
}
42+
43+
#[derive(Encode, Decode)]
44+
pub struct PermissionContract<T: Config> {
45+
pub delegator: T::AccountId,
46+
pub scope: PermissionScope<T>,
47+
pub duration: PermissionDuration<T>,
48+
pub revocation: RevocationTerms<T>,
49+
pub enforcement: EnforcementAuthority<T>,
50+
pub last_update: BlockNumberFor<T>,
51+
pub last_execution: Option<BlockNumberFor<T>>,
52+
pub execution_count: u32,
53+
pub created_at: BlockNumberFor<T>,
54+
}
55+
}
56+
57+
impl<T: Config> UncheckedOnRuntimeUpgrade for MigrateToV8<T> {
58+
fn on_runtime_upgrade() -> Weight {
59+
let mut weight = Weight::zero();
60+
let mut translated = 0u64;
61+
62+
Permissions::<T>::translate(|_key: PermissionId, old: old::PermissionContract<T>| {
63+
weight = weight.saturating_add(T::DbWeight::get().reads_writes(1, 1));
64+
translated = translated.saturating_add(1);
65+
66+
let new_scope = match old.scope {
67+
old::PermissionScope::Stream(old_stream) => {
68+
crate::PermissionScope::Stream(crate::StreamScope {
69+
recipients: old_stream.recipients,
70+
allocation: old_stream.allocation,
71+
distribution: old_stream.distribution,
72+
accumulating: old_stream.accumulating,
73+
recipient_managers: old_stream.recipient_managers,
74+
weight_setters: old_stream.weight_setters,
75+
funnels: BoundedVec::default(),
76+
})
77+
}
78+
old::PermissionScope::Curator(curator) => {
79+
crate::PermissionScope::Curator(curator)
80+
}
81+
old::PermissionScope::Namespace(namespace) => {
82+
crate::PermissionScope::Namespace(namespace)
83+
}
84+
old::PermissionScope::Wallet(wallet) => crate::PermissionScope::Wallet(wallet),
85+
};
86+
87+
Some(crate::PermissionContract {
88+
delegator: old.delegator,
89+
scope: new_scope,
90+
duration: old.duration,
91+
revocation: old.revocation,
92+
enforcement: old.enforcement,
93+
last_update: old.last_update,
94+
last_execution: old.last_execution,
95+
execution_count: old.execution_count,
96+
created_at: old.created_at,
97+
})
98+
});
99+
100+
weight
101+
}
102+
}
103+
}

0 commit comments

Comments
 (0)