Skip to content

Commit 6130a9e

Browse files
snapshots: full resolve
1 parent 5978e08 commit 6130a9e

File tree

5 files changed

+84
-131
lines changed

5 files changed

+84
-131
lines changed

src/discof/restore/fd_snaprd_tile.c

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ struct fd_snaprd_tile {
4949
ulong ack_cnt;
5050
int peer_selection;
5151

52-
fd_ip4_port_t addr;
52+
fd_sspeer_t peer;
5353

5454
struct {
5555
ulong write_buffer_pos;
@@ -73,6 +73,13 @@ struct fd_snaprd_tile {
7373
char incremental_snapshot_path[ PATH_MAX ];
7474
} local_in;
7575

76+
struct {
77+
char full_path[ PATH_MAX ];
78+
ulong full_len;
79+
char incremental_path[ PATH_MAX ];
80+
ulong incremental_len;
81+
} http;
82+
7683
struct {
7784
char path[ PATH_MAX ];
7885
int do_download;
@@ -204,8 +211,8 @@ read_http_data( fd_snaprd_tile_t * ctx,
204211
case FD_SSHTTP_ADVANCE_AGAIN: break;
205212
case FD_SSHTTP_ADVANCE_ERROR: {
206213
FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
207-
FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
208-
fd_ssping_invalidate( ctx->ssping, ctx->addr, now );
214+
FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), ctx->peer.addr.port ));
215+
fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, now );
209216
fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
210217
ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
211218
ctx->deadline_nanos = now;
@@ -319,16 +326,13 @@ drain_buffer( fd_snaprd_tile_t * ctx ) {
319326
static void
320327
rename_snapshots( fd_snaprd_tile_t * ctx ) {
321328
if( FD_UNLIKELY( -1==ctx->local_out.dir_fd ) ) return;
322-
char const * full_snapshot_name;
323-
char const * incremental_snapshot_name;
324-
fd_sshttp_snapshot_names( ctx->sshttp, &full_snapshot_name, &incremental_snapshot_name );
325329

326330
if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd ) ) {
327-
if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "snapshot.tar.bz2-partial", ctx->local_out.dir_fd, full_snapshot_name ) ) )
331+
if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "snapshot.tar.bz2-partial", ctx->local_out.dir_fd, ctx->http.full_path ) ) )
328332
FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
329333
}
330334
if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd ) ) {
331-
if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "incremental-snapshot.tar.bz2-partial", ctx->local_out.dir_fd, incremental_snapshot_name ) ) )
335+
if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "incremental-snapshot.tar.bz2-partial", ctx->local_out.dir_fd, ctx->http.incremental_path ) ) )
332336
FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
333337
}
334338
}
@@ -357,8 +361,8 @@ after_credit( fd_snaprd_tile_t * ctx,
357361

358362
switch ( ctx->state ) {
359363
case FD_SNAPRD_STATE_WAITING_FOR_PEERS: {
360-
fd_ip4_port_t best = fd_ssping_best( ctx->ssping );
361-
if( FD_LIKELY( best.l ) ) {
364+
fd_sspeer_t best = fd_ssping_best( ctx->ssping );
365+
if( FD_LIKELY( best.addr.l ) ) {
362366
ctx->state = FD_SNAPRD_STATE_COLLECTING_PEERS;
363367
ctx->deadline_nanos = now + 500L*1000L*1000L;
364368
}
@@ -367,8 +371,8 @@ after_credit( fd_snaprd_tile_t * ctx,
367371
case FD_SNAPRD_STATE_COLLECTING_PEERS: {
368372
if( FD_UNLIKELY( now<ctx->deadline_nanos ) ) break;
369373

370-
fd_ip4_port_t best = fd_ssping_best( ctx->ssping );
371-
if( FD_UNLIKELY( !best.l ) ) {
374+
fd_sspeer_t best = fd_ssping_best( ctx->ssping );
375+
if( FD_UNLIKELY( !best.addr.l ) ) {
372376
ctx->state = FD_SNAPRD_STATE_WAITING_FOR_PEERS;
373377
break;
374378
}
@@ -378,10 +382,19 @@ after_credit( fd_snaprd_tile_t * ctx,
378382
FD_LOG_NOTICE(( "loading full snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
379383
ctx->state = FD_SNAPRD_STATE_READING_FULL_FILE;
380384
} else {
381-
FD_LOG_NOTICE(( "downloading full snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( best.addr ), best.port ));
382-
ctx->addr = best;
385+
char path[ PATH_MAX ];
386+
char encoded_full_hash[ FD_BASE58_ENCODED_32_SZ ];
387+
char encoded_incremental_hash[ FD_BASE58_ENCODED_32_SZ ];
388+
fd_base58_encode_32( best.snapshot_info->full.hash, NULL, encoded_full_hash );
389+
fd_base58_encode_32( best.snapshot_info->incremental.hash, NULL, encoded_incremental_hash );
390+
FD_TEST( fd_cstr_printf_check( ctx->http.full_path, PATH_MAX, &ctx->http.full_len, "snapshot-%lu-%s.tar.zst", best.snapshot_info->full.slot, encoded_full_hash ) );
391+
FD_TEST( fd_cstr_printf_check( ctx->http.incremental_path, PATH_MAX, &ctx->http.incremental_len, "incremental-snapshot-%lu-%lu-%s.tar.zst", best.snapshot_info->incremental.base_slot, best.snapshot_info->incremental.slot, encoded_incremental_hash ) );
392+
FD_TEST( fd_cstr_printf_check( path, PATH_MAX, NULL, "/%s", ctx->http.full_path ) );
393+
394+
FD_LOG_NOTICE(( "downloading full snapshot from http://" FD_IP4_ADDR_FMT ":%hu/%s", FD_IP4_ADDR_FMT_ARGS( best.addr.addr ), best.addr.port, ctx->http.full_path ));
395+
ctx->peer = best;
383396
ctx->state = FD_SNAPRD_STATE_READING_FULL_HTTP;
384-
fd_sshttp_init( ctx->sshttp, best, "/snapshot.tar.bz2", 17UL, now );
397+
fd_sshttp_init( ctx->sshttp, best.addr, path, ctx->http.full_len + 1UL, now );
385398
}
386399
break;
387400
}
@@ -445,8 +458,10 @@ after_credit( fd_snaprd_tile_t * ctx,
445458
break;
446459
}
447460

448-
FD_LOG_NOTICE(( "downloading incremental snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
449-
fd_sshttp_init( ctx->sshttp, ctx->addr, "/incremental-snapshot.tar.bz2", 29UL, fd_log_wallclock() );
461+
char path[ PATH_MAX ];
462+
FD_TEST( fd_cstr_printf_check( path, PATH_MAX, NULL, "/%s", ctx->http.incremental_path ) );
463+
FD_LOG_NOTICE(( "downloading incremental snapshot from http://" FD_IP4_ADDR_FMT ":%hu/%s", FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), ctx->peer.addr.port, path));
464+
fd_sshttp_init( ctx->sshttp, ctx->peer.addr, path, ctx->http.incremental_len + 1UL, fd_log_wallclock() );
450465
ctx->state = FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP;
451466
break;
452467
case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET:
@@ -508,18 +523,18 @@ after_frag( fd_snaprd_tile_t * ctx,
508523
case FD_SNAPRD_STATE_READING_FULL_HTTP:
509524
case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP:
510525
FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
511-
FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
526+
FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), ctx->peer.addr.port ));
512527
fd_sshttp_cancel( ctx->sshttp );
513-
fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() );
528+
fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, fd_log_wallclock() );
514529
fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
515530
ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
516531
break;
517532
case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP:
518533
case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP:
519534
FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
520-
FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
535+
FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), ctx->peer.addr.port ));
521536
fd_sshttp_cancel( ctx->sshttp );
522-
fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() );
537+
fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, fd_log_wallclock() );
523538
/* We would like to transition to FULL_HTTP_RESET, but we can't
524539
do it just yet, because we have already sent a DONE control
525540
fragment, and need to wait for acknowledges to come back

src/discof/restore/utils/fd_sshttp.c

Lines changed: 3 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -182,92 +182,6 @@ send_request( fd_sshttp_t * http,
182182
return FD_SSHTTP_ADVANCE_AGAIN;
183183
}
184184

185-
static int
186-
follow_redirect( fd_sshttp_t * http,
187-
struct phr_header * headers,
188-
ulong header_cnt,
189-
long now ) {
190-
if( FD_UNLIKELY( !http->hops ) ) {
191-
FD_LOG_WARNING(( "too many redirects" ));
192-
fd_sshttp_cancel( http );
193-
return FD_SSHTTP_ADVANCE_ERROR;
194-
}
195-
196-
http->hops--;
197-
198-
ulong location_len;
199-
char const * location = NULL;
200-
201-
for( ulong i=0UL; i<header_cnt; i++ ) {
202-
if( FD_UNLIKELY( !strncasecmp( headers[ i ].name, "location", headers[ i ].name_len ) ) ) {
203-
if( FD_UNLIKELY( !headers [ i ].value_len || headers[ i ].value[ 0 ]!='/' ) ) {
204-
FD_LOG_WARNING(( "invalid location header `%.*s`", (int)headers[ i ].value_len, headers[ i ].value ));
205-
fd_sshttp_cancel( http );
206-
return FD_SSHTTP_ADVANCE_ERROR;
207-
}
208-
209-
location_len = headers[ i ].value_len;
210-
location = headers[ i ].value;
211-
212-
if( FD_UNLIKELY( location_len>=PATH_MAX-1UL ) ) {
213-
fd_sshttp_cancel( http );
214-
return FD_SSHTTP_ADVANCE_ERROR;
215-
}
216-
217-
char snapshot_name[ PATH_MAX ];
218-
fd_memcpy( snapshot_name, location+1UL, location_len-1UL );
219-
snapshot_name[ location_len-1UL ] = '\0';
220-
221-
ulong full_entry_slot, incremental_entry_slot;
222-
uchar decoded_hash[ FD_HASH_FOOTPRINT ];
223-
int err = fd_ssarchive_parse_filename( snapshot_name, &full_entry_slot, &incremental_entry_slot, decoded_hash );
224-
225-
if( FD_UNLIKELY( err ) ) {
226-
FD_LOG_WARNING(( "unrecognized snapshot file `%s` in redirect location header", snapshot_name ));
227-
fd_sshttp_cancel( http );
228-
return FD_SSHTTP_ADVANCE_ERROR;
229-
}
230-
231-
char encoded_hash[ FD_BASE58_ENCODED_32_SZ ];
232-
fd_base58_encode_32( decoded_hash, NULL, encoded_hash );
233-
234-
if( FD_LIKELY( incremental_entry_slot!=ULONG_MAX ) ) {
235-
FD_TEST( fd_cstr_printf_check( http->incremental_snapshot_name, PATH_MAX, NULL, "incremental-snapshot-%lu-%lu-%s.tar.zst", full_entry_slot, incremental_entry_slot, encoded_hash ) );
236-
} else {
237-
FD_TEST( fd_cstr_printf_check( http->full_snapshot_name, PATH_MAX, NULL, "snapshot-%lu-%s.tar.zst", full_entry_slot, encoded_hash ) );
238-
}
239-
break;
240-
}
241-
}
242-
243-
if( FD_UNLIKELY( !location ) ) {
244-
FD_LOG_WARNING(( "no location header in redirect response" ));
245-
fd_sshttp_cancel( http );
246-
return FD_SSHTTP_ADVANCE_ERROR;
247-
}
248-
249-
if( FD_UNLIKELY( !fd_cstr_printf_check( http->request, sizeof(http->request), &http->request_len,
250-
"GET %.*s HTTP/1.1\r\n"
251-
"User-Agent: Firedancer\r\n"
252-
"Accept: */*\r\n"
253-
"Accept-Encoding: identity\r\n"
254-
"Host: " FD_IP4_ADDR_FMT "\r\n\r\n",
255-
(int)location_len, location, FD_IP4_ADDR_FMT_ARGS( http->addr.addr ) ) ) ) {
256-
FD_LOG_WARNING(( "location header too long `%.*s`", (int)location_len, location ));
257-
fd_sshttp_cancel( http );
258-
return FD_SSHTTP_ADVANCE_ERROR;
259-
}
260-
261-
FD_LOG_NOTICE(( "following redirect to http://" FD_IP4_ADDR_FMT ":%hu%.*s",
262-
FD_IP4_ADDR_FMT_ARGS( http->addr.addr ), http->addr.port,
263-
(int)headers[ 0 ].value_len, headers[ 0 ].value ));
264-
265-
fd_sshttp_cancel( http );
266-
fd_sshttp_init( http, http->addr, location, location_len, now );
267-
268-
return FD_SSHTTP_ADVANCE_AGAIN;
269-
}
270-
271185
static int
272186
read_response( fd_sshttp_t * http,
273187
ulong * data_len,
@@ -314,7 +228,9 @@ read_response( fd_sshttp_t * http,
314228

315229
int is_redirect = (status==301) | (status==303) | (status==304) | (status==307) | (status==308);
316230
if( FD_UNLIKELY( is_redirect ) ) {
317-
return follow_redirect( http, headers, header_cnt, now );
231+
FD_LOG_WARNING(( "redirect response not allowed (%d)", status ));
232+
fd_sshttp_cancel( http );
233+
return FD_SSHTTP_ADVANCE_ERROR;
318234
}
319235

320236
if( FD_UNLIKELY( status!=200 ) ) {

src/discof/restore/utils/fd_ssping.c

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,7 @@ struct fd_ssping_peer {
3030
fd_ssresolve_t * full_ssresolve;
3131
fd_ssresolve_t * inc_ssresolve;
3232

33-
struct {
34-
35-
struct {
36-
ulong slot;
37-
fd_hash_t hash;
38-
} full;
39-
40-
struct {
41-
ulong base_slot;
42-
ulong slot;
43-
fd_hash_t hash;
44-
} incremental;
45-
46-
} snapshot_info;
33+
fd_ssinfo_t snapshot_info;
4734

4835
struct {
4936
ulong next;
@@ -418,6 +405,7 @@ poll_advance( fd_ssping_t * ssping,
418405
continue;
419406
} else { /* FD_SSRESOLVE_ADVANCE_SUCCESS */
420407
FD_TEST( peer->deadline_nanos>now );
408+
421409
if( resolve_result.base_slot==ULONG_MAX ) {
422410
peer->snapshot_info.full.slot = resolve_result.slot;
423411
memcpy( &peer->snapshot_info.full.hash, &resolve_result.hash, sizeof(fd_hash_t) );
@@ -426,8 +414,7 @@ poll_advance( fd_ssping_t * ssping,
426414
peer->snapshot_info.incremental.base_slot = resolve_result.base_slot;
427415
peer->snapshot_info.incremental.slot = resolve_result.slot;
428416
memcpy( &peer->snapshot_info.incremental.hash, &resolve_result.hash, sizeof(fd_hash_t) );
429-
peer->incremental_latency_nanos = PEER_DEADLINE_NANOS_PING - (ulong)(peer->deadline_nanos - now);
430-
}
417+
peer->incremental_latency_nanos = PEER_DEADLINE_NANOS_PING - (ulong)(peer->deadline_nanos - now); }
431418
}
432419
}
433420
}
@@ -592,11 +579,22 @@ fd_ssping_advance( fd_ssping_t * ssping,
592579
poll_advance( ssping, now );
593580
}
594581

595-
fd_ip4_port_t
582+
fd_sspeer_t
596583
fd_ssping_best( fd_ssping_t const * ssping ) {
597584
score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( ssping->score_treap, ssping->pool );
598-
if( FD_UNLIKELY( score_treap_fwd_iter_done( iter ) ) ) return (fd_ip4_port_t){ .l=0UL };
585+
if( FD_UNLIKELY( score_treap_fwd_iter_done( iter ) ) ) {
586+
return (fd_sspeer_t){
587+
.addr = {
588+
.l = 0UL
589+
},
590+
.snapshot_info = NULL,
591+
};
592+
}
599593

600594
fd_ssping_peer_t const * best = score_treap_fwd_iter_ele_const( iter, ssping->pool );
601-
return best->addr;
595+
596+
return (fd_sspeer_t){
597+
.addr = best->addr,
598+
.snapshot_info = &best->snapshot_info,
599+
};
602600
}

src/discof/restore/utils/fd_ssping.h

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "../../../util/fd_util_base.h"
1919
#include "../../../util/net/fd_net_headers.h"
20+
#include "../../../flamenco/types/fd_types_custom.h"
2021

2122
#define FD_SSPING_ALIGN (8UL)
2223

@@ -25,6 +26,29 @@
2526
struct fd_ssping_private;
2627
typedef struct fd_ssping_private fd_ssping_t;
2728

29+
/* fd_ssinfo stores the resolved snapshot information from a peer. */
30+
struct fd_ssinfo {
31+
struct {
32+
ulong slot; /* slot of the full snapshot */
33+
uchar hash[ FD_HASH_FOOTPRINT ]; /* base58 decoded hash of the full snapshot */
34+
ulong slots_behind; /* number of slots behind the latest full cluster slot */
35+
} full;
36+
37+
struct {
38+
ulong base_slot;
39+
ulong slot;
40+
uchar hash[ FD_HASH_FOOTPRINT ];
41+
ulong slots_behind;
42+
} incremental;
43+
};
44+
typedef struct fd_ssinfo fd_ssinfo_t;
45+
46+
struct fd_sspeer {
47+
fd_ip4_port_t addr;
48+
fd_ssinfo_t const * snapshot_info;
49+
};
50+
typedef struct fd_sspeer fd_sspeer_t;
51+
2852
FD_PROTOTYPES_BEGIN
2953

3054
FD_FN_CONST ulong
@@ -83,7 +107,7 @@ fd_ssping_advance( fd_ssping_t * ssping,
83107
/* Retrieve the best "active" peer right now, by lowest ping. If no
84108
peer is active or pingable, this returns 0.0.0.0:0. */
85109

86-
fd_ip4_port_t
110+
fd_sspeer_t
87111
fd_ssping_best( fd_ssping_t const * ssping );
88112

89113
FD_PROTOTYPES_END

src/discof/restore/utils/fd_ssresolve.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
#define FD_SSRESOLVE_ALIGN (8UL)
1010

1111
struct fd_ssresolve_result {
12-
ulong slot; /* slot of the snapshot */
13-
ulong base_slot; /* base slot of incremental snapshot or ULONG_MAX */
14-
fd_hash_t hash; /* base58 decoded hash of the snapshot */
12+
ulong slot; /* slot of the snapshot */
13+
ulong base_slot; /* base slot of incremental snapshot or ULONG_MAX */
14+
fd_hash_t hash; /* base58 decoded hash of the snapshot */
1515
};
1616

1717
typedef struct fd_ssresolve_result fd_ssresolve_result_t;

0 commit comments

Comments
 (0)