Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 44 additions & 31 deletions src/discof/replay/fd_replay_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ struct fd_replay_tile {
fd_txncache_t * txncache;
fd_store_t * store;
fd_banks_t banks[1];
ulong frontier_indices[ FD_BANKS_MAX_BANKS ];
ulong frontier_cnt;

/* This flag is 1 If we have seen a vote signature that our node has
sent out get rooted at least one time. The value is 0 otherwise.
Expand Down Expand Up @@ -1916,7 +1918,8 @@ can_process_fec( fd_replay_tile_t * ctx,
return 1;
}

static void
/* Returns 0 on successful FEC ingestion, 1 if the block got marked dead. */
static int
insert_fec_set( fd_replay_tile_t * ctx,
fd_stem_context_t * stem,
fd_reasm_fec_t * reasm_fec ) {
Expand Down Expand Up @@ -1967,7 +1970,7 @@ insert_fec_set( fd_replay_tile_t * ctx,
fd_block_id_ele_t * block_id_ele = &ctx->block_id_arr[ reasm_fec->bank_idx ];
if( FD_UNLIKELY( block_id_ele->latest_fec_idx>=reasm_fec->fec_set_idx ) ) {
FD_LOG_WARNING(( "dropping FEC set (slot=%lu, fec_set_idx=%u) because it is at least as old as the latest FEC set (slot=%lu, fec_set_idx=%u)", reasm_fec->slot, reasm_fec->fec_set_idx, block_id_ele->slot, block_id_ele->latest_fec_idx ));
return;
return 0;
}
block_id_ele->latest_fec_idx = reasm_fec->fec_set_idx;
block_id_ele->latest_mr = reasm_fec->key;
Expand All @@ -1982,7 +1985,7 @@ insert_fec_set( fd_replay_tile_t * ctx,
}

/* If we are the leader, we don't need to process the FEC set. */
if( FD_UNLIKELY( reasm_fec->is_leader ) ) return;
if( FD_UNLIKELY( reasm_fec->is_leader ) ) return 0;

/* Forks form a partial ordering over FEC sets. The Repair tile
delivers FEC sets in-order per fork, but FEC set ordering across
Expand Down Expand Up @@ -2044,16 +2047,18 @@ insert_fec_set( fd_replay_tile_t * ctx,
ctx->metrics.store_query_missing_mr = reasm_fec->key.ul[0];
FD_BASE58_ENCODE_32_BYTES( reasm_fec->key.key, key_b58 );
FD_LOG_WARNING(( "store fec for slot: %lu is on minority fork already pruned by publish. abandoning slice. root: %lu. pruned merkle: %s", reasm_fec->slot, ctx->consensus_root_slot, key_b58 ));
return;
return 0;
}
sched_fec->fec = store_fec;
if( FD_UNLIKELY( !fd_sched_fec_ingest( ctx->sched, sched_fec ) ) ) { /* FIXME this critical section is unnecessarily complex. should refactor to just be held for the memcpy and block_offs. */
mark_bank_dead( ctx, stem, sched_fec->bank_idx );
return 1;
}
} FD_STORE_SLOCK_END;

ctx->metrics.store_query_release++;
fd_histf_sample( ctx->metrics.store_query_work, (ulong)fd_log_wallclock() - work );
return 0;
}

static void
Expand Down Expand Up @@ -2107,7 +2112,8 @@ process_fec_set( fd_replay_tile_t * ctx,
path[ path_cnt++ ] = curr;
}

