Skip to content

Commit 5978e08

Browse files
snapshots: resolve snapshots
1 parent 0b3b2b3 commit 5978e08

File tree

5 files changed

+521
-52
lines changed

5 files changed

+521
-52
lines changed

src/discof/restore/Local.mk

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,4 @@ endif
2323
$(call add-objs,utils/fd_ssping,fd_discof)
2424
$(call add-objs,utils/fd_sshttp,fd_discof)
2525
$(call add-objs,utils/fd_ssarchive,fd_discof)
26+
$(call add-objs,utils/fd_ssresolve,fd_discof)

src/discof/restore/utils/fd_sshttp.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ FD_FN_CONST ulong
5353
fd_sshttp_footprint( void ) {
5454
ulong l;
5555
l = FD_LAYOUT_INIT;
56-
l = FD_LAYOUT_APPEND( l, FD_SSHTTP_ALIGN, sizeof(fd_sshttp_t) );
56+
l = FD_LAYOUT_APPEND( l, FD_SSHTTP_ALIGN, sizeof(fd_sshttp_t) );
5757
return FD_LAYOUT_FINI( l, FD_SSHTTP_ALIGN );
5858
}
5959

src/discof/restore/utils/fd_ssping.c

Lines changed: 150 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
#define _GNU_SOURCE
12
#include "fd_ssping.h"
3+
#include "fd_ssresolve.h"
24

35
#include "../../../util/bits/fd_bits.h"
46
#include "../../../util/log/fd_log.h"
@@ -9,20 +11,39 @@
911
#include <sys/socket.h>
1012
#include <netinet/in.h>
1113
#include <netinet/ip_icmp.h>
14+
#include <netinet/tcp.h>
1215

1316
#define PEER_STATE_UNPINGED 0
1417
#define PEER_STATE_PINGED 1
1518
#define PEER_STATE_VALID 2
1619
#define PEER_STATE_REFRESHING 3
1720
#define PEER_STATE_INVALID 4
1821

19-
#define PEER_DEADLINE_NANOS_PING (1L*1000L*1000L*1000L) /* 1 second */
22+
#define PEER_DEADLINE_NANOS_PING (2L*1000L*1000L*1000L) /* 2 seconds */
2023
#define PEER_DEADLINE_NANOS_VALID (2L*60L*1000L*1000L*1000L) /* 2 minutes */
2124
#define PEER_DEADLINE_NANOS_INVALID (5L*60L*1000L*1000L*1000L) /* 5 minutes */
2225

