Skip to content

Commit ae85e45

Browse files
committed
net 2.0: generic userland RX flow steering
Fixes a design flaw where the xdp and sock tiles hardcode the flow steering rules for the app tiles. Defines a generic 'topo_net_rx' struct that maps UDP ports to output links (and metadata like DST_PROTO IDs). Adds a 'find_16x16' API for AVX-accelerated fast port matching. Temporarily undoes the 'repair ping-pong' flow steering hack.
1 parent cf3ceb4 commit ae85e45

File tree

22 files changed

+321
-286
lines changed

22 files changed

+321
-286
lines changed

src/app/fdctl/topology.c

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -380,14 +380,17 @@ fd_topo_initialize( config_t * config ) {
380380

381381
FOR(net_tile_cnt) fd_topos_net_tile_finish( topo, i );
382382

383+
fd_topo_net_rx_t rx_rules = {0};
384+
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_SHRED, "net_shred", config->tiles.shred.shred_listen_port );
385+
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_TPU_QUIC, "net_quic" , config->tiles.quic.quic_transaction_listen_port );
386+
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_TPU_UDP, "net_quic" , config->tiles.quic.regular_transaction_listen_port );
387+
383388
for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
384389
fd_topo_tile_t * tile = &topo->tiles[ i ];
385390

