2
2
#include "utils/fd_sshttp.h"
3
3
#include "utils/fd_ssctrl.h"
4
4
#include "utils/fd_ssarchive.h"
5
+ #include "utils/fd_ssmsg.h"
5
6
6
7
#include "../../disco/topo/fd_topo.h"
7
8
#include "../../disco/metrics/fd_metrics.h"
@@ -74,10 +75,17 @@ struct fd_snaprd_tile {
74
75
} local_in ;
75
76
76
77
struct {
77
- char full_path [ PATH_MAX ];
78
- ulong full_len ;
79
- char incremental_path [ PATH_MAX ];
80
- ulong incremental_len ;
78
+
79
+ struct {
80
+ char path [ PATH_MAX ];
81
+ ulong len ;
82
+ } full ;
83
+
84
+ struct {
85
+ char path [ PATH_MAX ];
86
+ ulong len ;
87
+ } incremental ;
88
+
81
89
} http ;
82
90
83
91
struct {
@@ -328,11 +336,11 @@ rename_snapshots( fd_snaprd_tile_t * ctx ) {
328
336
if ( FD_UNLIKELY ( -1 == ctx -> local_out .dir_fd ) ) return ;
329
337
330
338
if ( FD_LIKELY ( -1 != ctx -> local_out .full_snapshot_fd ) ) {
331
- if ( FD_UNLIKELY ( -1 == renameat ( ctx -> local_out .dir_fd , "snapshot.tar.bz2-partial" , ctx -> local_out .dir_fd , ctx -> http .full_path ) ) )
339
+ if ( FD_UNLIKELY ( -1 == renameat ( ctx -> local_out .dir_fd , "snapshot.tar.bz2-partial" , ctx -> local_out .dir_fd , ctx -> http .full . path ) ) )
332
340
FD_LOG_ERR (( "renameat() failed (%i-%s)" , errno , fd_io_strerror ( errno ) ));
333
341
}
334
342
if ( FD_LIKELY ( -1 != ctx -> local_out .incremental_snapshot_fd ) ) {
335
- if ( FD_UNLIKELY ( -1 == renameat ( ctx -> local_out .dir_fd , "incremental-snapshot.tar.bz2-partial" , ctx -> local_out .dir_fd , ctx -> http .incremental_path ) ) )
343
+ if ( FD_UNLIKELY ( -1 == renameat ( ctx -> local_out .dir_fd , "incremental-snapshot.tar.bz2-partial" , ctx -> local_out .dir_fd , ctx -> http .incremental . path ) ) )
336
344
FD_LOG_ERR (( "renameat() failed (%i-%s)" , errno , fd_io_strerror ( errno ) ));
337
345
}
338
346
}
@@ -362,7 +370,7 @@ after_credit( fd_snaprd_tile_t * ctx,
362
370
363
371
switch ( ctx -> state ) {
364
372
case FD_SNAPRD_STATE_WAITING_FOR_PEERS : {
365
- fd_sspeer_t best = fd_ssping_best ( ctx -> ssping );
373
+ fd_sspeer_t best = fd_ssping_best ( ctx -> ssping , 0UL );
366
374
if ( FD_LIKELY ( best .addr .l ) ) {
367
375
ctx -> state = FD_SNAPRD_STATE_COLLECTING_PEERS ;
368
376
ctx -> deadline_nanos = now + 500L * 1000L * 1000L ;
@@ -372,7 +380,14 @@ after_credit( fd_snaprd_tile_t * ctx,
372
380
case FD_SNAPRD_STATE_COLLECTING_PEERS : {
373
381
if ( FD_UNLIKELY ( now < ctx -> deadline_nanos ) ) break ;
374
382
375
- fd_sspeer_t best = fd_ssping_best ( ctx -> ssping );
383
+ ulong highest_slot = 0UL ;
384
+ if ( FD_LIKELY ( ctx -> peer .addr .l ) ) {
385
+ highest_slot = ctx -> peer .snapshot_info -> incremental .slot != ULONG_MAX ?
386
+ ctx -> peer .snapshot_info -> incremental .slot :
387
+ ctx -> peer .snapshot_info -> full .slot ;
388
+ }
389
+
390
+ fd_sspeer_t best = fd_ssping_best ( ctx -> ssping , highest_slot );
376
391
if ( FD_UNLIKELY ( !best .addr .l ) ) {
377
392
ctx -> state = FD_SNAPRD_STATE_WAITING_FOR_PEERS ;
378
393
break ;
@@ -385,17 +400,29 @@ after_credit( fd_snaprd_tile_t * ctx,
385
400
} else {
386
401
char path [ PATH_MAX ];
387
402
char encoded_full_hash [ FD_BASE58_ENCODED_32_SZ ];
388
- char encoded_incremental_hash [ FD_BASE58_ENCODED_32_SZ ];
403
+
404
+ /* Generate the http paths */
389
405
fd_base58_encode_32 ( best .snapshot_info -> full .hash , NULL , encoded_full_hash );
390
- fd_base58_encode_32 ( best .snapshot_info -> incremental .hash , NULL , encoded_incremental_hash );
391
- FD_TEST ( fd_cstr_printf_check ( ctx -> http .full_path , PATH_MAX , & ctx -> http .full_len , "snapshot-%lu-%s.tar.zst" , best .snapshot_info -> full .slot , encoded_full_hash ) );
392
- FD_TEST ( fd_cstr_printf_check ( ctx -> http .incremental_path , PATH_MAX , & ctx -> http .incremental_len , "incremental-snapshot-%lu-%lu-%s.tar.zst" , best .snapshot_info -> incremental .base_slot , best .snapshot_info -> incremental .slot , encoded_incremental_hash ) );
393
- FD_TEST ( fd_cstr_printf_check ( path , PATH_MAX , NULL , "/%s" , ctx -> http .full_path ) );
406
+ FD_TEST ( fd_cstr_printf_check ( ctx -> http .full .path , PATH_MAX , & ctx -> http .full .len , "snapshot-%lu-%s.tar.zst" , best .snapshot_info -> full .slot , encoded_full_hash ) );
407
+ FD_TEST ( fd_cstr_printf_check ( path , PATH_MAX , NULL , "/%s" , ctx -> http .full .path ) );
408
+
409
+ if ( ctx -> config .incremental_snapshot_fetch ) {
410
+ char encoded_incremental_hash [ FD_BASE58_ENCODED_32_SZ ];
411
+ fd_base58_encode_32 ( best .snapshot_info -> incremental .hash , NULL , encoded_incremental_hash );
412
+ FD_TEST ( fd_cstr_printf_check ( ctx -> http .incremental .path , PATH_MAX , & ctx -> http .incremental .len , "incremental-snapshot-%lu-%lu-%s.tar.zst" , best .snapshot_info -> incremental .base_slot , best .snapshot_info -> incremental .slot , encoded_incremental_hash ) );
413
+ }
414
+
415
+ uint low ;
416
+ uint high ;
417
+ /* send the highest manifest slot */
418
+ ulong highest_manifest_slot = ctx -> config .incremental_snapshot_fetch ? best .snapshot_info -> incremental .slot : best .snapshot_info -> full .slot ;
419
+ fd_ssmsg_slot_to_frag ( highest_manifest_slot , & low , & high );
420
+ fd_stem_publish ( stem , 0UL , FD_SNAPSHOT_MSG_HIGHEST_MANIFEST_SLOT , 0UL , 0UL , 0UL , low , high );
394
421
395
- FD_LOG_NOTICE (( "downloading full snapshot from http://" FD_IP4_ADDR_FMT ":%hu/%s" , FD_IP4_ADDR_FMT_ARGS ( best .addr .addr ), best .addr .port , ctx -> http .full_path ));
422
+ FD_LOG_NOTICE (( "downloading full snapshot from http://" FD_IP4_ADDR_FMT ":%hu/%s" , FD_IP4_ADDR_FMT_ARGS ( best .addr .addr ), best .addr .port , ctx -> http .full . path ));
396
423
ctx -> peer = best ;
397
424
ctx -> state = FD_SNAPRD_STATE_READING_FULL_HTTP ;
398
- fd_sshttp_init ( ctx -> sshttp , best .addr , path , ctx -> http .full_len + 1UL , now );
425
+ fd_sshttp_init ( ctx -> sshttp , best .addr , path , ctx -> http .full . len + 1UL , now );
399
426
}
400
427
break ;
401
428
}
@@ -460,9 +487,9 @@ after_credit( fd_snaprd_tile_t * ctx,
460
487
}
461
488
462
489
char path [ PATH_MAX ];
463
- FD_TEST ( fd_cstr_printf_check ( path , PATH_MAX , NULL , "/%s" , ctx -> http .incremental_path ) );
490
+ FD_TEST ( fd_cstr_printf_check ( path , PATH_MAX , NULL , "/%s" , ctx -> http .incremental . path ) );
464
491
FD_LOG_NOTICE (( "downloading incremental snapshot from http://" FD_IP4_ADDR_FMT ":%hu/%s" , FD_IP4_ADDR_FMT_ARGS ( ctx -> peer .addr .addr ), ctx -> peer .addr .port , path ));
465
- fd_sshttp_init ( ctx -> sshttp , ctx -> peer .addr , path , ctx -> http .incremental_len + 1UL , fd_log_wallclock () );
492
+ fd_sshttp_init ( ctx -> sshttp , ctx -> peer .addr , path , ctx -> http .incremental . len + 1UL , fd_log_wallclock () );
466
493
ctx -> state = FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP ;
467
494
break ;
468
495
case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET :
@@ -692,6 +719,8 @@ unprivileged_init( fd_topo_t * topo,
692
719
ctx -> out .wmark = fd_dcache_compact_wmark ( ctx -> out .wksp , topo -> links [ tile -> out_link_id [ 0 ] ].dcache , topo -> links [ tile -> out_link_id [ 0 ] ].mtu );
693
720
ctx -> out .chunk = ctx -> out .chunk0 ;
694
721
ctx -> out .mtu = topo -> links [ tile -> out_link_id [ 0 ] ].mtu ;
722
+
723
+ fd_memset ( & ctx -> peer , 0 , sizeof (ctx -> peer ) );
695
724
}
696
725
697
726
#define STEM_BURST 2UL /* One control message, and one data message */
0 commit comments