Skip to content

Commit 2983cc1

Browse files
committed
work
1 parent 8c94f1a commit 2983cc1

File tree

4 files changed

+84
-80
lines changed

4 files changed

+84
-80
lines changed

src/discof/rpcserver/fd_rpc_history.c

Lines changed: 75 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -77,32 +77,33 @@ typedef struct fd_rpc_acct_map_elem fd_rpc_acct_map_elem_t;
7777
#define POOL_T fd_rpc_acct_map_elem_t
7878
#include "../../util/tmpl/fd_pool.c"
7979

80-
struct fd_rpc_shred_map_elem {
80+
struct fd_rpc_reasm_map_elem {
8181
union {
8282
ulong next;
8383
struct {
84-
ulong payload_sz;
85-
uchar payload[FD_SHRED_DATA_PAYLOAD_MAX];
84+
ulong data_sz; /* TODO fixed-32. sz of the FEC set payload, guaranteed < FD_STORE_DATA_MAX */
85+
uchar data[FD_STORE_DATA_MAX]; /* FEC set payload = coalesced data shreds (byte array) */
8686
} data;
8787
};
8888
};
89-
typedef struct fd_rpc_shred_map_elem fd_rpc_shred_map_elem_t;
90-
#define POOL_NAME fd_rpc_shred_pool
91-
#define POOL_T fd_rpc_shred_map_elem_t
89+
typedef struct fd_rpc_reasm_map_elem fd_rpc_reasm_map_elem_t;
90+
#define POOL_NAME fd_rpc_reasm_pool
91+
#define POOL_T fd_rpc_reasm_map_elem_t
9292
#include "../../util/tmpl/fd_pool.c"
9393

94-
#define FD_SHRED_MAP_COL_CNT (1UL<<10)
95-
struct fd_rpc_shred_map {
96-
struct fd_rpc_shred_map_column {
94+
#define FD_REASM_MAP_COL_CNT (1UL<<10)
95+
#define FD_REASM_MAP_COL_HEIGHT (128UL)
96+
struct fd_rpc_reasm_map {
97+
struct fd_rpc_reasm_map_column {
9798
ulong max_idx; /* The max shred index set in this column + 1 */
9899
ulong used_cnt; /* The number of shreds received in this column */
99-
uchar end_found; /* Whether the last shred of the slot has been found */
100-
fd_rpc_shred_map_elem_t * ele[FD_SHRED_BLK_MAX];
101-
} cols[FD_SHRED_MAP_COL_CNT];
100+
uchar end_found; /* Whether the last slice of the slot has been found */
101+
fd_rpc_reasm_map_elem_t * ele[FD_REASM_MAP_COL_HEIGHT];
102+
} cols[FD_REASM_MAP_COL_CNT];
102103
ulong head; /* Next open column */
103104
ulong tail; /* Oldest column */
104105
};
105-
typedef struct fd_rpc_shred_map fd_rpc_shred_map_t;
106+
typedef struct fd_rpc_reasm_map fd_rpc_reasm_map_t;
106107

107108
struct fd_rpc_history {
108109
fd_spad_t * spad;
@@ -111,8 +112,8 @@ struct fd_rpc_history {
111112
fd_rpc_txn_t * txn_map;
112113
fd_rpc_acct_map_t * acct_map;
113114
fd_rpc_acct_map_elem_t * acct_pool;
114-
fd_rpc_shred_map_t * shred_map;
115-
fd_rpc_shred_map_elem_t * shred_pool;
115+
fd_rpc_reasm_map_t * reasm_map;
116+
fd_rpc_reasm_map_elem_t * reasm_pool;
116117
ulong first_slot;
117118
ulong latest_slot;
118119
int file_fd;
@@ -138,12 +139,12 @@ fd_rpc_history_create(fd_rpcserver_args_t * args) {
138139
mem = fd_spad_alloc( spad, fd_rpc_acct_map_pool_align(), fd_rpc_acct_map_pool_footprint( args->acct_index_max ) );
139140
hist->acct_pool = fd_rpc_acct_map_pool_join( fd_rpc_acct_map_pool_new( mem, args->acct_index_max ) );
140141

141-
mem = fd_spad_alloc( spad, alignof(fd_rpc_shred_map_t), sizeof(fd_rpc_shred_map_t) );
142-
memset(mem, 0, sizeof(fd_rpc_shred_map_t));
143-
hist->shred_map = (fd_rpc_shred_map_t *)mem;
144-
static const ulong shred_pool_sz = 1024UL * 256UL;
145-
mem = fd_spad_alloc( spad, fd_rpc_shred_pool_align(), fd_rpc_shred_pool_footprint( shred_pool_sz ) );
146-
hist->shred_pool = fd_rpc_shred_pool_join( fd_rpc_shred_pool_new( mem, shred_pool_sz ) );
142+
mem = fd_spad_alloc( spad, alignof(fd_rpc_reasm_map_t), sizeof(fd_rpc_reasm_map_t) );
143+
memset(mem, 0, sizeof(fd_rpc_reasm_map_t));
144+
hist->reasm_map = (fd_rpc_reasm_map_t *)mem;
145+
static const ulong reasm_pool_sz = 1024UL;
146+
mem = fd_spad_alloc( spad, fd_rpc_reasm_pool_align(), fd_rpc_reasm_pool_footprint( reasm_pool_sz ) );
147+
hist->reasm_pool = fd_rpc_reasm_pool_join( fd_rpc_reasm_pool_new( mem, reasm_pool_sz ) );
147148

148149
hist->file_fd = open( args->history_file, O_CREAT | O_RDWR | O_TRUNC, 0644 );
149150
if( hist->file_fd == -1 ) FD_LOG_ERR(( "unable to open rpc history file: %s", args->history_file ));
@@ -179,16 +180,16 @@ fd_rpc_history_alloc_block(fd_rpc_history_t * hist, ulong slot) {
179180

180181
void
181182
fd_rpc_history_debug(fd_rpc_history_t * hist) {
182-
fd_rpc_shred_map_t * shred_map = hist->shred_map;
183+
fd_rpc_reasm_map_t * reasm_map = hist->reasm_map;
183184
ulong tot_cnt = 0;
184-
for( ulong slot = shred_map->tail; slot < shred_map->head; slot++ ) {
185-
ulong col_idx = slot & (FD_SHRED_MAP_COL_CNT - 1);
186-
struct fd_rpc_shred_map_column * col = &shred_map->cols[col_idx];
187-
FD_LOG_NOTICE(( "slot %lu: %lu shreds, %lu max, %d end_found", slot, col->used_cnt, col->max_idx, (int)col->end_found ));
185+
for( ulong slot = reasm_map->tail; slot < reasm_map->head; slot++ ) {
186+
ulong col_idx = slot & (FD_REASM_MAP_COL_CNT - 1);
187+
struct fd_rpc_reasm_map_column * col = &reasm_map->cols[col_idx];
188+
FD_LOG_NOTICE(( "slot %lu: %lu fecs, %lu max, %d end_found", slot, col->used_cnt, col->max_idx, (int)col->end_found ));
188189
tot_cnt += col->used_cnt;
189190
}
190-
FD_LOG_NOTICE(( "%lu head, %lu tail, %lu total shreds, %lu total blocks",
191-
shred_map->head, shred_map->tail, tot_cnt, shred_map->head - shred_map->tail ));
191+
FD_LOG_NOTICE(( "%lu head, %lu tail, %lu total fecs, %lu total blocks",
192+
reasm_map->head, reasm_map->tail, tot_cnt, reasm_map->head - reasm_map->tail ));
192193
}
193194

194195
void
@@ -260,24 +261,24 @@ fd_rpc_history_scan_block(fd_rpc_history_t * hist, ulong slot, ulong file_offset
260261
}
261262

262263
void
263-
fd_rpc_history_process_column(fd_rpc_history_t * hist, struct fd_rpc_shred_map_column * col, fd_shred_t * shred) {
264+
fd_rpc_history_process_column(fd_rpc_history_t * hist, struct fd_rpc_reasm_map_column * col, fd_reasm_fec_t * fec) {
264265
FD_SPAD_FRAME_BEGIN( hist->spad ) {
265266

266-
FD_LOG_NOTICE(( "assembling slot %lu block", shred->slot ));
267+
FD_LOG_NOTICE(( "assembling slot %lu block", fec->slot ));
267268

268269
/* Assemble the block */
269-
ulong slot = shred->slot;
270+
ulong slot = fec->slot;
270271
ulong blk_sz = 0;
271272
for( ulong i = 0; i < col->max_idx; i++ ) {
272-
fd_rpc_shred_map_elem_t * ele = col->ele[i];
273-
blk_sz += ele->data.payload_sz;
273+
fd_rpc_reasm_map_elem_t * ele = col->ele[i];
274+
blk_sz += ele->data.data_sz;
274275
}
275276
uchar * blk_data = fd_spad_alloc( hist->spad, alignof(ulong), blk_sz );
276277
ulong blk_off = 0;
277278
for( ulong i = 0; i < col->max_idx; i++ ) {
278-
fd_rpc_shred_map_elem_t * ele = col->ele[i];
279-
fd_memcpy( blk_data + blk_off, ele->data.payload, ele->data.payload_sz );
280-
blk_off += ele->data.payload_sz;
279+
fd_rpc_reasm_map_elem_t * ele = col->ele[i];
280+
fd_memcpy( blk_data + blk_off, ele->data.data, ele->data.data_sz );
281+
blk_off += ele->data.data_sz;
281282
}
282283
FD_TEST( blk_off == blk_sz );
283284

@@ -300,13 +301,13 @@ fd_rpc_history_process_column(fd_rpc_history_t * hist, struct fd_rpc_shred_map_c
300301
}
301302

302303
static void
303-
fd_rpc_history_discard_column(fd_rpc_shred_map_t * shred_map, fd_rpc_shred_map_elem_t * shred_pool, ulong slot) {
304-
ulong col_idx = slot & (FD_SHRED_MAP_COL_CNT - 1);
305-
struct fd_rpc_shred_map_column * col = &shred_map->cols[col_idx];
304+
fd_rpc_history_discard_column(fd_rpc_reasm_map_t * reasm_map, fd_rpc_reasm_map_elem_t * reasm_pool, ulong slot) {
305+
ulong col_idx = slot & (FD_REASM_MAP_COL_CNT - 1);
306+
struct fd_rpc_reasm_map_column * col = &reasm_map->cols[col_idx];
306307
for( ulong i = 0; i < col->max_idx; i++ ) {
307-
fd_rpc_shred_map_elem_t * ele = col->ele[i];
308+
fd_rpc_reasm_map_elem_t * ele = col->ele[i];
308309
if( ele ) {
309-
fd_rpc_shred_pool_ele_release( shred_pool, ele );
310+
fd_rpc_reasm_pool_ele_release( reasm_pool, ele );
310311
col->ele[i] = NULL;
311312
}
312313
}
@@ -316,59 +317,61 @@ fd_rpc_history_discard_column(fd_rpc_shred_map_t * shred_map, fd_rpc_shred_map_e
316317
}
317318

318319
void
319-
fd_rpc_history_save_shred(fd_rpc_history_t * hist, fd_shred_t * shred) {
320-
if( !fd_shred_is_data( shred->variant ) ) return;
321-
if( shred->idx >= FD_SHRED_BLK_MAX ) return;
320+
fd_rpc_history_save_fec(fd_rpc_history_t * hist, fd_store_t * store, fd_reasm_fec_t * fec_msg ) {
321+
fd_store_fec_t * fec_p = fd_store_query( store, &fec_msg->key );
322+
if( !fec_p ) return;
322323

323-
fd_rpc_shred_map_t * shred_map = hist->shred_map;
324-
fd_rpc_shred_map_elem_t * shred_pool = hist->shred_pool;
324+
fd_rpc_reasm_map_t * reasm_map = hist->reasm_map;
325+
fd_rpc_reasm_map_elem_t * reasm_pool = hist->reasm_pool;
325326

326-
if( shred_map->head == 0UL ) {
327-
shred_map->head = shred->slot+1;
328-
shred_map->tail = shred->slot;
327+
if( reasm_map->head == 0UL ) {
328+
reasm_map->head = fec_msg->slot+1;
329+
reasm_map->tail = fec_msg->slot;
329330
}
330-
if( shred->slot < shred_map->tail ) return; /* Do not go backwards */
331-
while( shred->slot >= shred_map->tail + FD_SHRED_MAP_COL_CNT ) {
332-
FD_TEST( shred_map->tail < shred_map->head );
333-
fd_rpc_history_discard_column( shred_map, shred_pool, shred_map->tail++ );
331+
if( fec_msg->slot < reasm_map->tail ) return; /* Do not go backwards */
332+
while( fec_msg->slot >= reasm_map->tail + FD_REASM_MAP_COL_CNT ) {
333+
FD_TEST( reasm_map->tail < reasm_map->head );
334+
fd_rpc_history_discard_column( reasm_map, reasm_pool, reasm_map->tail++ );
334335
}
335-
while( shred->slot >= shred_map->head ) {
336-
ulong col_idx = (shred_map->head++) & (FD_SHRED_MAP_COL_CNT - 1);
337-
struct fd_rpc_shred_map_column * col = &shred_map->cols[col_idx];
336+
while( fec_msg->slot >= reasm_map->head ) {
337+
ulong col_idx = (reasm_map->head++) & (FD_REASM_MAP_COL_CNT - 1);
338+
struct fd_rpc_reasm_map_column * col = &reasm_map->cols[col_idx];
338339
col->max_idx = 0;
339340
col->used_cnt = 0;
340341
col->end_found = 0;
341342
}
342-
FD_TEST( shred->slot >= shred_map->tail && shred->slot < shred_map->head && shred_map->head - shred_map->tail <= FD_SHRED_MAP_COL_CNT );
343+
FD_TEST( fec_msg->slot >= reasm_map->tail && fec_msg->slot < reasm_map->head && reasm_map->head - reasm_map->tail <= FD_REASM_MAP_COL_CNT );
343344

344-
ulong col_idx = shred->slot & (FD_SHRED_MAP_COL_CNT - 1);
345-
struct fd_rpc_shred_map_column * col = &shred_map->cols[col_idx];
345+
ulong col_idx = fec_msg->slot & (FD_REASM_MAP_COL_CNT - 1);
346+
struct fd_rpc_reasm_map_column * col = &reasm_map->cols[col_idx];
346347

347-
/* See if we've already received this shred */
348-
if( col->max_idx > shred->idx && col->ele[shred->idx] != NULL ) return;
349-
while( col->max_idx <= shred->idx ) {
348+
FD_TEST( fec_msg->fec_set_idx < FD_REASM_MAP_COL_HEIGHT );
349+
350+
/* See if we've already received this fec */
351+
if( col->max_idx > fec_msg->fec_set_idx && col->ele[fec_msg->fec_set_idx] != NULL ) return;
352+
while( col->max_idx <= fec_msg->fec_set_idx ) {
350353
col->ele[col->max_idx++] = NULL;
351354
}
352355

353356
/* Acquire space for the shred. If we've run out of space, discard the oldest column */
354-
fd_rpc_shred_map_elem_t * ele = NULL;
357+
fd_rpc_reasm_map_elem_t * ele = NULL;
355358
do {
356-
ele = fd_rpc_shred_pool_ele_acquire( shred_pool );
359+
ele = fd_rpc_reasm_pool_ele_acquire( reasm_pool );
357360
if( ele != NULL ) break;
358-
FD_TEST( shred_map->tail < shred_map->head );
359-
fd_rpc_history_discard_column( shred_map, shred_pool, shred_map->tail++ );
361+
FD_TEST( reasm_map->tail < reasm_map->head );
362+
fd_rpc_history_discard_column( reasm_map, reasm_pool, reasm_map->tail++ );
360363
} while( 1 );
361364

362-
ulong payload_sz = ele->data.payload_sz = fd_shred_payload_sz( shred );
363-
fd_memcpy( ele->data.payload, fd_shred_data_payload( shred ), payload_sz );
364-
col->ele[shred->idx] = ele;
365+
ulong data_sz = ele->data.data_sz = fec_p->data_sz;
366+
fd_memcpy( ele->data.data, fec_p->data, data_sz );
367+
col->ele[fec_msg->fec_set_idx] = ele;
365368
col->used_cnt++;
366-
if( FD_UNLIKELY( shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE ) ) col->end_found = 1;
369+
if( FD_UNLIKELY( fec_msg->slot_complete ) ) col->end_found = 1;
367370

368371
if( col->end_found && col->used_cnt == col->max_idx ) {
369372
/* We've received all the shreds for this slot. Process it. */
370-
fd_rpc_history_process_column( hist, col, shred );
371-
fd_rpc_history_discard_column( shred_map, shred_pool, shred->slot );
373+
fd_rpc_history_process_column( hist, col, fec_msg );
374+
fd_rpc_history_discard_column( reasm_map, reasm_pool, fec_msg->slot );
372375
}
373376
}
374377

src/discof/rpcserver/fd_rpc_history.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ fd_rpc_history_t * fd_rpc_history_create(fd_rpcserver_args_t * args);
1616

1717
void fd_rpc_history_save_info(fd_rpc_history_t * hist, fd_replay_notif_msg_t * msg);
1818

19-
void fd_rpc_history_save_shred(fd_rpc_history_t * hist, fd_shred_t * shred);
19+
void fd_rpc_history_save_fec(fd_rpc_history_t * hist, fd_store_t * store, fd_reasm_fec_t * fec);
2020

2121
ulong fd_rpc_history_first_slot(fd_rpc_history_t * hist);
2222

src/discof/rpcserver/fd_rpc_service.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ struct fd_rpc_global_ctx {
8585
fd_spad_t * spad;
8686
fd_webserver_t ws;
8787
fd_funk_t * funk;
88+
fd_store_t * store;
8889
struct fd_ws_subscription sub_list[FD_WS_MAX_SUBS];
8990
ulong sub_cnt;
9091
ulong last_subsc_id;
@@ -2340,8 +2341,8 @@ fd_rpc_create_ctx(fd_rpcserver_args_t * args, fd_rpc_ctx_t ** ctx_p) {
23402341
void
23412342
fd_rpc_start_service(fd_rpcserver_args_t * args, fd_rpc_ctx_t * ctx) {
23422343
fd_rpc_global_ctx_t * gctx = ctx->global;
2343-
23442344
gctx->funk = args->funk;
2345+
gctx->store = args->store;
23452346
}
23462347

23472348
int
@@ -2440,13 +2441,13 @@ fd_rpc_stake_after_frag(fd_rpc_ctx_t * ctx) {
24402441

24412442
void
24422443
fd_rpc_repair_during_frag(fd_rpc_ctx_t * ctx, void const * msg, int sz) {
2443-
(void)sz;
2444+
FD_TEST( sz==(int)sizeof(fd_reasm_fec_t) );
24442445
fd_memcpy(ctx->global->buffer, msg, sizeof(fd_reasm_fec_t));
24452446
}
24462447

24472448
void
24482449
fd_rpc_repair_after_frag(fd_rpc_ctx_t * ctx) {
24492450
fd_rpc_global_ctx_t * subs = ctx->global;
24502451
fd_reasm_fec_t * fec_p = (fd_reasm_fec_t *)subs->buffer;
2451-
(void)fec_p;
2452+
fd_rpc_history_save_fec( subs->history, subs->store, fec_p );
24522453
}

src/discof/rpcserver/fd_rpcserv_tile.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,17 +128,17 @@ after_frag( fd_rpcserv_tile_ctx_t * ctx,
128128
(void)stem;
129129

130130
ulong * last_seq = &ctx->in_links[ in_idx ].last_seq;
131-
if( seq != *last_seq + 1 ) {
131+
if( FD_UNLIKELY( seq != *last_seq + 1 && seq != 0 ) ) {
132132
FD_LOG_NOTICE(( "seq jump from %lu to %lu on input %lu", *last_seq, seq, in_idx ));
133133
}
134134
*last_seq = seq;
135135

136136
uchar kind = ctx->in_kind[ in_idx ];
137-
if( FD_UNLIKELY( kind==IN_KIND_REPLAY_NOTIF ) ) {
137+
if( kind==IN_KIND_REPLAY_NOTIF ) {
138138
fd_rpc_replay_after_frag( ctx->ctx );
139-
} else if( FD_UNLIKELY( kind == IN_KIND_STAKE_OUT ) ) {
139+
} else if( kind == IN_KIND_STAKE_OUT ) {
140140
fd_rpc_stake_after_frag( ctx->ctx );
141-
} else if( FD_UNLIKELY( kind == IN_KIND_REPAIR_REPLAY ) ) {
141+
} else if( kind == IN_KIND_REPAIR_REPLAY ) {
142142
fd_rpc_repair_after_frag( ctx->ctx );
143143
} else {
144144
FD_LOG_ERR(("Unknown in_idx %lu for rpc", in_idx));

0 commit comments

Comments
 (0)