Skip to content

Generic userland RX flow steering #5993

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion book/api/metrics-generated.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@

| Metric | Type | Description |
|--------|------|-------------|
| <span class="metrics-name">net_&#8203;rx_&#8203;pkt_&#8203;cnt</span> | counter | Packet receive count. |
| <span class="metrics-name">net_&#8203;rx_&#8203;pkt_&#8203;cnt</span><br/>{pkt_&#8203;kind="<span class="metrics-enum">ip4_&#8203;udp</span>"} | counter | Packet receive count. (IPv4 UDP packet (no options)) |
| <span class="metrics-name">net_&#8203;rx_&#8203;pkt_&#8203;cnt</span><br/>{pkt_&#8203;kind="<span class="metrics-enum">ip4_&#8203;opt_&#8203;udp</span>"} | counter | Packet receive count. (IPv4 UDP packet (with options)) |
| <span class="metrics-name">net_&#8203;rx_&#8203;bytes_&#8203;total</span> | counter | Total number of bytes received (including Ethernet header). |
| <span class="metrics-name">net_&#8203;rx_&#8203;undersz_&#8203;cnt</span> | counter | Number of incoming packets dropped due to being too small. |
| <span class="metrics-name">net_&#8203;rx_&#8203;fill_&#8203;blocked_&#8203;cnt</span> | counter | Number of incoming packets dropped due to fill ring being full. |
Expand Down
9 changes: 6 additions & 3 deletions src/app/fdctl/topology.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,14 +380,17 @@ fd_topo_initialize( config_t * config ) {

FOR(net_tile_cnt) fd_topos_net_tile_finish( topo, i );

fd_topo_net_rx_t rx_rules = {0};
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_SHRED, "net_shred", config->tiles.shred.shred_listen_port );
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_TPU_QUIC, "net_quic" , config->tiles.quic.quic_transaction_listen_port );
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_TPU_UDP, "net_quic" , config->tiles.quic.regular_transaction_listen_port );

for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
fd_topo_tile_t * tile = &topo->tiles[ i ];

