From 2561736ab2ad493234e1210f952c06230e0d94e3 Mon Sep 17 00:00:00 2001 From: Richard Patel Date: Mon, 24 Nov 2025 08:43:32 +0000 Subject: [PATCH 1/3] restore: parallel snapshot decompression --- .../firedancer-dev/commands/snapshot_load.c | 42 ++- src/app/firedancer/config/default.toml | 20 +- src/app/shared/fd_action.h | 1 + src/app/shared/fd_config.h | 1 + src/app/shared/fd_config_parse.c | 1 + src/disco/stem/fd_stem.c | 13 + src/disco/stem/fd_stem.h | 2 + src/discof/restore/Local.mk | 6 + src/discof/restore/fd_snapdc_tile.c | 136 +++++++- src/discof/restore/fd_snapin_tile.c | 160 +++++++-- src/discof/restore/fd_snapin_tile_private.h | 42 ++- src/discof/restore/utils/fd_ssctrl.h | 1 + src/discof/restore/utils/fd_zstd_dskip.c | 306 ++++++++++++++++++ src/discof/restore/utils/fd_zstd_dskip.h | 40 +++ src/discof/restore/utils/test_zstd_dskip.c | 64 ++++ 15 files changed, 759 insertions(+), 76 deletions(-) create mode 100644 src/discof/restore/utils/fd_zstd_dskip.c create mode 100644 src/discof/restore/utils/fd_zstd_dskip.h create mode 100644 src/discof/restore/utils/test_zstd_dskip.c diff --git a/src/app/firedancer-dev/commands/snapshot_load.c b/src/app/firedancer-dev/commands/snapshot_load.c index 492f9636d8d..33e55e39baa 100644 --- a/src/app/firedancer-dev/commands/snapshot_load.c +++ b/src/app/firedancer-dev/commands/snapshot_load.c @@ -37,6 +37,8 @@ snapshot_load_topo( config_t * config, fd_topob_new( &config->topo, config->name ); topo->max_page_size = fd_cstr_to_shmem_page_sz( config->hugetlbfs.max_page_size ); +#define FOR(cnt) for( ulong i=0UL; ifiredancer.runtime.max_live_slots, @@ -51,6 +53,7 @@ snapshot_load_topo( config_t * config, int snapshot_lthash_disabled = config->development.snapshots.disable_lthash_verification; ulong lta_tile_cnt = config->firedancer.layout.snapla_tile_count; + ulong dc_tile_cnt = config->development.snapshots.decompress_tile_count; if( config->firedancer.vinyl.enabled ) { setup_topo_vinyl_meta( topo, &config->firedancer ); @@ -81,8 +84,7 @@ snapshot_load_topo( config_t * config, /* "snapdc": Zstandard decompress tile */ fd_topob_wksp( topo, "snapdc" ); - fd_topo_tile_t * snapdc_tile = fd_topob_tile( topo, "snapdc", "snapdc", "metric_in", ULONG_MAX, 0, 0 ); - snapdc_tile->allow_shutdown = 1; + FOR( dc_tile_cnt ) fd_topob_tile( topo, "snapdc", "snapdc", "metric_in", ULONG_MAX, 0, 0 )->allow_shutdown = 1; /* "snapin": Snapshot parser tile */ fd_topob_wksp( topo, "snapin" ); @@ -122,8 +124,6 @@ snapshot_load_topo( config_t * config, fd_topob_wksp( topo, "snapls_ct" ); } -#define FOR(cnt) for( ulong i=0UL; iallow_shutdown = 1; /**/ fd_topob_tile( topo, "snapls", "snapls", "metric_in", ULONG_MAX, 0, 0 )->allow_shutdown = 1; @@ -131,7 +131,7 @@ snapshot_load_topo( config_t * config, fd_topob_link( topo, "snapct_ld", "snapct_ld", 128UL, sizeof(fd_ssctrl_init_t), 1UL ); fd_topob_link( topo, "snapld_dc", "snapld_dc", 16384UL, USHORT_MAX, 1UL ); - fd_topob_link( topo, "snapdc_in", "snapdc_in", 16384UL, USHORT_MAX, 1UL ); + FOR( dc_tile_cnt ) fd_topob_link( topo, "snapdc_in", "snapdc_in", 16384UL, USHORT_MAX, 1UL ); if( FD_UNLIKELY( snapshot_lthash_disabled ) ) { fd_topob_link( topo, "snapin_ct", "snapin_ct", 128UL, 0UL, 1UL ); } @@ -159,9 +159,9 @@ snapshot_load_topo( config_t * config, fd_topob_tile_out( topo, "snapct", 0UL, "snapct_repr", 0UL ); fd_topob_tile_in ( topo, "snapld", 0UL, "metric_in", "snapct_ld", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); fd_topob_tile_out( topo, "snapld", 0UL, "snapld_dc", 0UL ); - fd_topob_tile_in ( topo, "snapdc", 0UL, "metric_in", "snapld_dc", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); - fd_topob_tile_out( topo, "snapdc", 0UL, "snapdc_in", 0UL ); - fd_topob_tile_in ( topo, "snapin", 0UL, "metric_in", "snapdc_in", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); + FOR( dc_tile_cnt ) fd_topob_tile_in ( topo, "snapdc", i, "metric_in", "snapld_dc", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); + FOR( dc_tile_cnt ) fd_topob_tile_out( topo, "snapdc", i, "snapdc_in", i ); + FOR( dc_tile_cnt ) fd_topob_tile_in ( topo, "snapin", 0UL, "metric_in", "snapdc_in", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); if( FD_UNLIKELY( snapshot_lthash_disabled ) ) { fd_topob_tile_out( topo, "snapin", 0UL, "snapin_ct", 0UL ); } else { @@ -262,6 +262,7 @@ snapshot_load_args( int * pargc, " --fsck After loading, run database integrity checks\n" " --lthash After loading, recompute the account DB lthash\n" " --accounts-hist After loading, analyze account size distribution\n" + " --dc-tiles Number of snapdc tiles\n" "\n" "Vinyl database flags:\n" " --vinyl-server After loading, indefinitely run a vinyl DB server\n" @@ -295,6 +296,7 @@ snapshot_load_args( int * pargc, char const * vinyl_io = fd_env_strip_cmdline_cstr ( pargc, pargv, "--vinyl-io", NULL, "bd" ); float cache_sz = fd_env_strip_cmdline_float ( pargc, pargv, "--cache-sz", NULL, 0.0f ); float cache_rec_max = fd_env_strip_cmdline_float ( pargc, pargv, "--cache-rec-max",NULL, 0.0f ); + uint dc_tile_cnt = fd_env_strip_cmdline_uint ( pargc, pargv, "--dc-tiles", NULL, 1U ); fd_cstr_ncpy( args->snapshot_load.snapshot_dir, snapshot_dir, sizeof(args->snapshot_load.snapshot_dir) ); args->snapshot_load.fsck = fsck; @@ -314,6 +316,7 @@ snapshot_load_args( int * pargc, args->snapshot_load.db_rec_max = (ulong)db_rec_max; args->snapshot_load.cache_sz = (ulong)cache_sz; args->snapshot_load.cache_rec_max = (ulong)cache_rec_max; + args->snapshot_load.dc_tile_cnt = (uint)dc_tile_cnt; fd_cstr_ncpy( args->snapshot_load.vinyl_path, vinyl_path, sizeof(args->snapshot_load.vinyl_path) ); @@ -595,6 +598,7 @@ fixup_config( config_t * config, } config->development.snapshots.disable_lthash_verification = !args->snapshot_load.lthash; + config->development.snapshots.decompress_tile_count = args->snapshot_load.dc_tile_cnt; /* FIXME Unfortunately, the fdctl boot procedure constructs the topology before parsing command-line arguments. So, here, @@ -658,8 +662,6 @@ snapshot_load_cmd_fn( args_t * args, ulong total_off_old = 0UL; ulong decomp_off_old = 0UL; ulong vinyl_off_old = 0UL; - ulong snapct_backp_old = 0UL; - ulong snapct_wait_old = 0UL; ulong snapld_backp_old = 0UL; ulong snapld_wait_old = 0UL; ulong snapdc_backp_old = 0UL; @@ -684,8 +686,8 @@ snapshot_load_cmd_fn( args_t * args, puts( "" ); fputs( "--------------------------------------------", stdout ); if( snapwr_tile ) fputs( "--------------", stdout ); - if( snapls_tile ) fputs( "[ct],[ld],[dc],[in],[lts]--------[ct],[ld],[dc],[in],[lts]", stdout ); - else fputs( "[ct],[ld],[dc],[in]--------[ct],[ld],[dc],[in]", stdout ); + if( snapls_tile ) fputs( "[ld],[dc],[in],[lts]--------[ld],[dc],[in],[lts]", stdout ); + else fputs( "[ld],[dc],[in]--------[ld],[dc],[in]", stdout ); if( snapwr_tile ) fputs( ",[wh],[wr]" , stdout ); puts( "--------------" ); } @@ -712,8 +714,6 @@ snapshot_load_cmd_fn( args_t * args, ulong decomp_off = snapdc_metrics[ MIDX( GAUGE, SNAPDC, FULL_DECOMPRESSED_BYTES_WRITTEN ) ] + snapdc_metrics[ MIDX( GAUGE, SNAPDC, INCREMENTAL_DECOMPRESSED_BYTES_WRITTEN ) ]; ulong vinyl_off = snapwr_tile ? snapwr_metrics[ MIDX( GAUGE, SNAPWR, VINYL_BYTES_WRITTEN ) ] : 0UL; - ulong snapct_backp = snapct_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ]; - ulong snapct_wait = snapct_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG ) ] + snapct_backp; ulong snapld_backp = snapld_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ]; ulong snapld_wait = snapld_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG ) ] + snapld_backp; ulong snapdc_backp = snapdc_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ]; @@ -744,29 +744,25 @@ snapshot_load_cmd_fn( args_t * args, printf( " vinyl=%4.0fMB/s", (double)( vinyl_off - vinyl_off_old )/1e6 ); } if( !snapls_tile ) { - printf( " backp=(%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%", - ( (double)( snapct_backp-snapct_backp_old )*ns_per_tick )/1e7, + printf( " backp=(%3.0f%%,%3.0f%%,%3.0f%%", ( (double)( snapld_backp-snapld_backp_old )*ns_per_tick )/1e7, ( (double)( snapdc_backp-snapdc_backp_old )*ns_per_tick )/1e7, ( (double)( snapin_backp-snapin_backp_old )*ns_per_tick )/1e7 ); } else { - printf( " backp=(%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%", - ( (double)( snapct_backp-snapct_backp_old )*ns_per_tick )/1e7, + printf( " backp=(%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%", ( (double)( snapld_backp-snapld_backp_old )*ns_per_tick )/1e7, ( (double)( snapdc_backp-snapdc_backp_old )*ns_per_tick )/1e7, ( (double)( snapin_backp-snapin_backp_old )*ns_per_tick )/1e7, ( (double)( snapls_backp-snapls_backp_old )*ns_per_tick )/1e7 ); } if( !snapls_tile ) { - printf( ") busy=(%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%", - 100-( ( (double)( snapct_wait-snapct_wait_old )*ns_per_tick )/1e7 ), + printf( ") busy=(%3.0f%%,%3.0f%%,%3.0f%%", 100-( ( (double)( snapld_wait-snapld_wait_old )*ns_per_tick )/1e7 ), 100-( ( (double)( snapdc_wait-snapdc_wait_old )*ns_per_tick )/1e7 ), 100-( ( (double)( snapin_wait-snapin_wait_old )*ns_per_tick )/1e7 ) ); } else { - printf( ") busy=(%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%", - 100-( ( (double)( snapct_wait-snapct_wait_old )*ns_per_tick )/1e7 ), + printf( ") busy=(%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%", 100-( ( (double)( snapld_wait-snapld_wait_old )*ns_per_tick )/1e7 ), 100-( ( (double)( snapdc_wait-snapdc_wait_old )*ns_per_tick )/1e7 ), 100-( ( (double)( snapin_wait-snapin_wait_old )*ns_per_tick )/1e7 ), @@ -784,8 +780,6 @@ snapshot_load_cmd_fn( args_t * args, total_off_old = total_off; decomp_off_old = decomp_off; vinyl_off_old = vinyl_off; - snapct_backp_old = snapct_backp; - snapct_wait_old = snapct_wait; snapld_backp_old = snapld_backp; snapld_wait_old = snapld_wait; snapdc_backp_old = snapdc_backp; diff --git a/src/app/firedancer/config/default.toml b/src/app/firedancer/config/default.toml index 7d92d5a5472..4e95dff54bc 100644 --- a/src/app/firedancer/config/default.toml +++ b/src/app/firedancer/config/default.toml @@ -1778,12 +1778,18 @@ user = "" [development.udpecho] affinity = "auto" - # Set to true to disable verification of the lthash in the - # snapshot loader. This is not safe or supported in production - # and should only be used for development and testing purposes. - # - # If lthash verification is disabled, the validator will not - # start any lthash tiles and the value of `snapla_tile_count` - # in the layout will be ignored. + # Experimental snapshot loading options [development.snapshots] + # Set to true to disable verification of the lthash in the + # snapshot loader. This is not safe or supported in production + # and should only be used for development and testing purposes. + # + # If lthash verification is disabled, the validator will not + # start any lthash tiles and the value of `snapla_tile_count` + # in the layout will be ignored. disable_lthash_verification = true + + # Set numebr of decompression threads. Values other than 1 only + # have an effect with snapshot files with an experimental + # compression format. See src/discof/restore/fd_snapmk_para.c + decompress_tile_count = 1 diff --git a/src/app/shared/fd_action.h b/src/app/shared/fd_action.h index a0eabd0c041..2edb22dd78b 100644 --- a/src/app/shared/fd_action.h +++ b/src/app/shared/fd_action.h @@ -140,6 +140,7 @@ union fdctl_args { ulong db_rec_max; ulong cache_sz; ulong cache_rec_max; + uint dc_tile_cnt; } snapshot_load; struct { diff --git a/src/app/shared/fd_config.h b/src/app/shared/fd_config.h index ffbffad9e95..217a6e9a521 100644 --- a/src/app/shared/fd_config.h +++ b/src/app/shared/fd_config.h @@ -348,6 +348,7 @@ struct fd_config { struct { int disable_lthash_verification; + uint decompress_tile_count; } snapshots; struct { diff --git a/src/app/shared/fd_config_parse.c b/src/app/shared/fd_config_parse.c index b3dde44de4c..c2d32f35a24 100644 --- a/src/app/shared/fd_config_parse.c +++ b/src/app/shared/fd_config_parse.c @@ -315,6 +315,7 @@ fd_config_extract_pod( uchar * pod, CFG_POP ( cstr, development.udpecho.affinity ); CFG_POP ( bool, development.snapshots.disable_lthash_verification ); + CFG_POP ( uint, development.snapshots.decompress_tile_count ); if( FD_UNLIKELY( !config->is_firedancer ) ) { CFG_POP ( bool, development.gui.websocket_compression ); diff --git a/src/disco/stem/fd_stem.c b/src/disco/stem/fd_stem.c index 5b138670133..982fec00a17 100644 --- a/src/disco/stem/fd_stem.c +++ b/src/disco/stem/fd_stem.c @@ -194,6 +194,10 @@ #define STEM_LAZY (0L) #endif +#ifndef STEM_CUSTOM_IN_SELECT +#define STEM_CUSTOM_IN_SELECT 0 +#endif + #define STEM_SHUTDOWN_SEQ (ULONG_MAX-1UL) static inline void @@ -473,6 +477,7 @@ STEM_(run1)( ulong in_cnt, if( FD_UNLIKELY( event_seq>=event_cnt ) ) { event_seq = 0UL; +# if !STEM_CUSTOM_IN_SELECT /* Randomize the order of event processing for the next event event_cnt events to avoid lighthousing effects causing input credit starvation at extreme fan in/fan out, extreme in load @@ -494,6 +499,7 @@ STEM_(run1)( ulong in_cnt, in[ swap_idx ] = in[ 0 ]; in[ 0 ] = in_tmp; } +# endif } /* Reload housekeeping timer */ @@ -581,9 +587,15 @@ STEM_(run1)( ulong in_cnt, } #endif +# if STEM_CUSTOM_IN_SELECT + in_seq = stem.next_in_idx; +# endif + fd_stem_tile_in_t * this_in = &in[ in_seq ]; +# if !STEM_CUSTOM_IN_SELECT in_seq++; if( in_seq>=in_cnt ) in_seq = 0UL; /* cmov */ +# endif /* Check if this in has any new fragments to mux */ @@ -824,3 +836,4 @@ STEM_(run)( fd_topo_t * topo, #undef STEM_CALLBACK_RETURNABLE_FRAG #undef STEM_CALLBACK_AFTER_FRAG #undef STEM_CALLBACK_AFTER_POLL_OVERRUN +#undef STEM_CUSTOM_IN_SELECT diff --git a/src/disco/stem/fd_stem.h b/src/disco/stem/fd_stem.h index d8e9777fab8..417afc76b44 100644 --- a/src/disco/stem/fd_stem.h +++ b/src/disco/stem/fd_stem.h @@ -13,6 +13,8 @@ struct fd_stem_context { ulong * cr_avail; ulong * min_cr_avail; ulong cr_decrement_amount; + + ulong next_in_idx; }; typedef struct fd_stem_context fd_stem_context_t; diff --git a/src/discof/restore/Local.mk b/src/discof/restore/Local.mk index b3c2bd429ae..531bf7b1935 100644 --- a/src/discof/restore/Local.mk +++ b/src/discof/restore/Local.mk @@ -28,6 +28,12 @@ $(call make-unit-test,test_sspeer_selector,utils/test_sspeer_selector,fd_discof $(call run-unit-test,test_slot_delta_parser) $(call run-unit-test,test_sspeer_selector) endif +ifdef FD_HAS_ZSTD +$(call add-objs,utils/fd_zstd_dskip,fd_discof) +ifdef FD_HAS_HOSTED +$(call make-unit-test,test_zstd_dskip,utils/test_zstd_dskip,fd_discof fd_flamenco fd_ballet fd_util) +endif +endif ifdef FD_HAS_HOSTED $(call make-fuzz-test,fuzz_snapshot_parser,utils/fuzz_snapshot_parser,fd_discof fd_flamenco fd_ballet fd_util) diff --git a/src/discof/restore/fd_snapdc_tile.c b/src/discof/restore/fd_snapdc_tile.c index 51a9c1a6b17..bb97e18ff15 100644 --- a/src/discof/restore/fd_snapdc_tile.c +++ b/src/discof/restore/fd_snapdc_tile.c @@ -1,4 +1,5 @@ #include "utils/fd_ssctrl.h" +#include "utils/fd_zstd_dskip.h" #include "../../disco/topo/fd_topo.h" #include "../../disco/metrics/fd_metrics.h" @@ -12,6 +13,10 @@ #define ZSTD_WINDOW_SZ (1UL<<25UL) /* 32MiB */ +#define FD_SNAP_PARA_MAGIC (0xf212f209fd944ba2UL) +#define FD_SNAP_PARA_ENABLE (0x72701281047a55b8UL) +#define FD_SNAP_PARA_DISABLE (0xd629be3208ad6fb4UL) + /* The snapdc tile is a state machine that decompresses the full and optionally incremental snapshot byte stream that it receives from the snapld tile. In the event that the snapshot is already uncompressed, @@ -21,9 +26,23 @@ struct fd_snapdc_tile { uint full : 1; uint is_zstd : 1; uint dirty : 1; /* in the middle of a frame? */ - int state; + uint para : 1; /* parallel decompress enabled? */ + int state; + + uint tile_idx; + uint tile_cnt; ZSTD_DCtx * zstd; + fd_zstd_dskip_t skip[1]; + + /* Window to peek into the first few bytes of each Zstandard frame. + Used to detect Zstandard skippable frames for signaling. */ +# define FRAME_PEEK (128UL) + uchar peek[ FRAME_PEEK ]; + ulong peek_off; + + ulong frame_idx; /* index of current frame */ + ulong frame_off; /* offset within current frame */ struct { fd_wksp_t * wksp; @@ -96,7 +115,13 @@ handle_control_frag( fd_snapdc_tile_t * ctx, /* All control messages cause us to want to reset the decompression stream */ ulong error = ZSTD_DCtx_reset( ctx->zstd, ZSTD_reset_session_only ); if( FD_UNLIKELY( ZSTD_isError( error ) ) ) FD_LOG_ERR(( "ZSTD_DCtx_reset failed (%lu-%s)", error, ZSTD_getErrorName( error ) )); - ctx->dirty = 0; + fd_zstd_dskip_init( ctx->skip ); + ctx->dirty = 0; + ctx->peek_off = 0UL; + ctx->frame_idx = 0UL; + ctx->frame_off = 0UL; + if( ctx->tile_idx==0 && ctx->para ) FD_LOG_INFO(( "parallel decompress disable" )); + ctx->para = 0; switch( sig ) { case FD_SNAPSHOT_MSG_CTRL_INIT_FULL: { @@ -156,6 +181,28 @@ handle_control_frag( fd_snapdc_tile_t * ctx, fd_stem_publish( stem, 0UL, sig, 0UL, 0UL, 0UL, 0UL, 0UL ); } +__attribute__((cold)) static void +handle_skippable_frame( fd_snapdc_tile_t * ctx, + fd_stem_context_t * stem ) { + uchar const * peek = ctx->peek; + uint zstd_magic_idx; + ulong magic; + if( ZSTD_isError( ZSTD_readSkippableFrame( &magic, sizeof(ulong), &zstd_magic_idx, peek, FRAME_PEEK ) ) ) { + return; + } + if( zstd_magic_idx!=0 ) return; + + if( magic==FD_SNAP_PARA_ENABLE && !ctx->para ) { + if( ctx->tile_idx==0 ) FD_LOG_INFO(( "parallel decompress enable" )); + ctx->para = 1; + } + if( magic==FD_SNAP_PARA_DISABLE && ctx->para ) { + if( ctx->tile_idx==0 ) FD_LOG_INFO(( "parallel decompress disable" )); + ctx->para = 0; + fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_BARRIER, 0UL, 0UL, 0UL, 0UL, 0UL ); + } +} + static inline int handle_data_frag( fd_snapdc_tile_t * ctx, fd_stem_context_t * stem, @@ -176,6 +223,7 @@ handle_data_frag( fd_snapdc_tile_t * ctx, uchar * out = fd_chunk_to_laddr( ctx->out.wksp, ctx->out.chunk ); if( FD_UNLIKELY( !ctx->is_zstd ) ) { + if( ctx->tile_idx!=0UL ) return 0; FD_TEST( ctx->in.frag_posin.frag_pos, ctx->out.mtu ); fd_memcpy( out, in, cpy ); @@ -197,6 +245,65 @@ handle_data_frag( fd_snapdc_tile_t * ctx, return 0; } + /* Detect Zstandard skippable frames */ + + ulong peek_consumed = 0UL; + if( FD_UNLIKELY( ctx->peek_offpeek + ctx->peek_off; + ulong peek_free = FRAME_PEEK - ctx->peek_off; + peek_consumed = fd_ulong_min( peek_free, sz-ctx->in.frag_pos ); + fd_memcpy( peek, in, peek_consumed ); + ctx->peek_off += peek_consumed; + + /* Try to find skippable signaling frames */ + if( ctx->peek_off>=16UL ) { + if( ZSTD_isSkippableFrame( peek, FRAME_PEEK ) ) { + handle_skippable_frame( ctx, stem ); + } + } + } + + /* Are we responsible for this frame? */ + + _Bool ignore_frame = 0; + if( ctx->para ) { + ignore_frame = ( ctx->frame_idx%ctx->tile_cnt )!=(ulong)ctx->tile_idx; + } else { + ignore_frame = ctx->tile_idx!=0UL; + } + + /* Skip over frames */ + + if( ignore_frame ) { + ulong in_consumed = 0UL; + ulong frame_res = fd_zstd_dskip_advance( ctx->skip, in, sz-ctx->in.frag_pos, &in_consumed ); + if( FD_UNLIKELY( frame_res==ULONG_MAX ) ) { + FD_LOG_WARNING(( "error while skipping compressed frame" )); + ctx->state = FD_SNAPSHOT_STATE_ERROR; + fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL ); + return 0; + } + FD_TEST( in_consumed<=sz-ctx->in.frag_pos ); + + ctx->in.frag_pos += in_consumed; + FD_TEST( ctx->in.frag_pos<=sz ); + + ctx->frame_off += in_consumed; + ctx->dirty = frame_res!=0UL; + if( !ctx->dirty ) { + ctx->frame_idx++; + ctx->frame_off = 0UL; + ctx->peek_off = 0UL; + } + + if( FD_LIKELY( ctx->in.frag_posin.frag_pos = 0UL; + return 0; + } + + /* Actually decompress frame */ + ulong in_consumed = 0UL, out_produced = 0UL; ulong frame_res = ZSTD_decompressStream_simpleArgs( ctx->zstd, @@ -214,7 +321,8 @@ handle_data_frag( fd_snapdc_tile_t * ctx, } if( FD_LIKELY( out_produced ) ) { - fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_DATA, ctx->out.chunk, out_produced, 0UL, 0UL, 0UL ); + ulong ctl = fd_frag_meta_ctl( 0UL, 0, frame_res==0UL, 0 ); + fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_DATA, ctx->out.chunk, out_produced, ctl, 0UL, 0UL ); ctx->out.chunk = fd_dcache_compact_next( ctx->out.chunk, out_produced, ctx->out.chunk0, ctx->out.wmark ); } @@ -229,7 +337,13 @@ handle_data_frag( fd_snapdc_tile_t * ctx, ctx->metrics.incremental.decompressed_bytes_written += out_produced; } + ctx->frame_off += in_consumed; ctx->dirty = frame_res!=0UL; + if( !ctx->dirty ) { + ctx->frame_idx++; + ctx->frame_off = 0UL; + ctx->peek_off = 0UL; + } int maybe_more_output = out_produced==ctx->out.mtu || ctx->in.frag_posin.frag_pos = 0UL; @@ -289,11 +403,21 @@ unprivileged_init( fd_topo_t * topo, fd_snapdc_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapdc_tile_t), sizeof(fd_snapdc_tile_t) ); void * _zstd = FD_SCRATCH_ALLOC_APPEND( l, 32UL, ZSTD_estimateDStreamSize( ZSTD_WINDOW_SZ ) ); - ctx->state = FD_SNAPSHOT_STATE_IDLE; + ctx->state = FD_SNAPSHOT_STATE_IDLE; + ctx->tile_idx = (uint)tile->kind_id; + ctx->tile_cnt = (uint)fd_topo_tile_name_cnt( topo, tile->name );; + ctx->dirty = 0; + ctx->para = 0; + if( ctx->tile_idx==0 && ctx->tile_cnt>1 ) FD_LOG_INFO(( "parallel decompress disable" )); ctx->zstd = ZSTD_initStaticDStream( _zstd, ZSTD_estimateDStreamSize( ZSTD_WINDOW_SZ ) ); FD_TEST( ctx->zstd ); FD_TEST( ctx->zstd==_zstd ); + fd_zstd_dskip_init( ctx->skip ); + + ctx->frame_idx = 0UL; + ctx->frame_off = 0UL; + ctx->peek_off = 0UL; ctx->dirty = 0; ctx->in.frag_pos = 0UL; @@ -325,8 +449,8 @@ unprivileged_init( fd_topo_t * topo, (ulong)scratch + scratch_footprint( tile ) )); } -/* handle_data_frag can publish one data frag plus an error frag */ -#define STEM_BURST 2UL +/* handle_data_frag can publish one data frag, a barrier frag, and an error frag */ +#define STEM_BURST 3UL #define STEM_LAZY 1000L diff --git a/src/discof/restore/fd_snapin_tile.c b/src/discof/restore/fd_snapin_tile.c index 1d396514dc2..d6e74bbc78c 100644 --- a/src/discof/restore/fd_snapin_tile.c +++ b/src/discof/restore/fd_snapin_tile.c @@ -428,13 +428,20 @@ process_manifest( fd_snapin_tile_t * ctx ) { ctx->manifest_out.chunk = fd_dcache_compact_next( ctx->manifest_out.chunk, sizeof(fd_snapshot_manifest_t), ctx->manifest_out.chunk0, ctx->manifest_out.wmark ); } - static int handle_data_frag( fd_snapin_tile_t * ctx, + ulong in_idx, ulong chunk, ulong sz, + ulong ctl, fd_stem_context_t * stem ) { + if( in_idx!=0UL && !ctx->flags.manifest_processed ) return 1; + if( ctx->dirty ) FD_TEST( in_idx==ctx->dirty_idx ); + ctx->dirty = !fd_frag_meta_ctl_eom( ctl ); + if( ctx->dirty ) ctx->dirty_idx = in_idx; + if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_FINISHING ) ) { + FD_LOG_WARNING(( "invariant violation: received data frag after finishing snapshot" )); transition_malformed( ctx, stem ); return 0; } @@ -447,23 +454,26 @@ handle_data_frag( fd_snapin_tile_t * ctx, FD_LOG_ERR(( "invalid state for data frag %d", ctx->state )); } - FD_TEST( chunk>=ctx->in.chunk0 && chunk<=ctx->in.wmark && sz<=ctx->in.mtu ); + fd_snapin_in_t * in = &ctx->in[ in_idx ]; + FD_TEST( chunk>=in->chunk0 && chunk<=in->wmark && sz<=in->mtu ); if( FD_UNLIKELY( !ctx->lthash_disabled && ctx->buffered_batch.batch_cnt>0UL ) ) { fd_snapin_process_account_batch( ctx, NULL, &ctx->buffered_batch ); + ctx->dirty = 1; ctx->dirty_idx = in_idx; return 1; } for(;;) { - if( FD_UNLIKELY( sz-ctx->in.pos==0UL ) ) break; + if( FD_UNLIKELY( sz-in->pos==0UL ) ) break; - uchar const * data = (uchar const *)fd_chunk_to_laddr_const( ctx->in.wksp, chunk ) + ctx->in.pos; + uchar const * data = (uchar const *)fd_chunk_to_laddr_const( in->wksp, chunk ) + in->pos; int early_exit = 0; fd_ssparse_advance_result_t result[1]; - int res = fd_ssparse_advance( ctx->ssparse, data, sz-ctx->in.pos, result ); + int res = fd_ssparse_advance( ctx->ssparse, data, sz-in->pos, result ); switch( res ) { case FD_SSPARSE_ADVANCE_ERROR: + FD_LOG_WARNING(( "snapshot parsing failed" )); transition_malformed( ctx, stem ); return 0; case FD_SSPARSE_ADVANCE_AGAIN: @@ -475,6 +485,7 @@ handle_data_frag( fd_snapin_tile_t * ctx, result->manifest.acc_vec_map, result->manifest.acc_vec_pool ); if( FD_UNLIKELY( res==FD_SSMANIFEST_PARSER_ADVANCE_ERROR ) ) { + FD_LOG_WARNING(( "invalid snapshot manifest" )); transition_malformed( ctx, stem ); return 0; } else if( FD_LIKELY( res==FD_SSMANIFEST_PARSER_ADVANCE_DONE ) ) { @@ -492,6 +503,7 @@ handle_data_frag( fd_snapin_tile_t * ctx, bytes_remaining, sd_result ); if( FD_UNLIKELY( res<0 ) ) { + FD_LOG_WARNING(( "invalid slot deltas in snapshot" )); transition_malformed( ctx, stem ); return 0; } else if( FD_LIKELY( res==FD_SLOT_DELTA_PARSER_ADVANCE_GROUP ) ) { @@ -551,25 +563,76 @@ handle_data_frag( fd_snapin_tile_t * ctx, ctx->flags.manifest_processed = 1; } - ctx->in.pos += result->bytes_consumed; + in->pos += result->bytes_consumed; if( FD_LIKELY( ctx->full ) ) ctx->metrics.full_bytes_read += result->bytes_consumed; else ctx->metrics.incremental_bytes_read += result->bytes_consumed; if( FD_UNLIKELY( early_exit ) ) break; } - int reprocess_frag = ctx->in.posin.pos = 0UL; + int reprocess_frag = in->pospos = 0UL; + } else { + ctx->dirty = 1; + ctx->dirty_idx = in_idx; + } return reprocess_frag; } -static void +static fd_snapdc_barrier_t * +fd_snapdc_barrier_init( fd_snapdc_barrier_t * barrier, + uint tile_cnt ) { + *barrier = (fd_snapdc_barrier_t) { + .rem_set = 0UL, + .ctrl_type = 0U, + .cnt = tile_cnt, + .active = 0 + }; + return barrier; +} + +static int +fd_snapdc_barrier_recv( fd_snapdc_barrier_t * barrier, + uint in_idx, + ulong ctrl_type ) { + if( FD_UNLIKELY( barrier->primed ) ) { + /* All tiles reached barrier, now let them pass */ + barrier->rem_set = fd_ulong_clear_bit( barrier->rem_set, (int)in_idx ); + barrier->primed = !!barrier->rem_set; + return 1; + } + /* Still waiting for tiles to reach barrier */ + if( !barrier->active ) { + barrier->active = 1; + barrier->ctrl_type = (uint)ctrl_type; + barrier->rem_set = (1UL<cnt)-1UL; + } else { + if( FD_UNLIKELY( barrier->ctrl_type!=ctrl_type ) ) { + FD_LOG_ERR(( "received conflicting control messages from snapdc tiles: %u vs %u", + barrier->ctrl_type, (uint)ctrl_type )); + } + } + barrier->rem_set = fd_ulong_clear_bit( barrier->rem_set, (int)in_idx ); + if( !barrier->rem_set ) { + barrier->primed = 1; + barrier->active = 0; + barrier->rem_set = (1UL<cnt)-1UL; + } + return 0; +} + +static int handle_control_frag( fd_snapin_tile_t * ctx, - fd_stem_context_t * stem, - ulong sig ) { + uint in_idx, + ulong sig, + fd_stem_context_t * stem ) { + ctx->dirty = 0; switch( sig ) { case FD_SNAPSHOT_MSG_CTRL_INIT_FULL: case FD_SNAPSHOT_MSG_CTRL_INIT_INCR: + if( !fd_snapdc_barrier_recv( &ctx->dc_barrier, in_idx, sig ) ) return 1; + if( in_idx!=0UL ) return 0; fd_ssparse_batch_enable( ctx->ssparse, ctx->use_vinyl || sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL ); FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE ); ctx->state = FD_SNAPSHOT_STATE_PROCESSING; @@ -594,6 +657,7 @@ handle_control_frag( fd_snapin_tile_t * ctx, FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING || ctx->state==FD_SNAPSHOT_STATE_FINISHING || ctx->state==FD_SNAPSHOT_STATE_ERROR ); + FD_LOG_DEBUG(( "state transition FAIL->IDLE" )); ctx->state = FD_SNAPSHOT_STATE_IDLE; if( ctx->use_vinyl ) { @@ -615,9 +679,12 @@ handle_control_frag( fd_snapin_tile_t * ctx, FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING || ctx->state==FD_SNAPSHOT_STATE_FINISHING || ctx->state==FD_SNAPSHOT_STATE_ERROR ); + if( !fd_snapdc_barrier_recv( &ctx->dc_barrier, in_idx, sig ) ) return 1; + if( in_idx!=0UL ) return 0; if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_FINISHING ) ) { - transition_malformed( ctx, stem ); - return; + FD_LOG_WARNING(( "corrupt snapshot: unexpected end of TAR file" )); + transition_malformed( ctx, stem ); + return 0; } ctx->state = FD_SNAPSHOT_STATE_IDLE; @@ -638,9 +705,12 @@ handle_control_frag( fd_snapin_tile_t * ctx, FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING || ctx->state==FD_SNAPSHOT_STATE_FINISHING || ctx->state==FD_SNAPSHOT_STATE_ERROR ); + if( !fd_snapdc_barrier_recv( &ctx->dc_barrier, in_idx, sig ) ) return 1; + if( in_idx!=0UL ) return 0; if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_FINISHING ) ) { + FD_LOG_WARNING(( "corrupt snapshot: unexpected end of TAR file" )); transition_malformed( ctx, stem ); - return; + return 0; } ctx->state = FD_SNAPSHOT_STATE_IDLE; @@ -674,6 +744,8 @@ handle_control_frag( fd_snapin_tile_t * ctx, } case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN: + if( !fd_snapdc_barrier_recv( &ctx->dc_barrier, in_idx, sig ) ) return 1; + if( in_idx!=0UL ) return 0; FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE ); ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN; if( ctx->use_vinyl ) fd_snapin_vinyl_shutdown( ctx ); @@ -689,34 +761,48 @@ handle_control_frag( fd_snapin_tile_t * ctx, } break; + case FD_SNAPSHOT_MSG_CTRL_BARRIER: + if( !fd_snapdc_barrier_recv( &ctx->dc_barrier, in_idx, sig ) ) return 1; + if( in_idx!=0UL ) return 0; + break; + default: FD_LOG_ERR(( "unexpected control sig %lu", sig )); - return; } /* Forward the control message down the pipeline */ fd_stem_publish( stem, ctx->out_ct_idx, sig, 0UL, 0UL, 0UL, 0UL, 0UL ); + return 0; +} + +static void +before_credit( fd_snapin_tile_t * ctx, + fd_stem_context_t * stem, + int * charge_busy ) { + (void)charge_busy; + if( !ctx->dirty ) { + ctx->in_idx++; + if( ctx->in_idx==ctx->in_cnt ) ctx->in_idx = 0; + } + stem->next_in_idx = ctx->in_idx; } static inline int returnable_frag( fd_snapin_tile_t * ctx, - ulong in_idx FD_PARAM_UNUSED, + ulong in_idx, ulong seq FD_PARAM_UNUSED, ulong sig, ulong chunk, ulong sz, - ulong ctl FD_PARAM_UNUSED, + ulong ctl, ulong tsorig FD_PARAM_UNUSED, ulong tspub FD_PARAM_UNUSED, fd_stem_context_t * stem ) { FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN ); ctx->stem = stem; - if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) return handle_data_frag( ctx, chunk, sz, stem ); - else handle_control_frag( ctx, stem, sig ); - ctx->stem = NULL; - - return 0; + if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) return handle_data_frag( ctx, in_idx, chunk, sz, ctl, stem ); + else return handle_control_frag( ctx, (uint)in_idx, sig, stem ); } static ulong @@ -807,6 +893,7 @@ unprivileged_init( fd_topo_t * topo, ctx->full = 1; ctx->state = FD_SNAPSHOT_STATE_IDLE; ctx->lthash_disabled = tile->snapin.lthash_disabled; + ctx->dirty = 0; ctx->boot_timestamp = fd_log_wallclock(); @@ -835,7 +922,8 @@ unprivileged_init( fd_topo_t * topo, fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) ); if( FD_UNLIKELY( tile->kind_id ) ) FD_LOG_ERR(( "There can only be one `" NAME "` tile" )); - if( FD_UNLIKELY( tile->in_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1", tile->in_cnt )); + if( FD_UNLIKELY( !tile->in_cnt ) ) FD_LOG_ERR(( "tile `" NAME "` has no ins" )); + if( FD_UNLIKELY( tile->in_cnt > DC_TILE_MAX ) ) FD_LOG_ERR(( "tile `" NAME "` has too many ins" )); ctx->manifest_out = out1( topo, tile, "snapin_manif" ); ctx->gui_out = out1( topo, tile, "snapin_gui" ); @@ -855,15 +943,20 @@ unprivileged_init( fd_topo_t * topo, fd_ssparse_reset( ctx->ssparse ); fd_ssmanifest_parser_init( ctx->manifest_parser, fd_chunk_to_laddr( ctx->manifest_out.mem, ctx->manifest_out.chunk ) ); fd_slot_delta_parser_init( ctx->slot_delta_parser ); - - fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ 0UL ] ]; - FD_TEST( 0==strcmp( in_link->name, "snapdc_in" ) ); - fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ]; - ctx->in.wksp = in_wksp->wksp; - ctx->in.chunk0 = fd_dcache_compact_chunk0( ctx->in.wksp, in_link->dcache ); - ctx->in.wmark = fd_dcache_compact_wmark( ctx->in.wksp, in_link->dcache, in_link->mtu ); - ctx->in.mtu = in_link->mtu; - ctx->in.pos = 0UL; + fd_snapdc_barrier_init( &ctx->dc_barrier, (uint)tile->in_cnt ); + + ctx->in_cnt = tile->in_cnt; + ctx->in_idx = 0UL; + for( ulong i=0UL; iin_cnt; i++ ) { + fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ i ] ]; + FD_TEST( 0==strcmp( in_link->name, "snapdc_in" ) ); + fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ]; + ctx->in[ i ].wksp = in_wksp->wksp; + ctx->in[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->in[ i ].wksp, in_link->dcache ); + ctx->in[ i ].wmark = fd_dcache_compact_wmark( ctx->in[ i ].wksp, in_link->dcache, in_link->mtu ); + ctx->in[ i ].mtu = in_link->mtu; + ctx->in[ i ].pos = 0UL; + } ctx->buffered_batch.batch_cnt = 0UL; ctx->buffered_batch.remaining_idx = 0UL; @@ -887,8 +980,11 @@ unprivileged_init( fd_topo_t * topo, #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown #define STEM_CALLBACK_METRICS_WRITE metrics_write +#define STEM_CALLBACK_BEFORE_CREDIT before_credit #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag +#define STEM_CUSTOM_IN_SELECT 1 + #include "../../disco/stem/fd_stem.c" fd_topo_run_tile_t fd_tile_snapin = { diff --git a/src/discof/restore/fd_snapin_tile_private.h b/src/discof/restore/fd_snapin_tile_private.h index e3a535bc560..718be231b52 100644 --- a/src/discof/restore/fd_snapin_tile_private.h +++ b/src/discof/restore/fd_snapin_tile_private.h @@ -17,6 +17,8 @@ #include "../../vinyl/io/fd_vinyl_io.h" #include "../../vinyl/meta/fd_vinyl_meta.h" +#define DC_TILE_MAX (64UL) + struct blockhash_group { uchar blockhash[ 32UL ]; ulong txnhash_offset; @@ -44,11 +46,41 @@ struct buffered_account_batch { typedef struct buffered_account_batch buffered_account_batch_t; +struct fd_snapin_in { + fd_wksp_t * wksp; + ulong chunk0; + ulong wmark; + ulong mtu; + ulong pos; +}; + +typedef struct fd_snapin_in fd_snapin_in_t; + +/* fd_snapdc_barrier synchronizes all snapdc tiles to the same control + frag event. E.g. when a 'snapshot read finished' message is received + from an in link, this barrier polls all other links until the same + message is received. */ + +struct fd_snapdc_barrier { + ulong rem_set; + uint ctrl_type; + uint cnt; + uint active : 1; + uint primed : 1; +}; + +typedef struct fd_snapdc_barrier fd_snapdc_barrier_t; + struct fd_snapin_tile { int state; uint full : 1; /* loading a full snapshot? */ uint use_vinyl : 1; /* using vinyl-backed accdb? */ uint lthash_disabled : 1; /* disable lthash checking? */ + uint dirty : 1; /* in the middle of a dc in burst? */ + + ulong dirty_idx; + ulong in_idx; + ulong in_cnt; ulong seed; long boot_timestamp; @@ -67,6 +99,8 @@ struct fd_snapin_tile { fd_ssmanifest_parser_t * manifest_parser; fd_slot_delta_parser_t * slot_delta_parser; + fd_snapdc_barrier_t dc_barrier; + buffered_account_batch_t buffered_batch; struct { @@ -91,13 +125,7 @@ struct fd_snapin_tile { ulong accounts_inserted; } metrics; - struct { - fd_wksp_t * wksp; - ulong chunk0; - ulong wmark; - ulong mtu; - ulong pos; - } in; + fd_snapin_in_t in[ DC_TILE_MAX ]; ulong out_ct_idx; fd_snapin_out_link_t manifest_out; diff --git a/src/discof/restore/utils/fd_ssctrl.h b/src/discof/restore/utils/fd_ssctrl.h index b95cc3d2d68..811427ada79 100644 --- a/src/discof/restore/utils/fd_ssctrl.h +++ b/src/discof/restore/utils/fd_ssctrl.h @@ -70,6 +70,7 @@ #define FD_SNAPSHOT_MSG_CTRL_DONE (6UL) /* Current snapshot succeeded, commit work, go idle, and expect shutdown */ #define FD_SNAPSHOT_MSG_CTRL_SHUTDOWN (7UL) /* No work left to do, perform final cleanup and shut down */ #define FD_SNAPSHOT_MSG_CTRL_ERROR (8UL) /* Some tile encountered an error with the current stream */ +#define FD_SNAPSHOT_MSG_CTRL_BARRIER (9UL) /* Synchronize all tiles up to this point in the pipeline */ /* snapla -> snapls */ #define FD_SNAPSHOT_HASH_MSG_RESULT_ADD (9UL) /* Hash result sent from snapla to snapls */ diff --git a/src/discof/restore/utils/fd_zstd_dskip.c b/src/discof/restore/utils/fd_zstd_dskip.c new file mode 100644 index 00000000000..e539abf8bc9 --- /dev/null +++ b/src/discof/restore/utils/fd_zstd_dskip.c @@ -0,0 +1,306 @@ +#include "fd_zstd_dskip.h" +#include "../../../util/fd_util.h" +#include + +/* Zstandard format constants */ +#define FD_ZSTD_MAGICNUMBER 0xFD2FB528U +#define FD_ZSTD_MAGIC_SKIPPABLE_START 0x184D2A50U +#define FD_ZSTD_MAGIC_SKIPPABLE_MASK 0xFFFFFFF0U +#define FD_ZSTD_FRAMEIDSIZE 4 +#define FD_ZSTD_SKIPPABLEHEADERSIZE 8 +#define FD_ZSTD_BLOCKHEADERSIZE 3 +#define FD_ZSTD_FRAMEHEADERSIZE_PREFIX 5 /* min bytes to determine frame header size */ +#define FD_ZSTD_FRAMECHECKSUMSIZE 4 + +/* Parser states */ +#define FD_ZSTD_DSKIP_STATE_MAGIC 0 /* Reading magic number */ +#define FD_ZSTD_DSKIP_STATE_SKIP_SIZE 1 /* Reading skippable frame size */ +#define FD_ZSTD_DSKIP_STATE_SKIP_DATA 2 /* Skipping skippable frame data */ +#define FD_ZSTD_DSKIP_STATE_FRAME_HDR 3 /* Reading frame header */ +#define FD_ZSTD_DSKIP_STATE_BLOCK_HDR 4 /* Reading block header */ +#define FD_ZSTD_DSKIP_STATE_BLOCK_DATA 5 /* Skipping block data */ +#define FD_ZSTD_DSKIP_STATE_CHECKSUM 6 /* Skipping frame checksum */ + +fd_zstd_dskip_t * +fd_zstd_dskip_init( fd_zstd_dskip_t * dskip ) { + memset( dskip, 0, sizeof( fd_zstd_dskip_t ) ); + dskip->state = FD_ZSTD_DSKIP_STATE_MAGIC; + return dskip; +} + +/* Helper to read little-endian integers */ +static inline uint +fd_zstd_read_le32( uchar const * buf ) { + return (uint)buf[0] | ((uint)buf[1]<<8) | ((uint)buf[2]<<16) | ((uint)buf[3]<<24); +} + +static inline uint +fd_zstd_read_le24( uchar const * buf ) { + return (uint)buf[0] | ((uint)buf[1]<<8) | ((uint)buf[2]<<16); +} + +/* Helper to calculate frame header size from frame header descriptor byte + Based on ZSTD_frameHeaderSize_internal */ +static inline ulong +fd_zstd_frame_header_size( uchar fhd ) { + static uchar const did_field_size[4] = { 0, 1, 2, 4 }; + static uchar const fcs_field_size[4] = { 0, 2, 4, 8 }; + + uint dict_id = fhd & 3; + uint single_seg = (fhd >> 5) & 1; + uint fcs_id = fhd >> 6; + + return (ulong)FD_ZSTD_FRAMEHEADERSIZE_PREFIX + (ulong)(!single_seg) + + (ulong)did_field_size[dict_id] + (ulong)fcs_field_size[fcs_id] + + (ulong)(single_seg && !fcs_id); +} + +ulong +fd_zstd_dskip_advance( fd_zstd_dskip_t * dskip, + void const * src, + ulong src_sz, + ulong * src_consumed ) { + + uchar const * src_ptr = (uchar const *)src; + ulong consumed = 0UL; + + while( consumed < src_sz ) { + ulong avail = src_sz - consumed; + + switch( dskip->state ) { + + case FD_ZSTD_DSKIP_STATE_MAGIC: { + /* Need to buffer the 4-byte magic number */ + ulong need = FD_ZSTD_FRAMEIDSIZE - dskip->buf_sz; + ulong copy = (avail < need) ? avail : need; + memcpy( dskip->buf + dskip->buf_sz, src_ptr + consumed, copy ); + dskip->buf_sz += copy; + consumed += copy; + + if( dskip->buf_sz < FD_ZSTD_FRAMEIDSIZE ) { + /* Need more data */ + *src_consumed = consumed; + return 1UL; + } + + /* Parse magic number */ + uint magic = fd_zstd_read_le32( dskip->buf ); + + if( (magic & FD_ZSTD_MAGIC_SKIPPABLE_MASK) == FD_ZSTD_MAGIC_SKIPPABLE_START ) { + /* Skippable frame */ + dskip->state = FD_ZSTD_DSKIP_STATE_SKIP_SIZE; + dskip->buf_sz = 0; + } else if( magic == FD_ZSTD_MAGICNUMBER ) { + /* Regular Zstandard frame - keep magic in buffer for FRAME_HDR */ + dskip->state = FD_ZSTD_DSKIP_STATE_FRAME_HDR; + /* buf_sz = 4, keep the magic bytes in buffer */ + } else { + /* Invalid magic number */ + *src_consumed = consumed; + return ULONG_MAX; + } + break; + } + + case FD_ZSTD_DSKIP_STATE_SKIP_SIZE: { + /* Read 4-byte size field for skippable frame */ + ulong need = FD_ZSTD_FRAMEIDSIZE - dskip->buf_sz; + ulong copy = (avail < need) ? avail : need; + memcpy( dskip->buf + dskip->buf_sz, src_ptr + consumed, copy ); + dskip->buf_sz += copy; + consumed += copy; + + if( dskip->buf_sz < FD_ZSTD_FRAMEIDSIZE ) { + *src_consumed = consumed; + return 1UL; + } + + uint size = fd_zstd_read_le32( dskip->buf ); + dskip->skip_rem = (ulong)size; + dskip->buf_sz = 0; + + if( dskip->skip_rem == 0 ) { + /* Empty skippable frame - done with this frame */ + dskip->state = FD_ZSTD_DSKIP_STATE_MAGIC; + *src_consumed = consumed; + return 0UL; + } + + dskip->state = FD_ZSTD_DSKIP_STATE_SKIP_DATA; + break; + } + + case FD_ZSTD_DSKIP_STATE_SKIP_DATA: { + /* Skip over skippable frame data */ + ulong skip = (avail < dskip->skip_rem) ? avail : dskip->skip_rem; + consumed += skip; + dskip->skip_rem -= skip; + + if( dskip->skip_rem == 0 ) { + /* Done with skippable frame */ + dskip->state = FD_ZSTD_DSKIP_STATE_MAGIC; + *src_consumed = consumed; + return 0UL; + } + + *src_consumed = consumed; + return 1UL; + } + + case FD_ZSTD_DSKIP_STATE_FRAME_HDR: { + /* Need to read enough to determine frame header size */ + if( dskip->buf_sz < FD_ZSTD_FRAMEHEADERSIZE_PREFIX ) { + ulong need = FD_ZSTD_FRAMEHEADERSIZE_PREFIX - dskip->buf_sz; + ulong copy = (avail < need) ? avail : need; + memcpy( dskip->buf + dskip->buf_sz, src_ptr + consumed, copy ); + dskip->buf_sz += copy; + consumed += copy; + + if( dskip->buf_sz < FD_ZSTD_FRAMEHEADERSIZE_PREFIX ) { + *src_consumed = consumed; + return 1UL; + } + } + + /* Calculate full frame header size */ + uchar fhd = dskip->buf[ FD_ZSTD_FRAMEHEADERSIZE_PREFIX - 1 ]; + ulong hdr_size = fd_zstd_frame_header_size( fhd ); + + /* Read the rest of the header */ + ulong need = hdr_size - dskip->buf_sz; + avail = src_sz - consumed; /* Recalculate avail after consuming bytes above */ + ulong copy = (avail < need) ? avail : need; + memcpy( dskip->buf + dskip->buf_sz, src_ptr + consumed, copy ); + dskip->buf_sz += copy; + consumed += copy; + + if( dskip->buf_sz < hdr_size ) { + *src_consumed = consumed; + return 1UL; + } + + /* Parse frame header descriptor for checksum flag */ + dskip->has_checksum = (fhd >> 2) & 1; + dskip->buf_sz = 0; + dskip->state = FD_ZSTD_DSKIP_STATE_BLOCK_HDR; + break; + } + + case FD_ZSTD_DSKIP_STATE_BLOCK_HDR: { + /* Read 3-byte block header */ + ulong need = FD_ZSTD_BLOCKHEADERSIZE - dskip->buf_sz; + ulong copy = (avail < need) ? avail : need; + memcpy( dskip->buf + dskip->buf_sz, src_ptr + consumed, copy ); + dskip->buf_sz += copy; + consumed += copy; + + if( dskip->buf_sz < FD_ZSTD_BLOCKHEADERSIZE ) { + *src_consumed = consumed; + return 1UL; + } + + /* Parse block header */ + uint block_hdr = fd_zstd_read_le24( dskip->buf ); + uint last_block = block_hdr & 1; + uint block_type = (block_hdr >> 1) & 3; + uint block_size = block_hdr >> 3; + + /* Block types: 0=raw, 1=rle, 2=compressed, 3=reserved */ + + /* Check for reserved block type */ + if( block_type == 3 ) { + *src_consumed = consumed; + return ULONG_MAX; + } + + /* RLE blocks store only 1 byte (the byte to repeat) */ + if( block_type == 1 ) { + block_size = 1; + } + + dskip->skip_rem = (ulong)block_size; + dskip->last_block = last_block; + dskip->buf_sz = 0; + + if( dskip->skip_rem == 0 ) { + /* Empty block */ + if( last_block ) { + /* Last block and empty, check for checksum */ + if( dskip->has_checksum ) { + dskip->skip_rem = FD_ZSTD_FRAMECHECKSUMSIZE; + dskip->state = FD_ZSTD_DSKIP_STATE_CHECKSUM; + } else { + /* Frame complete */ + dskip->state = FD_ZSTD_DSKIP_STATE_MAGIC; + *src_consumed = consumed; + return 0UL; + } + } else { + /* Not last block and empty, go to next block */ + dskip->state = FD_ZSTD_DSKIP_STATE_BLOCK_HDR; + } + } else { + /* Block has data to skip */ + dskip->state = FD_ZSTD_DSKIP_STATE_BLOCK_DATA; + } + break; + } + + case FD_ZSTD_DSKIP_STATE_BLOCK_DATA: { + /* Skip over block data */ + ulong skip = (avail < dskip->skip_rem) ? avail : dskip->skip_rem; + consumed += skip; + dskip->skip_rem -= skip; + + if( dskip->skip_rem > 0 ) { + *src_consumed = consumed; + return 1UL; + } + + /* Block data consumed, check if this was the last block */ + if( dskip->last_block ) { + /* Last block completed, check for checksum */ + if( dskip->has_checksum ) { + dskip->skip_rem = FD_ZSTD_FRAMECHECKSUMSIZE; + dskip->state = FD_ZSTD_DSKIP_STATE_CHECKSUM; + } else { + /* Frame complete */ + dskip->state = FD_ZSTD_DSKIP_STATE_MAGIC; + *src_consumed = consumed; + return 0UL; + } + } else { + /* More blocks to read */ + dskip->buf_sz = 0; + dskip->state = FD_ZSTD_DSKIP_STATE_BLOCK_HDR; + } + break; + } + + case FD_ZSTD_DSKIP_STATE_CHECKSUM: { + /* Skip 4-byte checksum */ + ulong skip = (avail < dskip->skip_rem) ? avail : dskip->skip_rem; + consumed += skip; + dskip->skip_rem -= skip; + + if( dskip->skip_rem > 0 ) { + *src_consumed = consumed; + return 1UL; + } + + /* Frame complete */ + dskip->state = FD_ZSTD_DSKIP_STATE_MAGIC; + *src_consumed = consumed; + return 0UL; + } + + default: + *src_consumed = consumed; + return ULONG_MAX; + } + } + + /* Consumed all input but haven't finished the frame yet */ + *src_consumed = consumed; + return 1UL; +} diff --git a/src/discof/restore/utils/fd_zstd_dskip.h b/src/discof/restore/utils/fd_zstd_dskip.h new file mode 100644 index 00000000000..7f4378c16fa --- /dev/null +++ b/src/discof/restore/utils/fd_zstd_dskip.h @@ -0,0 +1,40 @@ +#ifndef HEADER_fd_discof_restore_utils_fd_zstd_dskip_h +#define HEADER_fd_discof_restore_utils_fd_zstd_dskip_h + +/* fd_zstd_dskip.h provides an API to skip through Zstandard compressed + frames. This is useful for when multiple threads take turns + decompressing frames from the same compressed byte stream. */ + +#include "../../../util/fd_util_base.h" + +struct fd_zstd_dskip { + uchar buf[ 32 ]; /* Buffer for partial headers */ + ulong buf_sz; /* Number of bytes in buffer */ + ulong skip_rem; /* Bytes remaining to skip in current element */ + uint state; /* Current parser state */ + uint has_checksum; /* Whether current frame has checksum */ + uint last_block; /* Whether current block is the last in frame */ +}; + +typedef struct fd_zstd_dskip fd_zstd_dskip_t; + +FD_PROTOTYPES_BEGIN + +fd_zstd_dskip_t * +fd_zstd_dskip_init( fd_zstd_dskip_t * dskip ); + +/* fd_zstd_dskip_advance skips through Zstandard compressed data. + *src_consumed is set to the number of bytes consumed. Returns + ULONG_MAX on decompress error. Returns 1UL if everything was + skipped, but the current frame has not yet ended. Returns 0UL on end + of Zstandard frame. */ + +ulong +fd_zstd_dskip_advance( fd_zstd_dskip_t * dskip, + void const * src, + ulong src_sz, + ulong * src_consumed ); + +FD_PROTOTYPES_END + +#endif /* HEADER_fd_discof_restore_utils_fd_zstd_dskip_h */ diff --git a/src/discof/restore/utils/test_zstd_dskip.c b/src/discof/restore/utils/test_zstd_dskip.c new file mode 100644 index 00000000000..b0a4502a577 --- /dev/null +++ b/src/discof/restore/utils/test_zstd_dskip.c @@ -0,0 +1,64 @@ +#include "fd_zstd_dskip.h" +#include "../../../util/fd_util.h" +#include +#include +#include + +int +main( int argc, + char ** argv ) { + fd_boot( &argc, &argv ); + if( argc!=2 ) { + fprintf( stderr, "Usage: %s file.zst\n", argv[0] ); + return EXIT_FAILURE; + } + + FILE * file = fopen( argv[1], "rb" ); + if( FD_UNLIKELY( !file ) ) FD_LOG_ERR(( "fopen(%s,\"rb\") failed (%i-%s)", argv[1], errno, fd_io_strerror( errno ) )); + + fd_zstd_dskip_t dskip[1]; + fd_zstd_dskip_init( dskip ); + + ulong frame_idx = 0UL; + ulong frame_start = 0UL; + ulong total_offset = 0UL; + + for(;;) { + uchar buf[ 2 ]; + size_t nread = fread( buf, 1, sizeof(buf), file ); + if( nread==0 ) { + if( feof( file ) ) break; + FD_LOG_ERR(( "fread(%s) failed (%i-%s)", argv[1], errno, fd_io_strerror( errno ) )); + } + ulong offset = 0UL; + while( offsetstate, dskip->buf_sz, dskip->skip_rem )); + } + if( FD_UNLIKELY( src_consumed>(nread-offset) ) ) { + FD_LOG_ERR(( "src_consumed=%lu > avail=%lu (state=%u, buf_sz=%lu)", + src_consumed, nread-offset, dskip->state, dskip->buf_sz )); + } + FD_TEST( src_consumed<=(nread-offset) ); + offset += src_consumed; + total_offset += src_consumed; + if( res==0UL ) { + FD_LOG_NOTICE(( "Frame %lu at [%lu,%lu) bytes", frame_idx, frame_start, total_offset )); + frame_idx++; + frame_start = total_offset; + } + } + FD_TEST( offset==nread ); + } + + if( FD_UNLIKELY( 0!=fclose( file ) ) ) { + FD_LOG_ERR(( "fclose(%s) failed (%i-%s)", argv[1], errno, fd_io_strerror( errno ) )); + } + + FD_LOG_NOTICE(( "pass" )); + fd_halt(); + return EXIT_SUCCESS; +} From 341648d9f3481e47c51740a95d06cc454ef2a30d Mon Sep 17 00:00:00 2001 From: Richard Patel Date: Tue, 25 Nov 2025 21:53:37 +0000 Subject: [PATCH 2/3] restore: add fuzz_zstd_dskip fuzzer --- src/discof/restore/Local.mk | 1 + src/discof/restore/utils/fuzz_zstd_dskip.c | 69 ++++++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 src/discof/restore/utils/fuzz_zstd_dskip.c diff --git a/src/discof/restore/Local.mk b/src/discof/restore/Local.mk index 531bf7b1935..569e06ea063 100644 --- a/src/discof/restore/Local.mk +++ b/src/discof/restore/Local.mk @@ -32,6 +32,7 @@ ifdef FD_HAS_ZSTD $(call add-objs,utils/fd_zstd_dskip,fd_discof) ifdef FD_HAS_HOSTED $(call make-unit-test,test_zstd_dskip,utils/test_zstd_dskip,fd_discof fd_flamenco fd_ballet fd_util) +$(call make-fuzz-test,fuzz_zstd_dskip,utils/fuzz_zstd_dskip,fd_discof fd_flamenco fd_ballet fd_util) endif endif diff --git a/src/discof/restore/utils/fuzz_zstd_dskip.c b/src/discof/restore/utils/fuzz_zstd_dskip.c new file mode 100644 index 00000000000..5ebbfa0095b --- /dev/null +++ b/src/discof/restore/utils/fuzz_zstd_dskip.c @@ -0,0 +1,69 @@ +#include "fd_zstd_dskip.h" + +#include "../../../util/fd_util.h" +#include "../../../util/sanitize/fd_fuzz.h" + +#if !FD_HAS_HOSTED +#error "This target requires FD_HAS_HOSTED" +#endif + +#include +#include + +int +LLVMFuzzerInitialize( int * argc, + char *** argv ) { + putenv( "FD_LOG_BACKTRACE=0" ); + fd_boot( argc, argv ); + atexit( fd_halt ); + //fd_log_level_stderr_set(4); + //fd_log_level_logfile_set(4); + //fd_log_level_core_set(4); + return 0; +} + +int +LLVMFuzzerTestOneInput( uchar const * const data, + ulong const size ) { + if( FD_UNLIKELY( size<8UL ) ) return -1; + uint rng_seed = (uint)fd_ulong_hash( FD_LOAD( ulong, data+size-8 ) ); + fd_rng_t _rng[1]; fd_rng_t * rng = fd_rng_join( fd_rng_new( _rng, (uint)rng_seed, 0UL ) ); + + fd_zstd_dskip_t dskip0, dskip1; + fd_zstd_dskip_init( &dskip0 ); + fd_zstd_dskip_init( &dskip1 ); + + ulong off = 0UL; + while( off Date: Tue, 25 Nov 2025 21:54:20 +0000 Subject: [PATCH 3/3] restore: add zstd_dskip differential fuzzer Ensures that our streaming Zstandard frame boundary detector computes the same result as libzstd. --- src/discof/restore/Local.mk | 1 + .../restore/utils/fuzz_zstd_dskip_diff.c | 108 ++++++++++++++++++ 2 files changed, 109 insertions(+) create mode 100644 src/discof/restore/utils/fuzz_zstd_dskip_diff.c diff --git a/src/discof/restore/Local.mk b/src/discof/restore/Local.mk index 569e06ea063..35a455f7aa0 100644 --- a/src/discof/restore/Local.mk +++ b/src/discof/restore/Local.mk @@ -33,6 +33,7 @@ $(call add-objs,utils/fd_zstd_dskip,fd_discof) ifdef FD_HAS_HOSTED $(call make-unit-test,test_zstd_dskip,utils/test_zstd_dskip,fd_discof fd_flamenco fd_ballet fd_util) $(call make-fuzz-test,fuzz_zstd_dskip,utils/fuzz_zstd_dskip,fd_discof fd_flamenco fd_ballet fd_util) +$(call make-fuzz-test,fuzz_zstd_dskip_diff,utils/fuzz_zstd_dskip_diff,fd_discof fd_flamenco fd_ballet fd_util) endif endif diff --git a/src/discof/restore/utils/fuzz_zstd_dskip_diff.c b/src/discof/restore/utils/fuzz_zstd_dskip_diff.c new file mode 100644 index 00000000000..d0ab3992391 --- /dev/null +++ b/src/discof/restore/utils/fuzz_zstd_dskip_diff.c @@ -0,0 +1,108 @@ +#include "fd_zstd_dskip.h" + +#include "../../../util/fd_util.h" +#include "../../../util/sanitize/fd_fuzz.h" + +#if !FD_HAS_HOSTED +#error "This target requires FD_HAS_HOSTED" +#endif + +#include +#include +#include +#include +#include + +/* fuzz_zstd_dskip_diff compares the behavior of fd_zstd_dskip against + libzstd's ZSTD_findFrameCompressedSize. The fuzzer chunks the input + data randomly to test streaming behavior. */ + +int +LLVMFuzzerInitialize( int * argc, + char *** argv ) { + putenv( "FD_LOG_BACKTRACE=0" ); + fd_boot( argc, argv ); + atexit( fd_halt ); + + /* Disable parsing error logging but allow printf */ + fd_log_level_stderr_set(4); + fd_log_level_logfile_set(4); + fd_log_level_core_set(4); + return 0; +} + +int +LLVMFuzzerTestOneInput( uchar const * const data, + ulong const size ) { + if( FD_UNLIKELY( size<8UL ) ) return -1; + uint rng_seed = (uint)fd_ulong_hash( FD_LOAD( ulong, data+size-8 ) ); + fd_rng_t _rng[1]; fd_rng_t * rng = fd_rng_join( fd_rng_new( _rng, (uint)rng_seed, 0UL ) ); + + fd_zstd_dskip_t dskip0, dskip1; + fd_zstd_dskip_init( &dskip0 ); + fd_zstd_dskip_init( &dskip1 ); + + ulong off = 0UL; + while( off