Skip to content

snapshots: send highest manifest slot #5932

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 16 additions & 17 deletions src/discof/replay/fd_replay_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,9 @@ on_snapshot_message( fd_replay_tile_ctx_t * ctx,
fd_stem_context_t * stem,
ulong in_idx,
ulong chunk,
ulong sig ) {
ulong sig,
ulong tsorig FD_PARAM_UNUSED,
ulong tspub FD_PARAM_UNUSED) {
ulong msg = fd_ssmsg_sig_message( sig );
if( FD_LIKELY( msg==FD_SSMSG_DONE ) ) {
/* An end of message notification indicates the snapshot is loaded.
Expand All @@ -801,19 +803,6 @@ on_snapshot_message( fd_replay_tile_ctx_t * ctx,
state machine and set the state here accordingly. */
FD_LOG_INFO(("Snapshot loaded, replay can start executing"));
ctx->snapshot_init_done = 1;
/* Kickoff repair orphans after the snapshots are done loading. If
we kickoff repair after we receive a full manifest, we might try
to repair a slot that is potentially huge amount of slots behind
turbine causing our repair buffers to fill up. Instead, we should
wait until we are done receiving all the snapshots.

TODO: Eventually, this logic should be cased out more:
1. If we just have a full snapshot, load in the slot_ctx for the
slot ctx and kickoff repair as soon as the manifest is
received.
2. If we are loading a full and incremental snapshot, we should
only load in the slot_ctx and kickoff repair for the
incremental snapshot. */
kickoff_repair_orphans( ctx, stem );
init_from_snapshot( ctx, stem );
ulong curr_slot = fd_bank_slot_get( ctx->slot_ctx->bank );
Expand All @@ -824,6 +813,16 @@ on_snapshot_message( fd_replay_tile_ctx_t * ctx,
}

switch( msg ) {
case FD_SSMSG_HIGHEST_MANIFEST_SLOT: {
/* snapin sends the highest manifest slot so far to notify any
listeners of the highest rooted slot that will be loaded
by the snapshot tiles. The highest manifest slot may change
if the snapshot tiles retry snapshot loading, but it is
guaranteed to be monotonically increasing. */
/* TODO: currently a no-op. Repair cannot handle an receiving an
incrementally changing rooted slot and stake weights yet. */
break;
}
case FD_SSMSG_MANIFEST_FULL:
case FD_SSMSG_MANIFEST_INCREMENTAL: {
/* We may either receive a full snapshot manifest or an
Expand Down Expand Up @@ -944,8 +943,8 @@ after_frag( fd_replay_tile_ctx_t * ctx,
ulong seq FD_PARAM_UNUSED,
ulong sig,
ulong sz FD_PARAM_UNUSED,
ulong tsorig FD_PARAM_UNUSED,
ulong tspub FD_PARAM_UNUSED,
ulong tsorig,
ulong tspub,
fd_stem_context_t * stem FD_PARAM_UNUSED ) {
if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_ROOT ) ) {
ulong root = sig;
Expand Down Expand Up @@ -974,7 +973,7 @@ after_frag( fd_replay_tile_ctx_t * ctx,

fd_fseq_update( ctx->published_wmark, root );
} else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SNAP ) ) {
on_snapshot_message( ctx, stem, in_idx, ctx->_snap_out_chunk, sig );
on_snapshot_message( ctx, stem, in_idx, ctx->_snap_out_chunk, sig, tsorig, tspub );
} else if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_REPAIR ) ) {

/* Forks form a partial ordering over FEC sets. The Repair tile
Expand Down
1 change: 1 addition & 0 deletions src/discof/restore/Local.mk
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ endif
$(call add-objs,utils/fd_ssping,fd_discof)
$(call add-objs,utils/fd_sshttp,fd_discof)
$(call add-objs,utils/fd_ssarchive,fd_discof)
$(call add-objs,utils/fd_ssresolve,fd_discof)
11 changes: 8 additions & 3 deletions src/discof/restore/fd_snapdc_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,11 @@ transition_malformed( fd_snapdc_tile_t * ctx,
static inline void
handle_control_frag( fd_snapdc_tile_t * ctx,
fd_stem_context_t * stem,
ulong sig ) {
ulong sig,
ulong tsorig,
ulong tspub ) {
/* 1. Pass the control message downstream to the next consumer. */
fd_stem_publish( stem, 0UL, sig, ctx->out.chunk, 0UL, 0UL, 0UL, 0UL );
fd_stem_publish( stem, 0UL, sig, ctx->out.chunk, 0UL, 0UL, tsorig, tspub );
ulong error = ZSTD_DCtx_reset( ctx->zstd, ZSTD_reset_session_only );
if( FD_UNLIKELY( ZSTD_isError( error ) ) ) FD_LOG_ERR(( "ZSTD_DCtx_reset failed (%lu-%s)", error, ZSTD_getErrorName( error ) ));

Expand Down Expand Up @@ -148,6 +150,9 @@ handle_control_frag( fd_snapdc_tile_t * ctx,
FD_TEST( ctx->state==FD_SNAPDC_STATE_DONE );
ctx->state = FD_SNAPDC_STATE_SHUTDOWN;
break;
case FD_SNAPSHOT_MSG_HIGHEST_MANIFEST_SLOT:
/* Informational control message forwarded to snapin */
break;
default:
FD_LOG_ERR(( "unexpected control sig %lu", sig ));
return;
Expand Down Expand Up @@ -253,7 +258,7 @@ returnable_frag( fd_snapdc_tile_t * ctx,
FD_TEST( ctx->state!=FD_SNAPDC_STATE_SHUTDOWN );

if( FD_LIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) return handle_data_frag( ctx, stem, chunk, sz );
else handle_control_frag( ctx,stem, sig );
else handle_control_frag( ctx,stem, sig, tsorig, tspub );

return 0;
}
Expand Down
11 changes: 9 additions & 2 deletions src/discof/restore/fd_snapin_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,9 @@ handle_data_frag( fd_snapin_tile_t * ctx,
static void
handle_control_frag( fd_snapin_tile_t * ctx,
fd_stem_context_t * stem,
ulong sig ) {
ulong sig,
ulong tsorig,
ulong tspub ) {
switch( sig ) {
case FD_SNAPSHOT_MSG_CTRL_RESET_FULL:
ctx->full = 1;
Expand Down Expand Up @@ -257,6 +259,11 @@ handle_control_frag( fd_snapin_tile_t * ctx,
case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
ctx->state = FD_SNAPIN_STATE_SHUTDOWN;
break;
case FD_SNAPSHOT_MSG_HIGHEST_MANIFEST_SLOT:
/* Informational control message forwarded to snap_out */
fd_stem_publish( stem, 0UL, FD_SSMSG_HIGHEST_MANIFEST_SLOT, 0UL, 0UL, 0UL, tsorig, tspub );
/* No need to send an ack for an informational control message */
return;
default:
FD_LOG_ERR(( "unexpected control sig %lu", sig ));
return;
Expand Down Expand Up @@ -288,7 +295,7 @@ returnable_frag( fd_snapin_tile_t * ctx,
FD_TEST( ctx->state!=FD_SNAPIN_STATE_SHUTDOWN );

if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) handle_data_frag( ctx, chunk, sz, stem );
else handle_control_frag( ctx, stem, sig );
else handle_control_frag( ctx, stem, sig, tsorig, tspub );

return 0;
}
Expand Down
79 changes: 58 additions & 21 deletions src/discof/restore/fd_snaprd_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "utils/fd_sshttp.h"
#include "utils/fd_ssctrl.h"
#include "utils/fd_ssarchive.h"
#include "utils/fd_ssmsg.h"

#include "../../disco/topo/fd_topo.h"
#include "../../disco/metrics/fd_metrics.h"
Expand Down Expand Up @@ -49,7 +50,7 @@ struct fd_snaprd_tile {
ulong ack_cnt;
int peer_selection;

fd_ip4_port_t addr;
fd_sspeer_t peer;

struct {
ulong write_buffer_pos;
Expand All @@ -73,6 +74,20 @@ struct fd_snaprd_tile {
char incremental_snapshot_path[ PATH_MAX ];
} local_in;

struct {

struct {
char path[ PATH_MAX ];
ulong len;
} full;

struct {
char path[ PATH_MAX ];
ulong len;
} incremental;

} http;

struct {
char path[ PATH_MAX ];
int do_download;
Expand Down Expand Up @@ -204,8 +219,8 @@ read_http_data( fd_snaprd_tile_t * ctx,
case FD_SSHTTP_ADVANCE_AGAIN: break;
case FD_SSHTTP_ADVANCE_ERROR: {
FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
fd_ssping_invalidate( ctx->ssping, ctx->addr, now );
FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), ctx->peer.addr.port ));
fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, now );
fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
ctx->deadline_nanos = now;
Expand Down Expand Up @@ -319,16 +334,13 @@ drain_buffer( fd_snaprd_tile_t * ctx ) {
static void
rename_snapshots( fd_snaprd_tile_t * ctx ) {
if( FD_UNLIKELY( -1==ctx->local_out.dir_fd ) ) return;
char const * full_snapshot_name;
char const * incremental_snapshot_name;
fd_sshttp_snapshot_names( ctx->sshttp, &full_snapshot_name, &incremental_snapshot_name );

if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd ) ) {
if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "snapshot.tar.bz2-partial", ctx->local_out.dir_fd, full_snapshot_name ) ) )
if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "snapshot.tar.bz2-partial", ctx->local_out.dir_fd, ctx->http.full.path ) ) )
FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
}
if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd ) ) {
if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "incremental-snapshot.tar.bz2-partial", ctx->local_out.dir_fd, incremental_snapshot_name ) ) )
if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "incremental-snapshot.tar.bz2-partial", ctx->local_out.dir_fd, ctx->http.incremental.path ) ) )
FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
}
}
Expand Down Expand Up @@ -357,8 +369,8 @@ after_credit( fd_snaprd_tile_t * ctx,

switch ( ctx->state ) {
case FD_SNAPRD_STATE_WAITING_FOR_PEERS: {
fd_ip4_port_t best = fd_ssping_best( ctx->ssping );
if( FD_LIKELY( best.l ) ) {
fd_sspeer_t best = fd_ssping_best( ctx->ssping );
if( FD_LIKELY( best.addr.l ) ) {
ctx->state = FD_SNAPRD_STATE_COLLECTING_PEERS;
ctx->deadline_nanos = now + 500L*1000L*1000L;
}
Expand All @@ -367,8 +379,8 @@ after_credit( fd_snaprd_tile_t * ctx,
case FD_SNAPRD_STATE_COLLECTING_PEERS: {
if( FD_UNLIKELY( now<ctx->deadline_nanos ) ) break;

fd_ip4_port_t best = fd_ssping_best( ctx->ssping );
if( FD_UNLIKELY( !best.l ) ) {
fd_sspeer_t best = fd_ssping_best( ctx->ssping );
if( FD_UNLIKELY( !best.addr.l ) ) {
ctx->state = FD_SNAPRD_STATE_WAITING_FOR_PEERS;
break;
}
Expand All @@ -378,10 +390,31 @@ after_credit( fd_snaprd_tile_t * ctx,
FD_LOG_NOTICE(( "loading full snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
ctx->state = FD_SNAPRD_STATE_READING_FULL_FILE;
} else {
FD_LOG_NOTICE(( "downloading full snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( best.addr ), best.port ));
ctx->addr = best;
char path[ PATH_MAX ];
char encoded_full_hash[ FD_BASE58_ENCODED_32_SZ ];

/* Generate the http paths */
fd_base58_encode_32( best.snapshot_info->full.hash, NULL, encoded_full_hash );
FD_TEST( fd_cstr_printf_check( ctx->http.full.path, PATH_MAX, &ctx->http.full.len, "snapshot-%lu-%s.tar.zst", best.snapshot_info->full.slot, encoded_full_hash ) );
FD_TEST( fd_cstr_printf_check( path, PATH_MAX, NULL, "/%s", ctx->http.full.path ) );

if( ctx->config.incremental_snapshot_fetch ) {
char encoded_incremental_hash[ FD_BASE58_ENCODED_32_SZ ];
fd_base58_encode_32( best.snapshot_info->incremental.hash, NULL, encoded_incremental_hash );
FD_TEST( fd_cstr_printf_check( ctx->http.incremental.path, PATH_MAX, &ctx->http.incremental.len, "incremental-snapshot-%lu-%lu-%s.tar.zst", best.snapshot_info->incremental.base_slot, best.snapshot_info->incremental.slot, encoded_incremental_hash ) );
}

uint low;
uint high;
/* send the highest manifest slot */
ulong highest_manifest_slot = ctx->config.incremental_snapshot_fetch ? best.snapshot_info->incremental.slot : best.snapshot_info->full.slot;
fd_ssmsg_slot_to_frag( highest_manifest_slot, &low, &high );
fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_HIGHEST_MANIFEST_SLOT, 0UL, 0UL, 0UL, low, high );

FD_LOG_NOTICE(( "downloading full snapshot from http://" FD_IP4_ADDR_FMT ":%hu/%s", FD_IP4_ADDR_FMT_ARGS( best.addr.addr ), best.addr.port, ctx->http.full.path ));
ctx->peer = best;
ctx->state = FD_SNAPRD_STATE_READING_FULL_HTTP;
fd_sshttp_init( ctx->sshttp, best, "/snapshot.tar.bz2", 17UL, now );
fd_sshttp_init( ctx->sshttp, best.addr, path, ctx->http.full.len + 1UL, now );
}
break;
}
Expand Down Expand Up @@ -445,8 +478,10 @@ after_credit( fd_snaprd_tile_t * ctx,
break;
}

FD_LOG_NOTICE(( "downloading incremental snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
fd_sshttp_init( ctx->sshttp, ctx->addr, "/incremental-snapshot.tar.bz2", 29UL, fd_log_wallclock() );
char path[ PATH_MAX ];
FD_TEST( fd_cstr_printf_check( path, PATH_MAX, NULL, "/%s", ctx->http.incremental.path ) );
FD_LOG_NOTICE(( "downloading incremental snapshot from http://" FD_IP4_ADDR_FMT ":%hu/%s", FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), ctx->peer.addr.port, path));
fd_sshttp_init( ctx->sshttp, ctx->peer.addr, path, ctx->http.incremental.len + 1UL, fd_log_wallclock() );
ctx->state = FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP;
break;
case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET:
Expand Down Expand Up @@ -508,18 +543,18 @@ after_frag( fd_snaprd_tile_t * ctx,
case FD_SNAPRD_STATE_READING_FULL_HTTP:
case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP:
FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), ctx->peer.addr.port ));
fd_sshttp_cancel( ctx->sshttp );
fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() );
fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, fd_log_wallclock() );
fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
break;
case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP:
case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP:
FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), ctx->peer.addr.port ));
fd_sshttp_cancel( ctx->sshttp );
fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() );
fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, fd_log_wallclock() );
/* We would like to transition to FULL_HTTP_RESET, but we can't
do it just yet, because we have already sent a DONE control
fragment, and need to wait for acknowledges to come back
Expand Down Expand Up @@ -676,6 +711,8 @@ unprivileged_init( fd_topo_t * topo,
ctx->out.wmark = fd_dcache_compact_wmark ( ctx->out.wksp, topo->links[ tile->out_link_id[ 0 ] ].dcache, topo->links[ tile->out_link_id[ 0 ] ].mtu );
ctx->out.chunk = ctx->out.chunk0;
ctx->out.mtu = topo->links[ tile->out_link_id[ 0 ] ].mtu;

fd_memset( &ctx->peer, 0, sizeof(ctx->peer) );
}

#define STEM_BURST 2UL /* One control message, and one data message */
Expand Down
26 changes: 18 additions & 8 deletions src/discof/restore/utils/fd_ssctrl.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,23 @@

#define FD_SNAPSHOT_MSG_DATA (0UL) /* Fragment represents some snapshot data */

#define FD_SNAPSHOT_MSG_CTRL_RESET_FULL (1UL) /* Reset to start loading a fresh full snapshot */
#define FD_SNAPSHOT_MSG_CTRL_EOF_FULL (2UL) /* Full snapshot data is done, incremental data starting now */
#define FD_SNAPSHOT_MSG_CTRL_RESET_INCREMENTAL (3UL) /* Incremental data being retried, start incremental over */
#define FD_SNAPSHOT_MSG_CTRL_DONE (4UL) /* Snapshot load is over, data is finished for this tile */
#define FD_SNAPSHOT_MSG_CTRL_SHUTDOWN (5UL) /* All tiles have acknowledged snapshot load is done, can now shutdown */

#define FD_SNAPSHOT_MSG_CTRL_ACK (6UL) /* Sent from tiles back to snaprd, meaning they ACK whatever control message was pending */
#define FD_SNAPSHOT_MSG_CTRL_MALFORMED (7UL) /* Sent from tiles back to snaprd, meaning they consider the current snapshot malformed */
/* The HIGHEST_MANIFEST_SLOT message is an informational control message
that is forwarded from the snaprd tile to the snapin tile, which
forwards the message to the snap_out link. The HIGHEST_MANIFEST_SLOT
message contains the highest manifest slot so far. It is typically
the incremental snapshot slot but can be the full snapshot slot if
incremental snapshots are disabled. It is guaranteed to be
monotonically increasing and is forwarded regardless of any snapshot
loading error / retry. */
#define FD_SNAPSHOT_MSG_HIGHEST_MANIFEST_SLOT (1UL) /* Fragment contains the highest manifest slot so far, guaranteed to be monotonically increasing */

#define FD_SNAPSHOT_MSG_CTRL_RESET_FULL (2UL) /* Reset to start loading a fresh full snapshot */
#define FD_SNAPSHOT_MSG_CTRL_EOF_FULL (3UL) /* Full snapshot data is done, incremental data starting now */
#define FD_SNAPSHOT_MSG_CTRL_RESET_INCREMENTAL (4UL) /* Incremental data being retried, start incremental over */
#define FD_SNAPSHOT_MSG_CTRL_DONE (5UL) /* Snapshot load is over, data is finished for this tile */
#define FD_SNAPSHOT_MSG_CTRL_SHUTDOWN (6UL) /* All tiles have acknowledged snapshot load is done, can now shutdown */

#define FD_SNAPSHOT_MSG_CTRL_ACK (7UL) /* Sent from tiles back to snaprd, meaning they ACK whatever control message was pending */
#define FD_SNAPSHOT_MSG_CTRL_MALFORMED (8UL) /* Sent from tiles back to snaprd, meaning they consider the current snapshot malformed */

#endif /* HEADER_fd_src_discof_restore_utils_fd_ssctrl_h */
Loading
Loading