if( FD_UNLIKELY( !strcmp( tile->name, "net" ) || !strcmp( tile->name, "sock" ) ) ) {

tile->net.shred_listen_port = config->tiles.shred.shred_listen_port;
tile->net.quic_transaction_listen_port = config->tiles.quic.quic_transaction_listen_port;
tile->net.legacy_transaction_listen_port = config->tiles.quic.regular_transaction_listen_port;
tile->net.rx_rules = rx_rules;

} else if( FD_UNLIKELY( !strcmp( tile->name, "netlnk" ) ) ) {

Expand Down
2 changes: 1 addition & 1 deletion src/app/firedancer-dev/commands/backtest.c
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ backtest_topo( config_t * config ) {

for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
fd_topo_tile_t * tile = &topo->tiles[ i ];
if( !fd_topo_configure_tile( tile, config ) ) {
if( !fd_topo_configure_tile( tile, config, NULL ) ) {
FD_LOG_ERR(( "unknown tile name %lu `%s`", i, tile->name ));
}

Expand Down
2 changes: 1 addition & 1 deletion src/app/firedancer-dev/commands/gossip.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ gossip_topo( config_t * config ) {
if( net_tile_id==ULONG_MAX ) net_tile_id = fd_topo_find_tile( topo, "sock", 0UL );
if( FD_UNLIKELY( net_tile_id==ULONG_MAX ) ) FD_LOG_ERR(( "net tile not found" ));
fd_topo_tile_t * net_tile = &topo->tiles[ net_tile_id ];
net_tile->net.gossip_listen_port = config->gossip.port;
fd_topo_net_rx_rule_push( &net_tile->net.rx_rules, DST_PROTO_GOSSIP, "net_gossip", config->gossip.port );

fd_topob_wksp( topo, "gossip" );
fd_topo_tile_t * gossip_tile = fd_topob_tile( topo, "gossip", "gossip", "metric_in", 0UL, 0, 0 );
Expand Down
42 changes: 15 additions & 27 deletions src/app/firedancer-dev/commands/repair.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ static void
repair_topo( config_t * config ) {
resolve_gossip_entrypoints( config );

ulong net_tile_cnt = config->layout.net_tile_count;
ulong shred_tile_cnt = config->layout.shred_tile_count;
ulong quic_tile_cnt = config->layout.quic_tile_count;
ulong net_tile_cnt = config->layout.net_tile_count;
ulong shred_tile_cnt = config->layout.shred_tile_count;

fd_topo_t * topo = { fd_topob_new( &config->topo, config->name ) };
topo->max_page_size = fd_cstr_to_shmem_page_sz( config->hugetlbfs.max_page_size );
Expand All @@ -71,7 +70,6 @@ repair_topo( config_t * config ) {
fd_topob_wksp( topo, "net_shred" );
fd_topob_wksp( topo, "net_gossip" );
fd_topob_wksp( topo, "net_repair" );
fd_topob_wksp( topo, "net_quic" );

fd_topob_wksp( topo, "shred_repair" );
fd_topob_wksp( topo, "stake_out" );
Expand All @@ -93,8 +91,6 @@ repair_topo( config_t * config ) {
fd_topob_wksp( topo, "sign_repair" );

fd_topob_wksp( topo, "repair_repla" );
fd_topob_wksp( topo, "gossip_send" );
fd_topob_wksp( topo, "send_txns" );

fd_topob_wksp( topo, "shred" );
fd_topob_wksp( topo, "sign" );
Expand All @@ -111,7 +107,6 @@ repair_topo( config_t * config ) {
ulong pending_fec_shreds_depth = fd_ulong_min( fd_ulong_pow2_up( config->tiles.shred.max_pending_shred_sets * FD_REEDSOL_DATA_SHREDS_MAX ), USHORT_MAX + 1 /* dcache max */ );

/* topo, link_name, wksp_name, depth, mtu, burst */
FOR(quic_tile_cnt) fd_topob_link( topo, "quic_net", "net_quic", config->net.ingress_buffer_size, FD_NET_MTU, 1UL );
FOR(shred_tile_cnt) fd_topob_link( topo, "shred_net", "net_shred", config->net.ingress_buffer_size, FD_NET_MTU, 1UL );

/**/ fd_topob_link( topo, "stake_out", "stake_out", 128UL, 40UL + 40200UL * 40UL, 1UL );
Expand All @@ -127,7 +122,6 @@ repair_topo( config_t * config ) {

/**/ fd_topob_link( topo, "crds_shred", "crds_shred", 128UL, 8UL + 40200UL * 38UL, 1UL );
/**/ fd_topob_link( topo, "gossip_repai", "gossip_repai", 128UL, 40200UL * 38UL, 1UL );
/**/ fd_topob_link( topo, "gossip_send", "gossip_send", 128UL, 40200UL * 38UL, 1UL );

/**/ fd_topob_link( topo, "gossip_net", "net_gossip", config->net.ingress_buffer_size, FD_NET_MTU, 1UL );

Expand All @@ -140,8 +134,6 @@ repair_topo( config_t * config ) {
/**/ fd_topob_link( topo, "repair_repla", "repair_repla", 65536UL, sizeof(fd_reasm_fec_t), 1UL );
/**/ fd_topob_link( topo, "poh_shred", "poh_shred", 16384UL, USHORT_MAX, 1UL );

/**/ fd_topob_link( topo, "send_txns", "send_txns", 128UL, FD_TXN_MTU, 1UL );

FD_TEST( sizeof(fd_snapshot_manifest_t)<=(5UL*(1UL<<30UL)) );
/**/ fd_topob_link( topo, "snap_out", "snap_out", 2UL, 5UL*(1UL<<30UL), 1UL );

Expand Down Expand Up @@ -176,7 +168,6 @@ repair_topo( config_t * config ) {

FOR(net_tile_cnt) fd_topos_net_rx_link( topo, "net_gossip", i, config->net.ingress_buffer_size );
FOR(net_tile_cnt) fd_topos_net_rx_link( topo, "net_repair", i, config->net.ingress_buffer_size );
FOR(net_tile_cnt) fd_topos_net_rx_link( topo, "net_quic", i, config->net.ingress_buffer_size );
FOR(net_tile_cnt) fd_topos_net_rx_link( topo, "net_shred", i, config->net.ingress_buffer_size );

/* topo, tile_name, tile_wksp, metrics_wksp, cpu_idx, is_agave, uses_keyswitch */
Expand Down Expand Up @@ -254,10 +245,6 @@ repair_topo( config_t * config ) {
/* topo, tile_name, tile_kind_id, fseq_wksp, link_name, link_kind_id, reliable, polled */
for( ulong j=0UL; j<shred_tile_cnt; j++ )
fd_topos_tile_in_net( topo, "metric_in", "shred_net", j, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */
for( ulong j=0UL; j<quic_tile_cnt; j++ )
fd_topos_tile_in_net( topo, "metric_in", "quic_net", j, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */

/**/ fd_topob_tile_in( topo, "gossip", 0UL, "metric_in", "send_txns", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );

/**/ fd_topos_tile_in_net( topo, "metric_in", "gossip_net", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */
/**/ fd_topos_tile_in_net( topo, "metric_in", "repair_net", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */
Expand Down Expand Up @@ -294,7 +281,6 @@ repair_topo( config_t * config ) {
/**/ fd_topob_tile_out( topo, "gossip", 0UL, "gossip_sign", 0UL );
/**/ fd_topob_tile_in( topo, "gossip", 0UL, "metric_in", "sign_gossip", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_UNPOLLED );
/**/ fd_topob_tile_out( topo, "sign", 0UL, "sign_gossip", 0UL );
/**/ fd_topob_tile_out( topo, "gossip", 0UL, "gossip_send", 0UL );
/**/ fd_topob_tile_out( topo, "gossip", 0UL, "gossip_tower", 0UL );

FOR(net_tile_cnt) fd_topob_tile_in( topo, "repair", 0UL, "metric_in", "net_repair", i, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */
Expand Down Expand Up @@ -339,23 +325,25 @@ repair_topo( config_t * config ) {
fd_topob_tile_out( topo, "scap", 0UL, "snap_out", 0UL );
}

FD_TEST( link_permit_no_producers( topo, "quic_net" ) == quic_tile_cnt );
FD_TEST( link_permit_no_producers( topo, "poh_shred" ) == 1UL );
FD_TEST( link_permit_no_producers( topo, "send_txns" ) == 1UL );
FD_TEST( link_permit_no_producers( topo, "repair_scap" ) == 1UL );
FD_TEST( link_permit_no_producers( topo, "replay_scap" ) == 1UL );
FD_TEST( link_permit_no_producers( topo, "poh_shred" ) == 1UL );
FD_TEST( link_permit_no_producers( topo, "repair_scap" ) == 1UL );
FD_TEST( link_permit_no_producers( topo, "replay_scap" ) == 1UL );

FD_TEST( link_permit_no_consumers( topo, "net_quic" ) == quic_tile_cnt );
FD_TEST( link_permit_no_consumers( topo, "gossip_verif" ) == 1UL );
FD_TEST( link_permit_no_consumers( topo, "gossip_tower" ) == 1UL );
FD_TEST( link_permit_no_consumers( topo, "gossip_send" ) == 1UL );
FD_TEST( link_permit_no_consumers( topo, "repair_repla" ) == 1UL );
FD_TEST( link_permit_no_consumers( topo, "gossip_verif" ) == 1UL );
FD_TEST( link_permit_no_consumers( topo, "gossip_tower" ) == 1UL );
FD_TEST( link_permit_no_consumers( topo, "repair_repla" ) == 1UL );

FOR(net_tile_cnt) fd_topos_net_tile_finish( topo, i );

fd_topo_net_rx_t rx_rules = {0};
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_SHRED, "net_shred", config->tiles.shred.shred_listen_port );
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_GOSSIP, "net_gossip", config->gossip.port );
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_REPAIR, "net_repair", config->tiles.repair.repair_intake_listen_port );
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_REPAIR, "net_repair", config->tiles.repair.repair_serve_listen_port );

for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
fd_topo_tile_t * tile = &topo->tiles[ i ];
if( !fd_topo_configure_tile( tile, config ) ) {
if( !fd_topo_configure_tile( tile, config, &rx_rules ) ) {
FD_LOG_ERR(( "unknown tile name %lu `%s`", i, tile->name ));
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/app/firedancer-dev/commands/sim.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ sim_topo( config_t * config ) {
} else {
FD_LOG_NOTICE(( "Found archive file from config: %s", tile->archiver.rocksdb_path ));
}
} else if( !fd_topo_configure_tile( tile, config ) ) {
} else if( !fd_topo_configure_tile( tile, config, NULL ) ) {
FD_LOG_ERR(( "unknown tile name %lu `%s`", i, tile->name ));
}

Expand Down
2 changes: 1 addition & 1 deletion src/app/firedancer-dev/commands/snapshot_load.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ snapshot_load_topo( config_t * config,

for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
fd_topo_tile_t * tile = &topo->tiles[ i ];
if( !fd_topo_configure_tile( tile, config ) ) {
if( !fd_topo_configure_tile( tile, config, NULL ) ) {
FD_LOG_ERR(( "unknown tile name %lu `%s`", i, tile->name ));
}
}
Expand Down
28 changes: 16 additions & 12 deletions src/app/firedancer/topology.c
Original file line number Diff line number Diff line change
Expand Up @@ -825,9 +825,18 @@ fd_topo_initialize( config_t * config ) {

FOR(net_tile_cnt) fd_topos_net_tile_finish( topo, i );

fd_topo_net_rx_t rx_rules = {0};
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_TPU_QUIC, "net_quic", config->tiles.quic.quic_transaction_listen_port );
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_TPU_QUIC, "net_quic", config->tiles.quic.quic_transaction_listen_port );
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_SHRED, "net_shred", config->tiles.shred.shred_listen_port );
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_GOSSIP, "net_gossip", config->gossip.port );
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_REPAIR, "net_repair", config->tiles.repair.repair_intake_listen_port );
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_REPAIR, "net_repair", config->tiles.repair.repair_serve_listen_port );
fd_topo_net_rx_rule_push( &rx_rules, DST_PROTO_SEND, "net_send", config->tiles.send.send_src_port );

for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
fd_topo_tile_t * tile = &topo->tiles[ i ];
if( !fd_topo_configure_tile( tile, config ) ) {
if( !fd_topo_configure_tile( tile, config, &rx_rules ) ) {
FD_LOG_ERR(( "unknown tile name %lu `%s`", i, tile->name ));
}
}
Expand Down Expand Up @@ -858,17 +867,12 @@ fd_topo_initialize( config_t * config ) {
}

int
fd_topo_configure_tile( fd_topo_tile_t * tile,
fd_config_t * config ) {
if( FD_UNLIKELY( !strcmp( tile->name, "net" ) || !strcmp( tile->name, "sock" ) ) ) {

tile->net.shred_listen_port = config->tiles.shred.shred_listen_port;
tile->net.quic_transaction_listen_port = config->tiles.quic.quic_transaction_listen_port;
tile->net.legacy_transaction_listen_port = config->tiles.quic.regular_transaction_listen_port;
tile->net.gossip_listen_port = config->gossip.port;
tile->net.repair_intake_listen_port = config->tiles.repair.repair_intake_listen_port;
tile->net.repair_serve_listen_port = config->tiles.repair.repair_serve_listen_port;
tile->net.send_src_port = config->tiles.send.send_src_port;
fd_topo_configure_tile( fd_topo_tile_t * tile,
fd_config_t * config,
fd_topo_net_rx_t const * rx_rules ) {
if( FD_UNLIKELY( rx_rules && (!strcmp( tile->name, "net" ) || !strcmp( tile->name, "sock" )) ) ) {

tile->net.rx_rules = *rx_rules;

} else if( FD_UNLIKELY( !strcmp( tile->name, "netlnk" ) ) ) {

Expand Down
5 changes: 3 additions & 2 deletions src/app/firedancer/topology.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ setup_topo_txncache( fd_topo_t * topo,
ulong max_txn_per_slot );

int
fd_topo_configure_tile( fd_topo_tile_t * tile,
fd_config_t * config );
fd_topo_configure_tile( fd_topo_tile_t * tile,
fd_config_t * config,
fd_topo_net_rx_t const * rx_rules );

FD_PROTOTYPES_END

Expand Down
2 changes: 1 addition & 1 deletion src/app/shared/commands/get_identity.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ get_identity_cmd_fn( args_t * args FD_PARAM_UNUSED,
fd_topo_join_workspace( &config->topo, &config->topo.workspaces[ shred_wksp_id ], FD_SHMEM_JOIN_MODE_READ_ONLY );

/* Cast to shred context structure */
fd_shred_ctx_t const * shred_ctx = fd_topo_obj_laddr( &config->topo, shred_tile->tile_obj_id );
fd_shred_ctx_hdr_t const * shred_ctx = fd_topo_obj_laddr( &config->topo, shred_tile->tile_obj_id );
if( FD_UNLIKELY( !shred_ctx ) ) {
fd_topo_leave_workspaces( &config->topo );
FD_LOG_ERR(( "Failed to access shred tile object" ));
Expand Down
2 changes: 1 addition & 1 deletion src/app/shared_dev/commands/pktgen/pktgen.c
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ pktgen_cmd_fn( args_t * args FD_PARAM_UNUSED,
fd_topo_tile_t * metric_tile = &topo->tiles[ fd_topo_find_tile( topo, "metric", 0UL ) ];

ushort const listen_port = 9000;
net_tile->net.legacy_transaction_listen_port = listen_port;
fd_topo_net_rx_rule_push( &net_tile->net.rx_rules, DST_PROTO_TPU_UDP, "net_quic", listen_port );

if( FD_UNLIKELY( !fd_cstr_to_ip4_addr( config->tiles.metric.prometheus_listen_address, &metric_tile->metric.prometheus_listen_addr ) ) )
FD_LOG_ERR(( "failed to parse prometheus listen address `%s`", config->tiles.metric.prometheus_listen_address ));
Expand Down
2 changes: 1 addition & 1 deletion src/app/shared_dev/commands/udpecho/udpecho.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ udpecho_cmd_fn( args_t * args,
fd_topo_tile_t * net_tile = &topo->tiles[ fd_topo_find_tile( topo, "net", 0UL ) ];
fd_topo_tile_t * metric_tile = &topo->tiles[ fd_topo_find_tile( topo, "metric", 0UL ) ];

net_tile->net.legacy_transaction_listen_port = args->udpecho.listen_port;
fd_topo_net_rx_rule_push( &net_tile->net.rx_rules, DST_PROTO_TPU_UDP, "net_quic", args->udpecho.listen_port );

if( FD_UNLIKELY( !fd_cstr_to_ip4_addr( config->tiles.metric.prometheus_listen_address, &metric_tile->metric.prometheus_listen_addr ) ) )
FD_LOG_ERR(( "failed to parse prometheus listen address `%s`", config->tiles.metric.prometheus_listen_address ));
Expand Down
7 changes: 7 additions & 0 deletions src/disco/metrics/generated/fd_metrics_enums.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
#define FD_METRICS_ENUM_TILE_REGIME_V_PROCESSING_POSTFRAG_IDX 7
#define FD_METRICS_ENUM_TILE_REGIME_V_PROCESSING_POSTFRAG_NAME "processing_postfrag"

#define FD_METRICS_ENUM_PKT_KIND_NAME "pkt_kind"
#define FD_METRICS_ENUM_PKT_KIND_CNT (2UL)
#define FD_METRICS_ENUM_PKT_KIND_V_IP4_UDP_IDX 0
#define FD_METRICS_ENUM_PKT_KIND_V_IP4_UDP_NAME "ip4_udp"
#define FD_METRICS_ENUM_PKT_KIND_V_IP4_OPT_UDP_IDX 1
#define FD_METRICS_ENUM_PKT_KIND_V_IP4_OPT_UDP_NAME "ip4_opt_udp"

#define FD_METRICS_ENUM_SOCK_ERR_NAME "sock_err"
#define FD_METRICS_ENUM_SOCK_ERR_CNT (6UL)
#define FD_METRICS_ENUM_SOCK_ERR_V_NO_ERROR_IDX 0
Expand Down
3 changes: 2 additions & 1 deletion src/disco/metrics/generated/fd_metrics_net.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
#include "fd_metrics_net.h"

const fd_metrics_meta_t FD_METRICS_NET[FD_METRICS_NET_TOTAL] = {
DECLARE_METRIC( NET_RX_PKT_CNT, COUNTER ),
DECLARE_METRIC_ENUM( NET_RX_PKT_CNT, COUNTER, PKT_KIND, IP4_UDP ),
DECLARE_METRIC_ENUM( NET_RX_PKT_CNT, COUNTER, PKT_KIND, IP4_OPT_UDP ),
DECLARE_METRIC( NET_RX_BYTES_TOTAL, COUNTER ),
DECLARE_METRIC( NET_RX_UNDERSZ_CNT, COUNTER ),
DECLARE_METRIC( NET_RX_FILL_BLOCKED_CNT, COUNTER ),
Expand Down
Loading
Loading