Skip to content

Commit c6888e0

Browse files
asiegel-jtjumpsiegel
authored andcommitted
rpcserver: fixed most of the code rot in rpc service
1 parent 1d64948 commit c6888e0

File tree

9 files changed

+343
-188
lines changed

9 files changed

+343
-188
lines changed

src/app/firedancer/topology.c

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,8 @@ fd_topo_initialize( config_t * config ) {
204204
ulong resolv_tile_cnt = config->layout.resolv_tile_count;
205205
ulong sign_tile_cnt = config->firedancer.layout.sign_tile_count;
206206

207-
int enable_rpc = ( config->rpc.port != 0 );
208-
209207
fd_topo_t * topo = fd_topob_new( &config->topo, config->name );
208+
210209
topo->max_page_size = fd_cstr_to_shmem_page_sz( config->hugetlbfs.max_page_size );
211210
topo->gigantic_page_threshold = config->hugetlbfs.gigantic_page_threshold_mib << 20;
212211

@@ -296,7 +295,6 @@ fd_topo_initialize( config_t * config ) {
296295
fd_topob_wksp( topo, "replay_manif" );
297296

298297
fd_topob_wksp( topo, "slot_fseqs" ); /* fseqs for marked slots eg. turbine slot */
299-
if( enable_rpc ) fd_topob_wksp( topo, "rpcsrv" );
300298

301299
#define FOR(cnt) for( ulong i=0UL; i<cnt; i++ )
302300

@@ -436,9 +434,6 @@ fd_topo_initialize( config_t * config ) {
436434
/**/ fd_topob_tile( topo, "tower", "tower", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 );
437435
FOR(writer_tile_cnt) fd_topob_tile( topo, "writer", "writer", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 );
438436

439-
fd_topo_tile_t * rpcserv_tile = NULL;
440-
if( enable_rpc ) rpcserv_tile = fd_topob_tile( topo, "rpcsrv", "rpcsrv", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 1 );
441-
442437
fd_topo_tile_t * snaprd_tile = fd_topob_tile( topo, "snaprd", "snaprd", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 );
443438
snaprd_tile->allow_shutdown = 1;
444439
fd_topo_tile_t * snapdc_tile = fd_topob_tile( topo, "snapdc", "snapdc", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 );
@@ -456,7 +451,6 @@ fd_topo_initialize( config_t * config ) {
456451

457452
FOR(exec_tile_cnt) fd_topob_tile_uses( topo, &topo->tiles[ fd_topo_find_tile( topo, "exec", i ) ], funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
458453
/* */ fd_topob_tile_uses( topo, replay_tile, funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
459-
if(rpcserv_tile) fd_topob_tile_uses( topo, rpcserv_tile, funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
460454
FOR(writer_tile_cnt) fd_topob_tile_uses( topo, &topo->tiles[ fd_topo_find_tile( topo, "writer", i ) ], funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
461455

462456
/* Setup a shared wksp object for banks. */
@@ -789,9 +783,15 @@ fd_topo_initialize( config_t * config ) {
789783
/**/ fd_topob_link( topo, "replay_notif", "replay_notif", FD_REPLAY_NOTIF_DEPTH, FD_REPLAY_NOTIF_MTU, 1UL )->permit_no_consumers = 1;
790784
/**/ fd_topob_tile_out( topo, "replay", 0UL, "replay_notif", 0UL );
791785

786+
int enable_rpc = ( config->rpc.port != 0 );
792787
if( enable_rpc ) {
793-
fd_topob_tile_in( topo, "rpcsrv", 0UL, "metric_in", "replay_notif", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED );
794-
fd_topob_tile_in( topo, "rpcsrv", 0UL, "metric_in", "stake_out", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED );
788+
fd_topob_wksp( topo, "rpcsrv" );
789+
fd_topo_tile_t * rpcserv_tile = fd_topob_tile( topo, "rpcsrv", "rpcsrv", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 1 );
790+
fd_topob_tile_uses( topo, rpcserv_tile, funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
791+
fd_topob_tile_uses( topo, rpcserv_tile, store_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
792+
fd_topob_tile_in( topo, "rpcsrv", 0UL, "metric_in", "replay_notif", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED );
793+
fd_topob_tile_in( topo, "rpcsrv", 0UL, "metric_in", "stake_out", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED );
794+
fd_topob_tile_in( topo, "rpcsrv", 0UL, "metric_in", "repair_repla", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED );
795795
}
796796

797797
/* For now the only plugin consumer is the GUI */
@@ -1015,6 +1015,7 @@ fd_topo_configure_tile( fd_topo_tile_t * tile,
10151015
} else if( FD_UNLIKELY( !strcmp( tile->name, "rpcsrv" ) ) ) {
10161016
strncpy( tile->replay.blockstore_file, config->firedancer.blockstore.file, sizeof(tile->replay.blockstore_file) );
10171017
tile->rpcserv.funk_obj_id = fd_pod_query_ulong( config->topo.props, "funk", ULONG_MAX );
1018+
tile->rpcserv.store_obj_id = fd_pod_query_ulong( config->topo.props, "store", ULONG_MAX );
10181019
tile->rpcserv.rpc_port = config->rpc.port;
10191020
tile->rpcserv.tpu_port = config->tiles.quic.regular_transaction_listen_port;
10201021
tile->rpcserv.tpu_ip_addr = config->net.ip_addr;

src/app/rpcserver/main.c

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,10 @@
1414
#include "../../funk/fd_funk.h"
1515

1616
#define SHAM_LINK_CONTEXT fd_rpc_ctx_t
17-
#define SHAM_LINK_STATE fd_replay_notif_msg_t
1817
#define SHAM_LINK_NAME replay_sham_link
1918
#include "sham_link.h"
2019

2120
#define SHAM_LINK_CONTEXT fd_rpc_ctx_t
22-
#define SHAM_LINK_STATE fd_multi_epoch_leaders_t
2321
#define SHAM_LINK_NAME stake_sham_link
2422
#include "sham_link.h"
2523

@@ -55,7 +53,6 @@ init_args( int * argc, char *** argv, fd_rpcserver_args_t * args ) {
5553
}
5654
fd_wksp_mprotect( wksp, 1 );
5755

58-
args->leaders = fd_multi_epoch_leaders_join( fd_multi_epoch_leaders_new( aligned_alloc( fd_multi_epoch_leaders_align(), fd_multi_epoch_leaders_footprint() ) ) );
5956
args->port = (ushort)fd_env_strip_cmdline_ulong( argc, argv, "--port", NULL, 8899 );
6057

6158
args->params.max_connection_cnt = fd_env_strip_cmdline_ulong( argc, argv, "--max-connection-cnt", NULL, 30 );
@@ -206,10 +203,9 @@ int main( int argc, char ** argv ) {
206203
replay_sham_link_start( rep_notify );
207204
stake_sham_link_start( stake_notify );
208205
while( !stopflag ) {
209-
fd_replay_notif_msg_t msg;
210-
replay_sham_link_poll( rep_notify, ctx, &msg );
206+
replay_sham_link_poll( rep_notify, ctx );
211207

212-
stake_sham_link_poll( stake_notify, ctx, args.leaders );
208+
stake_sham_link_poll( stake_notify, ctx );
213209

214210
fd_rpc_ws_poll( ctx );
215211
}
@@ -219,21 +215,21 @@ int main( int argc, char ** argv ) {
219215
}
220216

221217
static void
222-
replay_sham_link_during_frag(fd_rpc_ctx_t * ctx, fd_replay_notif_msg_t * state, void const * msg, int sz) {
223-
fd_rpc_replay_during_frag( ctx, state, msg, sz );
218+
replay_sham_link_during_frag(fd_rpc_ctx_t * ctx, void const * msg, int sz) {
219+
fd_rpc_replay_during_frag( ctx, msg, sz );
224220
}
225221

226222
static void
227-
replay_sham_link_after_frag(fd_rpc_ctx_t * ctx, fd_replay_notif_msg_t * msg) {
228-
fd_rpc_replay_after_frag( ctx, msg );
223+
replay_sham_link_after_frag(fd_rpc_ctx_t * ctx) {
224+
fd_rpc_replay_after_frag( ctx );
229225
}
230226

231227
static void
232-
stake_sham_link_during_frag(fd_rpc_ctx_t * ctx, fd_multi_epoch_leaders_t * state, void const * msg, int sz) {
233-
fd_rpc_stake_during_frag( ctx, state, msg, sz );
228+
stake_sham_link_during_frag(fd_rpc_ctx_t * ctx, void const * msg, int sz) {
229+
fd_rpc_stake_during_frag( ctx, msg, sz );
234230
}
235231

236232
static void
237-
stake_sham_link_after_frag(fd_rpc_ctx_t * ctx, fd_multi_epoch_leaders_t * state) {
238-
fd_rpc_stake_after_frag( ctx, state );
233+
stake_sham_link_after_frag(fd_rpc_ctx_t * ctx) {
234+
fd_rpc_stake_after_frag( ctx );
239235
}

src/app/rpcserver/sham_link.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,13 @@ SHAM_LINK_(start)( SHAM_LINK_(t) * self ) {
3939
}
4040

4141
static void
42-
SHAM_LINK_(during_frag)( SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE * state, void const * msg, int sz );
42+
SHAM_LINK_(during_frag)( SHAM_LINK_CONTEXT * ctx, void const * msg, int sz );
4343

4444
static void
45-
SHAM_LINK_(after_frag)( SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE * state );
45+
SHAM_LINK_(after_frag)( SHAM_LINK_CONTEXT * ctx );
4646

4747
static inline void
48-
SHAM_LINK_(poll)( SHAM_LINK_(t) * self, SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE * state ) {
48+
SHAM_LINK_(poll)( SHAM_LINK_(t) * self, SHAM_LINK_CONTEXT * ctx ) {
4949
while (1) {
5050
fd_frag_meta_t const * mline = self->mcache + fd_mcache_line_idx( self->seq_expect, self->depth );
5151

@@ -64,7 +64,7 @@ SHAM_LINK_(poll)( SHAM_LINK_(t) * self, SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE
6464

6565
ulong chunk = mline->chunk;
6666
/* TODO: sanity check chunk,sz */
67-
SHAM_LINK_(during_frag)( ctx, state, fd_chunk_to_laddr( self->wksp, chunk ), mline->sz );
67+
SHAM_LINK_(during_frag)( ctx, fd_chunk_to_laddr( self->wksp, chunk ), mline->sz );
6868

6969
seq_found = fd_frag_meta_seq_query( mline );
7070
diff = fd_seq_diff( seq_found, self->seq_expect );
@@ -74,7 +74,7 @@ SHAM_LINK_(poll)( SHAM_LINK_(t) * self, SHAM_LINK_CONTEXT * ctx, SHAM_LINK_STATE
7474
continue;
7575
}
7676

77-
SHAM_LINK_(after_frag)( ctx, state );
77+
SHAM_LINK_(after_frag)( ctx );
7878

7979
self->seq_expect++;
8080
}

src/disco/topo/fd_topo.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,7 @@ struct fd_topo_tile {
431431

432432
struct {
433433
ulong funk_obj_id;
434+
ulong store_obj_id;
434435
ushort rpc_port;
435436
ushort tpu_port;
436437
uint tpu_ip_addr;

0 commit comments

Comments
 (0)