diff --git a/src/app/firedancer/topology.c b/src/app/firedancer/topology.c index a2872c991a2..f6d035ea58a 100644 --- a/src/app/firedancer/topology.c +++ b/src/app/firedancer/topology.c @@ -204,9 +204,8 @@ fd_topo_initialize( config_t * config ) { ulong resolv_tile_cnt = config->layout.resolv_tile_count; ulong sign_tile_cnt = config->firedancer.layout.sign_tile_count; - int enable_rpc = ( config->rpc.port != 0 ); - fd_topo_t * topo = fd_topob_new( &config->topo, config->name ); + topo->max_page_size = fd_cstr_to_shmem_page_sz( config->hugetlbfs.max_page_size ); topo->gigantic_page_threshold = config->hugetlbfs.gigantic_page_threshold_mib << 20; @@ -296,7 +295,6 @@ fd_topo_initialize( config_t * config ) { fd_topob_wksp( topo, "replay_manif" ); fd_topob_wksp( topo, "slot_fseqs" ); /* fseqs for marked slots eg. turbine slot */ - if( enable_rpc ) fd_topob_wksp( topo, "rpcsrv" ); #define FOR(cnt) for( ulong i=0UL; itile_cnt ], 0, 0 ); FOR(writer_tile_cnt) fd_topob_tile( topo, "writer", "writer", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); - fd_topo_tile_t * rpcserv_tile = NULL; - if( enable_rpc ) rpcserv_tile = fd_topob_tile( topo, "rpcsrv", "rpcsrv", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 1 ); - fd_topo_tile_t * snaprd_tile = fd_topob_tile( topo, "snaprd", "snaprd", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); snaprd_tile->allow_shutdown = 1; fd_topo_tile_t * snapdc_tile = fd_topob_tile( topo, "snapdc", "snapdc", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); @@ -456,7 +451,6 @@ fd_topo_initialize( config_t * config ) { FOR(exec_tile_cnt) fd_topob_tile_uses( topo, &topo->tiles[ fd_topo_find_tile( topo, "exec", i ) ], funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE ); /* */ fd_topob_tile_uses( topo, replay_tile, funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE ); - if(rpcserv_tile) fd_topob_tile_uses( topo, rpcserv_tile, funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE ); FOR(writer_tile_cnt) fd_topob_tile_uses( topo, &topo->tiles[ fd_topo_find_tile( topo, "writer", i ) ], funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE ); /* Setup a shared wksp object for banks. */ @@ -789,9 +783,15 @@ fd_topo_initialize( config_t * config ) { /**/ fd_topob_link( topo, "replay_notif", "replay_notif", FD_REPLAY_NOTIF_DEPTH, FD_REPLAY_NOTIF_MTU, 1UL )->permit_no_consumers = 1; /**/ fd_topob_tile_out( topo, "replay", 0UL, "replay_notif", 0UL ); + int enable_rpc = ( config->rpc.port != 0 ); if( enable_rpc ) { - fd_topob_tile_in( topo, "rpcsrv", 0UL, "metric_in", "replay_notif", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); - fd_topob_tile_in( topo, "rpcsrv", 0UL, "metric_in", "stake_out", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); + fd_topob_wksp( topo, "rpcsrv" ); + fd_topo_tile_t * rpcserv_tile = fd_topob_tile( topo, "rpcsrv", "rpcsrv", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 1 ); + fd_topob_tile_uses( topo, rpcserv_tile, funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE ); + fd_topob_tile_uses( topo, rpcserv_tile, store_obj, FD_SHMEM_JOIN_MODE_READ_WRITE ); + fd_topob_tile_in( topo, "rpcsrv", 0UL, "metric_in", "replay_notif", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); + fd_topob_tile_in( topo, "rpcsrv", 0UL, "metric_in", "stake_out", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); + fd_topob_tile_in( topo, "rpcsrv", 0UL, "metric_in", "repair_repla", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); } /* For now the only plugin consumer is the GUI */ @@ -1015,6 +1015,7 @@ fd_topo_configure_tile( fd_topo_tile_t * tile, } else if( FD_UNLIKELY( !strcmp( tile->name, "rpcsrv" ) ) ) { strncpy( tile->replay.blockstore_file, config->firedancer.blockstore.file, sizeof(tile->replay.blockstore_file) ); tile->rpcserv.funk_obj_id = fd_pod_query_ulong( config->topo.props, "funk", ULONG_MAX ); + tile->rpcserv.store_obj_id = fd_pod_query_ulong( config->topo.props, "store", ULONG_MAX ); tile->rpcserv.rpc_port = config->rpc.port; tile->rpcserv.tpu_port = config->tiles.quic.regular_transaction_listen_port; tile->rpcserv.tpu_ip_addr = config->net.ip_addr; diff --git a/src/app/rpcserver/main.c b/src/app/rpcserver/main.c index 195f52f8fa5..13c1f92a93d 100644 --- a/src/app/rpcserver/main.c +++ b/src/app/rpcserver/main.c @@ -14,12 +14,10 @@ #include "../../funk/fd_funk.h" #define SHAM_LINK_CONTEXT fd_rpc_ctx_t -#define SHAM_LINK_STATE fd_replay_notif_msg_t #define SHAM_LINK_NAME replay_sham_link #include "sham_link.h" #define SHAM_LINK_CONTEXT fd_rpc_ctx_t -#define SHAM_LINK_STATE fd_multi_epoch_leaders_t #define SHAM_LINK_NAME stake_sham_link #include "sham_link.h" @@ -55,7 +53,6 @@ init_args( int * argc, char *** argv, fd_rpcserver_args_t * args ) { } fd_wksp_mprotect( wksp, 1 ); - args->leaders = fd_multi_epoch_leaders_join( fd_multi_epoch_leaders_new( aligned_alloc( fd_multi_epoch_leaders_align(), fd_multi_epoch_leaders_footprint() ) ) ); args->port = (ushort)fd_env_strip_cmdline_ulong( argc, argv, "--port", NULL, 8899 ); args->params.max_connection_cnt = fd_env_strip_cmdline_ulong( argc, argv, "--max-connection-cnt", NULL, 30 ); @@ -206,10 +203,9 @@ int main( int argc, char ** argv ) { replay_sham_link_start( rep_notify ); stake_sham_link_start( stake_notify ); while( !stopflag ) { - fd_replay_notif_msg_t msg; - replay_sham_link_poll( rep_notify, ctx, &msg ); + replay_sham_link_poll( rep_notify, ctx ); - stake_sham_link_poll( stake_notify, ctx, args.leaders ); + stake_sham_link_poll( stake_notify, ctx ); fd_rpc_ws_poll( ctx ); } @@ -219,21 +215,21 @@ int main( int argc, char ** argv ) { } static void -replay_sham_link_during_frag(fd_rpc_ctx_t * ctx, fd_replay_notif_msg_t * state, void const * msg, int sz) { - fd_rpc_replay_during_frag( ctx, state, msg, sz ); +replay_sham_link_during_frag(fd_rpc_ctx_t * ctx, void const * msg, int sz) { + fd_rpc_replay_during_frag( ctx, msg, sz ); } static void -replay_sham_link_after_frag(fd_rpc_ctx_t * ctx, fd_replay_notif_msg_t * msg) { - fd_rpc_replay_after_frag( ctx, msg ); +replay_sham_link_after_frag(fd_rpc_ctx_t * ctx) { + fd_rpc_replay_after_frag( ctx ); } static void -stake_sham_link_during_frag(fd_rpc_ctx_t * ctx, fd_multi_epoch_leaders_t * state, void const * msg, int sz) { - fd_rpc_stake_during_frag( ctx, state, msg, sz ); +stake_sham_link_during_frag(fd_rpc_ctx_t * ctx, void const * msg, int sz) { + fd_rpc_stake_during_frag( ctx, msg, sz ); } static void -stake_sham_link_after_frag(fd_rpc_ctx_t * ctx, fd_multi_epoch_leaders_t * state) { - fd_rpc_stake_after_frag( ctx, state ); +stake_sham_link_after_frag(fd_rpc_ctx_t * ctx) { + fd_rpc_stake_after_frag( ctx ); } diff --git a/src/app/rpcserver/sham_link.h b/src/app/rpcserver/sham_link.h index c827aabe440..321ac26720e 100644 --- a/src/app/rpcserver/sham_link.h +++ b/src/app/rpcserver/sham_link.h @@ -39,13 +39,13 @@ SHAM_LINK_(start)( SHAM_LINK_(t) * self ) { } static void -SHAM_LINK_(during_frag)( SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE * state, void const * msg, int sz ); +SHAM_LINK_(during_frag)( SHAM_LINK_CONTEXT * ctx, void const * msg, int sz ); static void -SHAM_LINK_(after_frag)( SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE * state ); +SHAM_LINK_(after_frag)( SHAM_LINK_CONTEXT * ctx ); static inline void -SHAM_LINK_(poll)( SHAM_LINK_(t) * self, SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE * state ) { +SHAM_LINK_(poll)( SHAM_LINK_(t) * self, SHAM_LINK_CONTEXT * ctx ) { while (1) { fd_frag_meta_t const * mline = self->mcache + fd_mcache_line_idx( self->seq_expect, self->depth ); @@ -64,7 +64,7 @@ SHAM_LINK_(poll)( SHAM_LINK_(t) * self, SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE ulong chunk = mline->chunk; /* TODO: sanity check chunk,sz */ - SHAM_LINK_(during_frag)( ctx, state, fd_chunk_to_laddr( self->wksp, chunk ), mline->sz ); + SHAM_LINK_(during_frag)( ctx, fd_chunk_to_laddr( self->wksp, chunk ), mline->sz ); seq_found = fd_frag_meta_seq_query( mline ); diff = fd_seq_diff( seq_found, self->seq_expect ); @@ -74,7 +74,7 @@ SHAM_LINK_(poll)( SHAM_LINK_(t) * self, SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE continue; } - SHAM_LINK_(after_frag)( ctx, state ); + SHAM_LINK_(after_frag)( ctx ); self->seq_expect++; } diff --git a/src/disco/topo/fd_topo.h b/src/disco/topo/fd_topo.h index d6c446f83e2..26a0096b737 100644 --- a/src/disco/topo/fd_topo.h +++ b/src/disco/topo/fd_topo.h @@ -431,6 +431,7 @@ struct fd_topo_tile { struct { ulong funk_obj_id; + ulong store_obj_id; ushort rpc_port; ushort tpu_port; uint tpu_ip_addr; diff --git a/src/discof/rpcserver/fd_rpc_history.c b/src/discof/rpcserver/fd_rpc_history.c index 0d18f19a5ce..d2f20bd7b25 100644 --- a/src/discof/rpcserver/fd_rpc_history.c +++ b/src/discof/rpcserver/fd_rpc_history.c @@ -6,6 +6,8 @@ #include "../../ballet/shred/fd_shred.h" #include "../../flamenco/runtime/fd_system_ids.h" +#include + struct fd_rpc_block { ulong slot; ulong next; @@ -75,6 +77,18 @@ typedef struct fd_rpc_acct_map_elem fd_rpc_acct_map_elem_t; #define POOL_T fd_rpc_acct_map_elem_t #include "../../util/tmpl/fd_pool.c" +#define FD_REASM_MAP_COL_CNT (1UL<<10) +#define FD_REASM_MAP_COL_HEIGHT (128UL) +struct fd_rpc_reasm_map { + struct fd_rpc_reasm_map_column { + ulong ele_cnt; /* The number of shreds received in this column */ uchar end_found; /* Whether the last slice of the slot has been found */ + fd_reasm_fec_t ele[FD_REASM_MAP_COL_HEIGHT]; + } cols[FD_REASM_MAP_COL_CNT]; + ulong head; /* Next open column */ + ulong tail; /* Oldest column */ +}; +typedef struct fd_rpc_reasm_map fd_rpc_reasm_map_t; + struct fd_rpc_history { fd_spad_t * spad; fd_rpc_block_t * block_map; @@ -82,6 +96,7 @@ struct fd_rpc_history { fd_rpc_txn_t * txn_map; fd_rpc_acct_map_t * acct_map; fd_rpc_acct_map_elem_t * acct_pool; + fd_rpc_reasm_map_t * reasm_map; ulong first_slot; ulong latest_slot; int file_fd; @@ -107,6 +122,10 @@ fd_rpc_history_create(fd_rpcserver_args_t * args) { mem = fd_spad_alloc( spad, fd_rpc_acct_map_pool_align(), fd_rpc_acct_map_pool_footprint( args->acct_index_max ) ); hist->acct_pool = fd_rpc_acct_map_pool_join( fd_rpc_acct_map_pool_new( mem, args->acct_index_max ) ); + mem = fd_spad_alloc( spad, alignof(fd_rpc_reasm_map_t), sizeof(fd_rpc_reasm_map_t) ); + memset(mem, 0, sizeof(fd_rpc_reasm_map_t)); + hist->reasm_map = (fd_rpc_reasm_map_t *)mem; + hist->file_fd = open( args->history_file, O_CREAT | O_RDWR | O_TRUNC, 0644 ); if( hist->file_fd == -1 ) FD_LOG_ERR(( "unable to open rpc history file: %s", args->history_file )); hist->file_totsz = 0; @@ -114,100 +133,210 @@ fd_rpc_history_create(fd_rpcserver_args_t * args) { return hist; } +static fd_rpc_block_t * +fd_rpc_history_alloc_block(fd_rpc_history_t * hist, ulong slot) { + fd_rpc_block_t * blk = fd_rpc_block_map_query(hist->block_map, &slot, NULL); + if( blk ) return blk; + if( fd_rpc_block_map_is_full( hist->block_map ) ) return NULL; /* Out of space */ + blk = fd_rpc_block_map_insert( hist->block_map, &slot ); + if( blk == NULL ) { + FD_LOG_ERR(( "unable to save slot %lu block", slot )); + return NULL; + } + blk->slot = slot; + blk->file_offset = 0UL; + blk->file_size = 0UL; + memset( &blk->info, 0, sizeof(fd_replay_notif_msg_t) ); + blk->info.slot_exec.slot = slot; + if( hist->first_slot == ULONG_MAX ) { + hist->first_slot = hist->latest_slot = slot; + } else { + if( slot < hist->first_slot ) hist->first_slot = slot; + else if( slot > hist->latest_slot ) hist->latest_slot = slot; + } + hist->block_cnt++; + return blk; +} + void -fd_rpc_history_save(fd_rpc_history_t * hist, fd_replay_notif_msg_t * info) { - FD_SPAD_FRAME_BEGIN( hist->spad ) { - if( fd_rpc_block_map_is_full( hist->block_map ) ) return; /* Out of space */ +fd_rpc_history_debug(fd_rpc_history_t * hist) { + fd_rpc_reasm_map_t * reasm_map = hist->reasm_map; + ulong tot_cnt = 0; + for( ulong slot = reasm_map->tail; slot < reasm_map->head; slot++ ) { + ulong col_idx = slot & (FD_REASM_MAP_COL_CNT - 1); + struct fd_rpc_reasm_map_column * col = &reasm_map->cols[col_idx]; + FD_LOG_NOTICE(( "slot %lu: %lu fecs", slot, col->ele_cnt )); + tot_cnt += col->ele_cnt; + } + FD_LOG_NOTICE(( "%lu head, %lu tail, %lu total fecs, %lu total blocks", + reasm_map->head, reasm_map->tail, tot_cnt, reasm_map->head - reasm_map->tail )); +} - ulong blk_max = info->slot_exec.shred_cnt * FD_SHRED_MAX_SZ; - uchar * blk_data = fd_spad_alloc( hist->spad, 1, blk_max ); - ulong blk_sz = 0; - // if( fd_blockstore_slice_query( blockstore, info->slot_exec.slot, 0, (uint)(info->slot_exec.shred_cnt-1), blk_max, blk_data, &blk_sz) ) { - // FD_LOG_WARNING(( "unable to read slot %lu block", info->slot_exec.slot )); - // return; - // } +void +fd_rpc_history_save_info(fd_rpc_history_t * hist, fd_replay_notif_msg_t * info) { + fd_rpc_block_t * blk = fd_rpc_history_alloc_block( hist, info->slot_exec.slot ); + if( blk == NULL ) return; + blk->info = *info; +} - FD_LOG_NOTICE(( "saving slot %lu block", info->slot_exec.slot )); +static void +fd_rpc_history_scan_block(fd_rpc_history_t * hist, ulong slot, ulong file_offset, uchar * blk_data, ulong blk_sz) { + ulong blockoff = 0; + while (blockoff < blk_sz) { + if ( blockoff + sizeof(ulong) > blk_sz ) + return; + ulong mcount = *(const ulong *)(blk_data + blockoff); + blockoff += sizeof(ulong); + + /* Loop across microblocks */ + for (ulong mblk = 0; mblk < mcount; ++mblk) { + if ( blockoff + sizeof(fd_microblock_hdr_t) > blk_sz ) + FD_LOG_ERR(("premature end of block")); + fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t *)((const uchar *)blk_data + blockoff); + blockoff += sizeof(fd_microblock_hdr_t); + + /* Loop across transactions */ + for ( ulong txn_idx = 0; txn_idx < hdr->txn_cnt; txn_idx++ ) { + uchar txn_out[FD_TXN_MAX_SZ]; + ulong pay_sz = 0; + const uchar* raw = (const uchar *)blk_data + blockoff; + ulong txn_sz = fd_txn_parse_core(raw, fd_ulong_min(blk_sz - blockoff, FD_TXN_MTU), txn_out, NULL, &pay_sz); + if ( txn_sz == 0 || txn_sz > FD_TXN_MAX_SZ ) { + FD_LOG_WARNING( ( "failed to parse transaction %lu in microblock %lu at offset %lu", txn_idx, mblk, blockoff ) ); + return; + } + fd_txn_t * txn = (fd_txn_t *)txn_out; + + /* Loop across signatures */ + fd_ed25519_sig_t const * sigs = (fd_ed25519_sig_t const *)(raw + txn->signature_off); + for ( uchar j = 0; j < txn->signature_cnt; j++ ) { + if( fd_rpc_txn_map_is_full( hist->txn_map ) ) break; /* Out of space */ + fd_rpc_txn_key_t key; + memcpy(&key, (const uchar*)&sigs[j], sizeof(key)); + fd_rpc_txn_t * ent = fd_rpc_txn_map_insert( hist->txn_map, &key ); + ent->file_offset = file_offset + blockoff; + ent->file_size = pay_sz; + ent->slot = slot; + } - if( hist->first_slot == ULONG_MAX ) hist->first_slot = info->slot_exec.slot; - hist->latest_slot = info->slot_exec.slot; + /* Loop across accounts */ + fd_rpc_txn_key_t sig0; + memcpy(&sig0, (const uchar*)sigs, sizeof(sig0)); + fd_pubkey_t * accs = (fd_pubkey_t *)((uchar *)raw + txn->acct_addr_off); + for( ulong i = 0UL; i < txn->acct_addr_cnt; i++ ) { + if( !memcmp(&accs[i], fd_solana_vote_program_id.key, sizeof(fd_pubkey_t)) ) continue; /* Ignore votes */ + if( !fd_rpc_acct_map_pool_free( hist->acct_pool ) ) break; + fd_rpc_acct_map_elem_t * ele = fd_rpc_acct_map_pool_ele_acquire( hist->acct_pool ); + ele->key = accs[i]; + ele->slot = slot; + ele->sig = sig0; + fd_rpc_acct_map_ele_insert( hist->acct_map, ele, hist->acct_pool ); + } - fd_rpc_block_t * blk = fd_rpc_block_map_insert( hist->block_map, &info->slot_exec.slot ); - if( blk == NULL ) { - FD_LOG_ERR(( "unable to save slot %lu block", info->slot_exec.slot )); - return; + blockoff += pay_sz; + } } - blk->info = *info; + } + if ( blockoff != blk_sz ) + FD_LOG_ERR(("garbage at end of block")); +} +void +fd_rpc_history_process_column(fd_rpc_history_t * hist, struct fd_rpc_reasm_map_column * col, fd_store_t * store, fd_reasm_fec_t * fec) { + FD_SPAD_FRAME_BEGIN( hist->spad ) { + + FD_LOG_NOTICE(( "assembling slot %lu block", fec->slot )); + + /* Assemble the block */ + fd_store_fec_t * list[FD_REASM_MAP_COL_HEIGHT]; + ulong slot = fec->slot; + ulong blk_sz = 0; + for( ulong i = 0; i < col->ele_cnt; i++ ) { + fd_reasm_fec_t * ele = &col->ele[i]; + fd_store_fec_t * fec_p = list[i] = fd_store_query( store, &ele->key ); + if( !fec_p ) { + FD_LOG_WARNING(( "missing fec" )); + return; + } + blk_sz += fec_p->data_sz; + } + uchar * blk_data = fd_spad_alloc( hist->spad, alignof(ulong), blk_sz ); + ulong blk_off = 0; + for( ulong i = 0; i < col->ele_cnt; i++ ) { + fd_store_fec_t * fec_p = list[i]; + fd_memcpy( blk_data + blk_off, fec_p->data, fec_p->data_sz ); + blk_off += fec_p->data_sz; + } + FD_TEST( blk_off == blk_sz ); + + /* Get a block from the map */ + fd_rpc_block_t * blk = fd_rpc_history_alloc_block( hist, slot ); + if( blk == NULL ) return; + + /* Write the block to the file */ if( pwrite( hist->file_fd, blk_data, blk_sz, (long)hist->file_totsz ) != (ssize_t)blk_sz ) { FD_LOG_ERR(( "unable to write to rpc history file" )); } - ulong base_offset = blk->file_offset = hist->file_totsz; + ulong file_offset = blk->file_offset = hist->file_totsz; blk->file_size = blk_sz; hist->file_totsz += blk_sz; - hist->block_cnt ++; - ulong blockoff = 0; - while (blockoff < blk_sz) { - if ( blockoff + sizeof(ulong) > blk_sz ) - return; - ulong mcount = *(const ulong *)(blk_data + blockoff); - blockoff += sizeof(ulong); - - /* Loop across microblocks */ - for (ulong mblk = 0; mblk < mcount; ++mblk) { - if ( blockoff + sizeof(fd_microblock_hdr_t) > blk_sz ) - FD_LOG_ERR(("premature end of block")); - fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t *)((const uchar *)blk_data + blockoff); - blockoff += sizeof(fd_microblock_hdr_t); - - /* Loop across transactions */ - for ( ulong txn_idx = 0; txn_idx < hdr->txn_cnt; txn_idx++ ) { - uchar txn_out[FD_TXN_MAX_SZ]; - ulong pay_sz = 0; - const uchar* raw = (const uchar *)blk_data + blockoff; - ulong txn_sz = fd_txn_parse_core(raw, fd_ulong_min(blk_sz - blockoff, FD_TXN_MTU), txn_out, NULL, &pay_sz); - if ( txn_sz == 0 || txn_sz > FD_TXN_MAX_SZ ) { - FD_LOG_ERR( ( "failed to parse transaction %lu in microblock %lu", txn_idx, mblk ) ); - } - fd_txn_t * txn = (fd_txn_t *)txn_out; - - /* Loop across signatures */ - fd_ed25519_sig_t const * sigs = (fd_ed25519_sig_t const *)(raw + txn->signature_off); - for ( uchar j = 0; j < txn->signature_cnt; j++ ) { - if( fd_rpc_txn_map_is_full( hist->txn_map ) ) break; /* Out of space */ - fd_rpc_txn_key_t key; - memcpy(&key, (const uchar*)&sigs[j], sizeof(key)); - fd_rpc_txn_t * ent = fd_rpc_txn_map_insert( hist->txn_map, &key ); - ent->file_offset = base_offset + blockoff; - ent->file_size = pay_sz; - ent->slot = info->slot_exec.slot; - } - - /* Loop across accoounts */ - fd_rpc_txn_key_t sig0; - memcpy(&sig0, (const uchar*)sigs, sizeof(sig0)); - fd_pubkey_t * accs = (fd_pubkey_t *)((uchar *)raw + txn->acct_addr_off); - for( ulong i = 0UL; i < txn->acct_addr_cnt; i++ ) { - if( !memcmp(&accs[i], fd_solana_vote_program_id.key, sizeof(fd_pubkey_t)) ) continue; /* Ignore votes */ - if( !fd_rpc_acct_map_pool_free( hist->acct_pool ) ) break; - fd_rpc_acct_map_elem_t * ele = fd_rpc_acct_map_pool_ele_acquire( hist->acct_pool ); - ele->key = accs[i]; - ele->slot = info->slot_exec.slot; - ele->sig = sig0; - fd_rpc_acct_map_ele_insert( hist->acct_map, ele, hist->acct_pool ); - } - - blockoff += pay_sz; - } - } - } - if ( blockoff != blk_sz ) - FD_LOG_ERR(("garbage at end of block")); + /* Scan the block */ + fd_rpc_history_scan_block( hist, slot, file_offset, blk_data, blk_sz ); } FD_SPAD_FRAME_END; } +static void +fd_rpc_history_discard_column(fd_rpc_reasm_map_t * reasm_map, ulong slot) { + ulong col_idx = slot & (FD_REASM_MAP_COL_CNT - 1); + struct fd_rpc_reasm_map_column * col = &reasm_map->cols[col_idx]; + col->ele_cnt = 0; +} + +void +fd_rpc_history_save_fec(fd_rpc_history_t * hist, fd_store_t * store, fd_reasm_fec_t * fec_msg ) { + fd_store_fec_t * fec_p = fd_store_query( store, &fec_msg->key ); + if( !fec_p ) return; + + fd_rpc_reasm_map_t * reasm_map = hist->reasm_map; + + if( reasm_map->head == 0UL ) { + reasm_map->head = fec_msg->slot+1; + reasm_map->tail = fec_msg->slot; + } + if( fec_msg->slot < reasm_map->tail ) return; /* Do not go backwards */ + while( fec_msg->slot >= reasm_map->tail + FD_REASM_MAP_COL_CNT ) { + FD_TEST( reasm_map->tail < reasm_map->head ); + fd_rpc_history_discard_column( reasm_map, reasm_map->tail++ ); + } + while( fec_msg->slot >= reasm_map->head ) { + ulong col_idx = (reasm_map->head++) & (FD_REASM_MAP_COL_CNT - 1); + struct fd_rpc_reasm_map_column * col = &reasm_map->cols[col_idx]; + col->ele_cnt = 0; + } + FD_TEST( fec_msg->slot >= reasm_map->tail && fec_msg->slot < reasm_map->head && reasm_map->head - reasm_map->tail <= FD_REASM_MAP_COL_CNT ); + + ulong col_idx = fec_msg->slot & (FD_REASM_MAP_COL_CNT - 1); + struct fd_rpc_reasm_map_column * col = &reasm_map->cols[col_idx]; + + if( col->ele_cnt == 0 ) { + FD_TEST( fec_msg->fec_set_idx == 0 ); + } else { + FD_TEST( fec_msg->fec_set_idx > col->ele[col->ele_cnt-1].fec_set_idx ); + } + + FD_TEST( col->ele_cnt < FD_REASM_MAP_COL_HEIGHT ); + + col->ele[col->ele_cnt++] = *fec_msg; + + if( fec_msg->slot_complete ) { + /* We've received all the shreds for this slot. Process it. */ + fd_rpc_history_process_column( hist, col, store, fec_msg ); + fd_rpc_history_discard_column( reasm_map, fec_msg->slot ); + } +} + ulong fd_rpc_history_first_slot(fd_rpc_history_t * hist) { return hist->first_slot; diff --git a/src/discof/rpcserver/fd_rpc_history.h b/src/discof/rpcserver/fd_rpc_history.h index a3a9d1e784b..2bede3db601 100644 --- a/src/discof/rpcserver/fd_rpc_history.h +++ b/src/discof/rpcserver/fd_rpc_history.h @@ -2,6 +2,7 @@ #define HEADER_fd_src_discof_rpcserver_fd_rpc_history_h #include "fd_rpc_service.h" +#include "../../ballet/shred/fd_shred.h" struct fd_rpc_history; typedef struct fd_rpc_history fd_rpc_history_t; @@ -13,7 +14,9 @@ typedef struct fd_rpc_txn_key fd_rpc_txn_key_t; fd_rpc_history_t * fd_rpc_history_create(fd_rpcserver_args_t * args); -void fd_rpc_history_save(fd_rpc_history_t * hist, fd_replay_notif_msg_t * msg); +void fd_rpc_history_save_info(fd_rpc_history_t * hist, fd_replay_notif_msg_t * msg); + +void fd_rpc_history_save_fec(fd_rpc_history_t * hist, fd_store_t * store, fd_reasm_fec_t * fec); ulong fd_rpc_history_first_slot(fd_rpc_history_t * hist); diff --git a/src/discof/rpcserver/fd_rpc_service.c b/src/discof/rpcserver/fd_rpc_service.c index 12158375410..981b9ea7106 100644 --- a/src/discof/rpcserver/fd_rpc_service.c +++ b/src/discof/rpcserver/fd_rpc_service.c @@ -6,6 +6,8 @@ #include "../../flamenco/runtime/fd_acc_mgr.h" #include "../../ballet/base58/fd_base58.h" #include "../../ballet/base64/fd_base64.h" +#include "../../ballet/shred/fd_shred.h" +#include "../reasm/fd_reasm.h" #include "fd_rpc_history.h" #include "fd_block_to_json.h" #include "keywords.h" @@ -83,6 +85,7 @@ struct fd_rpc_global_ctx { fd_spad_t * spad; fd_webserver_t ws; fd_funk_t * funk; + fd_store_t * store; struct fd_ws_subscription sub_list[FD_WS_MAX_SUBS]; ulong sub_cnt; ulong last_subsc_id; @@ -92,6 +95,7 @@ struct fd_rpc_global_ctx { fd_perf_sample_t perf_sample_snapshot; long perf_sample_ts; fd_multi_epoch_leaders_t * leaders; + uchar buffer[sizeof(fd_reasm_fec_t) > sizeof(fd_replay_notif_msg_t) ? sizeof(fd_reasm_fec_t) : sizeof(fd_replay_notif_msg_t)]; ulong acct_age; fd_rpc_history_t * history; fd_pubkey_t const * identity_key; /* nullable */ @@ -2299,7 +2303,9 @@ fd_rpc_create_ctx(fd_rpcserver_args_t * args, fd_rpc_ctx_t ** ctx_p) { ctx->global = gctx; gctx->spad = args->spad; - gctx->leaders = args->leaders; + + uchar * mleaders_mem = (uchar *)fd_spad_alloc( args->spad, FD_MULTI_EPOCH_LEADERS_ALIGN, FD_MULTI_EPOCH_LEADERS_FOOTPRINT ); + gctx->leaders = fd_multi_epoch_leaders_join( fd_multi_epoch_leaders_new( mleaders_mem) ); if( !args->offline ) { gctx->tpu_socket = socket(AF_INET, SOCK_DGRAM, 0); @@ -2335,8 +2341,8 @@ fd_rpc_create_ctx(fd_rpcserver_args_t * args, fd_rpc_ctx_t ** ctx_p) { void fd_rpc_start_service(fd_rpcserver_args_t * args, fd_rpc_ctx_t * ctx) { fd_rpc_global_ctx_t * gctx = ctx->global; - gctx->funk = args->funk; + gctx->store = args->store; } int @@ -2362,15 +2368,15 @@ fd_webserver_ws_closed(ulong conn_id, void * cb_arg) { } void -fd_rpc_replay_during_frag( fd_rpc_ctx_t * ctx, fd_replay_notif_msg_t * state, void const * msg, int sz ) { - (void)ctx; +fd_rpc_replay_during_frag( fd_rpc_ctx_t * ctx, void const * msg, int sz ) { FD_TEST( sz == (int)sizeof(fd_replay_notif_msg_t) ); - fd_memcpy(state, msg, sizeof(fd_replay_notif_msg_t)); + fd_memcpy(ctx->global->buffer, msg, sizeof(fd_replay_notif_msg_t)); } void -fd_rpc_replay_after_frag(fd_rpc_ctx_t * ctx, fd_replay_notif_msg_t * msg) { +fd_rpc_replay_after_frag(fd_rpc_ctx_t * ctx) { fd_rpc_global_ctx_t * subs = ctx->global; + fd_replay_notif_msg_t * msg = (fd_replay_notif_msg_t *)subs->buffer; if( msg->type == FD_REPLAY_SLOT_TYPE ) { long ts = fd_log_wallclock() / (long)1e9; @@ -2406,7 +2412,7 @@ fd_rpc_replay_after_frag(fd_rpc_ctx_t * ctx, fd_replay_notif_msg_t * msg) { if( msg->slot_exec.shred_cnt == 0 ) return; - fd_rpc_history_save( subs->history, msg ); + fd_rpc_history_save_info( subs->history, msg ); for( ulong j = 0; j < subs->sub_cnt; ++j ) { struct fd_ws_subscription * sub = &subs->sub_list[ j ]; @@ -2423,13 +2429,25 @@ fd_rpc_replay_after_frag(fd_rpc_ctx_t * ctx, fd_replay_notif_msg_t * msg) { } void -fd_rpc_stake_during_frag( fd_rpc_ctx_t * ctx, fd_multi_epoch_leaders_t * state, void const * msg, int sz ) { - (void)ctx; (void)sz; - fd_multi_epoch_leaders_stake_msg_init( state, msg ); +fd_rpc_stake_during_frag( fd_rpc_ctx_t * ctx, void const * msg, int sz ) { + (void)sz; + fd_multi_epoch_leaders_stake_msg_init( ctx->global->leaders, msg ); } void -fd_rpc_stake_after_frag(fd_rpc_ctx_t * ctx, fd_multi_epoch_leaders_t * state) { - (void)ctx; - fd_multi_epoch_leaders_stake_msg_fini( state ); +fd_rpc_stake_after_frag(fd_rpc_ctx_t * ctx) { + fd_multi_epoch_leaders_stake_msg_fini( ctx->global->leaders ); +} + +void +fd_rpc_repair_during_frag(fd_rpc_ctx_t * ctx, void const * msg, int sz) { + FD_TEST( sz==(int)sizeof(fd_reasm_fec_t) ); + fd_memcpy(ctx->global->buffer, msg, sizeof(fd_reasm_fec_t)); +} + +void +fd_rpc_repair_after_frag(fd_rpc_ctx_t * ctx) { + fd_rpc_global_ctx_t * subs = ctx->global; + fd_reasm_fec_t * fec_p = (fd_reasm_fec_t *)subs->buffer; + fd_rpc_history_save_fec( subs->history, subs->store, fec_p ); } diff --git a/src/discof/rpcserver/fd_rpc_service.h b/src/discof/rpcserver/fd_rpc_service.h index a9442d4cc8a..ea793649114 100644 --- a/src/discof/rpcserver/fd_rpc_service.h +++ b/src/discof/rpcserver/fd_rpc_service.h @@ -2,10 +2,10 @@ #define HEADER_fd_src_discof_rpcserver_fd_rpc_service_h #include "../replay/fd_replay_notif.h" - -#include "../../disco/store/fd_store.h" #include "../../flamenco/leaders/fd_multi_epoch_leaders.h" #include "../../waltz/http/fd_http_server.h" +#include "../../disco/store/fd_store.h" +#include "../../discof/reasm/fd_reasm.h" #include @@ -14,7 +14,7 @@ typedef struct fd_rpc_ctx fd_rpc_ctx_t; struct fd_rpcserver_args { int offline; fd_funk_t funk[1]; - fd_multi_epoch_leaders_t * leaders; + fd_store_t * store; ushort port; fd_http_server_params_t params; struct sockaddr_in tpu_addr; @@ -26,7 +26,6 @@ struct fd_rpcserver_args { /* Bump allocator */ fd_spad_t * spad; - fd_store_t * store; }; typedef struct fd_rpcserver_args fd_rpcserver_args_t; @@ -38,12 +37,16 @@ int fd_rpc_ws_poll(fd_rpc_ctx_t * ctx); int fd_rpc_ws_fd(fd_rpc_ctx_t * ctx); -void fd_rpc_replay_during_frag(fd_rpc_ctx_t * ctx, fd_replay_notif_msg_t * state, void const * msg, int sz); +void fd_rpc_replay_during_frag(fd_rpc_ctx_t * ctx, void const * msg, int sz); + +void fd_rpc_replay_after_frag(fd_rpc_ctx_t * ctx); + +void fd_rpc_stake_during_frag(fd_rpc_ctx_t * ctx, void const * msg, int sz); -void fd_rpc_replay_after_frag(fd_rpc_ctx_t * ctx, fd_replay_notif_msg_t * msg); +void fd_rpc_stake_after_frag(fd_rpc_ctx_t * ctx); -void fd_rpc_stake_during_frag(fd_rpc_ctx_t * ctx, fd_multi_epoch_leaders_t * state, void const * msg, int sz); +void fd_rpc_repair_during_frag(fd_rpc_ctx_t * ctx, void const * msg, int sz); -void fd_rpc_stake_after_frag(fd_rpc_ctx_t * ctx, fd_multi_epoch_leaders_t * state); +void fd_rpc_repair_after_frag(fd_rpc_ctx_t * ctx); #endif /* HEADER_fd_src_discof_rpcserver_fd_rpc_service_h */ diff --git a/src/discof/rpcserver/fd_rpcserv_tile.c b/src/discof/rpcserver/fd_rpcserv_tile.c index a369ae5bcac..9cc665677c5 100644 --- a/src/discof/rpcserver/fd_rpcserv_tile.c +++ b/src/discof/rpcserver/fd_rpcserv_tile.c @@ -1,44 +1,45 @@ /* Repair tile runs the repair protocol for a Firedancer node. */ #define _GNU_SOURCE -#include "../../disco/topo/fd_topo.h" #include +#include "../../disco/topo/fd_topo.h" #include "generated/fd_rpcserv_tile_seccomp.h" - #include "../rpcserver/fd_rpc_service.h" - #include "../../flamenco/leaders/fd_multi_epoch_leaders.h" -#include "../../util/pod/fd_pod_format.h" #include "../../disco/keyguard/fd_keyload.h" #include "../../disco/keyguard/fd_keyswitch.h" +#include "../../ballet/shred/fd_shred.h" -#include #include #include -#define REPLAY_NOTIF_IDX 0 -#define STAKE_IN_IDX 1 +#define IN_KIND_REPLAY_NOTIF 0 +#define IN_KIND_STAKE_OUT 1 +#define IN_KIND_REPAIR_REPLAY 2 + +#define MAX_IN_LINKS (16) + +typedef union { + struct { + fd_wksp_t * mem; + ulong chunk0; + ulong wmark; + ulong mtu; + ulong last_seq; + }; +} fd_rpcserv_in_ctx_t; struct fd_rpcserv_tile_ctx { fd_rpcserver_args_t args; fd_rpc_ctx_t * ctx; - fd_store_t * store; fd_pubkey_t identity_key; fd_keyswitch_t * keyswitch; - fd_wksp_t * replay_notif_in_mem; - ulong replay_notif_in_chunk0; - ulong replay_notif_in_wmark; - fd_replay_notif_msg_t replay_notif_in_state; - - fd_wksp_t * stake_in_mem; - ulong stake_in_chunk0; - ulong stake_in_wmark; - - uchar __attribute__((aligned(FD_MULTI_EPOCH_LEADERS_ALIGN))) mleaders_mem[ FD_MULTI_EPOCH_LEADERS_FOOTPRINT ]; + uchar in_kind[ MAX_IN_LINKS ]; + fd_rpcserv_in_ctx_t in_links[ MAX_IN_LINKS ]; }; typedef struct fd_rpcserv_tile_ctx fd_rpcserv_tile_ctx_t; @@ -95,21 +96,17 @@ during_frag( fd_rpcserv_tile_ctx_t * ctx, ulong chunk, ulong sz, ulong ctl FD_PARAM_UNUSED ) { - - if( FD_UNLIKELY( in_idx==REPLAY_NOTIF_IDX ) ) { - if( FD_UNLIKELY( chunkreplay_notif_in_chunk0 || chunk>ctx->replay_notif_in_wmark ) ) { - FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, - ctx->replay_notif_in_chunk0, ctx->replay_notif_in_wmark )); - } - fd_rpc_replay_during_frag( ctx->ctx, &ctx->replay_notif_in_state, fd_chunk_to_laddr_const( ctx->replay_notif_in_mem, chunk ), (int)sz ); - - } else if( FD_UNLIKELY( in_idx==STAKE_IN_IDX ) ) { - if( FD_UNLIKELY( chunkstake_in_chunk0 || chunk>ctx->stake_in_wmark ) ) { - FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, - ctx->stake_in_chunk0, ctx->stake_in_wmark )); - } - fd_rpc_stake_during_frag( ctx->ctx, ctx->args.leaders, fd_chunk_to_laddr_const( ctx->stake_in_mem, chunk ), (int)sz ); - + if( FD_UNLIKELY( chunkin_links[in_idx].chunk0 || chunk>ctx->in_links[in_idx].wmark ) ) { + FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, + ctx->in_links[in_idx].chunk0, ctx->in_links[in_idx].wmark )); + } + uchar kind = ctx->in_kind[ in_idx ]; + if( FD_UNLIKELY( kind==IN_KIND_REPLAY_NOTIF ) ) { + fd_rpc_replay_during_frag( ctx->ctx, fd_chunk_to_laddr_const( ctx->in_links[in_idx].mem, chunk ), (int)sz ); + } else if( FD_UNLIKELY( kind == IN_KIND_STAKE_OUT ) ) { + fd_rpc_stake_during_frag( ctx->ctx, fd_chunk_to_laddr_const( ctx->in_links[in_idx].mem, chunk ), (int)sz ); + } else if( FD_UNLIKELY( kind == IN_KIND_REPAIR_REPLAY ) ) { + fd_rpc_repair_during_frag( ctx->ctx, fd_chunk_to_laddr_const( ctx->in_links[in_idx].mem, chunk ), (int)sz ); } else { FD_LOG_ERR(("Unknown in_idx %lu for rpc", in_idx)); } @@ -124,17 +121,25 @@ after_frag( fd_rpcserv_tile_ctx_t * ctx, ulong tsorig, ulong tspub, fd_stem_context_t * stem ) { - (void)seq; (void)sig; (void)sz; (void)tsorig; (void)tspub; (void)stem; - if( FD_LIKELY( in_idx==REPLAY_NOTIF_IDX ) ) { - fd_rpc_replay_after_frag( ctx->ctx, &ctx->replay_notif_in_state ); - } else if( FD_UNLIKELY( in_idx==STAKE_IN_IDX ) ) { - fd_rpc_stake_after_frag( ctx->ctx, ctx->args.leaders ); + ulong * last_seq = &ctx->in_links[ in_idx ].last_seq; + if( FD_UNLIKELY( seq != *last_seq + 1 && seq != 0 ) ) { + FD_LOG_NOTICE(( "seq jump from %lu to %lu on input %lu", *last_seq, seq, in_idx )); + } + *last_seq = seq; + + uchar kind = ctx->in_kind[ in_idx ]; + if( kind==IN_KIND_REPLAY_NOTIF ) { + fd_rpc_replay_after_frag( ctx->ctx ); + } else if( kind == IN_KIND_STAKE_OUT ) { + fd_rpc_stake_after_frag( ctx->ctx ); + } else if( kind == IN_KIND_REPAIR_REPLAY ) { + fd_rpc_repair_after_frag( ctx->ctx ); } else { FD_LOG_ERR(("Unknown in_idx %lu for rpc", in_idx)); } @@ -166,17 +171,9 @@ privileged_init( fd_topo_t * topo, args->tpu_addr.sin_addr.s_addr = tile->rpcserv.tpu_ip_addr; args->tpu_addr.sin_port = htons( (ushort)tile->rpcserv.tpu_port ); - args->leaders = fd_multi_epoch_leaders_join( fd_multi_epoch_leaders_new( ctx->mleaders_mem) ); - uchar * spad_mem_cur = spad_mem; args->spad = fd_spad_join( fd_spad_new( spad_mem_cur, FD_RPC_SCRATCH_MAX ) ); - /* Blockstore setup */ - ulong store_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "store" ); - FD_TEST( store_obj_id!=ULONG_MAX ); - args->store = fd_store_join( ctx->store ); - FD_TEST( args->store!=NULL ); - args->block_index_max = tile->rpcserv.block_index_max; args->txn_index_max = tile->rpcserv.txn_index_max; args->acct_index_max = tile->rpcserv.acct_index_max; @@ -195,13 +192,6 @@ unprivileged_init( fd_topo_t * topo, fd_topo_tile_t * tile ) { void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id ); - if( FD_UNLIKELY( tile->in_cnt != 2 || - strcmp( topo->links[ tile->in_link_id[ REPLAY_NOTIF_IDX ] ].name, "replay_notif") || - strcmp( topo->links[ tile->in_link_id[ STAKE_IN_IDX ] ].name, "stake_out" ) ) ) { - FD_LOG_ERR(( "repair tile has none or unexpected input links %lu %s %s", - tile->in_cnt, topo->links[ tile->in_link_id[ 0 ] ].name, topo->links[ tile->in_link_id[ 1 ] ].name )); - } - if( FD_UNLIKELY( tile->out_cnt != 0 ) ) { FD_LOG_ERR(( "repair tile has none or unexpected output links %lu %s %s", tile->out_cnt, topo->links[ tile->out_link_id[ 0 ] ].name, topo->links[ tile->out_link_id[ 1 ] ].name )); @@ -216,15 +206,25 @@ unprivileged_init( fd_topo_t * topo, if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) ) FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) )); - fd_topo_link_t * replay_notif_in_link = &topo->links[ tile->in_link_id[ REPLAY_NOTIF_IDX ] ]; - ctx->replay_notif_in_mem = topo->workspaces[ topo->objs[ replay_notif_in_link->dcache_obj_id ].wksp_id ].wksp; - ctx->replay_notif_in_chunk0 = fd_dcache_compact_chunk0( ctx->replay_notif_in_mem, replay_notif_in_link->dcache ); - ctx->replay_notif_in_wmark = fd_dcache_compact_wmark ( ctx->replay_notif_in_mem, replay_notif_in_link->dcache, replay_notif_in_link->mtu ); + for( uint in_idx=0U; in_idx<(tile->in_cnt); in_idx++ ) { + fd_topo_link_t * link = &topo->links[ tile->in_link_id[ in_idx ] ]; + if( 0==strcmp( link->name, "replay_notif" ) ) { + ctx->in_kind[ in_idx ] = IN_KIND_REPLAY_NOTIF; + } else if( 0==strcmp( link->name, "stake_out" ) ) { + ctx->in_kind[ in_idx ] = IN_KIND_STAKE_OUT; + } else if( 0==strcmp( link->name, "repair_repla" ) ) { + ctx->in_kind[ in_idx ] = IN_KIND_REPAIR_REPLAY; + } else { + FD_LOG_ERR(( "rpcserv tile has unexpected input link %s", link->name )); + } - fd_topo_link_t * stake_in_link = &topo->links[ tile->in_link_id[ STAKE_IN_IDX ] ]; - ctx->stake_in_mem = topo->workspaces[ topo->objs[ stake_in_link->dcache_obj_id ].wksp_id ].wksp; - ctx->stake_in_chunk0 = fd_dcache_compact_chunk0( ctx->stake_in_mem, stake_in_link->dcache ); - ctx->stake_in_wmark = fd_dcache_compact_wmark ( ctx->stake_in_mem, stake_in_link->dcache, stake_in_link->mtu ); + ctx->in_links[ in_idx ].mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp; + ctx->in_links[ in_idx ].chunk0 = fd_dcache_compact_chunk0( ctx->in_links[ in_idx ].mem, link->dcache ); + ctx->in_links[ in_idx ].wmark = fd_dcache_compact_wmark ( ctx->in_links[ in_idx ].mem, link->dcache, link->mtu ); + ctx->in_links[ in_idx ].mtu = link->mtu; + ctx->in_links[ in_idx ].last_seq = 0; + FD_TEST( fd_dcache_compact_is_safe( ctx->in_links[in_idx].mem, link->dcache, link->mtu, link->depth ) ); + } ctx->keyswitch = fd_keyswitch_join( fd_topo_obj_laddr( topo, tile->keyswitch_obj_id ) ); FD_TEST( ctx->keyswitch ); @@ -233,6 +233,10 @@ unprivileged_init( fd_topo_t * topo, if( FD_UNLIKELY( !fd_funk_join( args->funk, fd_topo_obj_laddr( topo, tile->rpcserv.funk_obj_id ) ) ) ) { FD_LOG_ERR(( "Failed to join database cache" )); } + + args->store = fd_store_join( fd_topo_obj_laddr( topo, tile->rpcserv.store_obj_id ) ); + FD_TEST( args->store->magic == FD_STORE_MAGIC ); + fd_rpc_start_service( args, ctx->ctx ); }