1
+ #define _GNU_SOURCE
1
2
#include "fd_ssping.h"
3
+ #include "fd_ssresolve.h"
2
4
3
5
#include "../../../util/bits/fd_bits.h"
4
6
#include "../../../util/log/fd_log.h"
9
11
#include <sys/socket.h>
10
12
#include <netinet/in.h>
11
13
#include <netinet/ip_icmp.h>
14
+ #include <netinet/tcp.h>
12
15
13
16
#define PEER_STATE_UNPINGED 0
14
17
#define PEER_STATE_PINGED 1
15
18
#define PEER_STATE_VALID 2
16
19
#define PEER_STATE_REFRESHING 3
17
20
#define PEER_STATE_INVALID 4
18
21
19
- #define PEER_DEADLINE_NANOS_PING (1L *1000L*1000L*1000L) /* 1 second */
22
+ #define PEER_DEADLINE_NANOS_PING (2L *1000L*1000L*1000L) /* 2 seconds */
20
23
#define PEER_DEADLINE_NANOS_VALID (2L*60L*1000L*1000L*1000L) /* 2 minutes */
21
24
#define PEER_DEADLINE_NANOS_INVALID (5L*60L*1000L*1000L*1000L) /* 5 minutes */
22
25
23
26
struct fd_ssping_peer {
24
- ulong refcnt ;
25
- fd_ip4_port_t addr ;
27
+ ulong refcnt ;
28
+ fd_ip4_port_t addr ;
29
+
30
+ fd_ssresolve_t * full_ssresolve ;
31
+ fd_ssresolve_t * inc_ssresolve ;
32
+
33
+ struct {
34
+
35
+ struct {
36
+ ulong slot ;
37
+ fd_hash_t hash ;
38
+ } full ;
39
+
40
+ struct {
41
+ ulong base_slot ;
42
+ ulong slot ;
43
+ fd_hash_t hash ;
44
+ } incremental ;
45
+
46
+ } snapshot_info ;
26
47
27
48
struct {
28
49
ulong next ;
@@ -50,6 +71,8 @@ struct fd_ssping_peer {
50
71
} fd ;
51
72
52
73
int state ;
74
+ ulong full_latency_nanos ;
75
+ ulong incremental_latency_nanos ;
53
76
ulong latency_nanos ;
54
77
long deadline_nanos ;
55
78
};
@@ -131,8 +154,13 @@ fd_ssping_footprint( ulong max_peers ) {
131
154
l = FD_LAYOUT_APPEND ( l , deadline_list_align (), deadline_list_footprint () );
132
155
l = FD_LAYOUT_APPEND ( l , deadline_list_align (), deadline_list_footprint () );
133
156
l = FD_LAYOUT_APPEND ( l , deadline_list_align (), deadline_list_footprint () );
134
- l = FD_LAYOUT_APPEND ( l , sizeof (struct pollfd ), max_peers * sizeof (struct pollfd ) );
135
- l = FD_LAYOUT_APPEND ( l , sizeof (ulong ), max_peers * sizeof (ulong ) );
157
+ l = FD_LAYOUT_APPEND ( l , sizeof (struct pollfd ), max_peers * sizeof (struct pollfd )* 2UL );
158
+ l = FD_LAYOUT_APPEND ( l , sizeof (ulong ), max_peers * sizeof (ulong )* 2UL );
159
+
160
+ for ( ulong i = 0UL ; i < max_peers * 2UL ; i ++ ) {
161
+ l = FD_LAYOUT_APPEND ( l , fd_ssresolve_align (), fd_ssresolve_footprint () );
162
+ }
163
+
136
164
return FD_LAYOUT_FINI ( l , FD_SSPING_ALIGN );
137
165
}
138
166
@@ -156,20 +184,20 @@ fd_ssping_new( void * shmem,
156
184
}
157
185
158
186
FD_SCRATCH_ALLOC_INIT ( l , shmem );
159
- fd_ssping_t * ssping = FD_SCRATCH_ALLOC_APPEND ( l , FD_SSPING_ALIGN , sizeof (fd_ssping_t ) );
160
- void * _pool = FD_SCRATCH_ALLOC_APPEND ( l , peer_pool_align (), peer_pool_footprint ( max_peers ) );
161
- void * _map = FD_SCRATCH_ALLOC_APPEND ( l , peer_map_align (), peer_map_footprint ( max_peers ) );
162
- void * _score_treap = FD_SCRATCH_ALLOC_APPEND ( l , score_treap_align (), score_treap_footprint ( max_peers ) );
187
+ fd_ssping_t * ssping = FD_SCRATCH_ALLOC_APPEND ( l , FD_SSPING_ALIGN , sizeof (fd_ssping_t ) );
188
+ void * _pool = FD_SCRATCH_ALLOC_APPEND ( l , peer_pool_align (), peer_pool_footprint ( max_peers ) );
189
+ void * _map = FD_SCRATCH_ALLOC_APPEND ( l , peer_map_align (), peer_map_footprint ( max_peers ) );
190
+ void * _score_treap = FD_SCRATCH_ALLOC_APPEND ( l , score_treap_align (), score_treap_footprint ( max_peers ) );
163
191
void * _unpinged = FD_SCRATCH_ALLOC_APPEND ( l , deadline_list_align (), deadline_list_footprint () );
164
192
void * _pinged = FD_SCRATCH_ALLOC_APPEND ( l , deadline_list_align (), deadline_list_footprint () );
165
193
void * _valid = FD_SCRATCH_ALLOC_APPEND ( l , deadline_list_align (), deadline_list_footprint () );
166
194
void * _refreshing = FD_SCRATCH_ALLOC_APPEND ( l , deadline_list_align (), deadline_list_footprint () );
167
195
void * _invalid = FD_SCRATCH_ALLOC_APPEND ( l , deadline_list_align (), deadline_list_footprint () );
168
- struct pollfd * fds = FD_SCRATCH_ALLOC_APPEND ( l , sizeof (struct pollfd ), max_peers * sizeof (struct pollfd ) );
169
- ulong * fds_idx = FD_SCRATCH_ALLOC_APPEND ( l , sizeof (ulong ), max_peers * sizeof (ulong ) );
196
+ struct pollfd * _fds = FD_SCRATCH_ALLOC_APPEND ( l , sizeof (struct pollfd ), max_peers * sizeof (struct pollfd )* 2UL );
197
+ ulong * fds_idx = FD_SCRATCH_ALLOC_APPEND ( l , sizeof (ulong ), max_peers * sizeof (ulong )* 2UL );
170
198
171
- ssping -> pool = peer_pool_join ( peer_pool_new ( _pool , max_peers ) );
172
- ssping -> map = peer_map_join ( peer_map_new ( _map , max_peers , seed ) );
199
+ ssping -> pool = peer_pool_join ( peer_pool_new ( _pool , max_peers ) );
200
+ ssping -> map = peer_map_join ( peer_map_new ( _map , max_peers , seed ) );
173
201
ssping -> score_treap = score_treap_join ( score_treap_new ( _score_treap , max_peers ) );
174
202
175
203
ssping -> unpinged = deadline_list_join ( deadline_list_new ( _unpinged ) );
@@ -178,9 +206,17 @@ fd_ssping_new( void * shmem,
178
206
ssping -> refreshing = deadline_list_join ( deadline_list_new ( _refreshing ) );
179
207
ssping -> invalid = deadline_list_join ( deadline_list_new ( _invalid ) );
180
208
181
- ssping -> fds_len = 0UL ;
182
- ssping -> fds = fds ;
183
- ssping -> fds_idx = fds_idx ;
209
+ ssping -> fds_len = 0UL ;
210
+ ssping -> fds = _fds ;
211
+ ssping -> fds_idx = fds_idx ;
212
+
213
+ FD_TEST ( peer_pool_max ( ssping -> pool )== max_peers );
214
+ for ( ulong i = 0UL ; i < peer_pool_max ( ssping -> pool ); i ++ ) {
215
+ void * _full_ssresolve = FD_SCRATCH_ALLOC_APPEND ( l , fd_ssresolve_align (), fd_ssresolve_footprint () );
216
+ void * _inc_ssresolve = FD_SCRATCH_ALLOC_APPEND ( l , fd_ssresolve_align (), fd_ssresolve_footprint () );
217
+ ssping -> pool [ i ].full_ssresolve = fd_ssresolve_join ( fd_ssresolve_new ( _full_ssresolve ) );
218
+ ssping -> pool [ i ].inc_ssresolve = fd_ssresolve_join ( fd_ssresolve_new ( _inc_ssresolve ) );
219
+ }
184
220
185
221
FD_COMPILER_MFENCE ();
186
222
FD_VOLATILE ( ssping -> magic ) = FD_SSPING_MAGIC ;
@@ -222,6 +258,11 @@ fd_ssping_add( fd_ssping_t * ssping,
222
258
peer -> refcnt = 0UL ;
223
259
peer -> state = PEER_STATE_UNPINGED ;
224
260
peer -> addr = addr ;
261
+ peer -> snapshot_info .full .slot = ULONG_MAX ;
262
+ peer -> snapshot_info .incremental .base_slot = ULONG_MAX ;
263
+ peer -> snapshot_info .incremental .slot = ULONG_MAX ;
264
+ peer -> full_latency_nanos = 0UL ;
265
+ peer -> incremental_latency_nanos = 0UL ;
225
266
peer_map_ele_insert ( ssping -> map , peer , ssping -> pool );
226
267
deadline_list_ele_push_tail ( ssping -> unpinged , peer , ssping -> pool );
227
268
}
@@ -238,13 +279,20 @@ remove_ping_fd( fd_ssping_t * ssping,
238
279
return ;
239
280
}
240
281
241
- ssping -> fds [ idx ] = ssping -> fds [ ssping -> fds_len - 1UL ] ;
242
- ssping -> fds_idx [ idx ] = ssping -> fds_idx [ ssping -> fds_len - 1UL ] ;
282
+ ulong full_idx = idx ;
283
+ ulong inc_idx = idx + 1UL ;
243
284
244
- fd_ssping_peer_t * peer = peer_pool_ele ( ssping -> pool , ssping -> fds_idx [ idx ] );
245
- peer -> fd .idx = idx ;
285
+ ssping -> fds [ inc_idx ] = ssping -> fds [ ssping -> fds_len - 1UL ];
286
+ ssping -> fds_idx [ inc_idx ] = ssping -> fds_idx [ ssping -> fds_len - 1UL ];
287
+ ssping -> fds_len -- ;
246
288
289
+ ssping -> fds [ full_idx ] = ssping -> fds [ ssping -> fds_len - 1UL ];
290
+ ssping -> fds_idx [ full_idx ] = ssping -> fds_idx [ ssping -> fds_len - 1UL ];
247
291
ssping -> fds_len -- ;
292
+
293
+ FD_TEST ( ssping -> fds_idx [ full_idx ]== ssping -> fds_idx [ inc_idx ] );
294
+ fd_ssping_peer_t * peer = peer_pool_ele ( ssping -> pool , ssping -> fds_idx [ full_idx ] );
295
+ peer -> fd .idx = full_idx ;
248
296
}
249
297
250
298
void
@@ -333,43 +381,71 @@ poll_advance( fd_ssping_t * ssping,
333
381
else if ( FD_UNLIKELY ( -1 == nfds ) ) FD_LOG_ERR (( "poll failed (%i-%s)" , errno , strerror ( errno ) ));
334
382
335
383
for ( ulong i = 0UL ; i < ssping -> fds_len ; i ++ ) {
384
+ fd_ssping_peer_t * peer = peer_pool_ele ( ssping -> pool , ssping -> fds_idx [ i ] );
336
385
struct pollfd * pfd = & ssping -> fds [ i ];
337
386
if ( FD_UNLIKELY ( pfd -> revents & (POLLERR |POLLHUP ) ) ) {
338
387
unping_peer ( ssping , peer_pool_ele ( ssping -> pool , ssping -> fds_idx [ i ] ), now );
339
388
continue ;
340
389
}
341
390
342
- if ( FD_LIKELY ( pfd -> revents & POLLOUT ) ) {
343
- struct icmphdr icmp_hdr = (struct icmphdr ){
344
- .type = ICMP_ECHO ,
345
- .code = 0 ,
346
- .un .echo .id = 0 , /* Automatically set by kernel for a ping socket */
347
- .un .echo .sequence = 0 , /* Only one ping goes out per socket, so nothing to change */
348
- .checksum = 0 /* Will be calculated by the kernel */
349
- };
350
-
351
- long result = send ( pfd -> fd , & icmp_hdr , sizeof (icmp_hdr ), 0 );
352
- if ( FD_UNLIKELY ( !result ) ) continue ;
353
- if ( FD_UNLIKELY ( -1 == result && errno == EAGAIN ) ) continue ;
354
- else if ( FD_UNLIKELY ( -1 == result ) ) {
355
- unping_peer ( ssping , peer_pool_ele ( ssping -> pool , ssping -> fds_idx [ i ] ), now );
356
- continue ;
357
- }
358
- pfd -> revents &= ~POLLOUT ;
391
+ int full = i & 1UL ? 0 : 1 ; /* even indices are full, odd indices are incremental */
392
+ fd_ssresolve_t * ssresolve = full ? peer -> full_ssresolve : peer -> inc_ssresolve ;
393
+ if ( FD_UNLIKELY ( now > peer -> deadline_nanos ) ) {
394
+ unping_peer ( ssping , peer , now );
395
+ continue ;
359
396
}
360
397
361
- if ( FD_LIKELY ( pfd -> revents & POLLIN ) ) {
362
- struct icmphdr icmp_hdr ;
363
- long result = recv ( pfd -> fd , & icmp_hdr , sizeof (icmp_hdr ), 0 );
364
- if ( FD_UNLIKELY ( -1 == result && errno == EAGAIN ) ) continue ;
365
- else if ( FD_UNLIKELY ( -1 == result || (ulong )result < sizeof (icmp_hdr ) || icmp_hdr .type != ICMP_ECHOREPLY ) ) {
366
- unping_peer ( ssping , peer_pool_ele ( ssping -> pool , ssping -> fds_idx [ i ] ), now );
367
- continue ;
398
+ if ( FD_LIKELY ( !fd_ssresolve_is_done ( ssresolve ) ) ) {
399
+ if ( FD_LIKELY ( pfd -> revents & POLLOUT ) ) {
400
+ int res = fd_ssresolve_advance_poll_out ( ssresolve );
401
+
402
+ if ( FD_UNLIKELY ( res == FD_SSRESOLVE_ADVANCE_ERROR ) ) {
403
+ unping_peer ( ssping , peer_pool_ele ( ssping -> pool , ssping -> fds_idx [ i ] ), now );
404
+ continue ;
405
+ }
406
+
407
+ pfd -> revents &= ~POLLOUT ;
368
408
}
369
409
370
- fd_ssping_peer_t * peer = peer_pool_ele ( ssping -> pool , ssping -> fds_idx [ i ] );
371
- FD_TEST ( peer -> deadline_nanos > now );
372
- peer -> latency_nanos = PEER_DEADLINE_NANOS_PING - (ulong )(peer -> deadline_nanos - now );
410
+ if ( FD_LIKELY ( pfd -> revents & POLLIN ) ) {
411
+ fd_ssresolve_result_t resolve_result ;
412
+ int res = fd_ssresolve_advance_poll_in ( ssresolve , & resolve_result );
413
+
414
+ if ( FD_UNLIKELY ( res == FD_SSRESOLVE_ADVANCE_ERROR ) ) {
415
+ unping_peer ( ssping , peer_pool_ele ( ssping -> pool , ssping -> fds_idx [ i ] ), now );
416
+ continue ;
417
+ } else if ( FD_UNLIKELY ( res == FD_SSRESOLVE_ADVANCE_AGAIN ) ) {
418
+ continue ;
419
+ } else { /* FD_SSRESOLVE_ADVANCE_SUCCESS */
420
+ FD_TEST ( peer -> deadline_nanos > now );
421
+ if ( resolve_result .base_slot == ULONG_MAX ) {
422
+ peer -> snapshot_info .full .slot = resolve_result .slot ;
423
+ memcpy ( & peer -> snapshot_info .full .hash , & resolve_result .hash , sizeof (fd_hash_t ) );
424
+ peer -> full_latency_nanos = PEER_DEADLINE_NANOS_PING - (ulong )(peer -> deadline_nanos - now );
425
+ } else {
426
+ peer -> snapshot_info .incremental .base_slot = resolve_result .base_slot ;
427
+ peer -> snapshot_info .incremental .slot = resolve_result .slot ;
428
+ memcpy ( & peer -> snapshot_info .incremental .hash , & resolve_result .hash , sizeof (fd_hash_t ) );
429
+ peer -> incremental_latency_nanos = PEER_DEADLINE_NANOS_PING - (ulong )(peer -> deadline_nanos - now );
430
+ }
431
+ }
432
+ }
433
+ }
434
+
435
+ /* Once both the full and incremental snapshots are resolved, we can
436
+ mark the peer valid and remove the peer from the list of peers to
437
+ ping. */
438
+ if ( fd_ssresolve_is_done ( peer -> full_ssresolve ) &&
439
+ fd_ssresolve_is_done ( peer -> inc_ssresolve ) ) {
440
+ FD_LOG_NOTICE (("successfully resolved snapshots for peer " FD_IP4_ADDR_FMT ":%hu "
441
+ "with full slot %lu, incremental base slot %lu and incremental slot %lu" ,
442
+ FD_IP4_ADDR_FMT_ARGS ( peer -> addr .addr ), peer -> addr .port ,
443
+ peer -> snapshot_info .full .slot ,
444
+ peer -> snapshot_info .incremental .base_slot ,
445
+ peer -> snapshot_info .incremental .slot ));
446
+ peer -> latency_nanos = (peer -> full_latency_nanos + peer -> incremental_latency_nanos ) / 2UL ;
447
+ FD_LOG_NOTICE (( "full latency is %lu, incremental latency is %lu, latency is %lu" ,
448
+ peer -> full_latency_nanos , peer -> incremental_latency_nanos , peer -> latency_nanos ));
373
449
374
450
if ( FD_LIKELY ( peer -> state == PEER_STATE_REFRESHING ) ) {
375
451
score_treap_ele_remove ( ssping -> score_treap , peer , ssping -> pool );
@@ -382,17 +458,22 @@ poll_advance( fd_ssping_t * ssping,
382
458
deadline_list_ele_remove ( ssping -> pinged , peer , ssping -> pool );
383
459
deadline_list_ele_push_tail ( ssping -> valid , peer , ssping -> pool );
384
460
score_treap_ele_insert ( ssping -> score_treap , peer , ssping -> pool );
385
- remove_ping_fd ( ssping , i );
461
+ remove_ping_fd ( ssping , peer -> fd . idx );
386
462
}
387
463
}
388
464
}
389
465
390
466
static int
391
- peer_connect ( fd_ssping_t * ssping ,
392
- fd_ssping_peer_t * peer ) {
393
- int sockfd = socket ( PF_INET , SOCK_DGRAM |SOCK_NONBLOCK , IPPROTO_ICMP );
467
+ create_socket ( fd_ssping_t * ssping ,
468
+ fd_ssping_peer_t * peer ) {
469
+ int sockfd = socket ( PF_INET , SOCK_STREAM |SOCK_NONBLOCK , 0 );
394
470
if ( FD_UNLIKELY ( -1 == sockfd ) ) FD_LOG_ERR (( "socket failed (%i-%s)" , errno , strerror ( errno ) ));
395
471
472
+ int optval = 1 ;
473
+ if ( FD_UNLIKELY ( -1 == setsockopt ( sockfd , SOL_TCP , TCP_NODELAY , & optval , sizeof (int ) ) ) ) {
474
+ FD_LOG_ERR (( "setsockopt() failed (%d-%s)" , errno , fd_io_strerror ( errno ) ));
475
+ }
476
+
396
477
struct sockaddr_in addr = {
397
478
.sin_family = AF_INET ,
398
479
.sin_port = fd_ushort_bswap ( peer -> addr .port ),
@@ -409,10 +490,28 @@ peer_connect( fd_ssping_t * ssping,
409
490
.events = POLLIN |POLLOUT ,
410
491
.revents = 0
411
492
};
493
+
494
+ return 0 ;
495
+ }
496
+
497
+ static int
498
+ peer_connect ( fd_ssping_t * ssping ,
499
+ fd_ssping_peer_t * peer ) {
500
+ int err ;
501
+ err = create_socket ( ssping , peer ); /* full */
502
+ if ( FD_UNLIKELY ( err ) ) return err ;
412
503
ssping -> fds_idx [ ssping -> fds_len ] = peer_pool_idx ( ssping -> pool , peer );
413
504
peer -> fd .idx = ssping -> fds_len ;
414
505
ssping -> fds_len ++ ;
415
506
507
+ err = create_socket ( ssping , peer ); /* incremental */
508
+ if ( FD_UNLIKELY ( err ) ) return err ;
509
+ ssping -> fds_idx [ ssping -> fds_len ] = peer_pool_idx ( ssping -> pool , peer );
510
+ ssping -> fds_len ++ ;
511
+
512
+ fd_ssresolve_init ( peer -> full_ssresolve , peer -> addr , ssping -> fds [ peer -> fd .idx ].fd , 1 );
513
+ fd_ssresolve_init ( peer -> inc_ssresolve , peer -> addr , ssping -> fds [ peer -> fd .idx + 1UL ].fd , 0 );
514
+
416
515
return 0 ;
417
516
}
418
517
0 commit comments