diff --git a/src/discof/replay/fd_replay_tile.c b/src/discof/replay/fd_replay_tile.c index cfaa6f1d089..e69c1ce843e 100644 --- a/src/discof/replay/fd_replay_tile.c +++ b/src/discof/replay/fd_replay_tile.c @@ -160,6 +160,8 @@ struct fd_replay_tile { fd_txncache_t * txncache; fd_store_t * store; fd_banks_t banks[1]; + ulong frontier_indices[ FD_BANKS_MAX_BANKS ]; + ulong frontier_cnt; /* This flag is 1 If we have seen a vote signature that our node has sent out get rooted at least one time. The value is 0 otherwise. @@ -1916,7 +1918,8 @@ can_process_fec( fd_replay_tile_t * ctx, return 1; } -static void +/* Returns 0 on successful FEC ingestion, 1 if the block got marked dead. */ +static int insert_fec_set( fd_replay_tile_t * ctx, fd_stem_context_t * stem, fd_reasm_fec_t * reasm_fec ) { @@ -1967,7 +1970,7 @@ insert_fec_set( fd_replay_tile_t * ctx, fd_block_id_ele_t * block_id_ele = &ctx->block_id_arr[ reasm_fec->bank_idx ]; if( FD_UNLIKELY( block_id_ele->latest_fec_idx>=reasm_fec->fec_set_idx ) ) { FD_LOG_WARNING(( "dropping FEC set (slot=%lu, fec_set_idx=%u) because it is at least as old as the latest FEC set (slot=%lu, fec_set_idx=%u)", reasm_fec->slot, reasm_fec->fec_set_idx, block_id_ele->slot, block_id_ele->latest_fec_idx )); - return; + return 0; } block_id_ele->latest_fec_idx = reasm_fec->fec_set_idx; block_id_ele->latest_mr = reasm_fec->key; @@ -1982,7 +1985,7 @@ insert_fec_set( fd_replay_tile_t * ctx, } /* If we are the leader, we don't need to process the FEC set. */ - if( FD_UNLIKELY( reasm_fec->is_leader ) ) return; + if( FD_UNLIKELY( reasm_fec->is_leader ) ) return 0; /* Forks form a partial ordering over FEC sets. The Repair tile delivers FEC sets in-order per fork, but FEC set ordering across @@ -2044,16 +2047,18 @@ insert_fec_set( fd_replay_tile_t * ctx, ctx->metrics.store_query_missing_mr = reasm_fec->key.ul[0]; FD_BASE58_ENCODE_32_BYTES( reasm_fec->key.key, key_b58 ); FD_LOG_WARNING(( "store fec for slot: %lu is on minority fork already pruned by publish. abandoning slice. root: %lu. pruned merkle: %s", reasm_fec->slot, ctx->consensus_root_slot, key_b58 )); - return; + return 0; } sched_fec->fec = store_fec; if( FD_UNLIKELY( !fd_sched_fec_ingest( ctx->sched, sched_fec ) ) ) { /* FIXME this critical section is unnecessarily complex. should refactor to just be held for the memcpy and block_offs. */ mark_bank_dead( ctx, stem, sched_fec->bank_idx ); + return 1; } } FD_STORE_SLOCK_END; ctx->metrics.store_query_release++; fd_histf_sample( ctx->metrics.store_query_work, (ulong)fd_log_wallclock() - work ); + return 0; } static void @@ -2107,7 +2112,8 @@ process_fec_set( fd_replay_tile_t * ctx, path[ path_cnt++ ] = curr; } - for( ulong i=path_cnt; i>0UL; i-- ) { + int dead = 0; + for( ulong i=path_cnt; i>0UL && !dead; i-- ) { fd_reasm_fec_t * leaf = path[ i-1 ]; /* If there's not capacity in the sched or banks, return early and @@ -2127,7 +2133,7 @@ process_fec_set( fd_replay_tile_t * ctx, FD_LOG_NOTICE(( "backfilling FEC sets for slot %lu from fec_set_idx %u to fec_set_idx %u", leaf->slot, leaf->fec_set_idx, curr->fec_set_idx )); for( ulong j=0UL; j<=leaf->fec_set_idx/FD_FEC_SHRED_CNT; j++ ) { - insert_fec_set( ctx, stem, slot_fecs[ j ] ); + if( FD_UNLIKELY( dead=insert_fec_set( ctx, stem, slot_fecs[ j ] ) ) ) break; } } } @@ -2283,6 +2289,32 @@ after_credit( fd_replay_tile_t * ctx, return; } + /* Mark a frontier eviction victim bank as dead. As refcnts on said + banks are drained, they will be pruned away. */ + if( FD_UNLIKELY( ctx->frontier_cnt ) ) { + *charge_busy = 1; + *opt_poll_in = 0; + bank_idx = ctx->frontier_indices[ --ctx->frontier_cnt ]; + fd_bank_t bank[1]; + FD_TEST( fd_banks_bank_query( bank, ctx->banks, bank_idx ) ); + if( FD_UNLIKELY( ctx->is_leader && bank_idx==ctx->leader_bank->data->idx ) ) return; + mark_bank_dead( ctx, stem, bank->data->idx ); + fd_sched_block_abandon( ctx->sched, bank->data->idx ); + + /* evict it from reasm */ + + fd_block_id_ele_t * block_id_ele = &ctx->block_id_arr[ bank->data->idx ]; + fd_reasm_fec_t * fec = fd_reasm_query( ctx->reasm, &block_id_ele->latest_mr ); + FD_TEST( fec ); + fd_reasm_fec_t * evicted_head = fd_reasm_remove( ctx->reasm, fec, ctx->store ); + if( FD_UNLIKELY( ctx->reasm_evicted ) ) { + /* already have a chain we are evicting. Prepend the new chain to the existing chain */ + fec->child = fd_reasm_pool_idx( ctx->reasm, ctx->reasm_evicted ); + } + ctx->reasm_evicted = evicted_head; + return; + } + /* If the reassembler has a fec that is ready, we should process it and pass it to the scheduler. */ int evict_banks = 0; @@ -2296,31 +2328,10 @@ after_credit( fd_replay_tile_t * ctx, if( FD_UNLIKELY( evict_banks ) ) { FD_LOG_WARNING(( "banks are full and partially executed frontier banks are being evicted" )); - ulong frontier_cnt = 0UL; - ulong frontier_indices[ FD_BANKS_MAX_BANKS ]; - fd_banks_get_frontier( ctx->banks, frontier_indices, &frontier_cnt ); - - /* Mark all frontier banks as dead. As refcnts on said banks are - drained, they will be pruned away. */ - for( ulong i=0UL; ibanks, frontier_indices[i] ) ); - if( FD_UNLIKELY( ctx->is_leader && frontier_indices[i]==ctx->leader_bank->data->idx ) ) continue; - mark_bank_dead( ctx, stem, bank->data->idx ); - fd_sched_block_abandon( ctx->sched, bank->data->idx ); - - /* evict it from reasm */ - - fd_block_id_ele_t * block_id_ele = &ctx->block_id_arr[ bank->data->idx ]; - fd_reasm_fec_t * fec = fd_reasm_query( ctx->reasm, &block_id_ele->latest_mr ); - FD_TEST( fec ); - fd_reasm_fec_t * evicted_head = fd_reasm_remove( ctx->reasm, fec, ctx->store ); - if( FD_UNLIKELY( ctx->reasm_evicted ) ) { - /* already have a chain we are evicting. Prepend the new chain to the existing chain */ - fec->child = fd_reasm_pool_idx( ctx->reasm, ctx->reasm_evicted ); - } - ctx->reasm_evicted = evicted_head; - } + fd_banks_get_frontier( ctx->banks, ctx->frontier_indices, &ctx->frontier_cnt ); + *charge_busy = 1; + *opt_poll_in = 0; + return; } *charge_busy = replay( ctx, stem ); @@ -2899,6 +2910,8 @@ unprivileged_init( fd_topo_t * topo, fd_bank_data_t * bank_pool = fd_banks_get_bank_pool( ctx->banks->data ); FD_MGAUGE_SET( REPLAY, MAX_LIVE_BANKS, fd_banks_pool_max( bank_pool ) ); + ctx->frontier_cnt = 0UL; + fd_bank_t bank[1]; FD_TEST( fd_banks_init_bank( bank, ctx->banks ) ); fd_bank_slot_set( bank, 0UL );