for( ulong i=path_cnt; i>0UL; i-- ) {
int dead = 0;
for( ulong i=path_cnt; i>0UL && !dead; i-- ) {
fd_reasm_fec_t * leaf = path[ i-1 ];

/* If there's not capacity in the sched or banks, return early and
Expand All @@ -2127,7 +2133,7 @@ process_fec_set( fd_replay_tile_t * ctx,
FD_LOG_NOTICE(( "backfilling FEC sets for slot %lu from fec_set_idx %u to fec_set_idx %u", leaf->slot, leaf->fec_set_idx, curr->fec_set_idx ));

for( ulong j=0UL; j<=leaf->fec_set_idx/FD_FEC_SHRED_CNT; j++ ) {
insert_fec_set( ctx, stem, slot_fecs[ j ] );
if( FD_UNLIKELY( dead=insert_fec_set( ctx, stem, slot_fecs[ j ] ) ) ) break;
}
}
}
Expand Down Expand Up @@ -2283,6 +2289,32 @@ after_credit( fd_replay_tile_t * ctx,
return;
}

/* Mark a frontier eviction victim bank as dead. As refcnts on said
banks are drained, they will be pruned away. */
if( FD_UNLIKELY( ctx->frontier_cnt ) ) {
*charge_busy = 1;
*opt_poll_in = 0;
bank_idx = ctx->frontier_indices[ --ctx->frontier_cnt ];
fd_bank_t bank[1];
FD_TEST( fd_banks_bank_query( bank, ctx->banks, bank_idx ) );
if( FD_UNLIKELY( ctx->is_leader && bank_idx==ctx->leader_bank->data->idx ) ) return;
mark_bank_dead( ctx, stem, bank->data->idx );
fd_sched_block_abandon( ctx->sched, bank->data->idx );

/* evict it from reasm */

fd_block_id_ele_t * block_id_ele = &ctx->block_id_arr[ bank->data->idx ];
fd_reasm_fec_t * fec = fd_reasm_query( ctx->reasm, &block_id_ele->latest_mr );
FD_TEST( fec );
fd_reasm_fec_t * evicted_head = fd_reasm_remove( ctx->reasm, fec, ctx->store );
if( FD_UNLIKELY( ctx->reasm_evicted ) ) {
/* already have a chain we are evicting. Prepend the new chain to the existing chain */
fec->child = fd_reasm_pool_idx( ctx->reasm, ctx->reasm_evicted );
}
ctx->reasm_evicted = evicted_head;
return;
}

/* If the reassembler has a fec that is ready, we should process it
and pass it to the scheduler. */
int evict_banks = 0;
Expand All @@ -2296,31 +2328,10 @@ after_credit( fd_replay_tile_t * ctx,

if( FD_UNLIKELY( evict_banks ) ) {
FD_LOG_WARNING(( "banks are full and partially executed frontier banks are being evicted" ));
ulong frontier_cnt = 0UL;
ulong frontier_indices[ FD_BANKS_MAX_BANKS ];
fd_banks_get_frontier( ctx->banks, frontier_indices, &frontier_cnt );

/* Mark all frontier banks as dead. As refcnts on said banks are
drained, they will be pruned away. */
for( ulong i=0UL; i<frontier_cnt; i++ ) {
fd_bank_t bank[1];
FD_TEST( fd_banks_bank_query( bank, ctx->banks, frontier_indices[i] ) );
if( FD_UNLIKELY( ctx->is_leader && frontier_indices[i]==ctx->leader_bank->data->idx ) ) continue;
mark_bank_dead( ctx, stem, bank->data->idx );
fd_sched_block_abandon( ctx->sched, bank->data->idx );

/* evict it from reasm */

fd_block_id_ele_t * block_id_ele = &ctx->block_id_arr[ bank->data->idx ];
fd_reasm_fec_t * fec = fd_reasm_query( ctx->reasm, &block_id_ele->latest_mr );
FD_TEST( fec );
fd_reasm_fec_t * evicted_head = fd_reasm_remove( ctx->reasm, fec, ctx->store );
if( FD_UNLIKELY( ctx->reasm_evicted ) ) {
/* already have a chain we are evicting. Prepend the new chain to the existing chain */
fec->child = fd_reasm_pool_idx( ctx->reasm, ctx->reasm_evicted );
}
ctx->reasm_evicted = evicted_head;
}
fd_banks_get_frontier( ctx->banks, ctx->frontier_indices, &ctx->frontier_cnt );
*charge_busy = 1;
*opt_poll_in = 0;
return;
}

*charge_busy = replay( ctx, stem );
Expand Down Expand Up @@ -2899,6 +2910,8 @@ unprivileged_init( fd_topo_t * topo,
fd_bank_data_t * bank_pool = fd_banks_get_bank_pool( ctx->banks->data );
FD_MGAUGE_SET( REPLAY, MAX_LIVE_BANKS, fd_banks_pool_max( bank_pool ) );

ctx->frontier_cnt = 0UL;

fd_bank_t bank[1];
FD_TEST( fd_banks_init_bank( bank, ctx->banks ) );
fd_bank_slot_set( bank, 0UL );
Expand Down
Loading