2326
struct fd_ssping_peer {
24-
ulong refcnt;
25-
fd_ip4_port_t addr;
27+
ulong refcnt;
28+
fd_ip4_port_t addr;
29+
30+
fd_ssresolve_t * full_ssresolve;
31+
fd_ssresolve_t * inc_ssresolve;
32+
33+
struct {
34+
35+
struct {
36+
ulong slot;
37+
fd_hash_t hash;
38+
} full;
39+
40+
struct {
41+
ulong base_slot;
42+
ulong slot;
43+
fd_hash_t hash;
44+
} incremental;
45+
46+
} snapshot_info;
2647

2748
struct {
2849
ulong next;
@@ -50,6 +71,8 @@ struct fd_ssping_peer {
5071
} fd;
5172

5273
int state;
74+
ulong full_latency_nanos;
75+
ulong incremental_latency_nanos;
5376
ulong latency_nanos;
5477
long deadline_nanos;
5578
};
@@ -131,8 +154,13 @@ fd_ssping_footprint( ulong max_peers ) {
131154
l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
132155
l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
133156
l = FD_LAYOUT_APPEND( l, deadline_list_align(), deadline_list_footprint() );
134-
l = FD_LAYOUT_APPEND( l, sizeof(struct pollfd), max_peers*sizeof(struct pollfd) );
135-
l = FD_LAYOUT_APPEND( l, sizeof(ulong), max_peers*sizeof(ulong) );
157+
l = FD_LAYOUT_APPEND( l, sizeof(struct pollfd), max_peers*sizeof(struct pollfd)*2UL );
158+
l = FD_LAYOUT_APPEND( l, sizeof(ulong), max_peers*sizeof(ulong)*2UL );
159+
160+
for( ulong i=0UL; i<max_peers*2UL; i++ ) {
161+
l = FD_LAYOUT_APPEND( l, fd_ssresolve_align(), fd_ssresolve_footprint() );
162+
}
163+
136164
return FD_LAYOUT_FINI( l, FD_SSPING_ALIGN );
137165
}
138166

@@ -156,20 +184,20 @@ fd_ssping_new( void * shmem,
156184
}
157185

158186
FD_SCRATCH_ALLOC_INIT( l, shmem );
159-
fd_ssping_t * ssping = FD_SCRATCH_ALLOC_APPEND( l, FD_SSPING_ALIGN, sizeof(fd_ssping_t) );
160-
void * _pool = FD_SCRATCH_ALLOC_APPEND( l, peer_pool_align(), peer_pool_footprint( max_peers ) );
161-
void * _map = FD_SCRATCH_ALLOC_APPEND( l, peer_map_align(), peer_map_footprint( max_peers ) );
162-
void * _score_treap = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
187+
fd_ssping_t * ssping = FD_SCRATCH_ALLOC_APPEND( l, FD_SSPING_ALIGN, sizeof(fd_ssping_t) );
188+
void * _pool = FD_SCRATCH_ALLOC_APPEND( l, peer_pool_align(), peer_pool_footprint( max_peers ) );
189+
void * _map = FD_SCRATCH_ALLOC_APPEND( l, peer_map_align(), peer_map_footprint( max_peers ) );
190+
void * _score_treap = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
163191
void * _unpinged = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
164192
void * _pinged = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
165193
void * _valid = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
166194
void * _refreshing = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
167195
void * _invalid = FD_SCRATCH_ALLOC_APPEND( l, deadline_list_align(), deadline_list_footprint() );
168-
struct pollfd * fds = FD_SCRATCH_ALLOC_APPEND( l, sizeof(struct pollfd), max_peers*sizeof(struct pollfd) );
169-
ulong * fds_idx = FD_SCRATCH_ALLOC_APPEND( l, sizeof(ulong), max_peers*sizeof(ulong) );
196+
struct pollfd * _fds = FD_SCRATCH_ALLOC_APPEND( l, sizeof(struct pollfd), max_peers*sizeof(struct pollfd)*2UL );
197+
ulong * fds_idx = FD_SCRATCH_ALLOC_APPEND( l, sizeof(ulong), max_peers*sizeof(ulong)*2UL );
170198

171-
ssping->pool = peer_pool_join( peer_pool_new( _pool, max_peers ) );
172-
ssping->map = peer_map_join( peer_map_new( _map, max_peers, seed ) );
199+
ssping->pool = peer_pool_join( peer_pool_new( _pool, max_peers ) );
200+
ssping->map = peer_map_join( peer_map_new( _map, max_peers, seed ) );
173201
ssping->score_treap = score_treap_join( score_treap_new( _score_treap, max_peers ) );
174202

175203
ssping->unpinged = deadline_list_join( deadline_list_new( _unpinged ) );
@@ -178,9 +206,17 @@ fd_ssping_new( void * shmem,
178206
ssping->refreshing = deadline_list_join( deadline_list_new( _refreshing ) );
179207
ssping->invalid = deadline_list_join( deadline_list_new( _invalid ) );
180208

181-
ssping->fds_len = 0UL;
182-
ssping->fds = fds;
183-
ssping->fds_idx = fds_idx;
209+
ssping->fds_len = 0UL;
210+
ssping->fds = _fds;
211+
ssping->fds_idx = fds_idx;
212+
213+
FD_TEST( peer_pool_max( ssping->pool )==max_peers );
214+
for( ulong i=0UL; i<peer_pool_max( ssping->pool ); i++ ) {
215+
void * _full_ssresolve = FD_SCRATCH_ALLOC_APPEND( l, fd_ssresolve_align(), fd_ssresolve_footprint() );
216+
void * _inc_ssresolve = FD_SCRATCH_ALLOC_APPEND( l, fd_ssresolve_align(), fd_ssresolve_footprint() );
217+
ssping->pool[ i ].full_ssresolve = fd_ssresolve_join( fd_ssresolve_new( _full_ssresolve ) );
218+
ssping->pool[ i ].inc_ssresolve = fd_ssresolve_join( fd_ssresolve_new( _inc_ssresolve ) );
219+
}
184220

185221
FD_COMPILER_MFENCE();
186222
FD_VOLATILE( ssping->magic ) = FD_SSPING_MAGIC;
@@ -222,6 +258,11 @@ fd_ssping_add( fd_ssping_t * ssping,
222258
peer->refcnt = 0UL;
223259
peer->state = PEER_STATE_UNPINGED;
224260
peer->addr = addr;
261+
peer->snapshot_info.full.slot = ULONG_MAX;
262+
peer->snapshot_info.incremental.base_slot = ULONG_MAX;
263+
peer->snapshot_info.incremental.slot = ULONG_MAX;
264+
peer->full_latency_nanos = 0UL;
265+
peer->incremental_latency_nanos = 0UL;
225266
peer_map_ele_insert( ssping->map, peer, ssping->pool );
226267
deadline_list_ele_push_tail( ssping->unpinged, peer, ssping->pool );
227268
}
@@ -238,13 +279,20 @@ remove_ping_fd( fd_ssping_t * ssping,
238279
return;
239280
}
240281

241-
ssping->fds[ idx ] = ssping->fds[ ssping->fds_len-1UL ];
242-
ssping->fds_idx[ idx ] = ssping->fds_idx[ ssping->fds_len-1UL ];
282+
ulong full_idx = idx;
283+
ulong inc_idx = idx+1UL;
243284

244-
fd_ssping_peer_t * peer = peer_pool_ele( ssping->pool, ssping->fds_idx[ idx ] );
245-
peer->fd.idx = idx;
285+
ssping->fds[ inc_idx ] = ssping->fds[ ssping->fds_len-1UL ];
286+
ssping->fds_idx[ inc_idx ] = ssping->fds_idx[ ssping->fds_len-1UL ];
287+
ssping->fds_len--;
246288

289+
ssping->fds[ full_idx ] = ssping->fds[ ssping->fds_len-1UL ];
290+
ssping->fds_idx[ full_idx ] = ssping->fds_idx[ ssping->fds_len-1UL ];
247291
ssping->fds_len--;
292+
293+
FD_TEST( ssping->fds_idx[ full_idx ]== ssping->fds_idx[ inc_idx ] );
294+
fd_ssping_peer_t * peer = peer_pool_ele( ssping->pool, ssping->fds_idx[ full_idx ] );
295+
peer->fd.idx = full_idx;
248296
}
249297

250298
void
@@ -333,43 +381,71 @@ poll_advance( fd_ssping_t * ssping,
333381
else if( FD_UNLIKELY( -1==nfds ) ) FD_LOG_ERR(( "poll failed (%i-%s)", errno, strerror( errno ) ));
334382

335383
for( ulong i=0UL; i<ssping->fds_len; i++ ) {
384+
fd_ssping_peer_t * peer = peer_pool_ele( ssping->pool, ssping->fds_idx[ i ] );
336385
struct pollfd * pfd = &ssping->fds[ i ];
337386
if( FD_UNLIKELY( pfd->revents & (POLLERR|POLLHUP) ) ) {
338387
unping_peer( ssping, peer_pool_ele( ssping->pool, ssping->fds_idx[ i ] ), now );
339388
continue;
340389
}
341390

342-
if( FD_LIKELY( pfd->revents & POLLOUT ) ) {
343-
struct icmphdr icmp_hdr = (struct icmphdr){
344-
.type = ICMP_ECHO,
345-
.code = 0,
346-
.un.echo.id = 0, /* Automatically set by kernel for a ping socket */
347-
.un.echo.sequence = 0, /* Only one ping goes out per socket, so nothing to change */
348-
.checksum = 0 /* Will be calculated by the kernel */
349-
};
350-
351-
long result = send( pfd->fd, &icmp_hdr, sizeof(icmp_hdr), 0 );
352-
if( FD_UNLIKELY( !result ) ) continue;
353-
if( FD_UNLIKELY( -1==result && errno==EAGAIN ) ) continue;
354-
else if( FD_UNLIKELY( -1==result ) ) {
355-
unping_peer( ssping, peer_pool_ele( ssping->pool, ssping->fds_idx[ i ] ), now );
356-
continue;
357-
}
358-
pfd->revents &= ~POLLOUT;
391+
int full = i&1UL ? 0 : 1; /* even indices are full, odd indices are incremental */
392+
fd_ssresolve_t * ssresolve = full ? peer->full_ssresolve : peer->inc_ssresolve;
393+
if( FD_UNLIKELY( now>peer->deadline_nanos ) ) {
394+
unping_peer( ssping, peer, now );
395+
continue;
359396
}
360397

361-
if( FD_LIKELY( pfd->revents & POLLIN ) ) {
362-
struct icmphdr icmp_hdr;
363-
long result = recv( pfd->fd, &icmp_hdr, sizeof(icmp_hdr), 0 );
364-
if( FD_UNLIKELY( -1==result && errno==EAGAIN ) ) continue;
365-
else if( FD_UNLIKELY( -1==result || (ulong)result<sizeof(icmp_hdr) || icmp_hdr.type!=ICMP_ECHOREPLY ) ) {
366-
unping_peer( ssping, peer_pool_ele( ssping->pool, ssping->fds_idx[ i ] ), now );
367-
continue;
398+
if( FD_LIKELY( !fd_ssresolve_is_done( ssresolve ) ) ) {
399+
if( FD_LIKELY( pfd->revents & POLLOUT ) ) {
400+
int res = fd_ssresolve_advance_poll_out( ssresolve );
401+
402+
if( FD_UNLIKELY( res==FD_SSRESOLVE_ADVANCE_ERROR ) ) {
403+
unping_peer( ssping, peer_pool_ele( ssping->pool, ssping->fds_idx[ i ] ), now );
404+
continue;
405+
}
406+
407+
pfd->revents &= ~POLLOUT;
368408
}
369409

370-
fd_ssping_peer_t * peer = peer_pool_ele( ssping->pool, ssping->fds_idx[ i ] );
371-
FD_TEST( peer->deadline_nanos>now );
372-
peer->latency_nanos = PEER_DEADLINE_NANOS_PING - (ulong)(peer->deadline_nanos - now);
410+
if( FD_LIKELY( pfd->revents & POLLIN ) ) {
411+
fd_ssresolve_result_t resolve_result;
412+
int res = fd_ssresolve_advance_poll_in( ssresolve, &resolve_result );
413+
414+
if( FD_UNLIKELY( res==FD_SSRESOLVE_ADVANCE_ERROR ) ) {
415+
unping_peer( ssping, peer_pool_ele( ssping->pool, ssping->fds_idx[ i ] ), now );
416+
continue;
417+
} else if( FD_UNLIKELY( res==FD_SSRESOLVE_ADVANCE_AGAIN ) ) {
418+
continue;
419+
} else { /* FD_SSRESOLVE_ADVANCE_SUCCESS */
420+
FD_TEST( peer->deadline_nanos>now );
421+
if( resolve_result.base_slot==ULONG_MAX ) {
422+
peer->snapshot_info.full.slot = resolve_result.slot;
423+
memcpy( &peer->snapshot_info.full.hash, &resolve_result.hash, sizeof(fd_hash_t) );
424+
peer->full_latency_nanos = PEER_DEADLINE_NANOS_PING - (ulong)(peer->deadline_nanos - now);
425+
} else {
426+
peer->snapshot_info.incremental.base_slot = resolve_result.base_slot;
427+
peer->snapshot_info.incremental.slot = resolve_result.slot;
428+
memcpy( &peer->snapshot_info.incremental.hash, &resolve_result.hash, sizeof(fd_hash_t) );
429+
peer->incremental_latency_nanos = PEER_DEADLINE_NANOS_PING - (ulong)(peer->deadline_nanos - now);
430+
}
431+
}
432+
}
433+
}
434+
435+
/* Once both the full and incremental snapshots are resolved, we can
436+
mark the peer valid and remove the peer from the list of peers to
437+
ping. */
438+
if( fd_ssresolve_is_done( peer->full_ssresolve ) &&
439+
fd_ssresolve_is_done( peer->inc_ssresolve ) ) {
440+
FD_LOG_NOTICE(("successfully resolved snapshots for peer " FD_IP4_ADDR_FMT ":%hu "
441+
"with full slot %lu, incremental base slot %lu and incremental slot %lu",
442+
FD_IP4_ADDR_FMT_ARGS( peer->addr.addr ), peer->addr.port,
443+
peer->snapshot_info.full.slot,
444+
peer->snapshot_info.incremental.base_slot,
445+
peer->snapshot_info.incremental.slot ));
446+
peer->latency_nanos = (peer->full_latency_nanos + peer->incremental_latency_nanos) / 2UL;
447+
FD_LOG_NOTICE(( "full latency is %lu, incremental latency is %lu, latency is %lu",
448+
peer->full_latency_nanos, peer->incremental_latency_nanos, peer->latency_nanos ));
373449

374450
if( FD_LIKELY( peer->state==PEER_STATE_REFRESHING ) ) {
375451
score_treap_ele_remove( ssping->score_treap, peer, ssping->pool );
@@ -382,17 +458,22 @@ poll_advance( fd_ssping_t * ssping,
382458
deadline_list_ele_remove( ssping->pinged, peer, ssping->pool );
383459
deadline_list_ele_push_tail( ssping->valid, peer, ssping->pool );
384460
score_treap_ele_insert( ssping->score_treap, peer, ssping->pool );
385-
remove_ping_fd( ssping, i );
461+
remove_ping_fd( ssping, peer->fd.idx );
386462
}
387463
}
388464
}
389465

390466
static int
391-
peer_connect( fd_ssping_t * ssping,
392-
fd_ssping_peer_t * peer ) {
393-
int sockfd = socket( PF_INET, SOCK_DGRAM|SOCK_NONBLOCK, IPPROTO_ICMP );
467+
create_socket( fd_ssping_t * ssping,
468+
fd_ssping_peer_t * peer ) {
469+
int sockfd = socket( PF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0 );
394470
if( FD_UNLIKELY( -1==sockfd ) ) FD_LOG_ERR(( "socket failed (%i-%s)", errno, strerror( errno ) ));
395471

472+
int optval = 1;
473+
if( FD_UNLIKELY( -1==setsockopt( sockfd, SOL_TCP, TCP_NODELAY, &optval, sizeof(int) ) ) ) {
474+
FD_LOG_ERR(( "setsockopt() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
475+
}
476+
396477
struct sockaddr_in addr = {
397478
.sin_family = AF_INET,
398479
.sin_port = fd_ushort_bswap( peer->addr.port ),
@@ -409,10 +490,28 @@ peer_connect( fd_ssping_t * ssping,
409490
.events = POLLIN|POLLOUT,
410491
.revents = 0
411492
};
493+
494+
return 0;
495+
}
496+
497+
static int
498+
peer_connect( fd_ssping_t * ssping,
499+
fd_ssping_peer_t * peer ) {
500+
int err;
501+
err = create_socket( ssping, peer ); /* full */
502+
if( FD_UNLIKELY( err ) ) return err;
412503
ssping->fds_idx[ ssping->fds_len ] = peer_pool_idx( ssping->pool, peer );
413504
peer->fd.idx = ssping->fds_len;
414505
ssping->fds_len++;
415506

507+
err = create_socket( ssping, peer ); /* incremental */
508+
if( FD_UNLIKELY( err ) ) return err;
509+
ssping->fds_idx[ ssping->fds_len ] = peer_pool_idx( ssping->pool, peer );
510+
ssping->fds_len++;
511+
512+
fd_ssresolve_init( peer->full_ssresolve, peer->addr, ssping->fds[ peer->fd.idx ].fd, 1 );
513+
fd_ssresolve_init( peer->inc_ssresolve, peer->addr, ssping->fds[ peer->fd.idx+1UL ].fd, 0 );
514+
416515
return 0;
417516
}
418517

0 commit comments

Comments
 (0)