diff --git a/src/discof/replay/fd_replay_tile.c b/src/discof/replay/fd_replay_tile.c index 07b6820499c..4af2f6d65b7 100644 --- a/src/discof/replay/fd_replay_tile.c +++ b/src/discof/replay/fd_replay_tile.c @@ -792,7 +792,9 @@ on_snapshot_message( fd_replay_tile_ctx_t * ctx, fd_stem_context_t * stem, ulong in_idx, ulong chunk, - ulong sig ) { + ulong sig, + ulong tsorig FD_PARAM_UNUSED, + ulong tspub FD_PARAM_UNUSED) { ulong msg = fd_ssmsg_sig_message( sig ); if( FD_LIKELY( msg==FD_SSMSG_DONE ) ) { /* An end of message notification indicates the snapshot is loaded. @@ -801,19 +803,6 @@ on_snapshot_message( fd_replay_tile_ctx_t * ctx, state machine and set the state here accordingly. */ FD_LOG_INFO(("Snapshot loaded, replay can start executing")); ctx->snapshot_init_done = 1; - /* Kickoff repair orphans after the snapshots are done loading. If - we kickoff repair after we receive a full manifest, we might try - to repair a slot that is potentially huge amount of slots behind - turbine causing our repair buffers to fill up. Instead, we should - wait until we are done receiving all the snapshots. - - TODO: Eventually, this logic should be cased out more: - 1. If we just have a full snapshot, load in the slot_ctx for the - slot ctx and kickoff repair as soon as the manifest is - received. - 2. If we are loading a full and incremental snapshot, we should - only load in the slot_ctx and kickoff repair for the - incremental snapshot. */ kickoff_repair_orphans( ctx, stem ); init_from_snapshot( ctx, stem ); ulong curr_slot = fd_bank_slot_get( ctx->slot_ctx->bank ); @@ -824,6 +813,16 @@ on_snapshot_message( fd_replay_tile_ctx_t * ctx, } switch( msg ) { + case FD_SSMSG_HIGHEST_MANIFEST_SLOT: { + /* snapin sends the highest manifest slot so far to notify any + listeners of the highest rooted slot that will be loaded + by the snapshot tiles. The highest manifest slot may change + if the snapshot tiles retry snapshot loading, but it is + guaranteed to be monotonically increasing. */ + /* TODO: currently a no-op. Repair cannot handle an receiving an + incrementally changing rooted slot and stake weights yet. */ + break; + } case FD_SSMSG_MANIFEST_FULL: case FD_SSMSG_MANIFEST_INCREMENTAL: { /* We may either receive a full snapshot manifest or an @@ -944,8 +943,8 @@ after_frag( fd_replay_tile_ctx_t * ctx, ulong seq FD_PARAM_UNUSED, ulong sig, ulong sz FD_PARAM_UNUSED, - ulong tsorig FD_PARAM_UNUSED, - ulong tspub FD_PARAM_UNUSED, + ulong tsorig, + ulong tspub, fd_stem_context_t * stem FD_PARAM_UNUSED ) { if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_ROOT ) ) { ulong root = sig; @@ -974,7 +973,7 @@ after_frag( fd_replay_tile_ctx_t * ctx, fd_fseq_update( ctx->published_wmark, root ); } else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SNAP ) ) { - on_snapshot_message( ctx, stem, in_idx, ctx->_snap_out_chunk, sig ); + on_snapshot_message( ctx, stem, in_idx, ctx->_snap_out_chunk, sig, tsorig, tspub ); } else if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_REPAIR ) ) { /* Forks form a partial ordering over FEC sets. The Repair tile diff --git a/src/discof/restore/Local.mk b/src/discof/restore/Local.mk index a4c1d484542..e09a876709d 100644 --- a/src/discof/restore/Local.mk +++ b/src/discof/restore/Local.mk @@ -23,3 +23,4 @@ endif $(call add-objs,utils/fd_ssping,fd_discof) $(call add-objs,utils/fd_sshttp,fd_discof) $(call add-objs,utils/fd_ssarchive,fd_discof) +$(call add-objs,utils/fd_ssresolve,fd_discof) diff --git a/src/discof/restore/fd_snapdc_tile.c b/src/discof/restore/fd_snapdc_tile.c index 3baf7d6bde4..d63ed11750f 100644 --- a/src/discof/restore/fd_snapdc_tile.c +++ b/src/discof/restore/fd_snapdc_tile.c @@ -101,9 +101,11 @@ transition_malformed( fd_snapdc_tile_t * ctx, static inline void handle_control_frag( fd_snapdc_tile_t * ctx, fd_stem_context_t * stem, - ulong sig ) { + ulong sig, + ulong tsorig, + ulong tspub ) { /* 1. Pass the control message downstream to the next consumer. */ - fd_stem_publish( stem, 0UL, sig, ctx->out.chunk, 0UL, 0UL, 0UL, 0UL ); + fd_stem_publish( stem, 0UL, sig, ctx->out.chunk, 0UL, 0UL, tsorig, tspub ); ulong error = ZSTD_DCtx_reset( ctx->zstd, ZSTD_reset_session_only ); if( FD_UNLIKELY( ZSTD_isError( error ) ) ) FD_LOG_ERR(( "ZSTD_DCtx_reset failed (%lu-%s)", error, ZSTD_getErrorName( error ) )); @@ -148,6 +150,9 @@ handle_control_frag( fd_snapdc_tile_t * ctx, FD_TEST( ctx->state==FD_SNAPDC_STATE_DONE ); ctx->state = FD_SNAPDC_STATE_SHUTDOWN; break; + case FD_SNAPSHOT_MSG_HIGHEST_MANIFEST_SLOT: + /* Informational control message forwarded to snapin */ + break; default: FD_LOG_ERR(( "unexpected control sig %lu", sig )); return; @@ -253,7 +258,7 @@ returnable_frag( fd_snapdc_tile_t * ctx, FD_TEST( ctx->state!=FD_SNAPDC_STATE_SHUTDOWN ); if( FD_LIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) return handle_data_frag( ctx, stem, chunk, sz ); - else handle_control_frag( ctx,stem, sig ); + else handle_control_frag( ctx,stem, sig, tsorig, tspub ); return 0; } diff --git a/src/discof/restore/fd_snapin_tile.c b/src/discof/restore/fd_snapin_tile.c index 85c27470308..beb2040c029 100644 --- a/src/discof/restore/fd_snapin_tile.c +++ b/src/discof/restore/fd_snapin_tile.c @@ -214,7 +214,9 @@ handle_data_frag( fd_snapin_tile_t * ctx, static void handle_control_frag( fd_snapin_tile_t * ctx, fd_stem_context_t * stem, - ulong sig ) { + ulong sig, + ulong tsorig, + ulong tspub ) { switch( sig ) { case FD_SNAPSHOT_MSG_CTRL_RESET_FULL: ctx->full = 1; @@ -257,6 +259,11 @@ handle_control_frag( fd_snapin_tile_t * ctx, case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN: ctx->state = FD_SNAPIN_STATE_SHUTDOWN; break; + case FD_SNAPSHOT_MSG_HIGHEST_MANIFEST_SLOT: + /* Informational control message forwarded to snap_out */ + fd_stem_publish( stem, 0UL, FD_SSMSG_HIGHEST_MANIFEST_SLOT, 0UL, 0UL, 0UL, tsorig, tspub ); + /* No need to send an ack for an informational control message */ + return; default: FD_LOG_ERR(( "unexpected control sig %lu", sig )); return; @@ -288,7 +295,7 @@ returnable_frag( fd_snapin_tile_t * ctx, FD_TEST( ctx->state!=FD_SNAPIN_STATE_SHUTDOWN ); if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) handle_data_frag( ctx, chunk, sz, stem ); - else handle_control_frag( ctx, stem, sig ); + else handle_control_frag( ctx, stem, sig, tsorig, tspub ); return 0; } diff --git a/src/discof/restore/fd_snaprd_tile.c b/src/discof/restore/fd_snaprd_tile.c index 7465adcd0a2..906b009fca9 100644 --- a/src/discof/restore/fd_snaprd_tile.c +++ b/src/discof/restore/fd_snaprd_tile.c @@ -2,6 +2,7 @@ #include "utils/fd_sshttp.h" #include "utils/fd_ssctrl.h" #include "utils/fd_ssarchive.h" +#include "utils/fd_ssmsg.h" #include "../../disco/topo/fd_topo.h" #include "../../disco/metrics/fd_metrics.h" @@ -49,7 +50,7 @@ struct fd_snaprd_tile { ulong ack_cnt; int peer_selection; - fd_ip4_port_t addr; + fd_sspeer_t peer; struct { ulong write_buffer_pos; @@ -73,6 +74,20 @@ struct fd_snaprd_tile { char incremental_snapshot_path[ PATH_MAX ]; } local_in; + struct { + + struct { + char path[ PATH_MAX ]; + ulong len; + } full; + + struct { + char path[ PATH_MAX ]; + ulong len; + } incremental; + + } http; + struct { char path[ PATH_MAX ]; int do_download; @@ -204,8 +219,8 @@ read_http_data( fd_snaprd_tile_t * ctx, case FD_SSHTTP_ADVANCE_AGAIN: break; case FD_SSHTTP_ADVANCE_ERROR: { FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2", - FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port )); - fd_ssping_invalidate( ctx->ssping, ctx->addr, now ); + FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), ctx->peer.addr.port )); + fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, now ); fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL ); ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET; ctx->deadline_nanos = now; @@ -319,16 +334,13 @@ drain_buffer( fd_snaprd_tile_t * ctx ) { static void rename_snapshots( fd_snaprd_tile_t * ctx ) { if( FD_UNLIKELY( -1==ctx->local_out.dir_fd ) ) return; - char const * full_snapshot_name; - char const * incremental_snapshot_name; - fd_sshttp_snapshot_names( ctx->sshttp, &full_snapshot_name, &incremental_snapshot_name ); if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd ) ) { - if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "snapshot.tar.bz2-partial", ctx->local_out.dir_fd, full_snapshot_name ) ) ) + if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "snapshot.tar.bz2-partial", ctx->local_out.dir_fd, ctx->http.full.path ) ) ) FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) )); } if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd ) ) { - if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "incremental-snapshot.tar.bz2-partial", ctx->local_out.dir_fd, incremental_snapshot_name ) ) ) + if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "incremental-snapshot.tar.bz2-partial", ctx->local_out.dir_fd, ctx->http.incremental.path ) ) ) FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) )); } } @@ -357,8 +369,8 @@ after_credit( fd_snaprd_tile_t * ctx, switch ( ctx->state ) { case FD_SNAPRD_STATE_WAITING_FOR_PEERS: { - fd_ip4_port_t best = fd_ssping_best( ctx->ssping ); - if( FD_LIKELY( best.l ) ) { + fd_sspeer_t best = fd_ssping_best( ctx->ssping ); + if( FD_LIKELY( best.addr.l ) ) { ctx->state = FD_SNAPRD_STATE_COLLECTING_PEERS; ctx->deadline_nanos = now + 500L*1000L*1000L; } @@ -367,8 +379,8 @@ after_credit( fd_snaprd_tile_t * ctx, case FD_SNAPRD_STATE_COLLECTING_PEERS: { if( FD_UNLIKELY( nowdeadline_nanos ) ) break; - fd_ip4_port_t best = fd_ssping_best( ctx->ssping ); - if( FD_UNLIKELY( !best.l ) ) { + fd_sspeer_t best = fd_ssping_best( ctx->ssping ); + if( FD_UNLIKELY( !best.addr.l ) ) { ctx->state = FD_SNAPRD_STATE_WAITING_FOR_PEERS; break; } @@ -378,10 +390,31 @@ after_credit( fd_snaprd_tile_t * ctx, FD_LOG_NOTICE(( "loading full snapshot from local file `%s`", ctx->local_in.full_snapshot_path )); ctx->state = FD_SNAPRD_STATE_READING_FULL_FILE; } else { - FD_LOG_NOTICE(( "downloading full snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( best.addr ), best.port )); - ctx->addr = best; + char path[ PATH_MAX ]; + char encoded_full_hash[ FD_BASE58_ENCODED_32_SZ ]; + + /* Generate the http paths */ + fd_base58_encode_32( best.snapshot_info->full.hash, NULL, encoded_full_hash ); + FD_TEST( fd_cstr_printf_check( ctx->http.full.path, PATH_MAX, &ctx->http.full.len, "snapshot-%lu-%s.tar.zst", best.snapshot_info->full.slot, encoded_full_hash ) ); + FD_TEST( fd_cstr_printf_check( path, PATH_MAX, NULL, "/%s", ctx->http.full.path ) ); + + if( ctx->config.incremental_snapshot_fetch ) { + char encoded_incremental_hash[ FD_BASE58_ENCODED_32_SZ ]; + fd_base58_encode_32( best.snapshot_info->incremental.hash, NULL, encoded_incremental_hash ); + FD_TEST( fd_cstr_printf_check( ctx->http.incremental.path, PATH_MAX, &ctx->http.incremental.len, "incremental-snapshot-%lu-%lu-%s.tar.zst", best.snapshot_info->incremental.base_slot, best.snapshot_info->incremental.slot, encoded_incremental_hash ) ); + } + + uint low; + uint high; + /* send the highest manifest slot */ + ulong highest_manifest_slot = ctx->config.incremental_snapshot_fetch ? best.snapshot_info->incremental.slot : best.snapshot_info->full.slot; + fd_ssmsg_slot_to_frag( highest_manifest_slot, &low, &high ); + fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_HIGHEST_MANIFEST_SLOT, 0UL, 0UL, 0UL, low, high ); + + FD_LOG_NOTICE(( "downloading full snapshot from http://" FD_IP4_ADDR_FMT ":%hu/%s", FD_IP4_ADDR_FMT_ARGS( best.addr.addr ), best.addr.port, ctx->http.full.path )); + ctx->peer = best; ctx->state = FD_SNAPRD_STATE_READING_FULL_HTTP; - fd_sshttp_init( ctx->sshttp, best, "/snapshot.tar.bz2", 17UL, now ); + fd_sshttp_init( ctx->sshttp, best.addr, path, ctx->http.full.len + 1UL, now ); } break; } @@ -445,8 +478,10 @@ after_credit( fd_snaprd_tile_t * ctx, break; } - FD_LOG_NOTICE(( "downloading incremental snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port )); - fd_sshttp_init( ctx->sshttp, ctx->addr, "/incremental-snapshot.tar.bz2", 29UL, fd_log_wallclock() ); + char path[ PATH_MAX ]; + FD_TEST( fd_cstr_printf_check( path, PATH_MAX, NULL, "/%s", ctx->http.incremental.path ) ); + FD_LOG_NOTICE(( "downloading incremental snapshot from http://" FD_IP4_ADDR_FMT ":%hu/%s", FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), ctx->peer.addr.port, path)); + fd_sshttp_init( ctx->sshttp, ctx->peer.addr, path, ctx->http.incremental.len + 1UL, fd_log_wallclock() ); ctx->state = FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP; break; case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET: @@ -508,18 +543,18 @@ after_frag( fd_snaprd_tile_t * ctx, case FD_SNAPRD_STATE_READING_FULL_HTTP: case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP: FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2", - FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port )); + FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), ctx->peer.addr.port )); fd_sshttp_cancel( ctx->sshttp ); - fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() ); + fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, fd_log_wallclock() ); fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL ); ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET; break; case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP: case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP: FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2", - FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port )); + FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), ctx->peer.addr.port )); fd_sshttp_cancel( ctx->sshttp ); - fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() ); + fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, fd_log_wallclock() ); /* We would like to transition to FULL_HTTP_RESET, but we can't do it just yet, because we have already sent a DONE control fragment, and need to wait for acknowledges to come back @@ -676,6 +711,8 @@ unprivileged_init( fd_topo_t * topo, ctx->out.wmark = fd_dcache_compact_wmark ( ctx->out.wksp, topo->links[ tile->out_link_id[ 0 ] ].dcache, topo->links[ tile->out_link_id[ 0 ] ].mtu ); ctx->out.chunk = ctx->out.chunk0; ctx->out.mtu = topo->links[ tile->out_link_id[ 0 ] ].mtu; + + fd_memset( &ctx->peer, 0, sizeof(ctx->peer) ); } #define STEM_BURST 2UL /* One control message, and one data message */ diff --git a/src/discof/restore/utils/fd_ssctrl.h b/src/discof/restore/utils/fd_ssctrl.h index f21fe288b49..ed7ba85f7b8 100644 --- a/src/discof/restore/utils/fd_ssctrl.h +++ b/src/discof/restore/utils/fd_ssctrl.h @@ -40,13 +40,23 @@ #define FD_SNAPSHOT_MSG_DATA (0UL) /* Fragment represents some snapshot data */ -#define FD_SNAPSHOT_MSG_CTRL_RESET_FULL (1UL) /* Reset to start loading a fresh full snapshot */ -#define FD_SNAPSHOT_MSG_CTRL_EOF_FULL (2UL) /* Full snapshot data is done, incremental data starting now */ -#define FD_SNAPSHOT_MSG_CTRL_RESET_INCREMENTAL (3UL) /* Incremental data being retried, start incremental over */ -#define FD_SNAPSHOT_MSG_CTRL_DONE (4UL) /* Snapshot load is over, data is finished for this tile */ -#define FD_SNAPSHOT_MSG_CTRL_SHUTDOWN (5UL) /* All tiles have acknowledged snapshot load is done, can now shutdown */ - -#define FD_SNAPSHOT_MSG_CTRL_ACK (6UL) /* Sent from tiles back to snaprd, meaning they ACK whatever control message was pending */ -#define FD_SNAPSHOT_MSG_CTRL_MALFORMED (7UL) /* Sent from tiles back to snaprd, meaning they consider the current snapshot malformed */ +/* The HIGHEST_MANIFEST_SLOT message is an informational control message + that is forwarded from the snaprd tile to the snapin tile, which + forwards the message to the snap_out link. The HIGHEST_MANIFEST_SLOT + message contains the highest manifest slot so far. It is typically + the incremental snapshot slot but can be the full snapshot slot if + incremental snapshots are disabled. It is guaranteed to be + monotonically increasing and is forwarded regardless of any snapshot + loading error / retry. */ +#define FD_SNAPSHOT_MSG_HIGHEST_MANIFEST_SLOT (1UL) /* Fragment contains the highest manifest slot so far, guaranteed to be monotonically increasing */ + +#define FD_SNAPSHOT_MSG_CTRL_RESET_FULL (2UL) /* Reset to start loading a fresh full snapshot */ +#define FD_SNAPSHOT_MSG_CTRL_EOF_FULL (3UL) /* Full snapshot data is done, incremental data starting now */ +#define FD_SNAPSHOT_MSG_CTRL_RESET_INCREMENTAL (4UL) /* Incremental data being retried, start incremental over */ +#define FD_SNAPSHOT_MSG_CTRL_DONE (5UL) /* Snapshot load is over, data is finished for this tile */ +#define FD_SNAPSHOT_MSG_CTRL_SHUTDOWN (6UL) /* All tiles have acknowledged snapshot load is done, can now shutdown */ + +#define FD_SNAPSHOT_MSG_CTRL_ACK (7UL) /* Sent from tiles back to snaprd, meaning they ACK whatever control message was pending */ +#define FD_SNAPSHOT_MSG_CTRL_MALFORMED (8UL) /* Sent from tiles back to snaprd, meaning they consider the current snapshot malformed */ #endif /* HEADER_fd_src_discof_restore_utils_fd_ssctrl_h */ diff --git a/src/discof/restore/utils/fd_sshttp.c b/src/discof/restore/utils/fd_sshttp.c index 4746e02c81f..3c1ebc4378e 100644 --- a/src/discof/restore/utils/fd_sshttp.c +++ b/src/discof/restore/utils/fd_sshttp.c @@ -36,9 +36,6 @@ struct fd_sshttp_private { ulong response_len; char response[ USHORT_MAX ]; - char full_snapshot_name[ PATH_MAX ]; - char incremental_snapshot_name[ PATH_MAX ]; - ulong content_len; ulong magic; @@ -53,7 +50,7 @@ FD_FN_CONST ulong fd_sshttp_footprint( void ) { ulong l; l = FD_LAYOUT_INIT; - l = FD_LAYOUT_APPEND( l, FD_SSHTTP_ALIGN, sizeof(fd_sshttp_t) ); + l = FD_LAYOUT_APPEND( l, FD_SSHTTP_ALIGN, sizeof(fd_sshttp_t) ); return FD_LAYOUT_FINI( l, FD_SSHTTP_ALIGN ); } @@ -182,92 +179,6 @@ send_request( fd_sshttp_t * http, return FD_SSHTTP_ADVANCE_AGAIN; } -static int -follow_redirect( fd_sshttp_t * http, - struct phr_header * headers, - ulong header_cnt, - long now ) { - if( FD_UNLIKELY( !http->hops ) ) { - FD_LOG_WARNING(( "too many redirects" )); - fd_sshttp_cancel( http ); - return FD_SSHTTP_ADVANCE_ERROR; - } - - http->hops--; - - ulong location_len; - char const * location = NULL; - - for( ulong i=0UL; i=PATH_MAX-1UL ) ) { - fd_sshttp_cancel( http ); - return FD_SSHTTP_ADVANCE_ERROR; - } - - char snapshot_name[ PATH_MAX ]; - fd_memcpy( snapshot_name, location+1UL, location_len-1UL ); - snapshot_name[ location_len-1UL ] = '\0'; - - ulong full_entry_slot, incremental_entry_slot; - uchar decoded_hash[ FD_HASH_FOOTPRINT ]; - int err = fd_ssarchive_parse_filename( snapshot_name, &full_entry_slot, &incremental_entry_slot, decoded_hash ); - - if( FD_UNLIKELY( err ) ) { - FD_LOG_WARNING(( "unrecognized snapshot file `%s` in redirect location header", snapshot_name )); - fd_sshttp_cancel( http ); - return FD_SSHTTP_ADVANCE_ERROR; - } - - char encoded_hash[ FD_BASE58_ENCODED_32_SZ ]; - fd_base58_encode_32( decoded_hash, NULL, encoded_hash ); - - if( FD_LIKELY( incremental_entry_slot!=ULONG_MAX ) ) { - FD_TEST( fd_cstr_printf_check( http->incremental_snapshot_name, PATH_MAX, NULL, "incremental-snapshot-%lu-%lu-%s.tar.zst", full_entry_slot, incremental_entry_slot, encoded_hash ) ); - } else { - FD_TEST( fd_cstr_printf_check( http->full_snapshot_name, PATH_MAX, NULL, "snapshot-%lu-%s.tar.zst", full_entry_slot, encoded_hash ) ); - } - break; - } - } - - if( FD_UNLIKELY( !location ) ) { - FD_LOG_WARNING(( "no location header in redirect response" )); - fd_sshttp_cancel( http ); - return FD_SSHTTP_ADVANCE_ERROR; - } - - if( FD_UNLIKELY( !fd_cstr_printf_check( http->request, sizeof(http->request), &http->request_len, - "GET %.*s HTTP/1.1\r\n" - "User-Agent: Firedancer\r\n" - "Accept: */*\r\n" - "Accept-Encoding: identity\r\n" - "Host: " FD_IP4_ADDR_FMT "\r\n\r\n", - (int)location_len, location, FD_IP4_ADDR_FMT_ARGS( http->addr.addr ) ) ) ) { - FD_LOG_WARNING(( "location header too long `%.*s`", (int)location_len, location )); - fd_sshttp_cancel( http ); - return FD_SSHTTP_ADVANCE_ERROR; - } - - FD_LOG_NOTICE(( "following redirect to http://" FD_IP4_ADDR_FMT ":%hu%.*s", - FD_IP4_ADDR_FMT_ARGS( http->addr.addr ), http->addr.port, - (int)headers[ 0 ].value_len, headers[ 0 ].value )); - - fd_sshttp_cancel( http ); - fd_sshttp_init( http, http->addr, location, location_len, now ); - - return FD_SSHTTP_ADVANCE_AGAIN; -} - static int read_response( fd_sshttp_t * http, ulong * data_len, @@ -314,7 +225,9 @@ read_response( fd_sshttp_t * http, int is_redirect = (status==301) | (status==303) | (status==304) | (status==307) | (status==308); if( FD_UNLIKELY( is_redirect ) ) { - return follow_redirect( http, headers, header_cnt, now ); + FD_LOG_WARNING(( "redirect response not allowed (%d)", status )); + fd_sshttp_cancel( http ); + return FD_SSHTTP_ADVANCE_ERROR; } if( FD_UNLIKELY( status!=200 ) ) { @@ -376,14 +289,6 @@ read_body( fd_sshttp_t * http, return FD_SSHTTP_ADVANCE_DATA; } -void -fd_sshttp_snapshot_names( fd_sshttp_t * http, - char const ** full_snapshot_name, - char const ** incremental_snapshot_name ) { - *full_snapshot_name = http->full_snapshot_name; - *incremental_snapshot_name = http->incremental_snapshot_name; -} - int fd_sshttp_advance( fd_sshttp_t * http, ulong * data_len, diff --git a/src/discof/restore/utils/fd_sshttp.h b/src/discof/restore/utils/fd_sshttp.h index e718ff9e8b5..a70ac266655 100644 --- a/src/discof/restore/utils/fd_sshttp.h +++ b/src/discof/restore/utils/fd_sshttp.h @@ -24,12 +24,6 @@ fd_sshttp_new( void * shmem ); fd_sshttp_t * fd_sshttp_join( void * sshttp ); -/* Sets points to snapshot names */ -void -fd_sshttp_snapshot_names( fd_sshttp_t * http, - char const ** full_snapshot_name, - char const ** incremental_snapshot_name ); - void fd_sshttp_init( fd_sshttp_t * http, fd_ip4_port_t addr, diff --git a/src/discof/restore/utils/fd_ssmsg.h b/src/discof/restore/utils/fd_ssmsg.h index 28fbfe2161b..3fd722132ba 100644 --- a/src/discof/restore/utils/fd_ssmsg.h +++ b/src/discof/restore/utils/fd_ssmsg.h @@ -3,9 +3,10 @@ #include "../../../flamenco/types/fd_types.h" -#define FD_SSMSG_MANIFEST_FULL (0) /* A snapshot manifest message from the full snapshot */ -#define FD_SSMSG_MANIFEST_INCREMENTAL (1) /* A snapshot manifest message from the incremental snapshot */ -#define FD_SSMSG_DONE (2) /* Indicates the snapshot is fully loaded and tiles are shutting down */ +#define FD_SSMSG_MANIFEST_FULL (0) /* A snapshot manifest message from the full snapshot */ +#define FD_SSMSG_MANIFEST_INCREMENTAL (1) /* A snapshot manifest message from the incremental snapshot */ +#define FD_SSMSG_HIGHEST_MANIFEST_SLOT (2) /* The highest manifest slot so far, guaranteed to be monotonically increasing */ +#define FD_SSMSG_DONE (3) /* Indicates the snapshot is fully loaded and tiles are shutting down */ /* This number is exhagerately high, but is consisent with Frankendancer. We need this to be about the number of validators, so roughly @@ -24,6 +25,24 @@ fd_ssmsg_sig( ulong message, FD_FN_CONST static inline ulong fd_ssmsg_sig_manifest_size( ulong sig ) { return (sig >> 2); } FD_FN_CONST static inline ulong fd_ssmsg_sig_message( ulong sig ) { return (sig & 0x3UL); } +/* The FD_SSMSG_HIGHEST_MANIFEST_SLOT uses the tsorig and tspub fields + of the fd_frag_meta_t struct to store the low and high 32 bits of + the slot number. */ +static inline void +fd_ssmsg_slot_to_frag( ulong slot, + uint* low, + uint* high ) { + *low = (uint)(slot & 0xFFFFFFFFUL); + *high = (uint)((slot >> 32UL) & 0xFFFFFFFFUL); +} + +static inline void +fd_ssmsg_frag_to_slot( ulong low, + ulong high, + ulong* slot ) { + *slot = (high << 32UL) | low; +} + struct fd_snapshot_manifest_vote_account { /* The pubkey of the vote account */ uchar vote_account_pubkey[ 32UL ]; diff --git a/src/discof/restore/utils/fd_ssping.c b/src/discof/restore/utils/fd_ssping.c index 0e3ce6ade50..f2f9187c80b 100644 --- a/src/discof/restore/utils/fd_ssping.c +++ b/src/discof/restore/utils/fd_ssping.c @@ -1,4 +1,6 @@ +#define _GNU_SOURCE #include "fd_ssping.h" +#include "fd_ssresolve.h" #include "../../../util/bits/fd_bits.h" #include "../../../util/log/fd_log.h" @@ -9,6 +11,7 @@ #include #include #include +#include #define PEER_STATE_UNPINGED 0 #define PEER_STATE_PINGED 1 @@ -16,13 +19,18 @@ #define PEER_STATE_REFRESHING 3 #define PEER_STATE_INVALID 4 -#define PEER_DEADLINE_NANOS_PING (1L*1000L*1000L*1000L) /* 1 second */ +#define PEER_DEADLINE_NANOS_PING (2L*1000L*1000L*1000L) /* 2 seconds */ #define PEER_DEADLINE_NANOS_VALID (2L*60L*1000L*1000L*1000L) /* 2 minutes */ #define PEER_DEADLINE_NANOS_INVALID (5L*60L*1000L*1000L*1000L) /* 5 minutes */ struct fd_ssping_peer { - ulong refcnt; - fd_ip4_port_t addr; + ulong refcnt; + fd_ip4_port_t addr; + + fd_ssresolve_t * full_ssresolve; + fd_ssresolve_t * inc_ssresolve; + + fd_ssinfo_t snapshot_info; struct { ulong next; @@ -50,6 +58,8 @@ struct fd_ssping_peer { } fd; int state; + ulong full_latency_nanos; + ulong incremental_latency_nanos; ulong latency_nanos; long deadline_nanos; }; @@ -110,6 +120,8 @@ struct fd_ssping_private { struct pollfd * fds; ulong * fds_idx; + ulong slot; /* highest slot of selected peer */ + ulong magic; /* ==FD_SSPING_MAGIC */ }; @@ -131,8 +143,13 @@ fd_ssping_footprint( ulong max_peers ) { l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() ); l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() ); l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() ); - l = FD_LAYOUT_APPEND( l, sizeof(struct pollfd), max_peers*sizeof(struct pollfd) ); - l = FD_LAYOUT_APPEND( l, sizeof(ulong), max_peers*sizeof(ulong) ); + l = FD_LAYOUT_APPEND( l, sizeof(struct pollfd), max_peers*sizeof(struct pollfd)*2UL ); + l = FD_LAYOUT_APPEND( l, sizeof(ulong), max_peers*sizeof(ulong)*2UL ); + + for( ulong i=0UL; ipool = peer_pool_join( peer_pool_new( _pool, max_peers ) ); - ssping->map = peer_map_join( peer_map_new( _map, max_peers, seed ) ); + ssping->pool = peer_pool_join( peer_pool_new( _pool, max_peers ) ); + ssping->map = peer_map_join( peer_map_new( _map, max_peers, seed ) ); ssping->score_treap = score_treap_join( score_treap_new( _score_treap, max_peers ) ); ssping->unpinged = deadline_list_join( deadline_list_new( _unpinged ) ); @@ -178,9 +195,19 @@ fd_ssping_new( void * shmem, ssping->refreshing = deadline_list_join( deadline_list_new( _refreshing ) ); ssping->invalid = deadline_list_join( deadline_list_new( _invalid ) ); - ssping->fds_len = 0UL; - ssping->fds = fds; - ssping->fds_idx = fds_idx; + ssping->fds_len = 0UL; + ssping->fds = _fds; + ssping->fds_idx = fds_idx; + + ssping->slot = 0UL; + + FD_TEST( peer_pool_max( ssping->pool )==max_peers ); + for( ulong i=0UL; ipool ); i++ ) { + void * _full_ssresolve = FD_SCRATCH_ALLOC_APPEND( l, fd_ssresolve_align(), fd_ssresolve_footprint() ); + void * _inc_ssresolve = FD_SCRATCH_ALLOC_APPEND( l, fd_ssresolve_align(), fd_ssresolve_footprint() ); + ssping->pool[ i ].full_ssresolve = fd_ssresolve_join( fd_ssresolve_new( _full_ssresolve ) ); + ssping->pool[ i ].inc_ssresolve = fd_ssresolve_join( fd_ssresolve_new( _inc_ssresolve ) ); + } FD_COMPILER_MFENCE(); FD_VOLATILE( ssping->magic ) = FD_SSPING_MAGIC; @@ -219,9 +246,14 @@ fd_ssping_add( fd_ssping_t * ssping, if( FD_UNLIKELY( !peer_pool_free( ssping->pool ) ) ) return; peer = peer_pool_ele_acquire( ssping->pool ); FD_TEST( peer ); - peer->refcnt = 0UL; - peer->state = PEER_STATE_UNPINGED; - peer->addr = addr; + peer->refcnt = 0UL; + peer->state = PEER_STATE_UNPINGED; + peer->addr = addr; + peer->snapshot_info.full.slot = ULONG_MAX; + peer->snapshot_info.incremental.base_slot = ULONG_MAX; + peer->snapshot_info.incremental.slot = ULONG_MAX; + peer->full_latency_nanos = 0UL; + peer->incremental_latency_nanos = 0UL; peer_map_ele_insert( ssping->map, peer, ssping->pool ); deadline_list_ele_push_tail( ssping->unpinged, peer, ssping->pool ); } @@ -238,13 +270,20 @@ remove_ping_fd( fd_ssping_t * ssping, return; } - ssping->fds[ idx ] = ssping->fds[ ssping->fds_len-1UL ]; - ssping->fds_idx[ idx ] = ssping->fds_idx[ ssping->fds_len-1UL ]; + ulong full_idx = idx; + ulong inc_idx = idx+1UL; - fd_ssping_peer_t * peer = peer_pool_ele( ssping->pool, ssping->fds_idx[ idx ] ); - peer->fd.idx = idx; + ssping->fds[ inc_idx ] = ssping->fds[ ssping->fds_len-1UL ]; + ssping->fds_idx[ inc_idx ] = ssping->fds_idx[ ssping->fds_len-1UL ]; + ssping->fds_len--; + ssping->fds[ full_idx ] = ssping->fds[ ssping->fds_len-1UL ]; + ssping->fds_idx[ full_idx ] = ssping->fds_idx[ ssping->fds_len-1UL ]; ssping->fds_len--; + + FD_TEST( ssping->fds_idx[ full_idx ]== ssping->fds_idx[ inc_idx ] ); + fd_ssping_peer_t * peer = peer_pool_ele( ssping->pool, ssping->fds_idx[ full_idx ] ); + peer->fd.idx = full_idx; } void @@ -333,43 +372,71 @@ poll_advance( fd_ssping_t * ssping, else if( FD_UNLIKELY( -1==nfds ) ) FD_LOG_ERR(( "poll failed (%i-%s)", errno, strerror( errno ) )); for( ulong i=0UL; ifds_len; i++ ) { + fd_ssping_peer_t * peer = peer_pool_ele( ssping->pool, ssping->fds_idx[ i ] ); struct pollfd * pfd = &ssping->fds[ i ]; if( FD_UNLIKELY( pfd->revents & (POLLERR|POLLHUP) ) ) { unping_peer( ssping, peer_pool_ele( ssping->pool, ssping->fds_idx[ i ] ), now ); continue; } - if( FD_LIKELY( pfd->revents & POLLOUT ) ) { - struct icmphdr icmp_hdr = (struct icmphdr){ - .type = ICMP_ECHO, - .code = 0, - .un.echo.id = 0, /* Automatically set by kernel for a ping socket */ - .un.echo.sequence = 0, /* Only one ping goes out per socket, so nothing to change */ - .checksum = 0 /* Will be calculated by the kernel */ - }; + int full = i&1UL ? 0 : 1; /* even indices are full, odd indices are incremental */ + fd_ssresolve_t * ssresolve = full ? peer->full_ssresolve : peer->inc_ssresolve; + if( FD_UNLIKELY( now>peer->deadline_nanos ) ) { + unping_peer( ssping, peer, now ); + continue; + } + + if( FD_LIKELY( !fd_ssresolve_is_done( ssresolve ) ) ) { + if( FD_LIKELY( pfd->revents & POLLOUT ) ) { + int res = fd_ssresolve_advance_poll_out( ssresolve ); - long result = send( pfd->fd, &icmp_hdr, sizeof(icmp_hdr), 0 ); - if( FD_UNLIKELY( !result ) ) continue; - if( FD_UNLIKELY( -1==result && errno==EAGAIN ) ) continue; - else if( FD_UNLIKELY( -1==result ) ) { - unping_peer( ssping, peer_pool_ele( ssping->pool, ssping->fds_idx[ i ] ), now ); - continue; + if( FD_UNLIKELY( res==FD_SSRESOLVE_ADVANCE_ERROR ) ) { + unping_peer( ssping, peer_pool_ele( ssping->pool, ssping->fds_idx[ i ] ), now ); + continue; + } + + pfd->revents &= ~POLLOUT; } - pfd->revents &= ~POLLOUT; - } - if( FD_LIKELY( pfd->revents & POLLIN ) ) { - struct icmphdr icmp_hdr; - long result = recv( pfd->fd, &icmp_hdr, sizeof(icmp_hdr), 0 ); - if( FD_UNLIKELY( -1==result && errno==EAGAIN ) ) continue; - else if( FD_UNLIKELY( -1==result || (ulong)resultpool, ssping->fds_idx[ i ] ), now ); - continue; + if( FD_LIKELY( pfd->revents & POLLIN ) ) { + fd_ssresolve_result_t resolve_result; + int res = fd_ssresolve_advance_poll_in( ssresolve, &resolve_result ); + + if( FD_UNLIKELY( res==FD_SSRESOLVE_ADVANCE_ERROR ) ) { + unping_peer( ssping, peer_pool_ele( ssping->pool, ssping->fds_idx[ i ] ), now ); + continue; + } else if( FD_UNLIKELY( res==FD_SSRESOLVE_ADVANCE_AGAIN ) ) { + continue; + } else { /* FD_SSRESOLVE_ADVANCE_SUCCESS */ + FD_TEST( peer->deadline_nanos>now ); + + if( resolve_result.base_slot==ULONG_MAX ) { + peer->snapshot_info.full.slot = resolve_result.slot; + memcpy( &peer->snapshot_info.full.hash, &resolve_result.hash, sizeof(fd_hash_t) ); + peer->full_latency_nanos = PEER_DEADLINE_NANOS_PING - (ulong)(peer->deadline_nanos - now); + } else { + peer->snapshot_info.incremental.base_slot = resolve_result.base_slot; + peer->snapshot_info.incremental.slot = resolve_result.slot; + memcpy( &peer->snapshot_info.incremental.hash, &resolve_result.hash, sizeof(fd_hash_t) ); + peer->incremental_latency_nanos = PEER_DEADLINE_NANOS_PING - (ulong)(peer->deadline_nanos - now); } + } } + } - fd_ssping_peer_t * peer = peer_pool_ele( ssping->pool, ssping->fds_idx[ i ] ); - FD_TEST( peer->deadline_nanos>now ); - peer->latency_nanos = PEER_DEADLINE_NANOS_PING - (ulong)(peer->deadline_nanos - now); + /* Once both the full and incremental snapshots are resolved, we can + mark the peer valid and remove the peer from the list of peers to + ping. */ + if( fd_ssresolve_is_done( peer->full_ssresolve ) && + fd_ssresolve_is_done( peer->inc_ssresolve ) ) { + FD_LOG_NOTICE(("successfully resolved snapshots for peer " FD_IP4_ADDR_FMT ":%hu " + "with full slot %lu, incremental base slot %lu and incremental slot %lu", + FD_IP4_ADDR_FMT_ARGS( peer->addr.addr ), peer->addr.port, + peer->snapshot_info.full.slot, + peer->snapshot_info.incremental.base_slot, + peer->snapshot_info.incremental.slot )); + peer->latency_nanos = (peer->full_latency_nanos + peer->incremental_latency_nanos) / 2UL; + FD_LOG_NOTICE(( "full latency is %lu, incremental latency is %lu, latency is %lu", + peer->full_latency_nanos, peer->incremental_latency_nanos, peer->latency_nanos )); if( FD_LIKELY( peer->state==PEER_STATE_REFRESHING ) ) { score_treap_ele_remove( ssping->score_treap, peer, ssping->pool ); @@ -382,17 +449,22 @@ poll_advance( fd_ssping_t * ssping, deadline_list_ele_remove( ssping->pinged, peer, ssping->pool ); deadline_list_ele_push_tail( ssping->valid, peer, ssping->pool ); score_treap_ele_insert( ssping->score_treap, peer, ssping->pool ); - remove_ping_fd( ssping, i ); + remove_ping_fd( ssping, peer->fd.idx ); } } } static int -peer_connect( fd_ssping_t * ssping, - fd_ssping_peer_t * peer ) { - int sockfd = socket( PF_INET, SOCK_DGRAM|SOCK_NONBLOCK, IPPROTO_ICMP ); +create_socket( fd_ssping_t * ssping, + fd_ssping_peer_t * peer ) { + int sockfd = socket( PF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0 ); if( FD_UNLIKELY( -1==sockfd ) ) FD_LOG_ERR(( "socket failed (%i-%s)", errno, strerror( errno ) )); + int optval = 1; + if( FD_UNLIKELY( -1==setsockopt( sockfd, SOL_TCP, TCP_NODELAY, &optval, sizeof(int) ) ) ) { + FD_LOG_ERR(( "setsockopt() failed (%d-%s)", errno, fd_io_strerror( errno ) )); + } + struct sockaddr_in addr = { .sin_family = AF_INET, .sin_port = fd_ushort_bswap( peer->addr.port ), @@ -409,10 +481,28 @@ peer_connect( fd_ssping_t * ssping, .events = POLLIN|POLLOUT, .revents = 0 }; + + return 0; +} + +static int +peer_connect( fd_ssping_t * ssping, + fd_ssping_peer_t * peer ) { + int err; + err = create_socket( ssping, peer ); /* full */ + if( FD_UNLIKELY( err ) ) return err; ssping->fds_idx[ ssping->fds_len ] = peer_pool_idx( ssping->pool, peer ); peer->fd.idx = ssping->fds_len; ssping->fds_len++; + err = create_socket( ssping, peer ); /* incremental */ + if( FD_UNLIKELY( err ) ) return err; + ssping->fds_idx[ ssping->fds_len ] = peer_pool_idx( ssping->pool, peer ); + ssping->fds_len++; + + fd_ssresolve_init( peer->full_ssresolve, peer->addr, ssping->fds[ peer->fd.idx ].fd, 1 ); + fd_ssresolve_init( peer->inc_ssresolve, peer->addr, ssping->fds[ peer->fd.idx+1UL ].fd, 0 ); + return 0; } @@ -493,11 +583,25 @@ fd_ssping_advance( fd_ssping_t * ssping, poll_advance( ssping, now ); } -fd_ip4_port_t -fd_ssping_best( fd_ssping_t const * ssping ) { - score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( ssping->score_treap, ssping->pool ); - if( FD_UNLIKELY( score_treap_fwd_iter_done( iter ) ) ) return (fd_ip4_port_t){ .l=0UL }; +fd_sspeer_t +fd_ssping_best( fd_ssping_t * ssping ) { + for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( ssping->score_treap, ssping->pool ); + !score_treap_fwd_iter_done( iter ); + iter = score_treap_fwd_iter_next( iter, ssping->pool ) ) { + fd_ssping_peer_t const * best = score_treap_fwd_iter_ele_const( iter, ssping->pool ); + if( FD_LIKELY( best->snapshot_info.incremental.slot>=ssping->slot ) ) { + ssping->slot = best->snapshot_info.incremental.slot; + return (fd_sspeer_t){ + .addr = best->addr, + .snapshot_info = &best->snapshot_info, + }; + } + } - fd_ssping_peer_t const * best = score_treap_fwd_iter_ele_const( iter, ssping->pool ); - return best->addr; + return (fd_sspeer_t){ + .addr = { + .l = 0UL + }, + .snapshot_info = NULL, + }; } diff --git a/src/discof/restore/utils/fd_ssping.h b/src/discof/restore/utils/fd_ssping.h index f2fa7f3981c..3c5a083cd54 100644 --- a/src/discof/restore/utils/fd_ssping.h +++ b/src/discof/restore/utils/fd_ssping.h @@ -17,6 +17,7 @@ #include "../../../util/fd_util_base.h" #include "../../../util/net/fd_net_headers.h" +#include "../../../flamenco/types/fd_types_custom.h" #define FD_SSPING_ALIGN (8UL) @@ -25,6 +26,30 @@ struct fd_ssping_private; typedef struct fd_ssping_private fd_ssping_t; +/* fd_ssinfo stores the resolved snapshot information from a peer. */ +struct fd_ssinfo { + struct { + ulong slot; /* slot of the full snapshot */ + uchar hash[ FD_HASH_FOOTPRINT ]; /* base58 decoded hash of the full snapshot */ + ulong slots_behind; /* number of slots behind the latest full cluster slot */ + } full; + + struct { + ulong base_slot; /* slot of the full snapshot */ + ulong slot; /* slot of the incremental snapshot */ + uchar hash[ FD_HASH_FOOTPRINT ]; /* base58 decoded hash of the incremental snapshot */ + ulong slots_behind; /* number of slots behind latest incremental cluster slot */ + } incremental; +}; +typedef struct fd_ssinfo fd_ssinfo_t; + +/* fd_sspeer stores a peer's address and its resolved snapshot info. */ +struct fd_sspeer { + fd_ip4_port_t addr; + fd_ssinfo_t const * snapshot_info; +}; +typedef struct fd_sspeer fd_sspeer_t; + FD_PROTOTYPES_BEGIN FD_FN_CONST ulong @@ -80,11 +105,13 @@ void fd_ssping_advance( fd_ssping_t * ssping, long now ); -/* Retrieve the best "active" peer right now, by lowest ping. If no - peer is active or pingable, this returns 0.0.0.0:0. */ +/* Retrieve the best "active" peer right now, by lowest ping. Peer's + resolved snapshot slot must be greater than the highest slot + parameter. If no peer is active or pingable, this returns an empty + peer with address 0.0.0.0:0. */ -fd_ip4_port_t -fd_ssping_best( fd_ssping_t const * ssping ); +fd_sspeer_t +fd_ssping_best( fd_ssping_t * ssping ); FD_PROTOTYPES_END diff --git a/src/discof/restore/utils/fd_ssresolve.c b/src/discof/restore/utils/fd_ssresolve.c new file mode 100644 index 00000000000..1e71e478a4b --- /dev/null +++ b/src/discof/restore/utils/fd_ssresolve.c @@ -0,0 +1,348 @@ +#include "fd_ssresolve.h" +#include "fd_ssarchive.h" + +#include "../../../waltz/http/picohttpparser.h" +#include "../../../util/log/fd_log.h" + +#include +#include +#include +#include + +#include +#include +#include + +#define FD_SSRESOLVE_STATE_REQ (0) /* sending request for snapshot */ +#define FD_SSRESOLVE_STATE_RESP (1) /* receiving snapshot response */ +#define FD_SSRESOLVE_STATE_DONE (2) /* done */ + +struct fd_ssresolve_private { + int state; + long deadline; + + fd_ip4_port_t addr; + int sockfd; + int full; + + char request[ 4096UL ]; + ulong request_sent; + ulong request_len; + + ulong response_len; + char response[ USHORT_MAX ]; + + ulong magic; +}; + +FD_FN_CONST ulong +fd_ssresolve_align( void ) { + return FD_SSRESOLVE_ALIGN; +} + +FD_FN_CONST ulong +fd_ssresolve_footprint( void ) { + ulong l; + l = FD_LAYOUT_INIT; + l = FD_LAYOUT_APPEND( l, FD_SSRESOLVE_ALIGN, sizeof(fd_ssresolve_t) ); + return FD_LAYOUT_FINI( l, FD_SSRESOLVE_ALIGN ); +} + +void * +fd_ssresolve_new( void * shmem ) { + if( FD_UNLIKELY( !shmem ) ) { + FD_LOG_WARNING(( "NULL shmem" )); + return NULL; + } + + if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_ssresolve_align() ) ) ) { + FD_LOG_WARNING(( "unaligned shmem" )); + return NULL; + } + + FD_SCRATCH_ALLOC_INIT( l, shmem ); + fd_ssresolve_t * ssresolve = FD_SCRATCH_ALLOC_APPEND( l, FD_SSRESOLVE_ALIGN, sizeof(fd_ssresolve_t) ); + + ssresolve->state = FD_SSRESOLVE_STATE_REQ; + ssresolve->request_sent = 0UL; + ssresolve->request_len = 0UL; + ssresolve->response_len = 0UL; + + FD_COMPILER_MFENCE(); + FD_VOLATILE( ssresolve->magic ) = FD_SSRESOLVE_MAGIC; + FD_COMPILER_MFENCE(); + + return (void *)ssresolve; +} + +fd_ssresolve_t * +fd_ssresolve_join( void * shresolve ) { + if( FD_UNLIKELY( !shresolve ) ) { + FD_LOG_WARNING(( "NULL ssresolve" )); + return NULL; + } + + if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shresolve, fd_ssresolve_align() ) ) ) { + FD_LOG_WARNING(( "misaligned ssresolve" )); + return NULL; + } + + fd_ssresolve_t * ssresolve = (fd_ssresolve_t *)shresolve; + + if( FD_UNLIKELY( ssresolve->magic!=FD_SSRESOLVE_MAGIC ) ) { + FD_LOG_WARNING(( "bad magic" )); + return NULL; + } + + return ssresolve; +} + +void * +fd_ssresolve_leave( fd_ssresolve_t * ssresolve ) { + if( FD_UNLIKELY( !ssresolve ) ) { + FD_LOG_WARNING(( "NULL ssresolve" )); + return NULL; + } + + if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)ssresolve, fd_ssresolve_align() ) ) ) { + FD_LOG_WARNING(( "misaligned ssresolve" )); + return NULL; + } + + if( FD_UNLIKELY( ssresolve->magic!=FD_SSRESOLVE_MAGIC ) ) { + FD_LOG_WARNING(( "bad magic" )); + return NULL; + } + + return (void *)ssresolve; +} + +void * +fd_ssresolve_delete( void * shresolve ) { + if( FD_UNLIKELY( !shresolve ) ) { + FD_LOG_WARNING(( "NULL ssresolve" )); + return NULL; + } + + if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shresolve, fd_ssresolve_align() ) ) ) { + FD_LOG_WARNING(( "misaligned ssresolve" )); + return NULL; + } + + fd_ssresolve_t * ssresolve = (fd_ssresolve_t *)shresolve; + + if( FD_UNLIKELY( ssresolve->magic!=FD_SSRESOLVE_MAGIC ) ) { + FD_LOG_WARNING(( "bad magic" )); + return NULL; + } + + FD_COMPILER_MFENCE(); + ssresolve->magic = 0UL; + FD_COMPILER_MFENCE(); + + return shresolve; +} + +void +fd_ssresolve_init( fd_ssresolve_t * ssresolve, + fd_ip4_port_t addr, + int sockfd, + int full ) { + ssresolve->addr = addr; + ssresolve->sockfd = sockfd; + ssresolve->full = full; + + ssresolve->state = FD_SSRESOLVE_STATE_REQ; + ssresolve->request_sent = 0UL; + ssresolve->request_len = 0UL; + ssresolve->response_len = 0UL; +} + +static void +fd_ssresolve_render_req( fd_ssresolve_t * ssresolve, + fd_ip4_port_t addr ) { + if( FD_LIKELY( ssresolve->full ) ) { + FD_TEST( fd_cstr_printf_check( ssresolve->request, sizeof(ssresolve->request), &ssresolve->request_len, + "GET %.*s HTTP/1.1\r\n" + "User-Agent: Firedancer\r\n" + "Accept: */*\r\n" + "Accept-Encoding: identity\r\n" + "Host: " FD_IP4_ADDR_FMT "\r\n\r\n", + 17, "/snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( addr.addr ) ) ); + } else { + FD_TEST( fd_cstr_printf_check( ssresolve->request, sizeof(ssresolve->request), &ssresolve->request_len, + "GET %.*s HTTP/1.1\r\n" + "User-Agent: Firedancer\r\n" + "Accept: */*\r\n" + "Accept-Encoding: identity\r\n" + "Host: " FD_IP4_ADDR_FMT "\r\n\r\n", + 29, "/incremental-snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( addr.addr ) ) ); + } +} + +static int +fd_ssresolve_send_request( fd_ssresolve_t * ssresolve ) { + FD_TEST( ssresolve->state==FD_SSRESOLVE_STATE_REQ ); + + if( FD_UNLIKELY( !ssresolve->request_len ) ) { + fd_ssresolve_render_req( ssresolve, ssresolve->addr ); + } + + long sent = send( ssresolve->sockfd, ssresolve->request+ssresolve->request_sent, ssresolve->request_len-ssresolve->request_sent, 0 ); + if( FD_UNLIKELY( -1==sent && errno==EAGAIN ) ) return FD_SSRESOLVE_ADVANCE_AGAIN; + else if( FD_UNLIKELY( -1==sent ) ) { + return FD_SSRESOLVE_ADVANCE_ERROR; + } + + ssresolve->request_sent += (ulong)sent; + if( FD_UNLIKELY( ssresolve->request_sent==ssresolve->request_len ) ) { + ssresolve->state = FD_SSRESOLVE_STATE_RESP; + return FD_SSRESOLVE_ADVANCE_SUCCESS; + } + + return FD_SSRESOLVE_ADVANCE_AGAIN; +} + +static int +fd_ssresolve_parse_redirect( fd_ssresolve_t * ssresolve, + struct phr_header * headers, + ulong header_cnt, + fd_ssresolve_result_t * result ) { + ulong location_len = 0UL; + char const * location = NULL; + + for( ulong i=0UL; i=PATH_MAX-1UL ) ) return FD_SSRESOLVE_ADVANCE_ERROR; + + char snapshot_name[ PATH_MAX ]; + fd_memcpy( snapshot_name, location+1UL, location_len-1UL ); + snapshot_name[ location_len-1UL ] = '\0'; + + ulong full_entry_slot, incremental_entry_slot; + uchar decoded_hash[ FD_HASH_FOOTPRINT ]; + int err = fd_ssarchive_parse_filename( snapshot_name, &full_entry_slot, &incremental_entry_slot, decoded_hash ); + + if( FD_UNLIKELY( err ) ) { + FD_LOG_WARNING(( "unrecognized snapshot file `%s` in redirect location header", snapshot_name )); + return FD_SSRESOLVE_ADVANCE_ERROR; + } + + if( FD_LIKELY( incremental_entry_slot==ULONG_MAX ) ) { + result->slot = full_entry_slot; + result->base_slot = ULONG_MAX; + } else { + result->slot = incremental_entry_slot; + result->base_slot = full_entry_slot; + } + fd_memcpy( result->hash.hash, decoded_hash, FD_HASH_FOOTPRINT ); + + ssresolve->state = FD_SSRESOLVE_STATE_DONE; + return FD_SSRESOLVE_ADVANCE_SUCCESS; +} + +static int +fd_ssresolve_read_response( fd_ssresolve_t * ssresolve, + fd_ssresolve_result_t * result ) { + FD_TEST( ssresolve->state==FD_SSRESOLVE_STATE_RESP ); + long read = recv( ssresolve->sockfd, ssresolve->response+ssresolve->response_len, sizeof(ssresolve->response)-ssresolve->response_len, 0 ); + if( FD_UNLIKELY( -1==read && errno==EAGAIN ) ) return FD_SSRESOLVE_ADVANCE_AGAIN; + else if( FD_UNLIKELY( -1==read ) ) { + FD_LOG_WARNING(( "recv() failed (%d-%s)", errno, fd_io_strerror( errno ) )); + return FD_SSRESOLVE_ADVANCE_ERROR; + } + + ssresolve->response_len += (ulong)read; + + int minor_version; + int status; + const char * message; + ulong message_len; + struct phr_header headers[ 128UL ]; + ulong header_cnt = 128UL; + int parsed = phr_parse_response( ssresolve->response, + ssresolve->response_len, + &minor_version, + &status, + &message, + &message_len, + headers, + &header_cnt, + ssresolve->response_len - (ulong)read ); + if( FD_UNLIKELY( parsed==-1 ) ) { + FD_LOG_WARNING(( "malformed response body" )); + return FD_SSRESOLVE_ADVANCE_ERROR; + } else if( parsed==-2 ) { + return FD_SSRESOLVE_ADVANCE_ERROR; + } + + int is_redirect = (status==301) | (status==303) | (status==304) | (status==307) | (status==308); + if( FD_UNLIKELY( is_redirect ) ) { + return fd_ssresolve_parse_redirect( ssresolve, headers, header_cnt, result ); + } + + if( FD_UNLIKELY( status!=200 ) ) { + FD_LOG_WARNING(( "unexpected response status %d", status )); + return FD_SSRESOLVE_ADVANCE_ERROR; + } + + return FD_SSRESOLVE_ADVANCE_ERROR; +} + +int +fd_ssresolve_advance_poll_out( fd_ssresolve_t * ssresolve ) { + int res; + switch( ssresolve->state ) { + case FD_SSRESOLVE_STATE_REQ: { + res = fd_ssresolve_send_request( ssresolve ); + break; + } + case FD_SSRESOLVE_STATE_RESP: { + res = FD_SSRESOLVE_ADVANCE_AGAIN; + break; + } + default: { + FD_LOG_ERR(( "unexpected state %d", ssresolve->state )); + return FD_SSRESOLVE_ADVANCE_ERROR; + } + } + return res; +} + +int +fd_ssresolve_advance_poll_in( fd_ssresolve_t * ssresolve, + fd_ssresolve_result_t * result ) { + int res; + switch( ssresolve->state ) { + case FD_SSRESOLVE_STATE_RESP: { + res = fd_ssresolve_read_response( ssresolve, result ); + break; + } + case FD_SSRESOLVE_STATE_REQ: { + res = FD_SSRESOLVE_ADVANCE_AGAIN; + break; + } + default: { + FD_LOG_ERR(( "unexpected state %d", ssresolve->state )); + return FD_SSRESOLVE_ADVANCE_ERROR; + } + } + + return res; +} + +int +fd_ssresolve_is_done( fd_ssresolve_t * ssresolve ) { + return ssresolve->state==FD_SSRESOLVE_STATE_DONE; +} diff --git a/src/discof/restore/utils/fd_ssresolve.h b/src/discof/restore/utils/fd_ssresolve.h new file mode 100644 index 00000000000..a3146d11a94 --- /dev/null +++ b/src/discof/restore/utils/fd_ssresolve.h @@ -0,0 +1,81 @@ +#ifndef HEADER_fd_src_discof_restore_utils_fd_ssresolve_h +#define HEADER_fd_src_discof_restore_utils_fd_ssresolve_h + +#include "../../../util/fd_util_base.h" +#include "../../../flamenco/types/fd_types_custom.h" +#include "../../../util/net/fd_net_headers.h" + +#define FD_SSRESOLVE_MAGIC (0xF17EDA2CE55E510) /* FIREDANCER HTTP RESOLVE V0 */ +#define FD_SSRESOLVE_ALIGN (8UL) + +/* fd_ssresolve_result contains the resolved snapshot info from + making an http request to a snapshot peer. */ +struct fd_ssresolve_result { + ulong slot; /* slot of the snapshot */ + ulong base_slot; /* base slot of incremental snapshot or ULONG_MAX */ + fd_hash_t hash; /* base58 decoded hash of the snapshot */ +}; + +typedef struct fd_ssresolve_result fd_ssresolve_result_t; + +/* fd_ssresolve is responsible for resolving snapshots from a given + peer by sending a http request and parsing a http redirect response. + + It is used by fd_ssping_t to ping and resolve snapshots for each + peer. */ +struct fd_ssresolve_private; +typedef struct fd_ssresolve_private fd_ssresolve_t; + +FD_PROTOTYPES_BEGIN + +FD_FN_CONST ulong +fd_ssresolve_align( void ); + +FD_FN_CONST ulong +fd_ssresolve_footprint( void ); + +void * +fd_ssresolve_new( void * shmem ); + +fd_ssresolve_t * +fd_ssresolve_join( void * shresolve ); + +void * +fd_ssresolve_leave( fd_ssresolve_t * ssresolve ); + +void * +fd_ssresolve_delete( void * shresolve ); + +/* fd_ssresolve_init initializes a fd_ssresolve_t object with a peer's + address, a socket file descriptor, and whether the resolve request + is for a full or incremental snapshot. */ +void +fd_ssresolve_init( fd_ssresolve_t * ssresolve, + fd_ip4_port_t addr, + int sockfd, + int full ); + +#define FD_SSRESOLVE_ADVANCE_ERROR (-1) /* fatal error */ +#define FD_SSRESOLVE_ADVANCE_AGAIN ( 0) /* try again */ +#define FD_SSRESOLVE_ADVANCE_SUCCESS ( 1) /* success */ + +/* fd_ssresolve_advance_poll_out advances the ssresolve state machine + when its socket file descriptor is ready for sending data. */ +int +fd_ssresolve_advance_poll_out( fd_ssresolve_t * ssresolve ); + +/* fd_ssresolve_advance_poll_in advances the ssresolve state machine + when its socket file descriptor is ready for receiving data. */ +int +fd_ssresolve_advance_poll_in( fd_ssresolve_t * ssresolve, + fd_ssresolve_result_t * result ); + +/* fd_ssresolve_is_done returns whether the ssresolve state machine + is done. The ssresolve object must be reset via fd_ssresolve_init + to restart the state machine. */ +int +fd_ssresolve_is_done( fd_ssresolve_t * ssresolve ); + +FD_PROTOTYPES_END + +#endif /* HEADER_fd_src_discof_restore_utils_fd_ssresolve_h */