@@ -77,26 +77,12 @@ typedef struct fd_rpc_acct_map_elem fd_rpc_acct_map_elem_t;
77
77
#define POOL_T fd_rpc_acct_map_elem_t
78
78
#include "../../util/tmpl/fd_pool.c"
79
79
80
- struct fd_rpc_reasm_map_elem {
81
- union {
82
- ulong next ;
83
- struct {
84
- ulong data_sz ; /* TODO fixed-32. sz of the FEC set payload, guaranteed < FD_STORE_DATA_MAX */
85
- uchar data [FD_STORE_DATA_MAX ]; /* FEC set payload = coalesced data shreds (byte array) */
86
- } data ;
87
- };
88
- };
89
- typedef struct fd_rpc_reasm_map_elem fd_rpc_reasm_map_elem_t ;
90
- #define POOL_NAME fd_rpc_reasm_pool
91
- #define POOL_T fd_rpc_reasm_map_elem_t
92
- #include "../../util/tmpl/fd_pool.c"
93
-
94
80
#define FD_REASM_MAP_COL_CNT (1UL<<10)
95
81
#define FD_REASM_MAP_COL_HEIGHT (128UL)
96
82
struct fd_rpc_reasm_map {
97
83
struct fd_rpc_reasm_map_column {
98
84
ulong ele_cnt ; /* The number of shreds received in this column */ uchar end_found ; /* Whether the last slice of the slot has been found */
99
- fd_rpc_reasm_map_elem_t * ele [FD_REASM_MAP_COL_HEIGHT ];
85
+ fd_reasm_fec_t ele [FD_REASM_MAP_COL_HEIGHT ];
100
86
} cols [FD_REASM_MAP_COL_CNT ];
101
87
ulong head ; /* Next open column */
102
88
ulong tail ; /* Oldest column */
@@ -111,7 +97,6 @@ struct fd_rpc_history {
111
97
fd_rpc_acct_map_t * acct_map ;
112
98
fd_rpc_acct_map_elem_t * acct_pool ;
113
99
fd_rpc_reasm_map_t * reasm_map ;
114
- fd_rpc_reasm_map_elem_t * reasm_pool ;
115
100
ulong first_slot ;
116
101
ulong latest_slot ;
117
102
int file_fd ;
@@ -140,9 +125,6 @@ fd_rpc_history_create(fd_rpcserver_args_t * args) {
140
125
mem = fd_spad_alloc ( spad , alignof(fd_rpc_reasm_map_t ), sizeof (fd_rpc_reasm_map_t ) );
141
126
memset (mem , 0 , sizeof (fd_rpc_reasm_map_t ));
142
127
hist -> reasm_map = (fd_rpc_reasm_map_t * )mem ;
143
- static const ulong reasm_pool_sz = 1024UL ;
144
- mem = fd_spad_alloc ( spad , fd_rpc_reasm_pool_align (), fd_rpc_reasm_pool_footprint ( reasm_pool_sz ) );
145
- hist -> reasm_pool = fd_rpc_reasm_pool_join ( fd_rpc_reasm_pool_new ( mem , reasm_pool_sz ) );
146
128
147
129
hist -> file_fd = open ( args -> history_file , O_CREAT | O_RDWR | O_TRUNC , 0644 );
148
130
if ( hist -> file_fd == -1 ) FD_LOG_ERR (( "unable to open rpc history file: %s" , args -> history_file ));
@@ -259,24 +241,30 @@ fd_rpc_history_scan_block(fd_rpc_history_t * hist, ulong slot, ulong file_offset
259
241
}
260
242
261
243
void
262
- fd_rpc_history_process_column (fd_rpc_history_t * hist , struct fd_rpc_reasm_map_column * col , fd_reasm_fec_t * fec ) {
244
+ fd_rpc_history_process_column (fd_rpc_history_t * hist , struct fd_rpc_reasm_map_column * col , fd_store_t * store , fd_reasm_fec_t * fec ) {
263
245
FD_SPAD_FRAME_BEGIN ( hist -> spad ) {
264
246
265
247
FD_LOG_NOTICE (( "assembling slot %lu block" , fec -> slot ));
266
248
267
249
/* Assemble the block */
250
+ fd_store_fec_t * list [FD_REASM_MAP_COL_HEIGHT ];
268
251
ulong slot = fec -> slot ;
269
252
ulong blk_sz = 0 ;
270
253
for ( ulong i = 0 ; i < col -> ele_cnt ; i ++ ) {
271
- fd_rpc_reasm_map_elem_t * ele = col -> ele [i ];
272
- blk_sz += ele -> data .data_sz ;
254
+ fd_reasm_fec_t * ele = & col -> ele [i ];
255
+ fd_store_fec_t * fec_p = list [i ] = fd_store_query ( store , & ele -> key );
256
+ if ( !fec_p ) {
257
+ FD_LOG_WARNING (( "missing fec" ));
258
+ return ;
259
+ }
260
+ blk_sz += fec_p -> data_sz ;
273
261
}
274
262
uchar * blk_data = fd_spad_alloc ( hist -> spad , alignof(ulong ), blk_sz );
275
263
ulong blk_off = 0 ;
276
264
for ( ulong i = 0 ; i < col -> ele_cnt ; i ++ ) {
277
- fd_rpc_reasm_map_elem_t * ele = col -> ele [i ];
278
- fd_memcpy ( blk_data + blk_off , ele -> data . data , ele -> data . data_sz );
279
- blk_off += ele -> data . data_sz ;
265
+ fd_store_fec_t * fec_p = list [i ];
266
+ fd_memcpy ( blk_data + blk_off , fec_p -> data , fec_p -> data_sz );
267
+ blk_off += fec_p -> data_sz ;
280
268
}
281
269
FD_TEST ( blk_off == blk_sz );
282
270
@@ -299,16 +287,9 @@ fd_rpc_history_process_column(fd_rpc_history_t * hist, struct fd_rpc_reasm_map_c
299
287
}
300
288
301
289
static void
302
- fd_rpc_history_discard_column (fd_rpc_reasm_map_t * reasm_map , fd_rpc_reasm_map_elem_t * reasm_pool , ulong slot ) {
290
+ fd_rpc_history_discard_column (fd_rpc_reasm_map_t * reasm_map , ulong slot ) {
303
291
ulong col_idx = slot & (FD_REASM_MAP_COL_CNT - 1 );
304
292
struct fd_rpc_reasm_map_column * col = & reasm_map -> cols [col_idx ];
305
- for ( ulong i = 0 ; i < col -> ele_cnt ; i ++ ) {
306
- fd_rpc_reasm_map_elem_t * ele = col -> ele [i ];
307
- if ( ele ) {
308
- fd_rpc_reasm_pool_ele_release ( reasm_pool , ele );
309
- col -> ele [i ] = NULL ;
310
- }
311
- }
312
293
col -> ele_cnt = 0 ;
313
294
}
314
295
@@ -318,7 +299,6 @@ fd_rpc_history_save_fec(fd_rpc_history_t * hist, fd_store_t * store, fd_reasm_fe
318
299
if ( !fec_p ) return ;
319
300
320
301
fd_rpc_reasm_map_t * reasm_map = hist -> reasm_map ;
321
- fd_rpc_reasm_map_elem_t * reasm_pool = hist -> reasm_pool ;
322
302
323
303
if ( reasm_map -> head == 0UL ) {
324
304
reasm_map -> head = fec_msg -> slot + 1 ;
@@ -327,7 +307,7 @@ fd_rpc_history_save_fec(fd_rpc_history_t * hist, fd_store_t * store, fd_reasm_fe
327
307
if ( fec_msg -> slot < reasm_map -> tail ) return ; /* Do not go backwards */
328
308
while ( fec_msg -> slot >= reasm_map -> tail + FD_REASM_MAP_COL_CNT ) {
329
309
FD_TEST ( reasm_map -> tail < reasm_map -> head );
330
- fd_rpc_history_discard_column ( reasm_map , reasm_pool , reasm_map -> tail ++ );
310
+ fd_rpc_history_discard_column ( reasm_map , reasm_map -> tail ++ );
331
311
}
332
312
while ( fec_msg -> slot >= reasm_map -> head ) {
333
313
ulong col_idx = (reasm_map -> head ++ ) & (FD_REASM_MAP_COL_CNT - 1 );
@@ -339,25 +319,19 @@ fd_rpc_history_save_fec(fd_rpc_history_t * hist, fd_store_t * store, fd_reasm_fe
339
319
ulong col_idx = fec_msg -> slot & (FD_REASM_MAP_COL_CNT - 1 );
340
320
struct fd_rpc_reasm_map_column * col = & reasm_map -> cols [col_idx ];
341
321
342
- FD_TEST ( col -> ele_cnt < FD_REASM_MAP_COL_HEIGHT );
322
+ if ( col -> ele_cnt == 0 && fec_msg -> fec_set_idx != 0 ) {
323
+ FD_LOG_WARNING (( "fec_set_idx %u is not 0 but this is the first fec for slot %lu" , fec_msg -> fec_set_idx , fec_msg -> slot ));
324
+ return ;
325
+ }
343
326
344
- /* Acquire space for the fec data. If we've run out of space, discard the oldest column */
345
- fd_rpc_reasm_map_elem_t * ele = NULL ;
346
- do {
347
- ele = fd_rpc_reasm_pool_ele_acquire ( reasm_pool );
348
- if ( ele != NULL ) break ;
349
- FD_TEST ( reasm_map -> tail < reasm_map -> head );
350
- fd_rpc_history_discard_column ( reasm_map , reasm_pool , reasm_map -> tail ++ );
351
- } while ( 1 );
327
+ FD_TEST ( col -> ele_cnt < FD_REASM_MAP_COL_HEIGHT );
352
328
353
- ulong data_sz = ele -> data .data_sz = fec_p -> data_sz ;
354
- fd_memcpy ( ele -> data .data , fec_p -> data , data_sz );
355
- col -> ele [col -> ele_cnt ++ ] = ele ;
329
+ col -> ele [col -> ele_cnt ++ ] = * fec_msg ;
356
330
357
331
if ( fec_msg -> slot_complete ) {
358
332
/* We've received all the shreds for this slot. Process it. */
359
- fd_rpc_history_process_column ( hist , col , fec_msg );
360
- fd_rpc_history_discard_column ( reasm_map , reasm_pool , fec_msg -> slot );
333
+ fd_rpc_history_process_column ( hist , col , store , fec_msg );
334
+ fd_rpc_history_discard_column ( reasm_map , fec_msg -> slot );
361
335
}
362
336
}
363
337
0 commit comments