386391
if( FD_UNLIKELY( !strcmp( tile->name, "net" ) || !strcmp( tile->name, "sock" ) ) ) {
387392

388-
tile->net.shred_listen_port = config->tiles.shred.shred_listen_port;
389-
tile->net.quic_transaction_listen_port = config->tiles.quic.quic_transaction_listen_port;
390-
tile->net.legacy_transaction_listen_port = config->tiles.quic.regular_transaction_listen_port;
393+
tile->net.rx_rules = rx_rules;
391394

392395
} else if( FD_UNLIKELY( !strcmp( tile->name, "netlnk" ) ) ) {
393396

src/app/firedancer-dev/commands/backtest.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ backtest_topo( config_t * config ) {
323323

324324
for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
325325
fd_topo_tile_t * tile = &topo->tiles[ i ];
326-
if( !fd_topo_configure_tile( tile, config ) ) {
326+
if( !fd_topo_configure_tile( tile, config, NULL ) ) {
327327
FD_LOG_ERR(( "unknown tile name %lu `%s`", i, tile->name ));
328328
}
329329

src/app/firedancer-dev/commands/gossip.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ gossip_topo( config_t * config ) {
4242
if( net_tile_id==ULONG_MAX ) net_tile_id = fd_topo_find_tile( topo, "sock", 0UL );
4343
if( FD_UNLIKELY( net_tile_id==ULONG_MAX ) ) FD_LOG_ERR(( "net tile not found" ));
4444
fd_topo_tile_t * net_tile = &topo->tiles[ net_tile_id ];
45-
net_tile->net.gossip_listen_port = config->gossip.port;
45+
fd_topo_net_rx_rule_push( &net_tile->net.rx_rules, DST_PROTO_GOSSIP, "net_gossip", config->gossip.port );
4646

4747
fd_topob_wksp( topo, "gossip" );
4848
fd_topo_tile_t * gossip_tile = fd_topob_tile( topo, "gossip", "gossip", "metric_in", 0UL, 0, 0 );

src/app/firedancer-dev/commands/repair.c

Lines changed: 16 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,9 @@ static void
5858
repair_topo( config_t * config ) {
5959
resolve_gossip_entrypoints( config );
6060

61-
ulong net_tile_cnt = config->layout.net_tile_count;
62-
ulong shred_tile_cnt = config->layout.shred_tile_count;
63-
ulong quic_tile_cnt = config->layout.quic_tile_count;
64-
ulong sign_tile_cnt = config->firedancer.layout.sign_tile_count;
61+
ulong net_tile_cnt = config->layout.net_tile_count;
62+
ulong shred_tile_cnt = config->layout.shred_tile_count;
63+
ulong sign_tile_cnt = config->firedancer.layout.sign_tile_count;
6564

6665
fd_topo_t * topo = { fd_topob_new( &config->topo, config->name ) };
6766
topo->max_page_size = fd_cstr_to_shmem_page_sz( config->hugetlbfs.max_page_size );
@@ -72,7 +71,6 @@ repair_topo( config_t * config ) {
7271
fd_topob_wksp( topo, "net_shred" );
7372
fd_topob_wksp( topo, "net_gossip" );
7473
fd_topob_wksp( topo, "net_repair" );
75-
fd_topob_wksp( topo, "net_quic" );
7674

7775
fd_topob_wksp( topo, "shred_repair" );
7876
fd_topob_wksp( topo, "stake_out" );
@@ -94,8 +92,6 @@ repair_topo( config_t * config ) {
9492
fd_topob_wksp( topo, "sign_repair" );
9593

9694
fd_topob_wksp( topo, "repair_repla" );
97-
fd_topob_wksp( topo, "gossip_send" );
98-
fd_topob_wksp( topo, "send_txns" );
9995

10096
fd_topob_wksp( topo, "shred" );
10197
fd_topob_wksp( topo, "sign" );
@@ -112,7 +108,6 @@ repair_topo( config_t * config ) {
112108
ulong pending_fec_shreds_depth = fd_ulong_min( fd_ulong_pow2_up( config->tiles.shred.max_pending_shred_sets * FD_REEDSOL_DATA_SHREDS_MAX ), USHORT_MAX + 1 /* dcache max */ );
113109

114110
/* topo, link_name, wksp_name, depth, mtu, burst */
115-
FOR(quic_tile_cnt) fd_topob_link( topo, "quic_net", "net_quic", config->net.ingress_buffer_size, FD_NET_MTU, 1UL );
116111
FOR(shred_tile_cnt) fd_topob_link( topo, "shred_net", "net_shred", config->net.ingress_buffer_size, FD_NET_MTU, 1UL );
117112
118113
/**/ fd_topob_link( topo, "stake_out", "stake_out", 128UL, 40UL + 40200UL * 40UL, 1UL );
@@ -128,7 +123,6 @@ repair_topo( config_t * config ) {
128123
129124
/**/ fd_topob_link( topo, "crds_shred", "crds_shred", 128UL, 8UL + 40200UL * 38UL, 1UL );
130125
/**/ fd_topob_link( topo, "gossip_repai", "gossip_repai", 128UL, 40200UL * 38UL, 1UL );
131-
/**/ fd_topob_link( topo, "gossip_send", "gossip_send", 128UL, 40200UL * 38UL, 1UL );
132126
133127
/**/ fd_topob_link( topo, "gossip_net", "net_gossip", config->net.ingress_buffer_size, FD_NET_MTU, 1UL );
134128
@@ -146,8 +140,6 @@ repair_topo( config_t * config ) {
146140
/**/ fd_topob_link( topo, "repair_repla", "repair_repla", 65536UL, sizeof(fd_reasm_fec_t), 1UL );
147141
/**/ fd_topob_link( topo, "poh_shred", "poh_shred", 16384UL, USHORT_MAX, 1UL );
148142

149-
/**/ fd_topob_link( topo, "send_txns", "send_txns", 128UL, FD_TXN_MTU, 1UL );
150-
151143
FD_TEST( sizeof(fd_snapshot_manifest_t)<=(5UL*(1UL<<30UL)) );
152144
/**/ fd_topob_link( topo, "snap_out", "snap_out", 2UL, 5UL*(1UL<<30UL), 1UL );
153145

@@ -182,7 +174,6 @@ repair_topo( config_t * config ) {
182174

183175
FOR(net_tile_cnt) fd_topos_net_rx_link( topo, "net_gossip", i, config->net.ingress_buffer_size );
184176
FOR(net_tile_cnt) fd_topos_net_rx_link( topo, "net_repair", i, config->net.ingress_buffer_size );
185-
FOR(net_tile_cnt) fd_topos_net_rx_link( topo, "net_quic", i, config->net.ingress_buffer_size );
186177
FOR(net_tile_cnt) fd_topos_net_rx_link( topo, "net_shred", i, config->net.ingress_buffer_size );
187178

188179
/* topo, tile_name, tile_wksp, metrics_wksp, cpu_idx, is_agave, uses_keyswitch */
@@ -260,10 +251,6 @@ repair_topo( config_t * config ) {
260251
/* topo, tile_name, tile_kind_id, fseq_wksp, link_name, link_kind_id, reliable, polled */
261252
for( ulong j=0UL; j<shred_tile_cnt; j++ )
262253
fd_topos_tile_in_net( topo, "metric_in", "shred_net", j, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */
263-
for( ulong j=0UL; j<quic_tile_cnt; j++ )
264-
fd_topos_tile_in_net( topo, "metric_in", "quic_net", j, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */
265-
266-
/**/ fd_topob_tile_in( topo, "gossip", 0UL, "metric_in", "send_txns", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
267254

268255
/**/ fd_topos_tile_in_net( topo, "metric_in", "gossip_net", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */
269256
/**/ fd_topos_tile_in_net( topo, "metric_in", "repair_net", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */
@@ -300,7 +287,6 @@ repair_topo( config_t * config ) {
300287
/**/ fd_topob_tile_out( topo, "gossip", 0UL, "gossip_sign", 0UL );
301288
/**/ fd_topob_tile_in( topo, "gossip", 0UL, "metric_in", "sign_gossip", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_UNPOLLED );
302289
/**/ fd_topob_tile_out( topo, "sign", 0UL, "sign_gossip", 0UL );
303-
/**/ fd_topob_tile_out( topo, "gossip", 0UL, "gossip_send", 0UL );
304290
/**/ fd_topob_tile_out( topo, "gossip", 0UL, "gossip_tower", 0UL );
305291

306292
FOR(net_tile_cnt) fd_topob_tile_in( topo, "repair", 0UL, "metric_in", "net_repair", i, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */
@@ -350,23 +336,25 @@ repair_topo( config_t * config ) {
350336
fd_topob_tile_out( topo, "scap", 0UL, "snap_out", 0UL );
351337
}
352338

353-
FD_TEST( link_permit_no_producers( topo, "quic_net" ) == quic_tile_cnt );
354-
FD_TEST( link_permit_no_producers( topo, "poh_shred" ) == 1UL );
355-
FD_TEST( link_permit_no_producers( topo, "send_txns" ) == 1UL );
356-
FD_TEST( link_permit_no_producers( topo, "repair_scap" ) == 1UL );
357-
FD_TEST( link_permit_no_producers( topo, "replay_scap" ) == 1UL );
339+
FD_TEST( link_permit_no_producers( topo, "poh_shred" ) == 1UL );
340+
FD_TEST( link_permit_no_producers( topo, "repair_scap" ) == 1UL );
341+
FD_TEST( link_permit_no_producers( topo, "replay_scap" ) == 1UL );
358342

359-
FD_TEST( link_permit_no_consumers( topo, "net_quic" ) == quic_tile_cnt );
360-
FD_TEST( link_permit_no_consumers( topo, "gossip_verif" ) == 1UL );
361-
FD_TEST( link_permit_no_consumers( topo, "gossip_tower" ) == 1UL );
362-
FD_TEST( link_permit_no_consumers( topo, "gossip_send" ) == 1UL );
363-
FD_TEST( link_permit_no_consumers( topo, "repair_repla" ) == 1UL );
343+
FD_TEST( link_permit_no_consumers( topo, "gossip_verif" ) == 1UL );
344+
FD_TEST( link_permit_no_consumers( topo, "gossip_tower" ) == 1UL );
345+
FD_TEST( link_permit_no_consumers( topo, "repair_repla" ) == 1UL );
364346

365347
FOR(net_tile_cnt) fd_topos_net_tile_finish( topo, i );
366348

349+
fd_topo_net_rx_t rx_rules = {0};
350+
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_SHRED, "net_shred", config->tiles.shred.shred_listen_port );
351+
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_GOSSIP, "net_gossip", config->gossip.port );
352+
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_REPAIR, "net_repair", config->tiles.repair.repair_intake_listen_port );
353+
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_REPAIR, "net_repair", config->tiles.repair.repair_serve_listen_port );
354+
367355
for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
368356
fd_topo_tile_t * tile = &topo->tiles[ i ];
369-
if( !fd_topo_configure_tile( tile, config ) ) {
357+
if( !fd_topo_configure_tile( tile, config, &rx_rules ) ) {
370358
FD_LOG_ERR(( "unknown tile name %lu `%s`", i, tile->name ));
371359
}
372360
}

src/app/firedancer-dev/commands/send_test/send_test.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,12 @@ send_test_topo( config_t * config ) {
122122

123123
FOR(net_tile_cnt) fd_topos_net_tile_finish( topo, i );
124124

125+
fd_topo_net_rx_t rx_rules = {0};
126+
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_SEND, "net_send", config->tiles.send.send_src_port );
127+
125128
for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
126129
fd_topo_tile_t * tile = &topo->tiles[ i ];
127-
if( !fd_topo_configure_tile( tile, config ) ) {
130+
if( !fd_topo_configure_tile( tile, config, &rx_rules ) ) {
128131
FD_LOG_ERR(( "unknown tile name %lu `%s`", i, tile->name ));
129132
}
130133
}

src/app/firedancer-dev/commands/sim.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ sim_topo( config_t * config ) {
174174
} else {
175175
FD_LOG_NOTICE(( "Found archive file from config: %s", tile->archiver.rocksdb_path ));
176176
}
177-
} else if( !fd_topo_configure_tile( tile, config ) ) {
177+
} else if( !fd_topo_configure_tile( tile, config, NULL ) ) {
178178
FD_LOG_ERR(( "unknown tile name %lu `%s`", i, tile->name ));
179179
}
180180

src/app/firedancer-dev/commands/snapshot_load.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ snapshot_load_topo( config_t * config,
102102

103103
for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
104104
fd_topo_tile_t * tile = &topo->tiles[ i ];
105-
if( !fd_topo_configure_tile( tile, config ) ) {
105+
if( !fd_topo_configure_tile( tile, config, NULL ) ) {
106106
FD_LOG_ERR(( "unknown tile name %lu `%s`", i, tile->name ));
107107
}
108108
}

src/app/firedancer/topology.c

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -836,9 +836,18 @@ fd_topo_initialize( config_t * config ) {
836836

837837
FOR(net_tile_cnt) fd_topos_net_tile_finish( topo, i );
838838

839+
fd_topo_net_rx_t rx_rules = {0};
840+
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_TPU_QUIC, "net_quic", config->tiles.quic.quic_transaction_listen_port );
841+
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_TPU_QUIC, "net_quic", config->tiles.quic.quic_transaction_listen_port );
842+
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_SHRED, "net_shred", config->tiles.shred.shred_listen_port );
843+
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_GOSSIP, "net_gossip", config->gossip.port );
844+
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_REPAIR, "net_repair", config->tiles.repair.repair_intake_listen_port );
845+
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_REPAIR, "net_repair", config->tiles.repair.repair_serve_listen_port );
846+
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_SEND, "net_send", config->tiles.send.send_src_port );
847+
839848
for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
840849
fd_topo_tile_t * tile = &topo->tiles[ i ];
841-
if( !fd_topo_configure_tile( tile, config ) ) {
850+
if( !fd_topo_configure_tile( tile, config, &rx_rules ) ) {
842851
FD_LOG_ERR(( "unknown tile name %lu `%s`", i, tile->name ));
843852
}
844853
}
@@ -869,17 +878,12 @@ fd_topo_initialize( config_t * config ) {
869878
}
870879

871880
int
872-
fd_topo_configure_tile( fd_topo_tile_t * tile,
873-
fd_config_t * config ) {
874-
if( FD_UNLIKELY( !strcmp( tile->name, "net" ) || !strcmp( tile->name, "sock" ) ) ) {
875-
876-
tile->net.shred_listen_port = config->tiles.shred.shred_listen_port;
877-
tile->net.quic_transaction_listen_port = config->tiles.quic.quic_transaction_listen_port;
878-
tile->net.legacy_transaction_listen_port = config->tiles.quic.regular_transaction_listen_port;
879-
tile->net.gossip_listen_port = config->gossip.port;
880-
tile->net.repair_intake_listen_port = config->tiles.repair.repair_intake_listen_port;
881-
tile->net.repair_serve_listen_port = config->tiles.repair.repair_serve_listen_port;
882-
tile->net.send_src_port = config->tiles.send.send_src_port;
881+
fd_topo_configure_tile( fd_topo_tile_t * tile,
882+
fd_config_t * config,
883+
fd_topo_net_rx_t const * rx_rules ) {
884+
if( FD_UNLIKELY( rx_rules && (!strcmp( tile->name, "net" ) || !strcmp( tile->name, "sock" )) ) ) {
885+
886+
tile->net.rx_rules = *rx_rules;
883887

884888
} else if( FD_UNLIKELY( !strcmp( tile->name, "netlnk" ) ) ) {
885889

src/app/firedancer/topology.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,9 @@ setup_topo_txncache( fd_topo_t * topo,
4949
ulong max_txn_per_slot );
5050

5151
int
52-
fd_topo_configure_tile( fd_topo_tile_t * tile,
53-
fd_config_t * config );
52+
fd_topo_configure_tile( fd_topo_tile_t * tile,
53+
fd_config_t * config,
54+
fd_topo_net_rx_t const * rx_rules );
5455

5556
FD_PROTOTYPES_END
5657

src/app/shared_dev/commands/pktgen/pktgen.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ pktgen_cmd_fn( args_t * args FD_PARAM_UNUSED,
197197
fd_topo_tile_t * metric_tile = &topo->tiles[ fd_topo_find_tile( topo, "metric", 0UL ) ];
198198

199199
ushort const listen_port = 9000;
200-
net_tile->net.legacy_transaction_listen_port = listen_port;
200+
fd_topo_net_rx_rule_push( &net_tile->net.rx_rules, DST_PROTO_TPU_UDP, "net_quic", listen_port );
201201

202202
if( FD_UNLIKELY( !fd_cstr_to_ip4_addr( config->tiles.metric.prometheus_listen_address, &metric_tile->metric.prometheus_listen_addr ) ) )
203203
FD_LOG_ERR(( "failed to parse prometheus listen address `%s`", config->tiles.metric.prometheus_listen_address ));

0 commit comments

Comments
 (0)