@@ -102,6 +102,13 @@ typedef struct {
102
102
fd_store_t * store ;
103
103
fd_tower_t * tower ;
104
104
fd_shred_t const * curr ;
105
+
106
+ /* is_ready is used to determine if the backtest tile should send
107
+ more fec sets to the replay tile. is_ready==1 if more fec sets
108
+ should be sent; it gets set to 0 while waiting for an end of slot
109
+ notification from the replay tile. When a slot notification is
110
+ received, is_ready is set to 1 again. */
111
+ int is_ready ;
105
112
} ctx_t ;
106
113
107
114
FD_FN_CONST static inline ulong
@@ -181,9 +188,18 @@ notify_tower_root( ctx_t * ctx,
181
188
ulong tsorig ,
182
189
ulong tspub ) {
183
190
ulong replayed_slot = ctx -> replay_notification .slot_exec .slot ;
184
- ulong root = fd_tower_vote ( ctx -> tower , replayed_slot );
185
- if ( FD_LIKELY ( root != FD_SLOT_NULL ) ) {
186
- fd_stem_publish ( stem , ctx -> tower_replay_out_idx , root , 0UL , 0UL , 0UL , tsorig , tspub );
191
+ if ( ctx -> ingest_mode == FD_BACKTEST_ROCKSDB_INGEST ) {
192
+ /* We want to publish the previous last_replayed_slot, when we have
193
+ finished replaying the current slot. Then we can update the
194
+ last_replayed_slot to the newly executed slot. */
195
+ fd_stem_publish ( stem , ctx -> tower_replay_out_idx , ctx -> replay_notification .slot_exec .slot , 0UL , 0UL , 0UL , tsorig , tspub );
196
+ ctx -> is_ready = 1 ;
197
+
198
+ } else if ( ctx -> ingest_mode == FD_BACKTEST_SHREDCAP_INGEST ) {
199
+ ulong root = fd_tower_vote ( ctx -> tower , replayed_slot );
200
+ if ( FD_LIKELY ( root != FD_SLOT_NULL ) ) {
201
+ fd_stem_publish ( stem , ctx -> tower_replay_out_idx , root , 0UL , 0UL , 0UL , tsorig , tspub );
202
+ }
187
203
}
188
204
}
189
205
@@ -307,6 +323,10 @@ unprivileged_init( fd_topo_t * topo,
307
323
ctx -> slot_cnt = 0UL ;
308
324
309
325
ctx -> curr = NULL ;
326
+
327
+ ctx -> is_ready = 1 ;
328
+
329
+ FD_LOG_NOTICE (( "Finished unprivileged init" ));
310
330
}
311
331
312
332
fd_hash_t
@@ -340,6 +360,14 @@ after_credit_rocksdb( ctx_t * ctx,
340
360
ctx -> rocksdb_iter = rocksdb_create_iterator_cf (ctx -> rocksdb .db , ctx -> rocksdb .ro , ctx -> rocksdb .cf_handles [FD_ROCKSDB_CFIDX_DATA_SHRED ]);
341
361
}
342
362
363
+ /* If the slot we just sent to the replay tile has not finished
364
+ replaying, then we will block until it's done replaying and a
365
+ notification is received from the replay tile. */
366
+
367
+ if ( ctx -> is_ready == 0 ) {
368
+ return ;
369
+ }
370
+
343
371
ulong sz ;
344
372
fd_shred_t const * prev = NULL ;
345
373
fd_shred_t const * curr = ctx -> curr ? ctx -> curr : rocksdb_get_shred ( ctx , & sz );
@@ -380,6 +408,11 @@ after_credit_rocksdb( ctx_t * ctx,
380
408
ctx -> replay_out_chunk = fd_dcache_compact_next ( ctx -> replay_out_chunk , sizeof (fd_fec_out_t ), ctx -> replay_out_chunk0 , ctx -> replay_out_wmark );
381
409
ctx -> curr = curr ;
382
410
* charge_busy = 1 ;
411
+
412
+ if ( out .slot_complete ) {
413
+ ctx -> is_ready = 0 ;
414
+ }
415
+
383
416
return ; /* yield otherwise it will overrun */
384
417
}
385
418
0 commit comments