@@ -32,6 +32,7 @@ struct fd_rpc_txn {
32
32
ulong slot ;
33
33
ulong file_offset ;
34
34
ulong file_size ;
35
+ struct fd_rpc_txn * next_lru ;
35
36
};
36
37
typedef struct fd_rpc_txn fd_rpc_txn_t ;
37
38
@@ -62,8 +63,8 @@ struct fd_rpc_acct_map_elem {
62
63
fd_pubkey_t key ;
63
64
ulong next ;
64
65
ulong slot ;
65
- ulong age ;
66
66
fd_rpc_txn_key_t sig ; /* Transaction signature */
67
+ struct fd_rpc_acct_map_elem * next_lru ;
67
68
};
68
69
typedef struct fd_rpc_acct_map_elem fd_rpc_acct_map_elem_t ;
69
70
#define MAP_NAME fd_rpc_acct_map
@@ -94,8 +95,12 @@ struct fd_rpc_history {
94
95
fd_rpc_block_t * block_map ;
95
96
ulong block_cnt ;
96
97
fd_rpc_txn_t * txn_map ;
98
+ fd_rpc_txn_t * txn_oldest_lru ;
99
+ fd_rpc_txn_t * txn_newest_lru ;
97
100
fd_rpc_acct_map_t * acct_map ;
98
101
fd_rpc_acct_map_elem_t * acct_pool ;
102
+ fd_rpc_acct_map_elem_t * acct_oldest_lru ;
103
+ fd_rpc_acct_map_elem_t * acct_newest_lru ;
99
104
fd_rpc_reasm_map_t * reasm_map ;
100
105
ulong first_slot ;
101
106
ulong latest_slot ;
@@ -179,19 +184,22 @@ fd_rpc_history_save_info(fd_rpc_history_t * hist, fd_replay_notif_msg_t * info)
179
184
blk -> info = * info ;
180
185
}
181
186
182
- static void
187
+ static ulong
183
188
fd_rpc_history_scan_block (fd_rpc_history_t * hist , ulong slot , ulong file_offset , uchar * blk_data , ulong blk_sz ) {
184
189
ulong blockoff = 0 ;
190
+ ulong ret = 0 ;
185
191
while (blockoff < blk_sz ) {
186
192
if ( blockoff + sizeof (ulong ) > blk_sz )
187
- return ;
193
+ return ret ;
188
194
ulong mcount = * (const ulong * )(blk_data + blockoff );
189
195
blockoff += sizeof (ulong );
190
196
191
197
/* Loop across microblocks */
192
198
for (ulong mblk = 0 ; mblk < mcount ; ++ mblk ) {
193
- if ( blockoff + sizeof (fd_microblock_hdr_t ) > blk_sz )
199
+ if ( blockoff + sizeof (fd_microblock_hdr_t ) > blk_sz ) {
194
200
FD_LOG_ERR (("premature end of block" ));
201
+ return ret ;
202
+ }
195
203
fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t * )((const uchar * )blk_data + blockoff );
196
204
blockoff += sizeof (fd_microblock_hdr_t );
197
205
@@ -202,21 +210,38 @@ fd_rpc_history_scan_block(fd_rpc_history_t * hist, ulong slot, ulong file_offset
202
210
const uchar * raw = (const uchar * )blk_data + blockoff ;
203
211
ulong txn_sz = fd_txn_parse_core (raw , fd_ulong_min (blk_sz - blockoff , FD_TXN_MTU ), txn_out , NULL , & pay_sz );
204
212
if ( txn_sz == 0 || txn_sz > FD_TXN_MAX_SZ ) {
205
- FD_LOG_WARNING ( ( "failed to parse transaction %lu in microblock %lu at offset %lu" , txn_idx , mblk , blockoff ) );
206
- return ;
213
+ FD_LOG_ERR ( ( "failed to parse transaction %lu in microblock %lu (%lu) at offset %lu" , txn_idx , mblk , ret , blockoff ) );
214
+ return ret ;
207
215
}
208
216
fd_txn_t * txn = (fd_txn_t * )txn_out ;
209
217
210
218
/* Loop across signatures */
211
219
fd_ed25519_sig_t const * sigs = (fd_ed25519_sig_t const * )(raw + txn -> signature_off );
212
220
for ( uchar j = 0 ; j < txn -> signature_cnt ; j ++ ) {
213
- if ( fd_rpc_txn_map_is_full ( hist -> txn_map ) ) break ; /* Out of space */
221
+ while ( fd_rpc_txn_map_is_full ( hist -> txn_map ) ) {
222
+ /* Remove the oldest entry from the map */
223
+ fd_rpc_txn_t * ent = hist -> txn_oldest_lru ;
224
+ hist -> txn_oldest_lru = ent -> next_lru ;
225
+ fd_rpc_txn_map_remove ( hist -> txn_map , & ent -> sig );
226
+ }
227
+
228
+ /* Insert the new entry into the map */
214
229
fd_rpc_txn_key_t key ;
215
230
memcpy (& key , (const uchar * )& sigs [j ], sizeof (key ));
216
231
fd_rpc_txn_t * ent = fd_rpc_txn_map_insert ( hist -> txn_map , & key );
217
232
ent -> file_offset = file_offset + blockoff ;
218
233
ent -> file_size = pay_sz ;
219
234
ent -> slot = slot ;
235
+
236
+ /* Update the LRU chain*/
237
+ ent -> next_lru = NULL ;
238
+ if ( hist -> txn_newest_lru ) {
239
+ hist -> txn_newest_lru -> next_lru = ent ;
240
+ hist -> txn_newest_lru = ent ;
241
+ } else {
242
+ hist -> txn_newest_lru = ent ;
243
+ hist -> txn_oldest_lru = ent ;
244
+ }
220
245
}
221
246
222
247
/* Loop across accounts */
@@ -225,66 +250,102 @@ fd_rpc_history_scan_block(fd_rpc_history_t * hist, ulong slot, ulong file_offset
225
250
fd_pubkey_t * accs = (fd_pubkey_t * )((uchar * )raw + txn -> acct_addr_off );
226
251
for ( ulong i = 0UL ; i < txn -> acct_addr_cnt ; i ++ ) {
227
252
if ( !memcmp (& accs [i ], fd_solana_vote_program_id .key , sizeof (fd_pubkey_t )) ) continue ; /* Ignore votes */
228
- if ( !fd_rpc_acct_map_pool_free ( hist -> acct_pool ) ) break ;
253
+
254
+ while ( !fd_rpc_acct_map_pool_free ( hist -> acct_pool ) ) {
255
+ /* Remove the oldest entry from the map */
256
+ fd_rpc_acct_map_elem_t * ent = hist -> acct_oldest_lru ;
257
+ hist -> acct_oldest_lru = ent -> next_lru ;
258
+ ent = fd_rpc_acct_map_ele_remove ( hist -> acct_map , & ent -> key , NULL , hist -> acct_pool );
259
+ if ( ent ) fd_rpc_acct_map_pool_ele_release ( hist -> acct_pool , ent );
260
+ }
261
+
262
+ /* Insert the new entry into the map */
229
263
fd_rpc_acct_map_elem_t * ele = fd_rpc_acct_map_pool_ele_acquire ( hist -> acct_pool );
230
264
ele -> key = accs [i ];
231
265
ele -> slot = slot ;
232
266
ele -> sig = sig0 ;
233
267
fd_rpc_acct_map_ele_insert ( hist -> acct_map , ele , hist -> acct_pool );
268
+
269
+ /* Update the LRU chain */
270
+ ele -> next_lru = NULL ;
271
+ if ( hist -> acct_newest_lru ) {
272
+ hist -> acct_newest_lru -> next_lru = ele ;
273
+ hist -> acct_newest_lru = ele ;
274
+ } else {
275
+ hist -> acct_newest_lru = ele ;
276
+ hist -> acct_oldest_lru = ele ;
277
+ }
234
278
}
235
279
236
280
blockoff += pay_sz ;
237
281
}
238
282
}
283
+ ret = blockoff ;
239
284
}
240
- if ( blockoff != blk_sz )
241
- FD_LOG_ERR (("garbage at end of block" ));
285
+ return ret ;
242
286
}
243
287
244
288
void
245
289
fd_rpc_history_process_column (fd_rpc_history_t * hist , struct fd_rpc_reasm_map_column * col , fd_store_t * store , fd_reasm_fec_t * fec ) {
246
- FD_SPAD_FRAME_BEGIN ( hist -> spad ) {
247
-
248
- FD_LOG_NOTICE (( "assembling slot %lu block" , fec -> slot ));
290
+ ulong slot = fec -> slot ;
291
+ FD_LOG_NOTICE (( "assembling slot %lu block" , slot ));
249
292
250
- /* Assemble the block */
251
- fd_store_fec_t * list [FD_REASM_MAP_COL_HEIGHT ];
252
- ulong slot = fec -> slot ;
253
- ulong blk_sz = 0 ;
254
- for ( ulong i = 0 ; i < col -> ele_cnt ; i ++ ) {
293
+ /* Get a block from the map */
294
+ fd_rpc_block_t * blk = fd_rpc_history_alloc_block ( hist , slot );
295
+ if ( blk == NULL ) return ;
296
+ ulong file_offset = blk -> file_offset = hist -> file_totsz ;
297
+ blk -> file_size = 0 ;
298
+
299
+ /* Look up all the store_fec_t elements for this column */
300
+ fd_store_fec_t * list [FD_REASM_MAP_COL_HEIGHT ];
301
+ for ( ulong idx = 0 ; idx < col -> ele_cnt ; ) {
302
+ ulong end_idx = ULONG_MAX ;
303
+ /* Query the next batch */
304
+ fd_store_shacq ( store );
305
+ ulong batch_sz = 0 ;
306
+ for ( ulong i = idx ; i < col -> ele_cnt ; i ++ ) {
255
307
fd_reasm_fec_t * ele = & col -> ele [i ];
256
- fd_store_fec_t * fec_p = list [i ] = fd_store_query ( store , & ele -> key );
308
+ fd_store_fec_t * fec_p = list [i - idx ] = fd_store_query ( store , & ele -> key );
257
309
if ( !fec_p ) {
258
- FD_LOG_WARNING (( "missing fec" ));
310
+ FD_LOG_ERR (( "missing fec" ));
311
+ fd_store_shrel ( store );
259
312
return ;
260
313
}
261
- blk_sz += fec_p -> data_sz ;
262
- }
263
- uchar * blk_data = fd_spad_alloc ( hist -> spad , alignof(ulong ), blk_sz );
264
- ulong blk_off = 0 ;
265
- for ( ulong i = 0 ; i < col -> ele_cnt ; i ++ ) {
266
- fd_store_fec_t * fec_p = list [i ];
267
- fd_memcpy ( blk_data + blk_off , fec_p -> data , fec_p -> data_sz );
268
- blk_off += fec_p -> data_sz ;
314
+ batch_sz += fec_p -> data_sz ;
315
+ if ( col -> ele [i ].data_complete ) {
316
+ end_idx = i ;
317
+ break ;
318
+ }
269
319
}
270
- FD_TEST ( blk_off == blk_sz );
271
-
272
- /* Get a block from the map */
273
- fd_rpc_block_t * blk = fd_rpc_history_alloc_block ( hist , slot );
274
- if ( blk == NULL ) return ;
275
-
276
- /* Write the block to the file */
277
- if ( pwrite ( hist -> file_fd , blk_data , blk_sz , (long )hist -> file_totsz ) != (ssize_t )blk_sz ) {
278
- FD_LOG_ERR (( "unable to write to rpc history file" ));
320
+ if ( end_idx == ULONG_MAX ) {
321
+ FD_LOG_ERR (( "missing data complete flag" ));
322
+ fd_store_shrel ( store );
323
+ return ;
279
324
}
280
- ulong file_offset = blk -> file_offset = hist -> file_totsz ;
281
- blk -> file_size = blk_sz ;
282
- hist -> file_totsz += blk_sz ;
283
-
284
- /* Scan the block */
285
- fd_rpc_history_scan_block ( hist , slot , file_offset , blk_data , blk_sz );
286
-
287
- } FD_SPAD_FRAME_END ;
325
+ FD_SPAD_FRAME_BEGIN ( hist -> spad ) {
326
+ uchar * blk_data = fd_spad_alloc ( hist -> spad , alignof(ulong ), batch_sz );
327
+ ulong batch_off = 0 ;
328
+ for ( ulong i = idx ; i <= end_idx ; i ++ ) {
329
+ fd_store_fec_t * fec_p = list [i - idx ];
330
+ fd_memcpy ( blk_data + batch_off , fec_p -> data , fec_p -> data_sz );
331
+ batch_off += fec_p -> data_sz ;
332
+ }
333
+ FD_TEST ( batch_off == batch_sz );
334
+ /* Scan the block. Trim the padding. */
335
+ batch_sz = fd_rpc_history_scan_block ( hist , slot , file_offset , blk_data , batch_sz );
336
+ /* Write the trimmed batch to the file */
337
+ if ( pwrite ( hist -> file_fd , blk_data , batch_sz , (long )file_offset ) != (ssize_t )batch_sz ) {
338
+ FD_LOG_ERR (( "unable to write to rpc history file" ));
339
+ fd_store_shrel ( store );
340
+ return ;
341
+ }
342
+ file_offset += batch_sz ;
343
+ blk -> file_size += batch_sz ;
344
+ hist -> file_totsz = file_offset ;
345
+ } FD_SPAD_FRAME_END ;
346
+ fd_store_shrel ( store );
347
+ idx = end_idx + 1 ;
348
+ }
288
349
}
289
350
290
351
static void
0 commit comments