Skip to content

resurrect rpc service #6028

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions src/app/firedancer/topology.c
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,8 @@ fd_topo_initialize( config_t * config ) {
ulong resolv_tile_cnt = config->layout.resolv_tile_count;
ulong sign_tile_cnt = config->firedancer.layout.sign_tile_count;

int enable_rpc = ( config->rpc.port != 0 );

fd_topo_t * topo = fd_topob_new( &config->topo, config->name );

topo->max_page_size = fd_cstr_to_shmem_page_sz( config->hugetlbfs.max_page_size );
topo->gigantic_page_threshold = config->hugetlbfs.gigantic_page_threshold_mib << 20;

Expand Down Expand Up @@ -296,7 +295,6 @@ fd_topo_initialize( config_t * config ) {
fd_topob_wksp( topo, "replay_manif" );

fd_topob_wksp( topo, "slot_fseqs" ); /* fseqs for marked slots eg. turbine slot */
if( enable_rpc ) fd_topob_wksp( topo, "rpcsrv" );

#define FOR(cnt) for( ulong i=0UL; i<cnt; i++ )

Expand Down Expand Up @@ -436,9 +434,6 @@ fd_topo_initialize( config_t * config ) {
/**/ fd_topob_tile( topo, "tower", "tower", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 );
FOR(writer_tile_cnt) fd_topob_tile( topo, "writer", "writer", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 );

fd_topo_tile_t * rpcserv_tile = NULL;
if( enable_rpc ) rpcserv_tile = fd_topob_tile( topo, "rpcsrv", "rpcsrv", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 1 );

fd_topo_tile_t * snaprd_tile = fd_topob_tile( topo, "snaprd", "snaprd", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 );
snaprd_tile->allow_shutdown = 1;
fd_topo_tile_t * snapdc_tile = fd_topob_tile( topo, "snapdc", "snapdc", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 );
Expand All @@ -456,7 +451,6 @@ fd_topo_initialize( config_t * config ) {

FOR(exec_tile_cnt) fd_topob_tile_uses( topo, &topo->tiles[ fd_topo_find_tile( topo, "exec", i ) ], funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
/* */ fd_topob_tile_uses( topo, replay_tile, funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
if(rpcserv_tile) fd_topob_tile_uses( topo, rpcserv_tile, funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
FOR(writer_tile_cnt) fd_topob_tile_uses( topo, &topo->tiles[ fd_topo_find_tile( topo, "writer", i ) ], funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );

/* Setup a shared wksp object for banks. */
Expand Down Expand Up @@ -789,9 +783,15 @@ fd_topo_initialize( config_t * config ) {
/**/ fd_topob_link( topo, "replay_notif", "replay_notif", FD_REPLAY_NOTIF_DEPTH, FD_REPLAY_NOTIF_MTU, 1UL )->permit_no_consumers = 1;
/**/ fd_topob_tile_out( topo, "replay", 0UL, "replay_notif", 0UL );

int enable_rpc = ( config->rpc.port != 0 );
if( enable_rpc ) {
fd_topob_tile_in( topo, "rpcsrv", 0UL, "metric_in", "replay_notif", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED );
fd_topob_tile_in( topo, "rpcsrv", 0UL, "metric_in", "stake_out", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED );
fd_topob_wksp( topo, "rpcsrv" );
fd_topo_tile_t * rpcserv_tile = fd_topob_tile( topo, "rpcsrv", "rpcsrv", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 1 );
fd_topob_tile_uses( topo, rpcserv_tile, funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
fd_topob_tile_uses( topo, rpcserv_tile, store_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
fd_topob_tile_in( topo, "rpcsrv", 0UL, "metric_in", "replay_notif", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED );
fd_topob_tile_in( topo, "rpcsrv", 0UL, "metric_in", "stake_out", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED );
fd_topob_tile_in( topo, "rpcsrv", 0UL, "metric_in", "repair_repla", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED );
}

/* For now the only plugin consumer is the GUI */
Expand Down Expand Up @@ -1015,6 +1015,7 @@ fd_topo_configure_tile( fd_topo_tile_t * tile,
} else if( FD_UNLIKELY( !strcmp( tile->name, "rpcsrv" ) ) ) {
strncpy( tile->replay.blockstore_file, config->firedancer.blockstore.file, sizeof(tile->replay.blockstore_file) );
tile->rpcserv.funk_obj_id = fd_pod_query_ulong( config->topo.props, "funk", ULONG_MAX );
tile->rpcserv.store_obj_id = fd_pod_query_ulong( config->topo.props, "store", ULONG_MAX );
tile->rpcserv.rpc_port = config->rpc.port;
tile->rpcserv.tpu_port = config->tiles.quic.regular_transaction_listen_port;
tile->rpcserv.tpu_ip_addr = config->net.ip_addr;
Expand Down
24 changes: 10 additions & 14 deletions src/app/rpcserver/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@
#include "../../funk/fd_funk.h"

#define SHAM_LINK_CONTEXT fd_rpc_ctx_t
#define SHAM_LINK_STATE fd_replay_notif_msg_t
#define SHAM_LINK_NAME replay_sham_link
#include "sham_link.h"

#define SHAM_LINK_CONTEXT fd_rpc_ctx_t
#define SHAM_LINK_STATE fd_multi_epoch_leaders_t
#define SHAM_LINK_NAME stake_sham_link
#include "sham_link.h"

Expand Down Expand Up @@ -55,7 +53,6 @@ init_args( int * argc, char *** argv, fd_rpcserver_args_t * args ) {
}
fd_wksp_mprotect( wksp, 1 );

args->leaders = fd_multi_epoch_leaders_join( fd_multi_epoch_leaders_new( aligned_alloc( fd_multi_epoch_leaders_align(), fd_multi_epoch_leaders_footprint() ) ) );
args->port = (ushort)fd_env_strip_cmdline_ulong( argc, argv, "--port", NULL, 8899 );

args->params.max_connection_cnt = fd_env_strip_cmdline_ulong( argc, argv, "--max-connection-cnt", NULL, 30 );
Expand Down Expand Up @@ -206,10 +203,9 @@ int main( int argc, char ** argv ) {
replay_sham_link_start( rep_notify );
stake_sham_link_start( stake_notify );
while( !stopflag ) {
fd_replay_notif_msg_t msg;
replay_sham_link_poll( rep_notify, ctx, &msg );
replay_sham_link_poll( rep_notify, ctx );

stake_sham_link_poll( stake_notify, ctx, args.leaders );
stake_sham_link_poll( stake_notify, ctx );

fd_rpc_ws_poll( ctx );
}
Expand All @@ -219,21 +215,21 @@ int main( int argc, char ** argv ) {
}

static void
replay_sham_link_during_frag(fd_rpc_ctx_t * ctx, fd_replay_notif_msg_t * state, void const * msg, int sz) {
fd_rpc_replay_during_frag( ctx, state, msg, sz );
replay_sham_link_during_frag(fd_rpc_ctx_t * ctx, void const * msg, int sz) {
fd_rpc_replay_during_frag( ctx, msg, sz );
}

static void
replay_sham_link_after_frag(fd_rpc_ctx_t * ctx, fd_replay_notif_msg_t * msg) {
fd_rpc_replay_after_frag( ctx, msg );
replay_sham_link_after_frag(fd_rpc_ctx_t * ctx) {
fd_rpc_replay_after_frag( ctx );
}

static void
stake_sham_link_during_frag(fd_rpc_ctx_t * ctx, fd_multi_epoch_leaders_t * state, void const * msg, int sz) {
fd_rpc_stake_during_frag( ctx, state, msg, sz );
stake_sham_link_during_frag(fd_rpc_ctx_t * ctx, void const * msg, int sz) {
fd_rpc_stake_during_frag( ctx, msg, sz );
}

static void
stake_sham_link_after_frag(fd_rpc_ctx_t * ctx, fd_multi_epoch_leaders_t * state) {
fd_rpc_stake_after_frag( ctx, state );
stake_sham_link_after_frag(fd_rpc_ctx_t * ctx) {
fd_rpc_stake_after_frag( ctx );
}
10 changes: 5 additions & 5 deletions src/app/rpcserver/sham_link.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ SHAM_LINK_(start)( SHAM_LINK_(t) * self ) {
}

static void
SHAM_LINK_(during_frag)( SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE * state, void const * msg, int sz );
SHAM_LINK_(during_frag)( SHAM_LINK_CONTEXT * ctx, void const * msg, int sz );

static void
SHAM_LINK_(after_frag)( SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE * state );
SHAM_LINK_(after_frag)( SHAM_LINK_CONTEXT * ctx );

static inline void
SHAM_LINK_(poll)( SHAM_LINK_(t) * self, SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE * state ) {
SHAM_LINK_(poll)( SHAM_LINK_(t) * self, SHAM_LINK_CONTEXT * ctx ) {
while (1) {
fd_frag_meta_t const * mline = self->mcache + fd_mcache_line_idx( self->seq_expect, self->depth );

Expand All @@ -64,7 +64,7 @@ SHAM_LINK_(poll)( SHAM_LINK_(t) * self, SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE

ulong chunk = mline->chunk;
/* TODO: sanity check chunk,sz */
SHAM_LINK_(during_frag)( ctx, state, fd_chunk_to_laddr( self->wksp, chunk ), mline->sz );
SHAM_LINK_(during_frag)( ctx, fd_chunk_to_laddr( self->wksp, chunk ), mline->sz );

seq_found = fd_frag_meta_seq_query( mline );
diff = fd_seq_diff( seq_found, self->seq_expect );
Expand All @@ -74,7 +74,7 @@ SHAM_LINK_(poll)( SHAM_LINK_(t) * self, SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE
continue;
}

SHAM_LINK_(after_frag)( ctx, state );
SHAM_LINK_(after_frag)( ctx );

self->seq_expect++;
}
Expand Down
1 change: 1 addition & 0 deletions src/disco/topo/fd_topo.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ struct fd_topo_tile {

struct {
ulong funk_obj_id;
ulong store_obj_id;
ushort rpc_port;
ushort tpu_port;
uint tpu_ip_addr;
Expand Down
Loading
Loading