1- use std:: collections:: HashMap ;
2- use std:: ops:: Add ;
3- use acropolis_common:: types:: AddrKeyhash ;
41use acropolis_common:: { PoolId , StakeAddress } ;
52use anyhow:: Result ;
63use fjall:: { Config , Keyspace , PartitionCreateOptions } ;
4+ use std:: collections:: HashMap ;
75
86const POOL_ID_LENGTH : usize = 28 ;
9- const STAKE_KEY_LEN : usize = 28 ;
7+ const STAKE_ADDRESS_LEN : usize = 29 ; // 1 byte header + 28 bytes hash
108const EPOCH_LEN : usize = 8 ;
11- const TOTAL_KEY_LEN : usize = EPOCH_LEN + POOL_ID_LENGTH + STAKE_KEY_LEN ;
9+ const TOTAL_KEY_LEN : usize = EPOCH_LEN + POOL_ID_LENGTH + STAKE_ADDRESS_LEN ;
1210
13- // Batch size balances commit overhead vs memory usage
14- // ~720KB per batch (72 bytes × 10,000)
15- // ~130 commits for typical epoch (~1.3M delegations)
1611const BATCH_SIZE : usize = 10_000 ;
1712
18- fn encode_key ( epoch : u64 , pool_id : & PoolId , stake_key : & AddrKeyhash ) -> Vec < u8 > {
13+ fn encode_key ( epoch : u64 , pool_id : & PoolId , stake_address : & StakeAddress ) -> Result < Vec < u8 > > {
1914 let mut key = Vec :: with_capacity ( TOTAL_KEY_LEN ) ;
2015 key. extend_from_slice ( & epoch. to_be_bytes ( ) ) ;
2116 key. extend_from_slice ( pool_id. as_ref ( ) ) ;
22- key. extend_from_slice ( stake_key. as_ref ( ) ) ;
23- key
17+ key. extend_from_slice ( & stake_address. to_binary ( ) ) ;
18+
19+ Ok ( key)
2420}
2521
2622fn encode_epoch_pool_prefix ( epoch : u64 , pool_id : & PoolId ) -> Vec < u8 > {
@@ -30,29 +26,32 @@ fn encode_epoch_pool_prefix(epoch: u64, pool_id: &PoolId) -> Vec<u8> {
3026 prefix
3127}
3228
33- fn decode_key ( key : & [ u8 ] ) -> Result < ( u64 , PoolId , AddrKeyhash ) > {
29+ fn decode_key ( key : & [ u8 ] ) -> Result < ( u64 , PoolId , StakeAddress ) > {
30+ if key. len ( ) != TOTAL_KEY_LEN {
31+ anyhow:: bail!(
32+ "Invalid key length: expected {}, got {}" ,
33+ TOTAL_KEY_LEN ,
34+ key. len( )
35+ ) ;
36+ }
37+
3438 let epoch = u64:: from_be_bytes ( key[ ..EPOCH_LEN ] . try_into ( ) ?) ;
3539 let pool_id = key[ EPOCH_LEN ..EPOCH_LEN + POOL_ID_LENGTH ] . try_into ( ) ?;
36- let stake_key = key[ EPOCH_LEN + POOL_ID_LENGTH ..] . try_into ( ) ?;
37- Ok ( ( epoch, pool_id, stake_key) )
40+
41+ let stake_address_bytes = & key[ EPOCH_LEN + POOL_ID_LENGTH ..] ;
42+ let stake_address = StakeAddress :: from_binary ( stake_address_bytes) ?;
43+
44+ Ok ( ( epoch, pool_id, stake_address) )
3845}
3946
40- /// Encode epoch completion marker key
4147fn encode_epoch_marker ( epoch : u64 ) -> Vec < u8 > {
4248 epoch. to_be_bytes ( ) . to_vec ( )
4349}
4450
4551pub struct SPDDStore {
4652 keyspace : Keyspace ,
47- /// Partition for all SPDD data
48- /// Key format: epoch(8 bytes) + pool_id + stake_key
49- /// Value: amount(8 bytes)
5053 spdd : fjall:: PartitionHandle ,
51- /// Partition for epoch completion markers
52- /// Key format: epoch(8 bytes)
53- /// Value: "complete"
5454 epoch_markers : fjall:: PartitionHandle ,
55- /// Maximum number of epochs to retain (None = unlimited)
5655 retention_epochs : u64 ,
5756}
5857
@@ -101,8 +100,8 @@ impl SPDDStore {
101100 pub fn store_spdd (
102101 & mut self ,
103102 epoch : u64 ,
104- spdd_state : HashMap < PoolId , Vec < ( AddrKeyhash , u64 ) > > ,
105- ) -> fjall :: Result < ( ) > {
103+ spdd_state : HashMap < PoolId , Vec < ( StakeAddress , u64 ) > > ,
104+ ) -> Result < ( ) > {
106105 if self . is_epoch_complete ( epoch) ? {
107106 return Ok ( ( ) ) ;
108107 }
@@ -111,8 +110,8 @@ impl SPDDStore {
111110 let mut batch = self . keyspace . batch ( ) ;
112111 let mut count = 0 ;
113112 for ( pool_id, delegations) in spdd_state {
114- for ( stake_key , amount) in delegations {
115- let key = encode_key ( epoch, & pool_id, & stake_key ) ;
113+ for ( stake_address , amount) in delegations {
114+ let key = encode_key ( epoch, & pool_id, & stake_address ) ? ;
116115 let value = amount. to_be_bytes ( ) ;
117116 batch. insert ( & self . spdd , key, value) ;
118117
@@ -128,7 +127,6 @@ impl SPDDStore {
128127 batch. commit ( ) ?;
129128 }
130129
131- // Mark epoch as complete (single key operation)
132130 let marker_key = encode_epoch_marker ( epoch) ;
133131 self . epoch_markers . insert ( marker_key, b"complete" ) ?;
134132
@@ -141,7 +139,6 @@ impl SPDDStore {
141139 }
142140
143141 pub fn remove_epoch_data ( & self , epoch : u64 ) -> fjall:: Result < u64 > {
144- // Remove epoch marker first - if process fails midway, epoch will be marked incomplete
145142 let marker_key = encode_epoch_marker ( epoch) ;
146143 self . epoch_markers . remove ( marker_key) ?;
147144
@@ -183,7 +180,7 @@ impl SPDDStore {
183180 Ok ( deleted_epochs)
184181 }
185182
186- pub fn query_by_epoch ( & self , epoch : u64 ) -> Result < Vec < ( PoolId , AddrKeyhash , u64 ) > > {
183+ pub fn query_by_epoch ( & self , epoch : u64 ) -> Result < Vec < ( PoolId , StakeAddress , u64 ) > > {
187184 if !self . is_epoch_complete ( epoch) ? {
188185 return Err ( anyhow:: anyhow!( "Epoch SPDD Data is not complete" ) ) ;
189186 }
@@ -192,9 +189,9 @@ impl SPDDStore {
192189 let mut result = Vec :: new ( ) ;
193190 for item in self . spdd . prefix ( prefix) {
194191 let ( key, value) = item?;
195- let ( _, pool_id, stake_key ) = decode_key ( & key) ?;
192+ let ( _, pool_id, stake_address ) = decode_key ( & key) ?;
196193 let amount = u64:: from_be_bytes ( value. as_ref ( ) . try_into ( ) ?) ;
197- result. push ( ( pool_id, stake_key , amount) ) ;
194+ result. push ( ( pool_id, stake_address , amount) ) ;
198195 }
199196 Ok ( result)
200197 }
@@ -203,7 +200,7 @@ impl SPDDStore {
203200 & self ,
204201 epoch : u64 ,
205202 pool_id : & PoolId ,
206- ) -> Result < Vec < ( AddrKeyhash , u64 ) > > {
203+ ) -> Result < Vec < ( StakeAddress , u64 ) > > {
207204 if !self . is_epoch_complete ( epoch) ? {
208205 return Err ( anyhow:: anyhow!( "Epoch SPDD Data is not complete" ) ) ;
209206 }
@@ -212,9 +209,9 @@ impl SPDDStore {
212209 let mut result = Vec :: new ( ) ;
213210 for item in self . spdd . prefix ( prefix) {
214211 let ( key, value) = item?;
215- let ( _, _, stake_key ) = decode_key ( & key) ?;
212+ let ( _, _, stake_address ) = decode_key ( & key) ?;
216213 let amount = u64:: from_be_bytes ( value. as_ref ( ) . try_into ( ) ?) ;
217- result. push ( ( stake_key , amount) ) ;
214+ result. push ( ( stake_address , amount) ) ;
218215 }
219216 Ok ( result)
220217 }
@@ -224,32 +221,37 @@ impl SPDDStore {
224221mod tests {
225222 use super :: * ;
226223 use acropolis_common:: crypto:: keyhash_224;
227- use acropolis_common:: types:: AddrKeyhash ;
228- use acropolis_common:: PoolId ;
224+ use acropolis_common:: { NetworkId , PoolId , StakeCredential } ;
229225
230226 const DB_PATH : & str = "spdd_db" ;
231227
232228 fn test_pool_hash ( byte : u8 ) -> PoolId {
233229 keyhash_224 ( & [ byte] ) . into ( )
234230 }
235231
236- fn test_addr_hash ( byte : u8 ) -> AddrKeyhash {
237- keyhash_224 ( & [ byte] )
232+ fn test_stake_address ( byte : u8 , network : NetworkId ) -> StakeAddress {
233+ StakeAddress :: new ( StakeCredential :: AddrKeyHash ( keyhash_224 ( & [ byte] ) ) , network )
238234 }
239235
240236 #[ test]
241237 fn test_store_spdd_state ( ) {
242238 let mut spdd_store =
243239 SPDDStore :: new ( std:: path:: Path :: new ( DB_PATH ) , 1 ) . expect ( "Failed to create SPDD store" ) ;
244- let mut spdd_state: HashMap < PoolId , Vec < ( AddrKeyhash , u64 ) > > = HashMap :: new ( ) ;
240+ let mut spdd_state: HashMap < PoolId , Vec < ( StakeAddress , u64 ) > > = HashMap :: new ( ) ;
245241
246242 spdd_state. insert (
247243 test_pool_hash ( 0x01 ) ,
248- vec ! [ ( test_addr_hash( 0x10 ) , 100 ) , ( test_addr_hash( 0x11 ) , 150 ) ] ,
244+ vec ! [
245+ ( test_stake_address( 0x10 , NetworkId :: Mainnet ) , 100 ) ,
246+ ( test_stake_address( 0x11 , NetworkId :: Mainnet ) , 150 ) ,
247+ ] ,
249248 ) ;
250249 spdd_state. insert (
251250 test_pool_hash ( 0x02 ) ,
252- vec ! [ ( test_addr_hash( 0x20 ) , 200 ) , ( test_addr_hash( 0x21 ) , 250 ) ] ,
251+ vec ! [
252+ ( test_stake_address( 0x20 , NetworkId :: Testnet ) , 200 ) ,
253+ ( test_stake_address( 0x21 , NetworkId :: Testnet ) , 250 ) ,
254+ ] ,
253255 ) ;
254256 assert ! ( spdd_store. store_spdd( 1 , spdd_state) . is_ok( ) ) ;
255257
@@ -268,13 +270,13 @@ mod tests {
268270 . expect ( "Failed to create SPDD store" ) ;
269271
270272 for epoch in 1 ..=3 {
271- let mut spdd_state: HashMap < PoolId , Vec < ( AddrKeyhash , u64 ) > > = HashMap :: new ( ) ;
273+ let mut spdd_state: HashMap < PoolId , Vec < ( StakeAddress , u64 ) > > = HashMap :: new ( ) ;
272274
273275 spdd_state. insert (
274276 test_pool_hash ( epoch as u8 ) ,
275277 vec ! [
276- ( test_addr_hash ( 0x10 ) , epoch * 100 ) ,
277- ( test_addr_hash ( 0x11 ) , epoch * 150 ) ,
278+ ( test_stake_address ( 0x10 , NetworkId :: Mainnet ) , epoch * 100 ) ,
279+ ( test_stake_address ( 0x11 , NetworkId :: Mainnet ) , epoch * 150 ) ,
278280 ] ,
279281 ) ;
280282 spdd_store. store_spdd ( epoch, spdd_state) . expect ( "Failed to store SPDD state" ) ;
0 commit comments