Skip to content

Commit 0182f9d

Browse files
committed
work
1 parent c7533a3 commit 0182f9d

File tree

3 files changed

+222
-84
lines changed

3 files changed

+222
-84
lines changed

src/discof/rpcserver/fd_rpc_history.c

Lines changed: 218 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
#include "../../ballet/shred/fd_shred.h"
77
#include "../../flamenco/runtime/fd_system_ids.h"
88

9+
#include <string.h>
10+
911
struct fd_rpc_block {
1012
ulong slot;
1113
ulong next;
@@ -75,13 +77,42 @@ typedef struct fd_rpc_acct_map_elem fd_rpc_acct_map_elem_t;
7577
#define POOL_T fd_rpc_acct_map_elem_t
7678
#include "../../util/tmpl/fd_pool.c"
7779

80+
struct fd_rpc_shred_map_elem {
81+
union {
82+
ulong next;
83+
struct {
84+
ulong payload_sz;
85+
uchar payload[FD_SHRED_DATA_PAYLOAD_MAX];
86+
} data;
87+
};
88+
};
89+
typedef struct fd_rpc_shred_map_elem fd_rpc_shred_map_elem_t;
90+
#define POOL_NAME fd_rpc_shred_pool
91+
#define POOL_T fd_rpc_shred_map_elem_t
92+
#include "../../util/tmpl/fd_pool.c"
93+
94+
#define FD_SHRED_MAP_COL_CNT (64UL)
95+
struct fd_rpc_shred_map {
96+
struct fd_rpc_shred_map_column {
97+
ulong max_idx; /* The max shred index set in this column + 1 */
98+
ulong used_cnt; /* The number of shreds received in this column */
99+
uchar end_found; /* Whether the last shred of the slot has been found */
100+
fd_rpc_shred_map_elem_t * ele[FD_SHRED_BLK_MAX];
101+
} cols[FD_SHRED_MAP_COL_CNT];
102+
ulong head; /* Next open column */
103+
ulong tail; /* Oldest column */
104+
};
105+
typedef struct fd_rpc_shred_map fd_rpc_shred_map_t;
106+
78107
struct fd_rpc_history {
79108
fd_spad_t * spad;
80109
fd_rpc_block_t * block_map;
81110
ulong block_cnt;
82111
fd_rpc_txn_t * txn_map;
83112
fd_rpc_acct_map_t * acct_map;
84113
fd_rpc_acct_map_elem_t * acct_pool;
114+
fd_rpc_shred_map_t * shred_map;
115+
fd_rpc_shred_map_elem_t * shred_pool;
85116
ulong first_slot;
86117
ulong latest_slot;
87118
int file_fd;
@@ -107,111 +138,220 @@ fd_rpc_history_create(fd_rpcserver_args_t * args) {
107138
mem = fd_spad_alloc( spad, fd_rpc_acct_map_pool_align(), fd_rpc_acct_map_pool_footprint( args->acct_index_max ) );
108139
hist->acct_pool = fd_rpc_acct_map_pool_join( fd_rpc_acct_map_pool_new( mem, args->acct_index_max ) );
109140

141+
mem = fd_spad_alloc( spad, alignof(fd_rpc_shred_map_t), sizeof(fd_rpc_shred_map_t) );
142+
memset(mem, 0, sizeof(fd_rpc_shred_map_t));
143+
hist->shred_map = (fd_rpc_shred_map_t *)mem;
144+
static const ulong shred_pool_sz = 1024UL * 128UL;
145+
mem = fd_spad_alloc( spad, fd_rpc_shred_pool_align(), fd_rpc_shred_pool_footprint( shred_pool_sz ) );
146+
hist->shred_pool = fd_rpc_shred_pool_join( fd_rpc_shred_pool_new( mem, shred_pool_sz ) );
147+
110148
hist->file_fd = open( args->history_file, O_CREAT | O_RDWR | O_TRUNC, 0644 );
111149
if( hist->file_fd == -1 ) FD_LOG_ERR(( "unable to open rpc history file: %s", args->history_file ));
112150
hist->file_totsz = 0;
113151

114152
return hist;
115153
}
116154

155+
static fd_rpc_block_t *
156+
fd_rpc_history_alloc_block(fd_rpc_history_t * hist, ulong slot) {
157+
fd_rpc_block_t * blk = fd_rpc_block_map_query(hist->block_map, &slot, NULL);
158+
if( blk ) return blk;
159+
if( fd_rpc_block_map_is_full( hist->block_map ) ) return NULL; /* Out of space */
160+
blk = fd_rpc_block_map_insert( hist->block_map, &slot );
161+
if( blk == NULL ) {
162+
FD_LOG_ERR(( "unable to save slot %lu block", slot ));
163+
return NULL;
164+
}
165+
blk->slot = slot;
166+
memset( &blk->info, 0, sizeof(fd_replay_notif_msg_t) );
167+
blk->info.slot_exec.slot = slot;
168+
if( hist->first_slot == ULONG_MAX ) {
169+
hist->first_slot = hist->latest_slot = slot;
170+
} else {
171+
if( slot < hist->first_slot ) hist->first_slot = slot;
172+
else if( slot > hist->latest_slot ) hist->latest_slot = slot;
173+
}
174+
return blk;
175+
}
176+
117177
void
118-
fd_rpc_history_save(fd_rpc_history_t * hist, fd_replay_notif_msg_t * info) {
119-
FD_SPAD_FRAME_BEGIN( hist->spad ) {
120-
if( fd_rpc_block_map_is_full( hist->block_map ) ) return; /* Out of space */
178+
fd_rpc_history_save_info(fd_rpc_history_t * hist, fd_replay_notif_msg_t * info) {
179+
fd_rpc_block_t * blk = fd_rpc_history_alloc_block( hist, info->slot_exec.slot );
180+
if( blk == NULL ) return;
181+
blk->info = *info;
182+
}
121183

122-
ulong blk_max = info->slot_exec.shred_cnt * FD_SHRED_MAX_SZ;
123-
uchar * blk_data = fd_spad_alloc( hist->spad, 1, blk_max );
124-
ulong blk_sz = 0;
125-
// if( fd_blockstore_slice_query( blockstore, info->slot_exec.slot, 0, (uint)(info->slot_exec.shred_cnt-1), blk_max, blk_data, &blk_sz) ) {
126-
// FD_LOG_WARNING(( "unable to read slot %lu block", info->slot_exec.slot ));
127-
// return;
128-
// }
184+
static void
185+
fd_rpc_history_scan_block(fd_rpc_history_t * hist, ulong slot, ulong file_offset, uchar * blk_data, ulong blk_sz) {
186+
ulong blockoff = 0;
187+
while (blockoff < blk_sz) {
188+
if ( blockoff + sizeof(ulong) > blk_sz )
189+
return;
190+
ulong mcount = *(const ulong *)(blk_data + blockoff);
191+
blockoff += sizeof(ulong);
192+
193+
/* Loop across microblocks */
194+
for (ulong mblk = 0; mblk < mcount; ++mblk) {
195+
if ( blockoff + sizeof(fd_microblock_hdr_t) > blk_sz )
196+
FD_LOG_ERR(("premature end of block"));
197+
fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t *)((const uchar *)blk_data + blockoff);
198+
blockoff += sizeof(fd_microblock_hdr_t);
199+
200+
/* Loop across transactions */
201+
for ( ulong txn_idx = 0; txn_idx < hdr->txn_cnt; txn_idx++ ) {
202+
uchar txn_out[FD_TXN_MAX_SZ];
203+
ulong pay_sz = 0;
204+
const uchar* raw = (const uchar *)blk_data + blockoff;
205+
ulong txn_sz = fd_txn_parse_core(raw, fd_ulong_min(blk_sz - blockoff, FD_TXN_MTU), txn_out, NULL, &pay_sz);
206+
if ( txn_sz == 0 || txn_sz > FD_TXN_MAX_SZ ) {
207+
FD_LOG_ERR( ( "failed to parse transaction %lu in microblock %lu", txn_idx, mblk ) );
208+
}
209+
fd_txn_t * txn = (fd_txn_t *)txn_out;
210+
211+
/* Loop across signatures */
212+
fd_ed25519_sig_t const * sigs = (fd_ed25519_sig_t const *)(raw + txn->signature_off);
213+
for ( uchar j = 0; j < txn->signature_cnt; j++ ) {
214+
if( fd_rpc_txn_map_is_full( hist->txn_map ) ) break; /* Out of space */
215+
fd_rpc_txn_key_t key;
216+
memcpy(&key, (const uchar*)&sigs[j], sizeof(key));
217+
fd_rpc_txn_t * ent = fd_rpc_txn_map_insert( hist->txn_map, &key );
218+
ent->file_offset = file_offset + blockoff;
219+
ent->file_size = pay_sz;
220+
ent->slot = slot;
221+
}
129222

130-
FD_LOG_NOTICE(( "saving slot %lu block", info->slot_exec.slot ));
223+
/* Loop across accounts */
224+
fd_rpc_txn_key_t sig0;
225+
memcpy(&sig0, (const uchar*)sigs, sizeof(sig0));
226+
fd_pubkey_t * accs = (fd_pubkey_t *)((uchar *)raw + txn->acct_addr_off);
227+
for( ulong i = 0UL; i < txn->acct_addr_cnt; i++ ) {
228+
if( !memcmp(&accs[i], fd_solana_vote_program_id.key, sizeof(fd_pubkey_t)) ) continue; /* Ignore votes */
229+
if( !fd_rpc_acct_map_pool_free( hist->acct_pool ) ) break;
230+
fd_rpc_acct_map_elem_t * ele = fd_rpc_acct_map_pool_ele_acquire( hist->acct_pool );
231+
ele->key = accs[i];
232+
ele->slot = slot;
233+
ele->sig = sig0;
234+
fd_rpc_acct_map_ele_insert( hist->acct_map, ele, hist->acct_pool );
235+
}
131236

132-
if( hist->first_slot == ULONG_MAX ) hist->first_slot = info->slot_exec.slot;
133-
hist->latest_slot = info->slot_exec.slot;
237+
blockoff += pay_sz;
238+
}
239+
}
240+
}
241+
if ( blockoff != blk_sz )
242+
FD_LOG_ERR(("garbage at end of block"));
243+
}
134244

135-
fd_rpc_block_t * blk = fd_rpc_block_map_insert( hist->block_map, &info->slot_exec.slot );
136-
if( blk == NULL ) {
137-
FD_LOG_ERR(( "unable to save slot %lu block", info->slot_exec.slot ));
138-
return;
245+
void
246+
fd_rpc_history_process_column(fd_rpc_history_t * hist, struct fd_rpc_shred_map_column * col, fd_shred_t * shred) {
247+
FD_SPAD_FRAME_BEGIN( hist->spad ) {
248+
249+
FD_LOG_NOTICE(( "assembling slot %lu block", shred->slot ));
250+
251+
/* Assemble the block */
252+
ulong slot = shred->slot;
253+
ulong blk_sz = 0;
254+
for( ulong i = 0; i < col->max_idx; i++ ) {
255+
fd_rpc_shred_map_elem_t * ele = col->ele[i];
256+
blk_sz += ele->data.payload_sz;
257+
}
258+
uchar * blk_data = fd_spad_alloc( hist->spad, alignof(ulong), blk_sz );
259+
ulong blk_off = 0;
260+
for( ulong i = 0; i < col->max_idx; i++ ) {
261+
fd_rpc_shred_map_elem_t * ele = col->ele[i];
262+
fd_memcpy( blk_data + blk_off, ele->data.payload, ele->data.payload_sz );
263+
blk_off += ele->data.payload_sz;
139264
}
140-
blk->info = *info;
265+
FD_TEST( blk_off == blk_sz );
141266

267+
/* Get a block from the map */
268+
fd_rpc_block_t * blk = fd_rpc_history_alloc_block( hist, slot );
269+
if( blk == NULL ) return;
270+
271+
/* Write the block to the file */
142272
if( pwrite( hist->file_fd, blk_data, blk_sz, (long)hist->file_totsz ) != (ssize_t)blk_sz ) {
143273
FD_LOG_ERR(( "unable to write to rpc history file" ));
144274
}
145-
ulong base_offset = blk->file_offset = hist->file_totsz;
275+
ulong file_offset = blk->file_offset = hist->file_totsz;
146276
blk->file_size = blk_sz;
147277
hist->file_totsz += blk_sz;
148278
hist->block_cnt ++;
149279

150-
ulong blockoff = 0;
151-
while (blockoff < blk_sz) {
152-
if ( blockoff + sizeof(ulong) > blk_sz )
153-
return;
154-
ulong mcount = *(const ulong *)(blk_data + blockoff);
155-
blockoff += sizeof(ulong);
156-
157-
/* Loop across microblocks */
158-
for (ulong mblk = 0; mblk < mcount; ++mblk) {
159-
if ( blockoff + sizeof(fd_microblock_hdr_t) > blk_sz )
160-
FD_LOG_ERR(("premature end of block"));
161-
fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t *)((const uchar *)blk_data + blockoff);
162-
blockoff += sizeof(fd_microblock_hdr_t);
163-
164-
/* Loop across transactions */
165-
for ( ulong txn_idx = 0; txn_idx < hdr->txn_cnt; txn_idx++ ) {
166-
uchar txn_out[FD_TXN_MAX_SZ];
167-
ulong pay_sz = 0;
168-
const uchar* raw = (const uchar *)blk_data + blockoff;
169-
ulong txn_sz = fd_txn_parse_core(raw, fd_ulong_min(blk_sz - blockoff, FD_TXN_MTU), txn_out, NULL, &pay_sz);
170-
if ( txn_sz == 0 || txn_sz > FD_TXN_MAX_SZ ) {
171-
FD_LOG_ERR( ( "failed to parse transaction %lu in microblock %lu", txn_idx, mblk ) );
172-
}
173-
fd_txn_t * txn = (fd_txn_t *)txn_out;
174-
175-
/* Loop across signatures */
176-
fd_ed25519_sig_t const * sigs = (fd_ed25519_sig_t const *)(raw + txn->signature_off);
177-
for ( uchar j = 0; j < txn->signature_cnt; j++ ) {
178-
if( fd_rpc_txn_map_is_full( hist->txn_map ) ) break; /* Out of space */
179-
fd_rpc_txn_key_t key;
180-
memcpy(&key, (const uchar*)&sigs[j], sizeof(key));
181-
fd_rpc_txn_t * ent = fd_rpc_txn_map_insert( hist->txn_map, &key );
182-
ent->file_offset = base_offset + blockoff;
183-
ent->file_size = pay_sz;
184-
ent->slot = info->slot_exec.slot;
185-
}
186-
187-
/* Loop across accoounts */
188-
fd_rpc_txn_key_t sig0;
189-
memcpy(&sig0, (const uchar*)sigs, sizeof(sig0));
190-
fd_pubkey_t * accs = (fd_pubkey_t *)((uchar *)raw + txn->acct_addr_off);
191-
for( ulong i = 0UL; i < txn->acct_addr_cnt; i++ ) {
192-
if( !memcmp(&accs[i], fd_solana_vote_program_id.key, sizeof(fd_pubkey_t)) ) continue; /* Ignore votes */
193-
if( !fd_rpc_acct_map_pool_free( hist->acct_pool ) ) break;
194-
fd_rpc_acct_map_elem_t * ele = fd_rpc_acct_map_pool_ele_acquire( hist->acct_pool );
195-
ele->key = accs[i];
196-
ele->slot = info->slot_exec.slot;
197-
ele->sig = sig0;
198-
fd_rpc_acct_map_ele_insert( hist->acct_map, ele, hist->acct_pool );
199-
}
200-
201-
blockoff += pay_sz;
202-
}
203-
}
204-
}
205-
if ( blockoff != blk_sz )
206-
FD_LOG_ERR(("garbage at end of block"));
280+
/* Scan the block */
281+
fd_rpc_history_scan_block( hist, slot, file_offset, blk_data, blk_sz );
207282

208283
} FD_SPAD_FRAME_END;
209284
}
210285

286+
static void
287+
fd_rpc_history_discard_column(fd_rpc_shred_map_t * shred_map, fd_rpc_shred_map_elem_t * shred_pool, ulong slot) {
288+
ulong col_idx = slot & (FD_SHRED_MAP_COL_CNT - 1);
289+
struct fd_rpc_shred_map_column * col = &shred_map->cols[col_idx];
290+
for( ulong i = 0; i < col->max_idx; i++ ) {
291+
fd_rpc_shred_map_elem_t * ele = col->ele[i];
292+
if( ele ) {
293+
fd_rpc_shred_pool_ele_release( shred_pool, ele );
294+
col->ele[i] = NULL;
295+
}
296+
}
297+
col->max_idx = 0;
298+
col->used_cnt = 0;
299+
col->end_found = 0;
300+
}
301+
211302
void
212303
fd_rpc_history_save_shred(fd_rpc_history_t * hist, fd_shred_t * shred) {
213-
(void)hist;
214-
(void)shred;
304+
if( !fd_shred_is_data( shred->variant ) ) return;
305+
if( shred->idx >= FD_SHRED_BLK_MAX ) return;
306+
307+
fd_rpc_shred_map_t * shred_map = hist->shred_map;
308+
fd_rpc_shred_map_elem_t * shred_pool = hist->shred_pool;
309+
310+
if( shred_map->head == 0UL ) {
311+
shred_map->head = shred->slot+1;
312+
shred_map->tail = shred->slot;
313+
}
314+
while( shred->slot > shred_map->tail + FD_SHRED_MAP_COL_CNT ) {
315+
FD_TEST( shred_map->tail < shred_map->head );
316+
fd_rpc_history_discard_column( shred_map, shred_pool, shred_map->tail++ );
317+
}
318+
while( shred->slot >= shred_map->head ) {
319+
ulong col_idx = (shred_map->head++) & (FD_SHRED_MAP_COL_CNT - 1);
320+
struct fd_rpc_shred_map_column * col = &shred_map->cols[col_idx];
321+
col->max_idx = 0;
322+
col->used_cnt = 0;
323+
col->end_found = 0;
324+
}
325+
326+
ulong col_idx = shred->slot & (FD_SHRED_MAP_COL_CNT - 1);
327+
struct fd_rpc_shred_map_column * col = &shred_map->cols[col_idx];
328+
329+
/* See if we've already received this shred */
330+
if( col->max_idx > shred->idx && col->ele[shred->idx] != NULL ) return;
331+
while( col->max_idx <= shred->idx ) {
332+
col->ele[col->max_idx++] = NULL;
333+
}
334+
335+
/* Acquire space for the shred. If we've run out of space, discard the oldest column */
336+
fd_rpc_shred_map_elem_t * ele = NULL;
337+
do {
338+
ele = fd_rpc_shred_pool_ele_acquire( shred_pool );
339+
if( ele != NULL ) break;
340+
FD_TEST( shred_map->tail < shred_map->head );
341+
fd_rpc_history_discard_column( shred_map, shred_pool, shred_map->tail++ );
342+
} while( 1 );
343+
344+
ulong payload_sz = ele->data.payload_sz = fd_shred_payload_sz( shred );
345+
fd_memcpy( ele->data.payload, fd_shred_data_payload( shred ), payload_sz );
346+
col->ele[shred->idx] = ele;
347+
col->used_cnt++;
348+
if( FD_UNLIKELY( shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE ) ) col->end_found = 1;
349+
350+
if( col->end_found && col->used_cnt == col->max_idx ) {
351+
/* We've received all the shreds for this slot. Process it. */
352+
fd_rpc_history_process_column( hist, col, shred );
353+
fd_rpc_history_discard_column( shred_map, shred_pool, shred->slot );
354+
}
215355
}
216356

217357
ulong

src/discof/rpcserver/fd_rpc_history.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ typedef struct fd_rpc_txn_key fd_rpc_txn_key_t;
1414

1515
fd_rpc_history_t * fd_rpc_history_create(fd_rpcserver_args_t * args);
1616

17-
void fd_rpc_history_save(fd_rpc_history_t * hist, fd_replay_notif_msg_t * msg);
17+
void fd_rpc_history_save_info(fd_rpc_history_t * hist, fd_replay_notif_msg_t * msg);
1818

1919
void fd_rpc_history_save_shred(fd_rpc_history_t * hist, fd_shred_t * shred);
2020

src/discof/rpcserver/fd_rpc_service.c

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2407,7 +2407,7 @@ fd_rpc_replay_after_frag(fd_rpc_ctx_t * ctx, fd_replay_notif_msg_t * msg) {
24072407

24082408
if( msg->slot_exec.shred_cnt == 0 ) return;
24092409

2410-
fd_rpc_history_save( subs->history, msg );
2410+
fd_rpc_history_save_info( subs->history, msg );
24112411

24122412
for( ulong j = 0; j < subs->sub_cnt; ++j ) {
24132413
struct fd_ws_subscription * sub = &subs->sub_list[ j ];
@@ -2446,7 +2446,5 @@ void
24462446
fd_rpc_shred_repair_after_frag(fd_rpc_ctx_t * ctx, uchar * shred) {
24472447
fd_rpc_global_ctx_t * subs = ctx->global;
24482448
fd_shred_t * shred_p = (fd_shred_t *)shred;
2449-
if( fd_shred_is_data( shred_p->variant ) ) {
2450-
fd_rpc_history_save_shred( subs->history, shred_p );
2451-
}
2452-
}
2449+
fd_rpc_history_save_shred( subs->history, shred_p );
2450+
}

0 commit comments

Comments
 (0)