@@ -10,11 +10,10 @@ use bity_ic_icrc3_archive_api::types::{
1010use candid:: Principal ;
1111use serde:: { Deserialize , Deserializer , Serialize , Serializer } ;
1212use std:: sync:: { Arc , RwLock } ;
13+ use std:: time:: Duration ;
1314
14- /// The threshold at which blocks are automatically archived
15- const ARCHIVE_THRESHOLD : usize = 50 ;
1615/// The maximum number of transactions to keep in local storage
17- const MAX_LOCAL_TRANSACTIONS : usize = 100 ;
16+ const MAX_LOCAL_TRANSACTIONS : usize = 1000 ;
1817
1918/// The core blockchain implementation for ICRC3.
2019///
@@ -33,9 +32,28 @@ pub struct Blockchain {
3332 /// Number of blocks that have been archived
3433 pub chain_length : u64 ,
3534 /// Local transactions waiting to be archived
36- pub local_transactions : VecDeque < EncodedBlock > ,
37- /// Threshold for triggering archiving
38- pub archive_threshold : usize ,
35+ pub local_transactions : VecDeque < ( u128 , EncodedBlock ) > , // (timestamp, block)
36+ /// Time to live for non-archived transactions
37+ pub ttl_for_non_archived_transactions : Duration ,
38+ }
39+
40+ impl Blockchain {
41+ pub fn new (
42+ archive_canister_manager : ArchiveCanisterManager ,
43+ last_hash : Option < HashOf < EncodedBlock > > ,
44+ last_timestamp : u128 ,
45+ chain_length : u64 ,
46+ ttl_for_non_archived_transactions : Duration ,
47+ ) -> Self {
48+ Self {
49+ archive_canister_manager : Arc :: new ( RwLock :: new ( archive_canister_manager) ) ,
50+ last_hash,
51+ last_timestamp,
52+ chain_length,
53+ local_transactions : VecDeque :: new ( ) ,
54+ ttl_for_non_archived_transactions,
55+ }
56+ }
3957}
4058
4159impl Default for Blockchain {
@@ -47,7 +65,7 @@ impl Default for Blockchain {
4765 last_timestamp : 0 ,
4866 chain_length : 0 ,
4967 local_transactions : VecDeque :: new ( ) ,
50- archive_threshold : ARCHIVE_THRESHOLD ,
68+ ttl_for_non_archived_transactions : Duration :: from_secs ( 120 ) ,
5169 }
5270 }
5371}
@@ -68,7 +86,7 @@ impl Serialize for Blockchain {
6886 & self . last_timestamp ,
6987 & self . chain_length ,
7088 & self . local_transactions ,
71- & self . archive_threshold ,
89+ & self . ttl_for_non_archived_transactions ,
7290 )
7391 . serialize ( serializer)
7492 }
@@ -85,14 +103,14 @@ impl<'de> Deserialize<'de> for Blockchain {
85103 last_timestamp,
86104 chain_length,
87105 local_transactions,
88- archive_threshold ,
106+ ttl_for_non_archived_transactions ,
89107 ) = <(
90108 ArchiveCanisterManager ,
91109 Option < HashOf < EncodedBlock > > ,
92110 u128 ,
93111 u64 ,
94- VecDeque < EncodedBlock > ,
95- usize ,
112+ VecDeque < ( u128 , EncodedBlock ) > ,
113+ Duration ,
96114 ) >:: deserialize ( deserializer) ?;
97115
98116 Ok ( Blockchain {
@@ -101,46 +119,12 @@ impl<'de> Deserialize<'de> for Blockchain {
101119 last_timestamp,
102120 chain_length,
103121 local_transactions,
104- archive_threshold ,
122+ ttl_for_non_archived_transactions ,
105123 } )
106124 }
107125}
108126
109127impl Blockchain {
110- /// Archives blocks from local storage to archive canisters.
111- ///
112- /// This method:
113- /// 1. Takes blocks from the front of the local transactions queue
114- /// 2. Attempts to insert them into archive canisters
115- /// 3. Updates the chain length on successful insertion
116- ///
117- /// # Returns
118- ///
119- /// * `Ok(())` if all blocks were successfully archived
120- /// * `Err(String)` if archiving failed
121- async fn archive_blocks ( & mut self ) -> Result < ( ) , String > {
122- while let Some ( block) = self . local_transactions . pop_front ( ) {
123- let block_clone = block. clone ( ) ;
124-
125- match self
126- . archive_canister_manager
127- . write ( )
128- . map_err ( |e| format ! ( "Failed to acquire write lock: {}" , e) ) ?
129- . insert_block ( self . chain_length ( ) , block_clone)
130- . await
131- {
132- Ok ( _) => {
133- self . chain_length += 1 ;
134- }
135- Err ( e) => {
136- self . local_transactions . push_front ( block) ;
137- return Err ( e) ;
138- }
139- }
140- }
141- Ok ( ( ) )
142- }
143-
144128 /// Adds a new block to the blockchain.
145129 ///
146130 /// This method:
@@ -157,7 +141,7 @@ impl Blockchain {
157141 ///
158142 /// * `Ok(BlockIndex)` containing the new block's index
159143 /// * `Err(String)` if the block could not be added
160- pub async fn add_block < B > ( & mut self , block : B ) -> Result < BlockIndex , String >
144+ pub fn add_block < B > ( & mut self , block : B ) -> Result < BlockIndex , String >
161145 where
162146 B : Block + Clone ,
163147 {
@@ -180,28 +164,110 @@ impl Blockchain {
180164 }
181165
182166 if self . local_transactions . len ( ) >= MAX_LOCAL_TRANSACTIONS {
183- if let Err ( e) = self . archive_blocks ( ) . await {
184- trace ( & format ! (
185- "add_block error: Maximum local transactions reached and archiving failed: {}" ,
186- e
187- ) ) ;
188- return Err ( e) ;
189- }
167+ return Err ( format ! (
168+ "Local transactions limit reached: {}" ,
169+ MAX_LOCAL_TRANSACTIONS
170+ ) ) ;
190171 }
191172
192173 self . last_timestamp = block_clone. timestamp ( ) ;
193- let encoded_block: EncodedBlock = block_clone. encode ( ) ;
174+ let encoded_block: EncodedBlock = block_clone. clone ( ) . encode ( ) ;
194175 self . last_hash = Some ( B :: block_hash ( & encoded_block) ) ;
195176
196- self . local_transactions . push_back ( encoded_block. clone ( ) ) ;
177+ self . local_transactions
178+ . push_back ( ( block_clone. timestamp ( ) , encoded_block. clone ( ) ) ) ;
179+
180+ trace ( & format ! (
181+ "Added block to local transactions: {}" ,
182+ self . local_transactions. len( )
183+ ) ) ;
184+
185+ Ok ( self . chain_length ( ) + 1 )
186+ }
187+
188+ pub async fn archive_blocks_jobs ( & mut self ) -> Result < u128 , String > {
189+ trace ( & format ! ( "archive_blocks_jobs" ) ) ;
190+
191+ trace ( & format ! (
192+ "archive_blocks_jobs: local_transactions: {}" ,
193+ self . local_transactions. len( )
194+ ) ) ;
195+
196+ if self . local_transactions . is_empty ( ) {
197+ return Ok ( 0 ) ;
198+ }
199+
200+ let mut archived_count = 0 ;
201+ let current_time = ic_cdk:: api:: time ( ) as u128 ;
202+
203+ while !self . local_transactions . is_empty ( ) {
204+ if let Some ( ( oldest_timestamp, oldest_block) ) = self . local_transactions . front ( ) . cloned ( )
205+ {
206+ trace ( & format ! (
207+ "oldest_timestamp: {}, current_time: {}" ,
208+ oldest_timestamp, current_time
209+ ) ) ;
197210
198- if self . local_transactions . len ( ) >= self . archive_threshold {
199- if let Err ( e) = self . archive_blocks ( ) . await {
200- trace ( & format ! ( "add_block error: {}" , e) ) ;
211+ if oldest_timestamp + self . ttl_for_non_archived_transactions . as_nanos ( )
212+ < current_time
213+ {
214+ trace ( & format ! (
215+ "oldest_timestamp + ttl_for_non_archived_transactions: {}" ,
216+ oldest_timestamp + self . ttl_for_non_archived_transactions. as_nanos( )
217+ ) ) ;
218+
219+ if let Some ( tx) = self . local_transactions . pop_front ( ) {
220+ trace ( & format ! (
221+ "Archiving transaction from timestamp {}" ,
222+ oldest_timestamp
223+ ) ) ;
224+
225+ match self
226+ . archive_canister_manager
227+ . write ( )
228+ . map_err ( |e| format ! ( "Failed to acquire write lock: {}" , e) )
229+ {
230+ Ok ( mut archive_manager) => {
231+ match archive_manager
232+ . insert_block ( self . chain_length ( ) , oldest_block. clone ( ) )
233+ . await
234+ {
235+ Ok ( _) => {
236+ archived_count += 1 ;
237+ }
238+ Err ( e) => {
239+ self . local_transactions . push_front ( tx) ;
240+ return Err ( e) ;
241+ }
242+ }
243+ }
244+ Err ( e) => {
245+ self . local_transactions . push_front ( tx) ;
246+ return Err ( e) ;
247+ }
248+ }
249+ } else {
250+ trace ( & format ! (
251+ "oldest_timestamp + ttl_for_non_archived_transactions: {}" ,
252+ oldest_timestamp + self . ttl_for_non_archived_transactions. as_nanos( )
253+ ) ) ;
254+ break ;
255+ }
256+ } else {
257+ trace ( & format ! (
258+ "oldest_timestamp + ttl_for_non_archived_transactions: {}" ,
259+ oldest_timestamp + self . ttl_for_non_archived_transactions. as_nanos( )
260+ ) ) ;
261+ break ;
262+ }
263+ } else {
264+ trace ( & format ! ( "archive_blocks_jobs: local_transactions is empty" ) ) ;
265+ break ;
201266 }
202267 }
203268
204- Ok ( self . chain_length ( ) + 1 )
269+ trace ( & format ! ( "Archived {} transaction(s)" , archived_count) ) ;
270+ Ok ( archived_count)
205271 }
206272
207273 /// Retrieves a block by its index.
@@ -220,10 +286,10 @@ impl Blockchain {
220286 }
221287
222288 if block_id > self . chain_length ( ) {
223- return Some (
224- self . local_transactions [ block_id as usize - self . chain_length ( ) as usize - 1 ]
225- . clone ( ) ,
226- ) ;
289+ return self
290+ . local_transactions
291+ . get ( block_id as usize - self . chain_length ( ) as usize - 1 )
292+ . map ( | ( _ , block ) | block . clone ( ) ) ;
227293 } else {
228294 return None ;
229295 }
@@ -239,7 +305,7 @@ impl Blockchain {
239305 ///
240306 /// * `Ok(Principal)` containing the canister ID
241307 /// * `Err(String)` if the operation failed
242- pub async fn get_block_canister_id ( & self , block_id : BlockIndex ) -> Result < Principal , String > {
308+ pub fn get_block_canister_id ( & self , block_id : BlockIndex ) -> Result < Principal , String > {
243309 self . archive_canister_manager
244310 . read ( )
245311 . map_err ( |_| "Failed to read archive_canister_manager" ) ?
0 commit comments