Skip to content

Commit 3bdaafc

Browse files
snapshots: highest manifest slot
1 parent b04bb32 commit 3bdaafc

File tree

10 files changed

+215
-83
lines changed

10 files changed

+215
-83
lines changed

src/discof/replay/fd_replay_tile.c

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -921,7 +921,9 @@ on_snapshot_message( fd_replay_tile_ctx_t * ctx,
921921
fd_stem_context_t * stem,
922922
ulong in_idx,
923923
ulong chunk,
924-
ulong sig ) {
924+
ulong sig,
925+
ulong tsorig FD_PARAM_UNUSED,
926+
ulong tspub FD_PARAM_UNUSED) {
925927
ulong msg = fd_ssmsg_sig_message( sig );
926928
if( FD_LIKELY( msg==FD_SSMSG_DONE ) ) {
927929
/* An end of message notification indicates the snapshot is loaded.
@@ -930,25 +932,22 @@ on_snapshot_message( fd_replay_tile_ctx_t * ctx,
930932
state machine and set the state here accordingly. */
931933
FD_LOG_INFO(("Snapshot loaded, replay can start executing"));
932934
ctx->snapshot_init_done = 1;
933-
/* Kickoff repair orphans after the snapshots are done loading. If
934-
we kickoff repair after we receive a full manifest, we might try
935-
to repair a slot that is potentially huge amount of slots behind
936-
turbine causing our repair buffers to fill up. Instead, we should
937-
wait until we are done receiving all the snapshots.
938-
939-
TODO: Eventually, this logic should be cased out more:
940-
1. If we just have a full snapshot, load in the slot_ctx for the
941-
slot ctx and kickoff repair as soon as the manifest is
942-
received.
943-
2. If we are loading a full and incremental snapshot, we should
944-
only load in the slot_ctx and kickoff repair for the
945-
incremental snapshot. */
946935
kickoff_repair_orphans( ctx, stem );
947936
init_from_snapshot( ctx, stem );
948937
return;
949938
}
950939

951940
switch( msg ) {
941+
case FD_SSMSG_HIGHEST_MANIFEST_SLOT: {
942+
/* snapin sends the highest manifest slot so far to notify any
943+
listeners of the highest rooted slot that will be loaded
944+
by the snapshot tiles. The highest manifest slot may change
945+
if the snapshot tiles retry snapshot loading, but it is
946+
guaranteed to be monotonically increasing. */
947+
/* TODO: currently a no-op. Repair cannot handle an receiving an
948+
incrementally changing rooted slot and stake weights yet. */
949+
break;
950+
}
952951
case FD_SSMSG_MANIFEST_FULL:
953952
case FD_SSMSG_MANIFEST_INCREMENTAL: {
954953
/* We may either receive a full snapshot manifest or an
@@ -1043,8 +1042,8 @@ after_frag( fd_replay_tile_ctx_t * ctx,
10431042
ulong seq FD_PARAM_UNUSED,
10441043
ulong sig,
10451044
ulong sz FD_PARAM_UNUSED,
1046-
ulong tsorig FD_PARAM_UNUSED,
1047-
ulong tspub FD_PARAM_UNUSED,
1045+
ulong tsorig,
1046+
ulong tspub,
10481047
fd_stem_context_t * stem FD_PARAM_UNUSED ) {
10491048
if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_TOWER ) ) {
10501049
ulong root = sig;
@@ -1074,7 +1073,7 @@ after_frag( fd_replay_tile_ctx_t * ctx,
10741073

10751074
fd_fseq_update( ctx->published_wmark, root );
10761075
} else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SNAP ) ) {
1077-
on_snapshot_message( ctx, stem, in_idx, ctx->_snap_out_chunk, sig );
1076+
on_snapshot_message( ctx, stem, in_idx, ctx->_snap_out_chunk, sig, tsorig, tspub );
10781077
} else if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_REPAIR ) ) {
10791078

10801079
/* Forks form a partial ordering over FEC sets. The Repair tile

src/discof/restore/fd_snapdc_tile.c

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,11 @@ transition_malformed( fd_snapdc_tile_t * ctx,
101101
static inline void
102102
handle_control_frag( fd_snapdc_tile_t * ctx,
103103
fd_stem_context_t * stem,
104-
ulong sig ) {
104+
ulong sig,
105+
ulong tsorig,
106+
ulong tspub ) {
105107
/* 1. Pass the control message downstream to the next consumer. */
106-
fd_stem_publish( stem, 0UL, sig, ctx->out.chunk, 0UL, 0UL, 0UL, 0UL );
108+
fd_stem_publish( stem, 0UL, sig, ctx->out.chunk, 0UL, 0UL, tsorig, tspub );
107109
ulong error = ZSTD_DCtx_reset( ctx->zstd, ZSTD_reset_session_only );
108110
if( FD_UNLIKELY( ZSTD_isError( error ) ) ) FD_LOG_ERR(( "ZSTD_DCtx_reset failed (%lu-%s)", error, ZSTD_getErrorName( error ) ));
109111

@@ -148,6 +150,9 @@ handle_control_frag( fd_snapdc_tile_t * ctx,
148150
FD_TEST( ctx->state==FD_SNAPDC_STATE_DONE );
149151
ctx->state = FD_SNAPDC_STATE_SHUTDOWN;
150152
break;
153+
case FD_SNAPSHOT_MSG_HIGHEST_MANIFEST_SLOT:
154+
/* Informational control message forwarded to snapin */
155+
break;
151156
default:
152157
FD_LOG_ERR(( "unexpected control sig %lu", sig ));
153158
return;
@@ -254,7 +259,7 @@ returnable_frag( fd_snapdc_tile_t * ctx,
254259
FD_TEST( ctx->state!=FD_SNAPDC_STATE_SHUTDOWN );
255260

256261
if( FD_LIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) return handle_data_frag( ctx, stem, chunk, sz );
257-
else handle_control_frag( ctx,stem, sig );
262+
else handle_control_frag( ctx,stem, sig, tsorig, tspub );
258263

259264
return 0;
260265
}

src/discof/restore/fd_snapin_tile.c

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,9 @@ handle_data_frag( fd_snapin_tile_t * ctx,
214214
static void
215215
handle_control_frag( fd_snapin_tile_t * ctx,
216216
fd_stem_context_t * stem,
217-
ulong sig ) {
217+
ulong sig,
218+
ulong tsorig,
219+
ulong tspub ) {
218220
switch( sig ) {
219221
case FD_SNAPSHOT_MSG_CTRL_RESET_FULL:
220222
ctx->full = 1;
@@ -257,6 +259,11 @@ handle_control_frag( fd_snapin_tile_t * ctx,
257259
case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
258260
ctx->state = FD_SNAPIN_STATE_SHUTDOWN;
259261
break;
262+
case FD_SNAPSHOT_MSG_HIGHEST_MANIFEST_SLOT:
263+
/* Informational control message forwarded to snap_out */
264+
fd_stem_publish( stem, 0UL, FD_SSMSG_HIGHEST_MANIFEST_SLOT, 0UL, 0UL, 0UL, tsorig, tspub );
265+
/* No need to send an ack for an informational control message */
266+
return;
260267
default:
261268
FD_LOG_ERR(( "unexpected control sig %lu", sig ));
262269
return;
@@ -289,7 +296,7 @@ returnable_frag( fd_snapin_tile_t * ctx,
289296
FD_TEST( ctx->state!=FD_SNAPIN_STATE_SHUTDOWN );
290297

291298
if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) handle_data_frag( ctx, chunk, sz, stem );
292-
else handle_control_frag( ctx, stem, sig );
299+
else handle_control_frag( ctx, stem, sig, tsorig, tspub );
293300

294301
return 0;
295302
}

src/discof/restore/fd_snaprd_tile.c

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "utils/fd_sshttp.h"
33
#include "utils/fd_ssctrl.h"
44
#include "utils/fd_ssarchive.h"
5+
#include "utils/fd_ssmsg.h"
56

67
#include "../../disco/topo/fd_topo.h"
78
#include "../../disco/metrics/fd_metrics.h"
@@ -74,10 +75,17 @@ struct fd_snaprd_tile {
7475
} local_in;
7576

7677
struct {
77-
char full_path[ PATH_MAX ];
78-
ulong full_len;
79-
char incremental_path[ PATH_MAX ];
80-
ulong incremental_len;
78+
79+
struct {
80+
char path[ PATH_MAX ];
81+
ulong len;
82+
} full;
83+
84+
struct {
85+
char path[ PATH_MAX ];
86+
ulong len;
87+
} incremental;
88+
8189
} http;
8290

8391
struct {
@@ -328,11 +336,11 @@ rename_snapshots( fd_snaprd_tile_t * ctx ) {
328336
if( FD_UNLIKELY( -1==ctx->local_out.dir_fd ) ) return;
329337

330338
if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd ) ) {
331-
if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "snapshot.tar.bz2-partial", ctx->local_out.dir_fd, ctx->http.full_path ) ) )
339+
if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "snapshot.tar.bz2-partial", ctx->local_out.dir_fd, ctx->http.full.path ) ) )
332340
FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
333341
}
334342
if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd ) ) {
335-
if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "incremental-snapshot.tar.bz2-partial", ctx->local_out.dir_fd, ctx->http.incremental_path ) ) )
343+
if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "incremental-snapshot.tar.bz2-partial", ctx->local_out.dir_fd, ctx->http.incremental.path ) ) )
336344
FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
337345
}
338346
}
@@ -362,7 +370,7 @@ after_credit( fd_snaprd_tile_t * ctx,
362370

363371
switch ( ctx->state ) {
364372
case FD_SNAPRD_STATE_WAITING_FOR_PEERS: {
365-
fd_sspeer_t best = fd_ssping_best( ctx->ssping );
373+
fd_sspeer_t best = fd_ssping_best( ctx->ssping, 0UL );
366374
if( FD_LIKELY( best.addr.l ) ) {
367375
ctx->state = FD_SNAPRD_STATE_COLLECTING_PEERS;
368376
ctx->deadline_nanos = now + 500L*1000L*1000L;
@@ -372,7 +380,14 @@ after_credit( fd_snaprd_tile_t * ctx,
372380
case FD_SNAPRD_STATE_COLLECTING_PEERS: {
373381
if( FD_UNLIKELY( now<ctx->deadline_nanos ) ) break;
374382

375-
fd_sspeer_t best = fd_ssping_best( ctx->ssping );
383+
ulong highest_slot = 0UL;
384+
if( FD_LIKELY( ctx->peer.addr.l ) ) {
385+
highest_slot = ctx->peer.snapshot_info->incremental.slot!=ULONG_MAX ?
386+
ctx->peer.snapshot_info->incremental.slot :
387+
ctx->peer.snapshot_info->full.slot;
388+
}
389+
390+
fd_sspeer_t best = fd_ssping_best( ctx->ssping, highest_slot );
376391
if( FD_UNLIKELY( !best.addr.l ) ) {
377392
ctx->state = FD_SNAPRD_STATE_WAITING_FOR_PEERS;
378393
break;
@@ -385,17 +400,29 @@ after_credit( fd_snaprd_tile_t * ctx,
385400
} else {
386401
char path[ PATH_MAX ];
387402
char encoded_full_hash[ FD_BASE58_ENCODED_32_SZ ];
388-
char encoded_incremental_hash[ FD_BASE58_ENCODED_32_SZ ];
403+
404+
/* Generate the http paths */
389405
fd_base58_encode_32( best.snapshot_info->full.hash, NULL, encoded_full_hash );
390-
fd_base58_encode_32( best.snapshot_info->incremental.hash, NULL, encoded_incremental_hash );
391-
FD_TEST( fd_cstr_printf_check( ctx->http.full_path, PATH_MAX, &ctx->http.full_len, "snapshot-%lu-%s.tar.zst", best.snapshot_info->full.slot, encoded_full_hash ) );
392-
FD_TEST( fd_cstr_printf_check( ctx->http.incremental_path, PATH_MAX, &ctx->http.incremental_len, "incremental-snapshot-%lu-%lu-%s.tar.zst", best.snapshot_info->incremental.base_slot, best.snapshot_info->incremental.slot, encoded_incremental_hash ) );
393-
FD_TEST( fd_cstr_printf_check( path, PATH_MAX, NULL, "/%s", ctx->http.full_path ) );
406+
FD_TEST( fd_cstr_printf_check( ctx->http.full.path, PATH_MAX, &ctx->http.full.len, "snapshot-%lu-%s.tar.zst", best.snapshot_info->full.slot, encoded_full_hash ) );
407+
FD_TEST( fd_cstr_printf_check( path, PATH_MAX, NULL, "/%s", ctx->http.full.path ) );
408+
409+
if( ctx->config.incremental_snapshot_fetch ) {
410+
char encoded_incremental_hash[ FD_BASE58_ENCODED_32_SZ ];
411+
fd_base58_encode_32( best.snapshot_info->incremental.hash, NULL, encoded_incremental_hash );
412+
FD_TEST( fd_cstr_printf_check( ctx->http.incremental.path, PATH_MAX, &ctx->http.incremental.len, "incremental-snapshot-%lu-%lu-%s.tar.zst", best.snapshot_info->incremental.base_slot, best.snapshot_info->incremental.slot, encoded_incremental_hash ) );
413+
}
414+
415+
uint low;
416+
uint high;
417+
/* send the highest manifest slot */
418+
ulong highest_manifest_slot = ctx->config.incremental_snapshot_fetch ? best.snapshot_info->incremental.slot : best.snapshot_info->full.slot;
419+
fd_ssmsg_slot_to_frag( highest_manifest_slot, &low, &high );
420+
fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_HIGHEST_MANIFEST_SLOT, 0UL, 0UL, 0UL, low, high );
394421

395-
FD_LOG_NOTICE(( "downloading full snapshot from http://" FD_IP4_ADDR_FMT ":%hu/%s", FD_IP4_ADDR_FMT_ARGS( best.addr.addr ), best.addr.port, ctx->http.full_path ));
422+
FD_LOG_NOTICE(( "downloading full snapshot from http://" FD_IP4_ADDR_FMT ":%hu/%s", FD_IP4_ADDR_FMT_ARGS( best.addr.addr ), best.addr.port, ctx->http.full.path ));
396423
ctx->peer = best;
397424
ctx->state = FD_SNAPRD_STATE_READING_FULL_HTTP;
398-
fd_sshttp_init( ctx->sshttp, best.addr, path, ctx->http.full_len + 1UL, now );
425+
fd_sshttp_init( ctx->sshttp, best.addr, path, ctx->http.full.len + 1UL, now );
399426
}
400427
break;
401428
}
@@ -460,9 +487,9 @@ after_credit( fd_snaprd_tile_t * ctx,
460487
}
461488

462489
char path[ PATH_MAX ];
463-
FD_TEST( fd_cstr_printf_check( path, PATH_MAX, NULL, "/%s", ctx->http.incremental_path ) );
490+
FD_TEST( fd_cstr_printf_check( path, PATH_MAX, NULL, "/%s", ctx->http.incremental.path ) );
464491
FD_LOG_NOTICE(( "downloading incremental snapshot from http://" FD_IP4_ADDR_FMT ":%hu/%s", FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), ctx->peer.addr.port, path));
465-
fd_sshttp_init( ctx->sshttp, ctx->peer.addr, path, ctx->http.incremental_len + 1UL, fd_log_wallclock() );
492+
fd_sshttp_init( ctx->sshttp, ctx->peer.addr, path, ctx->http.incremental.len + 1UL, fd_log_wallclock() );
466493
ctx->state = FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP;
467494
break;
468495
case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET:
@@ -692,6 +719,8 @@ unprivileged_init( fd_topo_t * topo,
692719
ctx->out.wmark = fd_dcache_compact_wmark ( ctx->out.wksp, topo->links[ tile->out_link_id[ 0 ] ].dcache, topo->links[ tile->out_link_id[ 0 ] ].mtu );
693720
ctx->out.chunk = ctx->out.chunk0;
694721
ctx->out.mtu = topo->links[ tile->out_link_id[ 0 ] ].mtu;
722+
723+
fd_memset( &ctx->peer, 0, sizeof(ctx->peer) );
695724
}
696725

697726
#define STEM_BURST 2UL /* One control message, and one data message */

src/discof/restore/utils/fd_ssctrl.h

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,20 @@
4040

4141
#define FD_SNAPSHOT_MSG_DATA (0UL) /* Fragment represents some snapshot data */
4242

43-
#define FD_SNAPSHOT_MSG_CTRL_RESET_FULL (1UL) /* Reset to start loading a fresh full snapshot */
44-
#define FD_SNAPSHOT_MSG_CTRL_EOF_FULL (2UL) /* Full snapshot data is done, incremental data starting now */
45-
#define FD_SNAPSHOT_MSG_CTRL_RESET_INCREMENTAL (3UL) /* Incremental data being retried, start incremental over */
46-
#define FD_SNAPSHOT_MSG_CTRL_DONE (4UL) /* Snapshot load is over, data is finished for this tile */
47-
#define FD_SNAPSHOT_MSG_CTRL_SHUTDOWN (5UL) /* All tiles have acknowledged snapshot load is done, can now shutdown */
48-
49-
#define FD_SNAPSHOT_MSG_CTRL_ACK (6UL) /* Sent from tiles back to snaprd, meaning they ACK whatever control message was pending */
50-
#define FD_SNAPSHOT_MSG_CTRL_MALFORMED (7UL) /* Sent from tiles back to snaprd, meaning they consider the current snapshot malformed */
43+
/* The HIGHEST_MANIFEST_SLOT message is an informational control message
44+
that is forwarded from the snaprd tile to the snapin tile, which
45+
forwards the message to the snap_out link. The HIGHEST_MANIFEST_SLOT
46+
message is guaranteed to be monotonically increasing and is forwarded
47+
regardless of any snapshot loading error / retry. */
48+
#define FD_SNAPSHOT_MSG_HIGHEST_MANIFEST_SLOT (1UL) /* Fragment contains the highest manifest slot so far, guaranteed to be monotonically increasing */
49+
50+
#define FD_SNAPSHOT_MSG_CTRL_RESET_FULL (2UL) /* Reset to start loading a fresh full snapshot */
51+
#define FD_SNAPSHOT_MSG_CTRL_EOF_FULL (3UL) /* Full snapshot data is done, incremental data starting now */
52+
#define FD_SNAPSHOT_MSG_CTRL_RESET_INCREMENTAL (4UL) /* Incremental data being retried, start incremental over */
53+
#define FD_SNAPSHOT_MSG_CTRL_DONE (5UL) /* Snapshot load is over, data is finished for this tile */
54+
#define FD_SNAPSHOT_MSG_CTRL_SHUTDOWN (6UL) /* All tiles have acknowledged snapshot load is done, can now shutdown */
55+
56+
#define FD_SNAPSHOT_MSG_CTRL_ACK (7UL) /* Sent from tiles back to snaprd, meaning they ACK whatever control message was pending */
57+
#define FD_SNAPSHOT_MSG_CTRL_MALFORMED (8UL) /* Sent from tiles back to snaprd, meaning they consider the current snapshot malformed */
5158

5259
#endif /* HEADER_fd_src_discof_restore_utils_fd_ssctrl_h */

src/discof/restore/utils/fd_ssmsg.h

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33

44
#include "../../../flamenco/types/fd_types.h"
55

6-
#define FD_SSMSG_MANIFEST_FULL (0) /* A snapshot manifest message from the full snapshot */
7-
#define FD_SSMSG_MANIFEST_INCREMENTAL (1) /* A snapshot manifest message from the incremental snapshot */
8-
#define FD_SSMSG_DONE (2) /* Indicates the snapshot is fully loaded and tiles are shutting down */
6+
#define FD_SSMSG_MANIFEST_FULL (0) /* A snapshot manifest message from the full snapshot */
7+
#define FD_SSMSG_MANIFEST_INCREMENTAL (1) /* A snapshot manifest message from the incremental snapshot */
8+
#define FD_SSMSG_HIGHEST_MANIFEST_SLOT (2) /* The highest manifest slot so far, guaranteed to be monotonically increasing */
9+
#define FD_SSMSG_DONE (3) /* Indicates the snapshot is fully loaded and tiles are shutting down */
910

1011
/* This number is exhagerately high, but is consisent with Frankendancer.
1112
We need this to be about the number of validators, so roughly
@@ -24,6 +25,24 @@ fd_ssmsg_sig( ulong message,
2425
FD_FN_CONST static inline ulong fd_ssmsg_sig_manifest_size( ulong sig ) { return (sig >> 2); }
2526
FD_FN_CONST static inline ulong fd_ssmsg_sig_message( ulong sig ) { return (sig & 0x3UL); }
2627

28+
/* The FD_SSMSG_HIGHEST_MANIFEST_SLOT uses the tsorig and tspub fields
29+
of the fd_frag_meta_t struct to store the low and high 32 bits of
30+
the slot number. */
31+
static inline void
32+
fd_ssmsg_slot_to_frag( ulong slot,
33+
uint* low,
34+
uint* high ) {
35+
*low = (uint)(slot & 0xFFFFFFFFUL);
36+
*high = (uint)((slot >> 32UL) & 0xFFFFFFFFUL);
37+
}
38+
39+
static inline void
40+
fd_ssmsg_frag_to_slot( ulong low,
41+
ulong high,
42+
ulong* slot ) {
43+
*slot = (high << 32UL) | low;
44+
}
45+
2746
struct fd_snapshot_manifest_vote_account {
2847
/* The pubkey of the vote account */
2948
uchar vote_account_pubkey[ 32UL ];

src/discof/restore/utils/fd_ssping.c

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -242,14 +242,14 @@ fd_ssping_add( fd_ssping_t * ssping,
242242
if( FD_UNLIKELY( !peer_pool_free( ssping->pool ) ) ) return;
243243
peer = peer_pool_ele_acquire( ssping->pool );
244244
FD_TEST( peer );
245-
peer->refcnt = 0UL;
246-
peer->state = PEER_STATE_UNPINGED;
247-
peer->addr = addr;
245+
peer->refcnt = 0UL;
246+
peer->state = PEER_STATE_UNPINGED;
247+
peer->addr = addr;
248248
peer->snapshot_info.full.slot = ULONG_MAX;
249249
peer->snapshot_info.incremental.base_slot = ULONG_MAX;
250250
peer->snapshot_info.incremental.slot = ULONG_MAX;
251-
peer->full_latency_nanos = 0UL;
252-
peer->incremental_latency_nanos = 0UL;
251+
peer->full_latency_nanos = 0UL;
252+
peer->incremental_latency_nanos = 0UL;
253253
peer_map_ele_insert( ssping->map, peer, ssping->pool );
254254
deadline_list_ele_push_tail( ssping->unpinged, peer, ssping->pool );
255255
}
@@ -580,21 +580,24 @@ fd_ssping_advance( fd_ssping_t * ssping,
580580
}
581581

582582
fd_sspeer_t
583-
fd_ssping_best( fd_ssping_t const * ssping ) {
584-
score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( ssping->score_treap, ssping->pool );
585-
if( FD_UNLIKELY( score_treap_fwd_iter_done( iter ) ) ) {
586-
return (fd_sspeer_t){
587-
.addr = {
588-
.l = 0UL
589-
},
590-
.snapshot_info = NULL,
591-
};
583+
fd_ssping_best( fd_ssping_t const * ssping,
584+
ulong highest_slot ) {
585+
for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( ssping->score_treap, ssping->pool );
586+
!score_treap_fwd_iter_done( iter );
587+
iter = score_treap_fwd_iter_next( iter, ssping->pool ) ) {
588+
fd_ssping_peer_t const * best = score_treap_fwd_iter_ele_const( iter, ssping->pool );
589+
if( FD_LIKELY( best->snapshot_info.incremental.slot>=highest_slot ) ) {
590+
return (fd_sspeer_t){
591+
.addr = best->addr,
592+
.snapshot_info = &best->snapshot_info,
593+
};
594+
}
592595
}
593596

594-
fd_ssping_peer_t const * best = score_treap_fwd_iter_ele_const( iter, ssping->pool );
595-
596597
return (fd_sspeer_t){
597-
.addr = best->addr,
598-
.snapshot_info = &best->snapshot_info,
598+
.addr = {
599+
.l = 0UL
600+
},
601+
.snapshot_info = NULL,
599602
};
600603
}

0 commit comments

Comments
 (0)