1818 this tile simply copies the stream to the next tile in the pipeline. */
1919
2020struct fd_snapdc_tile {
21- int full ;
22- int is_zstd ;
21+ uint full : 1 ;
22+ uint is_zstd : 1 ;
23+ uint dirty : 1 ; /* in the middle of a frame? */
2324 int state ;
2425
2526 ZSTD_DCtx * zstd ;
@@ -95,6 +96,7 @@ handle_control_frag( fd_snapdc_tile_t * ctx,
9596 /* All control messages cause us to want to reset the decompression stream */
9697 ulong error = ZSTD_DCtx_reset ( ctx -> zstd , ZSTD_reset_session_only );
9798 if ( FD_UNLIKELY ( ZSTD_isError ( error ) ) ) FD_LOG_ERR (( "ZSTD_DCtx_reset failed (%lu-%s)" , error , ZSTD_getErrorName ( error ) ));
99+ ctx -> dirty = 0 ;
98100
99101 switch ( sig ) {
100102 case FD_SNAPSHOT_MSG_CTRL_INIT_FULL : {
@@ -103,7 +105,7 @@ handle_control_frag( fd_snapdc_tile_t * ctx,
103105 fd_ssctrl_init_t const * msg = fd_chunk_to_laddr_const ( ctx -> in .wksp , chunk );
104106 ctx -> state = FD_SNAPSHOT_STATE_PROCESSING ;
105107 ctx -> full = 1 ;
106- ctx -> is_zstd = msg -> zstd ;
108+ ctx -> is_zstd = !! msg -> zstd ;
107109 ctx -> in .frag_pos = 0UL ;
108110 ctx -> metrics .full .compressed_bytes_read = 0UL ;
109111 ctx -> metrics .full .decompressed_bytes_written = 0UL ;
@@ -115,24 +117,23 @@ handle_control_frag( fd_snapdc_tile_t * ctx,
115117 fd_ssctrl_init_t const * msg = fd_chunk_to_laddr_const ( ctx -> in .wksp , chunk );
116118 ctx -> state = FD_SNAPSHOT_STATE_PROCESSING ;
117119 ctx -> full = 0 ;
118- ctx -> is_zstd = msg -> zstd ;
120+ ctx -> is_zstd = !! msg -> zstd ;
119121 ctx -> in .frag_pos = 0UL ;
120122 ctx -> metrics .incremental .compressed_bytes_read = 0UL ;
121123 ctx -> metrics .incremental .decompressed_bytes_written = 0UL ;
122124 break ;
123125 }
124126 case FD_SNAPSHOT_MSG_CTRL_FAIL :
125127 FD_TEST ( ctx -> state == FD_SNAPSHOT_STATE_PROCESSING ||
126- ctx -> state == FD_SNAPSHOT_STATE_FINISHING ||
127128 ctx -> state == FD_SNAPSHOT_STATE_ERROR );
128129 ctx -> state = FD_SNAPSHOT_STATE_IDLE ;
129130 break ;
130131 case FD_SNAPSHOT_MSG_CTRL_NEXT :
131132 case FD_SNAPSHOT_MSG_CTRL_DONE :
132133 FD_TEST ( ctx -> state == FD_SNAPSHOT_STATE_PROCESSING ||
133- ctx -> state == FD_SNAPSHOT_STATE_FINISHING ||
134134 ctx -> state == FD_SNAPSHOT_STATE_ERROR );
135- if ( FD_UNLIKELY ( ctx -> is_zstd && ctx -> state != FD_SNAPSHOT_STATE_FINISHING ) ) {
135+ if ( FD_UNLIKELY ( ctx -> is_zstd && ctx -> dirty ) ) {
136+ FD_LOG_WARNING (( "encountered end-of-file in the middle of a compressed frame" ));
136137 ctx -> state = FD_SNAPSHOT_STATE_ERROR ;
137138 fd_stem_publish ( stem , 0UL , FD_SNAPSHOT_MSG_CTRL_ERROR , 0UL , 0UL , 0UL , 0UL , 0UL );
138139 return ;
@@ -160,16 +161,7 @@ handle_data_frag( fd_snapdc_tile_t * ctx,
160161 fd_stem_context_t * stem ,
161162 ulong chunk ,
162163 ulong sz ) {
163- if ( FD_UNLIKELY ( ctx -> state == FD_SNAPSHOT_STATE_FINISHING ) ) {
164- /* We thought the snapshot was finished (we already read the full
165- frame) and then we got another data fragment from the reader.
166- This means the snapshot has extra padding or garbage on the end,
167- which we don't trust so just abandon it completely. */
168- ctx -> state = FD_SNAPSHOT_STATE_ERROR ;
169- fd_stem_publish ( stem , 0UL , FD_SNAPSHOT_MSG_CTRL_ERROR , 0UL , 0UL , 0UL , 0UL , 0UL );
170- return 0 ;
171- }
172- else if ( FD_UNLIKELY ( ctx -> state == FD_SNAPSHOT_STATE_ERROR ) ) {
164+ if ( FD_UNLIKELY ( ctx -> state == FD_SNAPSHOT_STATE_ERROR ) ) {
173165 /* Ignore all data frags after observing an error in the stream until
174166 we receive fail & init control messages to restart processing. */
175167 return 0 ;
@@ -206,14 +198,16 @@ handle_data_frag( fd_snapdc_tile_t * ctx,
206198 }
207199
208200 ulong in_consumed = 0UL , out_produced = 0UL ;
209- ulong error = ZSTD_decompressStream_simpleArgs ( ctx -> zstd ,
210- out ,
211- ctx -> out .mtu ,
212- & out_produced ,
213- in ,
214- sz - ctx -> in .frag_pos ,
215- & in_consumed );
216- if ( FD_UNLIKELY ( ZSTD_isError ( error ) ) ) {
201+ ulong frame_res = ZSTD_decompressStream_simpleArgs (
202+ ctx -> zstd ,
203+ out ,
204+ ctx -> out .mtu ,
205+ & out_produced ,
206+ in ,
207+ sz - ctx -> in .frag_pos ,
208+ & in_consumed );
209+ if ( FD_UNLIKELY ( ZSTD_isError ( frame_res ) ) ) {
210+ FD_LOG_WARNING (( "error while decompressing snapshot (%u-%s)" , ZSTD_getErrorCode ( frame_res ), ZSTD_getErrorName ( frame_res ) ));
217211 ctx -> state = FD_SNAPSHOT_STATE_ERROR ;
218212 fd_stem_publish ( stem , 0UL , FD_SNAPSHOT_MSG_CTRL_ERROR , 0UL , 0UL , 0UL , 0UL , 0UL );
219213 return 0 ;
@@ -235,22 +229,7 @@ handle_data_frag( fd_snapdc_tile_t * ctx,
235229 ctx -> metrics .incremental .decompressed_bytes_written += out_produced ;
236230 }
237231
238- if ( FD_UNLIKELY ( !error ) ) {
239- if ( FD_UNLIKELY ( ctx -> in .frag_pos != sz ) ) {
240- /* Zstandard finished decoding the snapshot frame (the whole
241- snapshot is a single frame), but, the fragment we got from
242- the snapshot reader has not been fully consumed, so there is
243- some trailing padding or garbage at the end of the snapshot.
244-
245- This is not valid under the snapshot format and indicates a
246- problem so we abandon the snapshot. */
247- ctx -> state = FD_SNAPSHOT_STATE_ERROR ;
248- fd_stem_publish ( stem , 0UL , FD_SNAPSHOT_MSG_CTRL_ERROR , 0UL , 0UL , 0UL , 0UL , 0UL );
249- return 0 ;
250- }
251-
252- ctx -> state = FD_SNAPSHOT_STATE_FINISHING ;
253- }
232+ ctx -> dirty = frame_res != 0UL ;
254233
255234 int maybe_more_output = out_produced == ctx -> out .mtu || ctx -> in .frag_pos < sz ;
256235 if ( FD_LIKELY ( !maybe_more_output ) ) ctx -> in .frag_pos = 0UL ;
@@ -316,6 +295,7 @@ unprivileged_init( fd_topo_t * topo,
316295 FD_TEST ( ctx -> zstd );
317296 FD_TEST ( ctx -> zstd == _zstd );
318297
298+ ctx -> dirty = 0 ;
319299 ctx -> in .frag_pos = 0UL ;
320300 fd_memset ( & ctx -> metrics , 0 , sizeof (ctx -> metrics ) );
321301
0 commit comments