diff --git a/src/app/firedancer-dev/commands/backtest.c b/src/app/firedancer-dev/commands/backtest.c index 59d29aded1..214eb53202 100644 --- a/src/app/firedancer-dev/commands/backtest.c +++ b/src/app/firedancer-dev/commands/backtest.c @@ -20,6 +20,7 @@ #include "../../shared/fd_config.h" /* config_t */ #include "../../../disco/tiles.h" #include "../../../disco/topo/fd_topob.h" +#include "../../../disco/topo/fd_topob_vinyl.h" #include "../../../util/pod/fd_pod_format.h" #include "../../../discof/replay/fd_replay_tile.h" #include "../../../discof/restore/utils/fd_ssctrl.h" @@ -50,7 +51,7 @@ backtest_topo( config_t * config ) { ulong lta_tile_cnt = config->firedancer.layout.snapla_tile_count; int disable_snap_loader = !config->gossip.entrypoints_cnt; - int snap_vinyl = !!config->firedancer.vinyl.enabled; + int vinyl_enabled = !!config->firedancer.vinyl.enabled; int solcap_enabled = strlen( config->capture.solcap_capture )>0; int snapshot_lthash_disabled = config->development.snapshots.disable_lthash_verification; @@ -91,7 +92,7 @@ backtest_topo( config_t * config ) { config->firedancer.runtime.program_cache.heap_size_mib<<20 ); fd_topob_tile_uses( topo, replay_tile, progcache_obj, FD_SHMEM_JOIN_MODE_READ_WRITE ); - if( snap_vinyl ) { + if( vinyl_enabled ) { setup_topo_vinyl_meta( topo, &config->firedancer ); } @@ -105,7 +106,6 @@ backtest_topo( config_t * config ) { /**********************************************************************/ /* Add the snapshot tiles to topo */ /**********************************************************************/ - int vinyl_enabled = config->firedancer.vinyl.enabled; fd_topo_tile_t * snapin_tile = NULL; fd_topo_tile_t * snapwr_tile = NULL; if( FD_UNLIKELY( !disable_snap_loader ) ) { @@ -352,7 +352,7 @@ backtest_topo( config_t * config ) { fd_topob_tile_uses( topo, replay_tile, txncache_obj, FD_SHMEM_JOIN_MODE_READ_WRITE ); if( FD_LIKELY( !disable_snap_loader ) ) { fd_topob_tile_uses( topo, snapin_tile, txncache_obj, FD_SHMEM_JOIN_MODE_READ_WRITE ); - if( snap_vinyl ) { + if( vinyl_enabled ) { ulong vinyl_map_obj_id = fd_pod_query_ulong( topo->props, "vinyl.meta_map", ULONG_MAX ); FD_TEST( vinyl_map_obj_id !=ULONG_MAX ); ulong vinyl_pool_obj_id = fd_pod_query_ulong( topo->props, "vinyl.meta_pool", ULONG_MAX ); FD_TEST( vinyl_pool_obj_id!=ULONG_MAX ); fd_topo_obj_t * vinyl_map_obj = &topo->objs[ vinyl_map_obj_id ]; @@ -376,6 +376,10 @@ backtest_topo( config_t * config ) { fd_topob_tile_uses( topo, snapin_tile, funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE ); } + if( vinyl_enabled ) { + fd_topob_vinyl_rq( topo, "replay", 0UL, "vinyl_replay", "replay", 1UL, 1UL, 1UL ); + } + for( ulong i=0UL; itile_cnt; i++ ) { fd_topo_tile_t * tile = &topo->tiles[ i ]; fd_topo_configure_tile( tile, config ); diff --git a/src/app/firedancer-dev/main.c b/src/app/firedancer-dev/main.c index 56eed7c0ea..52218a970d 100644 --- a/src/app/firedancer-dev/main.c +++ b/src/app/firedancer-dev/main.c @@ -27,6 +27,9 @@ extern fd_topo_obj_callbacks_t fd_obj_cb_funk; extern fd_topo_obj_callbacks_t fd_obj_cb_vinyl_meta; extern fd_topo_obj_callbacks_t fd_obj_cb_vinyl_meta_ele; extern fd_topo_obj_callbacks_t fd_obj_cb_vinyl_data; +extern fd_topo_obj_callbacks_t fd_obj_cb_vinyl_req_pool; +extern fd_topo_obj_callbacks_t fd_obj_cb_vinyl_rq; +extern fd_topo_obj_callbacks_t fd_obj_cb_vinyl_cq; fd_topo_obj_callbacks_t * CALLBACKS[] = { &fd_obj_cb_mcache, @@ -47,6 +50,9 @@ fd_topo_obj_callbacks_t * CALLBACKS[] = { &fd_obj_cb_vinyl_meta, &fd_obj_cb_vinyl_meta_ele, &fd_obj_cb_vinyl_data, + &fd_obj_cb_vinyl_req_pool, + &fd_obj_cb_vinyl_rq, + &fd_obj_cb_vinyl_cq, NULL, }; diff --git a/src/app/firedancer/callbacks_vinyl.c b/src/app/firedancer/callbacks_vinyl.c index 8b3f3c6d18..adcef15fd8 100644 --- a/src/app/firedancer/callbacks_vinyl.c +++ b/src/app/firedancer/callbacks_vinyl.c @@ -1,5 +1,6 @@ #include "../../vinyl/fd_vinyl.h" #include "../../disco/topo/fd_topo.h" +#include "../../flamenco/accdb/fd_vinyl_req_pool.h" #include "../../util/pod/fd_pod_format.h" #define VAL(name) (__extension__({ \ @@ -111,3 +112,87 @@ fd_topo_obj_callbacks_t fd_obj_cb_vinyl_data = { .align = vinyl_data_align, .new = vinyl_data_new, }; + +/* vinyl_req_pool: request allocator */ + +static ulong +vinyl_req_pool_align( fd_topo_t const * topo, + fd_topo_obj_t const * obj ) { + (void)topo; (void)obj; + return fd_vinyl_req_pool_align(); +} + +static ulong +vinyl_req_pool_footprint( fd_topo_t const * topo, + fd_topo_obj_t const * obj ) { + return fd_vinyl_req_pool_footprint( VAL("batch_max"), VAL("batch_key_max") ); +} + +static void +vinyl_req_pool_new( fd_topo_t const * topo, + fd_topo_obj_t const * obj ) { + FD_TEST( fd_vinyl_req_pool_new( fd_topo_obj_laddr( topo, obj->id ), VAL("batch_max"), VAL("batch_key_max") ) ); +} + +fd_topo_obj_callbacks_t fd_obj_cb_vinyl_req_pool = { + .name = "vinyl_req_pool", + .footprint = vinyl_req_pool_footprint, + .align = vinyl_req_pool_align, + .new = vinyl_req_pool_new, +}; + +/* vinyl_rq: request queue */ + +static ulong +vinyl_rq_align( fd_topo_t const * topo, + fd_topo_obj_t const * obj ) { + (void)topo; (void)obj; + return fd_vinyl_rq_align(); +} + +static ulong +vinyl_rq_footprint( fd_topo_t const * topo, + fd_topo_obj_t const * obj ) { + return fd_vinyl_rq_footprint( VAL("req_cnt") ); +} + +static void +vinyl_rq_new( fd_topo_t const * topo, + fd_topo_obj_t const * obj ) { + FD_TEST( fd_vinyl_rq_new( fd_topo_obj_laddr( topo, obj->id ), VAL("req_cnt") ) ); +} + +fd_topo_obj_callbacks_t fd_obj_cb_vinyl_rq = { + .name = "vinyl_rq", + .footprint = vinyl_rq_footprint, + .align = vinyl_rq_align, + .new = vinyl_rq_new, +}; + +/* vinyl_cq: completion queue */ + +static ulong +vinyl_cq_align( fd_topo_t const * topo, + fd_topo_obj_t const * obj ) { + (void)topo; (void)obj; + return fd_vinyl_cq_align(); +} + +static ulong +vinyl_cq_footprint( fd_topo_t const * topo, + fd_topo_obj_t const * obj ) { + return fd_vinyl_cq_footprint( VAL("comp_cnt") ); +} + +static void +vinyl_cq_new( fd_topo_t const * topo, + fd_topo_obj_t const * obj ) { + FD_TEST( fd_vinyl_cq_new( fd_topo_obj_laddr( topo, obj->id ), VAL("comp_cnt") ) ); +} + +fd_topo_obj_callbacks_t fd_obj_cb_vinyl_cq = { + .name = "vinyl_cq", + .footprint = vinyl_cq_footprint, + .align = vinyl_cq_align, + .new = vinyl_cq_new, +}; diff --git a/src/app/firedancer/topology.c b/src/app/firedancer/topology.c index 0e95578a61..bc0d2c3148 100644 --- a/src/app/firedancer/topology.c +++ b/src/app/firedancer/topology.c @@ -1241,6 +1241,12 @@ fd_topo_configure_tile( fd_topo_tile_t * tile, tile->replay.funk_obj_id = fd_pod_query_ulong( config->topo.props, "funk", ULONG_MAX ); FD_TEST( tile->replay.funk_obj_id !=ULONG_MAX ); tile->replay.progcache_obj_id = fd_pod_query_ulong( config->topo.props, "progcache", ULONG_MAX ); FD_TEST( tile->replay.progcache_obj_id!=ULONG_MAX ); + if( config->firedancer.vinyl.enabled ) { + tile->replay.vinyl_data_wksp_id = fd_pod_query_ulong( config->topo.props, "vinyl.data", ULONG_MAX ); FD_TEST( tile->replay.vinyl_data_wksp_id!=ULONG_MAX ); + } else { + tile->replay.vinyl_data_wksp_id = ULONG_MAX; + } + tile->replay.max_live_slots = config->firedancer.runtime.max_live_slots; tile->replay.expected_shred_version = config->consensus.expected_shred_version; diff --git a/src/disco/topo/fd_topo.h b/src/disco/topo/fd_topo.h index f82be68a62..463982370d 100644 --- a/src/disco/topo/fd_topo.h +++ b/src/disco/topo/fd_topo.h @@ -366,6 +366,8 @@ struct fd_topo_tile { ulong max_vote_accounts; ulong funk_obj_id; + ulong vinyl_data_wksp_id; + ulong txncache_obj_id; ulong progcache_obj_id; @@ -621,6 +623,10 @@ struct fd_topo_tile { int io_type; /* FD_VINYL_IO_TYPE_* */ uint uring_depth; + +# define FD_TOPO_VINYL_LINK_MAX 16UL + ulong rq_cnt; + ulong rq_obj_id[ FD_TOPO_VINYL_LINK_MAX ]; } vinyl; }; }; @@ -629,9 +635,13 @@ typedef struct fd_topo_tile fd_topo_tile_t; typedef struct { ulong id; - char name[ 13UL ]; + char name[ 13UL ]; /* object type */ ulong wksp_id; + /* Optional label for object */ + char label[ 13UL ]; /* object label */ + ulong label_idx; /* index of object for this label (ULONG_MAX if not labelled) */ + ulong offset; ulong footprint; } fd_topo_obj_t; @@ -885,6 +895,49 @@ fd_topo_tile_producer_cnt( fd_topo_t const * topo, return in_cnt; } +FD_FN_PURE FD_FN_UNUSED static ulong +fd_topo_obj_cnt( fd_topo_t const * topo, + char const * obj_type, + char const * label ) { + ulong cnt = 0UL; + for( ulong i=0UL; iobj_cnt; i++ ) { + fd_topo_obj_t const * obj = &topo->objs[ i ]; + if( strncmp( obj->name, obj_type, sizeof(obj->name) ) ) continue; + if( label && + strncmp( obj->label, label, sizeof(obj->label) ) ) continue; + cnt++; + } + return cnt; +} + +FD_FN_PURE FD_FN_UNUSED static fd_topo_obj_t const * +fd_topo_find_obj( fd_topo_t const * topo, + char const * obj_type, + char const * label, + ulong label_idx ) { + for( ulong i=0UL; iobj_cnt; i++ ) { + fd_topo_obj_t const * obj = &topo->objs[ i ]; + if( strncmp( obj->name, obj_type, sizeof(obj->name) ) ) continue; + if( label && + strncmp( obj->label, label, sizeof(obj->label) ) ) continue; + if( label_idx != ULONG_MAX && obj->label_idx != label_idx ) continue; + return obj; + } + return NULL; +} + +FD_FN_PURE FD_FN_UNUSED static fd_topo_obj_t const * +fd_topo_find_tile_obj( fd_topo_t const * topo, + fd_topo_tile_t const * tile, + char const * obj_type ) { + for( ulong i=0UL; i<(tile->uses_obj_cnt); i++ ) { + fd_topo_obj_t const * obj = &topo->objs[ tile->uses_obj_id[ i ] ]; + if( strncmp( obj->name, obj_type, sizeof(obj->name) ) ) continue; + return obj; + } + return NULL; +} + /* Join (map into the process) all shared memory (huge/gigantic pages) needed by the tile, in the given topology. All memory associated with the tile (aka. used by links that the tile either produces to or diff --git a/src/disco/topo/fd_topob.c b/src/disco/topo/fd_topob.c index 956a802c43..8aa1463d6e 100644 --- a/src/disco/topo/fd_topob.c +++ b/src/disco/topo/fd_topob.c @@ -57,14 +57,32 @@ fd_topob_obj( fd_topo_t * topo, if( FD_UNLIKELY( wksp_id==ULONG_MAX ) ) FD_LOG_ERR(( "workspace not found: %s", wksp_name )); fd_topo_obj_t * obj = &topo->objs[ topo->obj_cnt ]; + memset( obj, 0, sizeof(fd_topo_obj_t) ); strncpy( obj->name, obj_name, sizeof(obj->name) ); - obj->id = topo->obj_cnt; - obj->wksp_id = wksp_id; + obj->id = topo->obj_cnt; + obj->wksp_id = wksp_id; + obj->label_idx = ULONG_MAX; topo->obj_cnt++; return obj; } +fd_topo_obj_t * +fd_topob_obj_named( fd_topo_t * topo, + char const * obj_type, + char const * wksp_name, + char const * label ) { + if( FD_UNLIKELY( !label ) ) FD_LOG_ERR(( "NULL args" )); + if( FD_UNLIKELY( strlen( label )>=sizeof(topo->objs[ topo->obj_cnt ].label ) ) ) FD_LOG_ERR(( "obj label too long: %s", label )); + fd_topo_obj_t * obj = fd_topob_obj( topo, obj_type, wksp_name ); + if( FD_UNLIKELY( !obj ) ) return NULL; + + fd_cstr_ncpy( obj->label, label, sizeof(obj->label) ); + obj->label_idx = fd_topo_obj_cnt( topo, obj_type, label ); + + return obj; +} + fd_topo_link_t * fd_topob_link( fd_topo_t * topo, char const * link_name, @@ -106,10 +124,10 @@ fd_topob_link( fd_topo_t * topo, } void -fd_topob_tile_uses( fd_topo_t * topo, - fd_topo_tile_t * tile, - fd_topo_obj_t * obj, - int mode ) { +fd_topob_tile_uses( fd_topo_t * topo, + fd_topo_tile_t * tile, + fd_topo_obj_t const * obj, + int mode ) { (void)topo; if( FD_UNLIKELY( tile->uses_obj_cnt>=FD_TOPO_MAX_TILE_OBJS ) ) FD_LOG_ERR(( "tile `%s` uses too many objects", tile->name )); diff --git a/src/disco/topo/fd_topob.h b/src/disco/topo/fd_topob.h index b5d9941b5b..224f844e5d 100644 --- a/src/disco/topo/fd_topob.h +++ b/src/disco/topo/fd_topob.h @@ -40,7 +40,7 @@ fd_topo_wksp_t * fd_topob_wksp( fd_topo_t * topo, char const * name ); -/* Add an object with the given name to the toplogy. An object is +/* Add an object with the given type to the toplogy. An object is something that takes up space in memory, in a workspace. The workspace must exist and have been added to the topology. @@ -50,9 +50,17 @@ fd_topob_wksp( fd_topo_t * topo, fd_topo_obj_t * fd_topob_obj( fd_topo_t * topo, - char const * obj_name, + char const * obj_type, char const * wksp_name ); +/* Same as fd_topo_obj, but labels the object. */ + +fd_topo_obj_t * +fd_topob_obj_named( fd_topo_t * topo, + char const * obj_type, + char const * wksp_name, + char const * label ); + /* Add a relationship saying that a certain tile uses a given object. This has the effect that when memory mapping required workspaces for a tile, it will map the workspace required for this object in @@ -62,10 +70,10 @@ fd_topob_obj( fd_topo_t * topo, FD_SHMEM_JOIN_MODE_READ_WRITE. */ void -fd_topob_tile_uses( fd_topo_t * topo, - fd_topo_tile_t * tile, - fd_topo_obj_t * obj, - int mode ); +fd_topob_tile_uses( fd_topo_t * topo, + fd_topo_tile_t * tile, + fd_topo_obj_t const * obj, + int mode ); /* Add a link to the toplogy. The link will not have any producer or consumer(s) by default, and those need to be added after. The link diff --git a/src/disco/topo/fd_topob_vinyl.h b/src/disco/topo/fd_topob_vinyl.h new file mode 100644 index 0000000000..8be44b5f86 --- /dev/null +++ b/src/disco/topo/fd_topob_vinyl.h @@ -0,0 +1,68 @@ +#ifndef HEADER_fd_src_disco_topo_fd_topo_vinyl_h +#define HEADER_fd_src_disco_topo_fd_topo_vinyl_h + +/* fd_topob_vinyl.h provides APIs for building topologies with vinyl + servers and clients. */ + +#include "fd_topob.h" +#include "../../util/pod/fd_pod_format.h" + +FD_PROTOTYPES_BEGIN + +/* fd_topob_vinyl_client declares a new vinyl client and attaches it to + a vinyl instance. Creates vinyl_rq and vinyl_req_pool objects, + reserves a link_id, and maps the objects into client and vinyl tiles. */ + +FD_FN_UNUSED static fd_topo_obj_t * +fd_topob_vinyl_rq( fd_topo_t * topo, + char const * tile_name, + ulong tile_kind_id, + char const * wksp_name, + char const * link_name, + ulong req_batch_max, + ulong req_batch_key_max, + ulong quota_max ) { + + /* Assumes there is only one vinyl tile in the topology */ + ulong vinyl_tile_id; + FD_TEST( ( vinyl_tile_id = fd_topo_find_tile( topo, "vinyl", 0UL ) )!=ULONG_MAX ); + fd_topo_tile_t * vinyl_tile = &topo->tiles[ vinyl_tile_id ]; + + ulong client_tile_id; + FD_TEST( ( client_tile_id = fd_topo_find_tile( topo, tile_name, tile_kind_id ) )!=ULONG_MAX ); + if( FD_UNLIKELY( client_tile_id==ULONG_MAX ) ) FD_LOG_ERR(( "tile not found: %s:%lu", tile_name, tile_kind_id )); + fd_topo_tile_t * client_tile = &topo->tiles[ client_tile_id ]; + + fd_topo_obj_t * req_pool_obj = fd_topob_obj_named( topo, "vinyl_req_pool", wksp_name, link_name ); + FD_TEST( fd_pod_insertf_ulong( topo->props, req_batch_max, "obj.%lu.batch_max", req_pool_obj->id ) ); + FD_TEST( fd_pod_insertf_ulong( topo->props, req_batch_key_max, "obj.%lu.batch_key_max", req_pool_obj->id ) ); + + fd_topo_obj_t * rq_obj = fd_topob_obj_named( topo, "vinyl_rq", wksp_name, link_name ); + FD_TEST( fd_pod_insertf_ulong( topo->props, req_batch_max, "obj.%lu.req_cnt", rq_obj->id ) ); + FD_TEST( fd_pod_insertf_ulong( topo->props, rq_obj->id, "obj.%lu.link_id", rq_obj->id ) ); + FD_TEST( fd_pod_insertf_ulong( topo->props, quota_max, "obj.%lu.quota_max", rq_obj->id ) ); + + /* No database client uses the completion queue yet, but one is + required to join a database client to the server. */ + fd_topo_obj_t * cq_obj = fd_topob_obj_named( topo, "vinyl_cq", wksp_name, link_name ); + FD_TEST( fd_pod_insertf_ulong( topo->props, 4UL, "obj.%lu.comp_cnt", cq_obj->id ) ); + + /* Associate req_pool and cq with rq */ + FD_TEST( fd_pod_insertf_ulong( topo->props, req_pool_obj->id, "obj.%lu.req_pool_obj_id", rq_obj->id ) ); + FD_TEST( fd_pod_insertf_ulong( topo->props, cq_obj->id, "obj.%lu.cq_obj_id", rq_obj->id ) ); + + fd_topob_tile_uses( topo, vinyl_tile, req_pool_obj, FD_SHMEM_JOIN_MODE_READ_WRITE ); + fd_topob_tile_uses( topo, vinyl_tile, rq_obj, FD_SHMEM_JOIN_MODE_READ_ONLY ); + fd_topob_tile_uses( topo, vinyl_tile, cq_obj, FD_SHMEM_JOIN_MODE_READ_WRITE ); + + fd_topob_tile_uses( topo, client_tile, req_pool_obj, FD_SHMEM_JOIN_MODE_READ_WRITE ); + fd_topob_tile_uses( topo, client_tile, rq_obj, FD_SHMEM_JOIN_MODE_READ_WRITE ); + fd_topob_tile_uses( topo, client_tile, cq_obj, FD_SHMEM_JOIN_MODE_READ_ONLY ); + + FD_TEST( rq_obj->label_idx==req_pool_obj->label_idx ); /* keep rq and req_pool in sync */ + return rq_obj; +} + +FD_PROTOTYPES_END + +#endif /* HEADER_fd_src_disco_topo_fd_topo_vinyl_h */ diff --git a/src/discof/replay/fd_replay_tile.c b/src/discof/replay/fd_replay_tile.c index 4e66bced4d..70fd8a040a 100644 --- a/src/discof/replay/fd_replay_tile.c +++ b/src/discof/replay/fd_replay_tile.c @@ -21,6 +21,8 @@ #include "../../util/pod/fd_pod.h" #include "../../flamenco/accdb/fd_accdb_admin.h" #include "../../flamenco/accdb/fd_accdb_impl_v1.h" +#include "../../flamenco/accdb/fd_accdb_impl_v2.h" +#include "../../flamenco/accdb/fd_vinyl_req_pool.h" #include "../../flamenco/rewards/fd_rewards.h" #include "../../flamenco/leaders/fd_multi_epoch_leaders.h" #include "../../flamenco/progcache/fd_progcache_admin.h" @@ -425,6 +427,7 @@ scratch_footprint( fd_topo_tile_t const * tile ) { l = FD_LAYOUT_APPEND( l, fd_txncache_align(), fd_txncache_footprint( tile->replay.max_live_slots ) ); l = FD_LAYOUT_APPEND( l, fd_reasm_align(), fd_reasm_footprint( 1 << 20 ) ); l = FD_LAYOUT_APPEND( l, fd_sched_align(), fd_sched_footprint( tile->replay.max_live_slots ) ); + l = FD_LAYOUT_APPEND( l, fd_vinyl_req_pool_align(), fd_vinyl_req_pool_footprint( 1UL, 1UL ) ); l = FD_LAYOUT_APPEND( l, fd_vote_tracker_align(), fd_vote_tracker_footprint() ); l = FD_LAYOUT_APPEND( l, fd_capture_ctx_align(), fd_capture_ctx_footprint() ); @@ -2346,16 +2349,17 @@ unprivileged_init( fd_topo_t * topo, ulong chain_cnt = fd_block_id_map_chain_cnt_est( tile->replay.max_live_slots ); FD_SCRATCH_ALLOC_INIT( l, scratch ); - fd_replay_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_replay_tile_t), sizeof(fd_replay_tile_t) ); - void * block_id_arr_mem = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_block_id_ele_t), sizeof(fd_block_id_ele_t) * tile->replay.max_live_slots ); - void * block_id_map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_block_id_map_align(), fd_block_id_map_footprint( chain_cnt ) ); - void * _txncache = FD_SCRATCH_ALLOC_APPEND( l, fd_txncache_align(), fd_txncache_footprint( tile->replay.max_live_slots ) ); - void * reasm_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_reasm_align(), fd_reasm_footprint( 1 << 20 ) ); - void * sched_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_sched_align(), fd_sched_footprint( tile->replay.max_live_slots ) ); - void * vote_tracker_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_vote_tracker_align(), fd_vote_tracker_footprint() ); - void * _capture_ctx = FD_SCRATCH_ALLOC_APPEND( l, fd_capture_ctx_align(), fd_capture_ctx_footprint() ); + fd_replay_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_replay_tile_t), sizeof(fd_replay_tile_t) ); + void * block_id_arr_mem = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_block_id_ele_t), sizeof(fd_block_id_ele_t) * tile->replay.max_live_slots ); + void * block_id_map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_block_id_map_align(), fd_block_id_map_footprint( chain_cnt ) ); + void * _txncache = FD_SCRATCH_ALLOC_APPEND( l, fd_txncache_align(), fd_txncache_footprint( tile->replay.max_live_slots ) ); + void * reasm_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_reasm_align(), fd_reasm_footprint( 1 << 20 ) ); + void * sched_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_sched_align(), fd_sched_footprint( tile->replay.max_live_slots ) ); + void * vinyl_req_pool_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_vinyl_req_pool_align(), fd_vinyl_req_pool_footprint( 1UL, 1UL ) ); + void * vote_tracker_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_vote_tracker_align(), fd_vote_tracker_footprint() ); + void * _capture_ctx = FD_SCRATCH_ALLOC_APPEND( l, fd_capture_ctx_align(), fd_capture_ctx_footprint() ); # if FD_HAS_FLATCC - void * block_dump_ctx = NULL; + void * block_dump_ctx = NULL; if( FD_UNLIKELY( tile->replay.dump_block_to_pb ) ) { block_dump_ctx = FD_SCRATCH_ALLOC_APPEND( l, fd_block_dump_context_align(), fd_block_dump_context_footprint() ); } @@ -2411,9 +2415,21 @@ unprivileged_init( fd_topo_t * topo, fd_features_enable_one_offs( features, one_off_features, (uint)tile->replay.enable_features_cnt, 0UL ); FD_TEST( fd_accdb_admin_join ( ctx->accdb_admin, fd_topo_obj_laddr( topo, tile->replay.funk_obj_id ) ) ); - FD_TEST( fd_accdb_user_v1_init ( ctx->accdb, fd_topo_obj_laddr( topo, tile->replay.funk_obj_id ) ) ); FD_TEST( fd_progcache_admin_join( ctx->progcache_admin, fd_topo_obj_laddr( topo, tile->replay.progcache_obj_id ) ) ); + if( tile->replay.vinyl_data_wksp_id==ULONG_MAX ) { + FD_TEST( fd_accdb_user_v1_init( ctx->accdb, fd_topo_obj_laddr( topo, tile->replay.funk_obj_id ) ) ); + } else { + fd_topo_obj_t const * vinyl_rq = fd_topo_find_tile_obj( topo, tile, "vinyl_rq" ); + fd_topo_obj_t const * vinyl_req_pool = fd_topo_find_tile_obj( topo, tile, "vinyl_req_pool" ); + FD_TEST( fd_accdb_user_v2_init( ctx->accdb, + fd_topo_obj_laddr( topo, tile->replay.funk_obj_id ), + fd_topo_obj_laddr( topo, vinyl_rq->id ), + topo->workspaces[ tile->replay.vinyl_data_wksp_id ].wksp, + fd_topo_obj_laddr( topo, vinyl_req_pool->id ), + vinyl_rq->id ) ); + } + void * _txncache_shmem = fd_topo_obj_laddr( topo, tile->replay.txncache_obj_id ); fd_txncache_shmem_t * txncache_shmem = fd_txncache_shmem_join( _txncache_shmem ); FD_TEST( txncache_shmem ); @@ -2460,6 +2476,8 @@ unprivileged_init( fd_topo_t * topo, ctx->sched = fd_sched_join( fd_sched_new( sched_mem, tile->replay.max_live_slots, ctx->exec_cnt ), tile->replay.max_live_slots ); FD_TEST( ctx->sched ); + FD_TEST( fd_vinyl_req_pool_new( vinyl_req_pool_mem, 1UL, 1UL ) ); + ctx->vote_tracker = fd_vote_tracker_join( fd_vote_tracker_new( vote_tracker_mem, 0UL ) ); FD_TEST( ctx->vote_tracker ); diff --git a/src/discof/vinyl/fd_vinyl_tile.c b/src/discof/vinyl/fd_vinyl_tile.c index 943e1be138..4f0dfec8f3 100644 --- a/src/discof/vinyl/fd_vinyl_tile.c +++ b/src/discof/vinyl/fd_vinyl_tile.c @@ -5,9 +5,11 @@ stem run loop and takes over. */ #include "../../disco/topo/fd_topo.h" +#include "../../disco/metrics/fd_metrics.h" #include "../../discof/restore/utils/fd_ssmsg.h" #include "../../vinyl/fd_vinyl.h" #include "../../vinyl/io/fd_vinyl_io_ur.h" +#include "../../util/pod/fd_pod_format.h" #include #include @@ -17,6 +19,7 @@ #define NAME "vinyl" #define MAX_INS 8 +#define MAX_CLIENTS 8 #define IN_KIND_GENESIS 1 #define IN_KIND_SNAP 2 @@ -42,6 +45,20 @@ struct fd_vinyl_tile_ctx { # if FD_HAS_LIBURING struct io_uring _ring[1]; # endif + + struct { + fd_wksp_t * rq_wksp; + ulong rq_gaddr; + fd_wksp_t * cq_wksp; + ulong cq_gaddr; + fd_wksp_t * req_pool_wksp; + ulong link_id; + ulong burst_max; + ulong quota_max; + } client_param[ MAX_CLIENTS ]; + uint client_active_cnt; + uint client_cnt; + uint client_join_inflight : 1; }; typedef struct fd_vinyl_tile_ctx fd_vinyl_tile_ctx_t; @@ -87,6 +104,84 @@ scratch_footprint( fd_topo_tile_t const * tile ) { return FD_LAYOUT_FINI( l, scratch_align() ); } +/* FIXME Pre-register database clients + + Ideally, we'd pre-register all database clients with the server at + initialization time (since all client data structures are allocated + upfront). But the vinyl_exec run loop can only register new + clients post-initialization. So, for now housekeep will be + responsible for registering clients from within. */ + +static void +register_clients( fd_vinyl_tile_ctx_t * ctx ) { + fd_cnc_t * cnc = ctx->vinyl->cnc; + fd_vinyl_cmd_t * cmd = fd_cnc_app_laddr( cnc ); + + int err = fd_cnc_open( cnc ); + if( FD_UNLIKELY( err ) ) FD_LOG_ERR(( "fd_cnc_open failed (%i-%s)", err, fd_cnc_strerror( err ) )); + + /* See if the vinyl_cnc is ready to accept new requests */ + + if( FD_UNLIKELY( fd_cnc_signal_query( cnc )!=FD_CNC_SIGNAL_RUN ) ) { + fd_cnc_close( cnc ); + return; + } + + /* See if a previous client join request finished */ + + if( ctx->client_join_inflight ) { + int err = cmd->join.err; + if( FD_UNLIKELY( err ) ) { + FD_LOG_ERR(( "Failed to initialize vinyl tile (%i-%s)", err, fd_vinyl_strerror( err ) )); + } + + ctx->client_active_cnt++; + ctx->client_join_inflight = 0; + + fd_cnc_close( cnc ); + return; + } + + /* Join the next client */ + + ulong client_idx = ctx->client_active_cnt; + fd_wksp_t * rq_wksp = ctx->client_param[ client_idx ].rq_wksp; + ulong rq_gaddr = ctx->client_param[ client_idx ].rq_gaddr; + fd_wksp_t * cq_wksp = ctx->client_param[ client_idx ].cq_wksp; + ulong cq_gaddr = ctx->client_param[ client_idx ].cq_gaddr; + fd_wksp_t * req_pool_wksp = ctx->client_param[ client_idx ].req_pool_wksp; + + cmd->join.err = 0; + cmd->join.link_id = ctx->client_param[ client_idx ].link_id; + cmd->join.burst_max = ctx->client_param[ client_idx ].burst_max; + cmd->join.quota_max = ctx->client_param[ client_idx ].quota_max; + fd_cstr_ncpy( cmd->join.wksp, fd_wksp_name( req_pool_wksp ), FD_SHMEM_NAME_MAX ); + fd_wksp_cstr( rq_wksp, rq_gaddr, cmd->join.rq ); + fd_wksp_cstr( cq_wksp, cq_gaddr, cmd->join.cq ); + + ctx->client_join_inflight = 1; + fd_cnc_signal( cnc, FD_VINYL_CNC_SIGNAL_CLIENT_JOIN ); + fd_cnc_close( cnc ); +} + +/* vinyl_housekeep is periodically called by the vinyl_exec run loop. + Does async startup tasks and metric publishing. */ + +static void +vinyl_housekeep( void * ctx_ ) { + fd_vinyl_tile_ctx_t * ctx = ctx_; + + long now = fd_tickcount(); + FD_MGAUGE_SET( TILE, HEARTBEAT, (ulong)now ); + + if( FD_UNLIKELY( ctx->client_active_cnt < ctx->client_cnt ) ) { + register_clients( ctx ); + if( ctx->client_active_cnt==ctx->client_cnt ) { + FD_LOG_INFO(( "Vinyl tile initialization complete" )); + } + } +} + /* vinyl_init_fast is a variation of fd_vinyl_init. Creates tile private data structures and formats shared cache objects. Reuses existing io/bstream/meta. */ @@ -278,10 +373,40 @@ unprivileged_init( fd_topo_t * topo, ctx->cnc_footprint = topo->objs[ tile->vinyl.vinyl_cnc_obj_id ].footprint; } + FD_TEST( tile->in_cnt==1UL ); fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ 0 ] ]; FD_TEST( in_link && 0==strcmp( in_link->name, "snapin_manif" ) ); if( FD_UNLIKELY( !tile->in_link_reliable[ 0 ] ) ) FD_LOG_ERR(( "tile `" NAME "` in link 0 must be reliable" )); ctx->snapin_manif_fseq = tile->in_link_fseq[ 0 ]; + + /* Discover mapped clients */ + for( ulong i=0UL; i<(tile->uses_obj_cnt); i++ ) { + ulong rq_obj_id = tile->uses_obj_id[ i ]; + fd_topo_obj_t const * rq_obj = &topo->objs[ rq_obj_id ]; + if( strcmp( rq_obj->name, "vinyl_rq" ) ) continue; + FD_TEST( ctx->client_cntprops, ULONG_MAX, "obj.%lu.link_id", rq_obj_id ); + ulong quota_max = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "obj.%lu.quota_max", rq_obj_id ); + ulong req_pool_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "obj.%lu.req_pool_obj_id", rq_obj_id ); + ulong cq_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "obj.%lu.cq_obj_id", rq_obj_id ); + FD_TEST( link_id !=ULONG_MAX ); + FD_TEST( quota_max !=ULONG_MAX ); + FD_TEST( req_pool_obj_id!=ULONG_MAX ); + FD_TEST( cq_obj_id !=ULONG_MAX ); + fd_topo_obj_t const * cq_obj = &topo->objs[ cq_obj_id ]; + fd_topo_obj_t const * req_pool_obj = &topo->objs[ req_pool_obj_id ]; + + ulong client_idx = ctx->client_cnt++; + ctx->client_param[ client_idx ].rq_wksp = topo->workspaces[ rq_obj->wksp_id ].wksp; + ctx->client_param[ client_idx ].rq_gaddr = rq_obj->offset; + ctx->client_param[ client_idx ].cq_wksp = topo->workspaces[ cq_obj->wksp_id ].wksp; + ctx->client_param[ client_idx ].cq_gaddr = cq_obj->offset; + ctx->client_param[ client_idx ].req_pool_wksp = topo->workspaces[ req_pool_obj->wksp_id ].wksp; + ctx->client_param[ client_idx ].link_id = link_id; + ctx->client_param[ client_idx ].burst_max = 1UL; + ctx->client_param[ client_idx ].quota_max = quota_max; + } } __attribute__((noreturn)) static void @@ -316,6 +441,8 @@ enter_vinyl_exec( fd_vinyl_tile_ctx_t * ctx ) { part_thresh, gc_thresh, gc_eager ); + ctx->vinyl->housekeep = vinyl_housekeep; + ctx->vinyl->housekeep_ctx = ctx; fd_vinyl_line_t * line = ctx->vinyl->line; ulong const line_cnt = ctx->vinyl->line_cnt; diff --git a/src/vinyl/fd_vinyl.c b/src/vinyl/fd_vinyl.c index bb5e0f10cd..fb849e6602 100644 --- a/src/vinyl/fd_vinyl.c +++ b/src/vinyl/fd_vinyl.c @@ -10,6 +10,11 @@ fd_vinyl_footprint( void ) { return sizeof(fd_vinyl_t); } +static void +fd_vinyl_housekeep_noop( void * ctx ) { + (void)ctx; +} + fd_vinyl_t * fd_vinyl_init( fd_tpool_t * tpool, ulong t0, ulong t1, int level, void * _vinyl, @@ -99,6 +104,9 @@ fd_vinyl_init( fd_tpool_t * tpool, ulong t0, ulong t1, int level, vinyl->async_min = async_min; vinyl->async_max = async_max; + vinyl->housekeep = fd_vinyl_housekeep_noop; + vinyl->housekeep_ctx = NULL; + vinyl->part_thresh = part_thresh; vinyl->gc_thresh = gc_thresh; vinyl->gc_eager = gc_eager; diff --git a/src/vinyl/fd_vinyl.h b/src/vinyl/fd_vinyl.h index b998fa1371..5c3371d7a7 100644 --- a/src/vinyl/fd_vinyl.h +++ b/src/vinyl/fd_vinyl.h @@ -78,6 +78,11 @@ struct __attribute__((aligned(128))) fd_vinyl_private { ulong async_min; /* Min run loop iterations per async handling, positive */ ulong async_max; /* Max run loop iterations per async handling, >=async_min */ + /* Periodic housekeeping callback */ + + void (* housekeep)( void * ctx ); + void * housekeep_ctx; + /* State */ ulong part_thresh; /* Insert partition blocks roughly every part_thresh bytes for parallel recovery */ diff --git a/src/vinyl/fd_vinyl_exec.c b/src/vinyl/fd_vinyl_exec.c index 852b5862ee..1d0c5852f1 100644 --- a/src/vinyl/fd_vinyl_exec.c +++ b/src/vinyl/fd_vinyl_exec.c @@ -421,6 +421,8 @@ fd_vinyl_exec( fd_vinyl_t * vinyl ) { fd_cnc_signal( cnc, FD_VINYL_CNC_SIGNAL_RUN ); } + + vinyl->housekeep( vinyl->housekeep_ctx ); } /* Receive requests from clients */