diff --git a/book/api/metrics-generated.md b/book/api/metrics-generated.md index aa8dae65a95..ddbcdffdc91 100644 --- a/book/api/metrics-generated.md +++ b/book/api/metrics-generated.md @@ -1204,3 +1204,16 @@ | tower_​hard_​forks_​active | gauge | Currently active hard forks | + +## Ibeth Tile + +
+ +| Metric | Type | Description | +|--------|------|-------------| +| ibeth_​rx_​pkt_​cnt | counter | Packet receive count. | +| ibeth_​rx_​bytes_​total | counter | Total number of bytes received (including Ethernet header). | +| ibeth_​tx_​pkt_​cnt | counter | Number of packet transmit jobs marked as completed by the kernel. | +| ibeth_​tx_​bytes_​total | counter | Total number of bytes transmitted (including Ethernet header). | + +
diff --git a/config/extra/with-ibverbs.mk b/config/extra/with-ibverbs.mk new file mode 100644 index 00000000000..a906521caae --- /dev/null +++ b/config/extra/with-ibverbs.mk @@ -0,0 +1,3 @@ +FD_HAS_IBVERBS:=1 +CPPFLAGS+=-DFD_HAS_IBVERBS=1 +LDFLAGS+=-libverbs diff --git a/src/app/fdctl/config/default.toml b/src/app/fdctl/config/default.toml index 56215251816..d54ed3e9b6b 100644 --- a/src/app/fdctl/config/default.toml +++ b/src/app/fdctl/config/default.toml @@ -894,8 +894,13 @@ dynamic_port_range = "8900-9000" # do not support XDP, and to support experimental features that are # not yet implemented in the XDP stack. # + # "ibverbs" (experimental) + # Use libibverbs (RDMA) in 'raw packet' mode for networking. + # Compatible with Mellanox ConnectX 5 and newer. Requires a + # Firedancer build with 'make EXTRAS=ibverbs'. + # # Using the XDP networking stack is strongly preferred where possible, - # as it is faster and better tested by the development team. + # as it provides good performance and is battle tested. provider = "xdp" # Which interface to bind to for network traffic. Currently, diff --git a/src/app/fdctl/main.c b/src/app/fdctl/main.c index 95bc8547d84..3c639095d24 100644 --- a/src/app/fdctl/main.c +++ b/src/app/fdctl/main.c @@ -44,6 +44,7 @@ configure_stage_t * STAGES[] = { extern fd_topo_run_tile_t fd_tile_net; extern fd_topo_run_tile_t fd_tile_netlnk; extern fd_topo_run_tile_t fd_tile_sock; +extern fd_topo_run_tile_t fd_tile_ibeth; extern fd_topo_run_tile_t fd_tile_quic; extern fd_topo_run_tile_t fd_tile_bundle; extern fd_topo_run_tile_t fd_tile_verify; @@ -64,6 +65,9 @@ fd_topo_run_tile_t * TILES[] = { &fd_tile_net, &fd_tile_netlnk, &fd_tile_sock, +# if FD_HAS_IBVERBS + &fd_tile_ibeth, +# endif &fd_tile_quic, &fd_tile_bundle, &fd_tile_verify, diff --git a/src/app/fddev/main.h b/src/app/fddev/main.h index a58f8e42242..82c3f410a57 100644 --- a/src/app/fddev/main.h +++ b/src/app/fddev/main.h @@ -54,6 +54,7 @@ configure_stage_t * STAGES[] = { extern fd_topo_run_tile_t fd_tile_net; extern fd_topo_run_tile_t fd_tile_netlnk; extern fd_topo_run_tile_t fd_tile_sock; +extern fd_topo_run_tile_t fd_tile_ibeth; extern fd_topo_run_tile_t fd_tile_quic; extern fd_topo_run_tile_t fd_tile_bundle; extern fd_topo_run_tile_t fd_tile_verify; @@ -79,6 +80,9 @@ fd_topo_run_tile_t * TILES[] = { &fd_tile_net, &fd_tile_netlnk, &fd_tile_sock, +# if FD_HAS_IBVERBS + &fd_tile_ibeth, +# endif &fd_tile_quic, &fd_tile_bundle, &fd_tile_verify, diff --git a/src/app/firedancer-dev/main.c b/src/app/firedancer-dev/main.c index 56eed7c0ea6..b18c62c7cef 100644 --- a/src/app/firedancer-dev/main.c +++ b/src/app/firedancer-dev/main.c @@ -72,6 +72,7 @@ configure_stage_t * STAGES[] = { extern fd_topo_run_tile_t fd_tile_net; extern fd_topo_run_tile_t fd_tile_netlnk; extern fd_topo_run_tile_t fd_tile_sock; +extern fd_topo_run_tile_t fd_tile_ibeth; extern fd_topo_run_tile_t fd_tile_quic; extern fd_topo_run_tile_t fd_tile_verify; extern fd_topo_run_tile_t fd_tile_dedup; @@ -122,6 +123,9 @@ fd_topo_run_tile_t * TILES[] = { &fd_tile_net, &fd_tile_netlnk, &fd_tile_sock, +# if FD_HAS_IBVERBS + &fd_tile_ibeth, +# endif &fd_tile_quic, &fd_tile_verify, &fd_tile_dedup, diff --git a/src/app/firedancer/config/default.toml b/src/app/firedancer/config/default.toml index 7d92d5a5472..80bdbe5051d 100644 --- a/src/app/firedancer/config/default.toml +++ b/src/app/firedancer/config/default.toml @@ -889,6 +889,11 @@ user = "" # for hosts which do not support XDP, and to support experimental # features that are not yet implemented in the XDP stack. # + # "ibverbs" (experimental) + # Use libibverbs (RDMA) in 'raw packet' mode for networking. + # Compatible with Mellanox ConnectX 5 and newer. Requires a + # Firedancer build with 'make EXTRAS=ibverbs'. + # # Using the XDP networking stack is strongly preferred where # possible, as it is faster and better tested by the development # team. diff --git a/src/app/firedancer/main.c b/src/app/firedancer/main.c index 32a022ffead..a057e96dbbb 100644 --- a/src/app/firedancer/main.c +++ b/src/app/firedancer/main.c @@ -57,6 +57,7 @@ configure_stage_t * STAGES[] = { extern fd_topo_run_tile_t fd_tile_net; extern fd_topo_run_tile_t fd_tile_netlnk; extern fd_topo_run_tile_t fd_tile_sock; +extern fd_topo_run_tile_t fd_tile_ibeth; extern fd_topo_run_tile_t fd_tile_quic; extern fd_topo_run_tile_t fd_tile_verify; extern fd_topo_run_tile_t fd_tile_dedup; @@ -96,6 +97,9 @@ fd_topo_run_tile_t * TILES[] = { &fd_tile_net, &fd_tile_netlnk, &fd_tile_sock, +# if FD_HAS_IBVERBS + &fd_tile_ibeth, +# endif &fd_tile_quic, &fd_tile_verify, &fd_tile_dedup, diff --git a/src/app/firedancer/topology.c b/src/app/firedancer/topology.c index 0e95578a617..400cd6af19d 100644 --- a/src/app/firedancer/topology.c +++ b/src/app/firedancer/topology.c @@ -1044,8 +1044,11 @@ fd_topo_configure_tile( fd_topo_tile_t * tile, FD_LOG_ERR(( "failed to parse prometheus listen address `%s`", config->tiles.metric.prometheus_listen_address )); tile->metric.prometheus_listen_port = config->tiles.metric.prometheus_listen_port; - } else if( FD_UNLIKELY( !strcmp( tile->name, "net" ) || !strcmp( tile->name, "sock" ) ) ) { + } else if( FD_UNLIKELY( !strcmp( tile->name, "net" ) || + !strcmp( tile->name, "sock" ) || + !strcmp( tile->name, "ibeth" ) ) ) { + tile->net.bind_address = config->net.bind_address_parsed; tile->net.shred_listen_port = config->tiles.shred.shred_listen_port; tile->net.quic_transaction_listen_port = config->tiles.quic.quic_transaction_listen_port; tile->net.legacy_transaction_listen_port = config->tiles.quic.regular_transaction_listen_port; diff --git a/src/app/shared/commands/configure/sysctl.c b/src/app/shared/commands/configure/sysctl.c index 74944a9b3bf..3a84bb40e3c 100644 --- a/src/app/shared/commands/configure/sysctl.c +++ b/src/app/shared/commands/configure/sysctl.c @@ -184,7 +184,7 @@ check( config_t const * config, sock_params[ 1 ].value = config->net.socket.send_buffer_size; r = check_param_list( sock_params ); } else { - FD_LOG_ERR(( "unknown net provider: %s", config->net.provider )); + CONFIGURE_OK(); } if( r.result!=CONFIGURE_OK ) return r; diff --git a/src/app/shared/fd_action.h b/src/app/shared/fd_action.h index a0eabd0c041..a0bbe56af4a 100644 --- a/src/app/shared/fd_action.h +++ b/src/app/shared/fd_action.h @@ -147,6 +147,10 @@ union fdctl_args { ulong max_contact; int compact_mode; } gossip; + + struct { + ushort listen_port; + } pktgen; }; typedef union fdctl_args args_t; diff --git a/src/app/shared/fd_config.c b/src/app/shared/fd_config.c index 4ddb44ab586..4e49ec51502 100644 --- a/src/app/shared/fd_config.c +++ b/src/app/shared/fd_config.c @@ -546,8 +546,10 @@ fd_config_validate( fd_config_t const * config ) { } else if( 0==strcmp( config->net.provider, "socket" ) ) { CFG_HAS_NON_ZERO( net.socket.receive_buffer_size ); CFG_HAS_NON_ZERO( net.socket.send_buffer_size ); + } else if( 0==strcmp( config->net.provider, "ibverbs" ) ) { + /**/ } else { - FD_LOG_ERR(( "invalid `net.provider`: must be \"xdp\" or \"socket\"" )); + FD_LOG_ERR(( "invalid `net.provider`: must be \"xdp\", \"socket\", or \"ibverbs\"" )); } CFG_HAS_NON_ZERO( tiles.netlink.max_routes ); diff --git a/src/app/shared/fd_config.h b/src/app/shared/fd_config.h index ffbffad9e95..df9e37817cf 100644 --- a/src/app/shared/fd_config.h +++ b/src/app/shared/fd_config.h @@ -173,7 +173,7 @@ struct fd_configf { typedef struct fd_configf fd_configf_t; struct fd_config_net { - char provider[ 8 ]; /* "xdp" or "socket" */ + char provider[ 8 ]; /* "xdp", "socket", or "ibverbs" */ char interface[ IF_NAMESIZE ]; char bind_address[ 16 ]; diff --git a/src/app/shared_dev/commands/pktgen/pktgen.c b/src/app/shared_dev/commands/pktgen/pktgen.c index 1156b899a18..201f171ec92 100644 --- a/src/app/shared_dev/commands/pktgen/pktgen.c +++ b/src/app/shared_dev/commands/pktgen/pktgen.c @@ -18,6 +18,19 @@ extern fd_topo_obj_callbacks_t * CALLBACKS[]; fd_topo_run_tile_t fdctl_tile_run( fd_topo_tile_t const * tile ); +static char const * +net_tile_name( char const * provider ) { + if( 0==strcmp( provider, "xdp" ) ) { + return "net"; + } else if( 0==strcmp( provider, "socket" ) ) { + return "socket"; + } else if( 0==strcmp( provider, "ibverbs" ) ) { + return "ibeth"; + } else { + FD_LOG_ERR(( "Invalid [net.provider]: %s", provider )); + } +} + static void pktgen_topo( config_t * config ) { char const * affinity = config->development.pktgen.affinity; @@ -43,7 +56,7 @@ pktgen_topo( config_t * config ) { } if( FD_LIKELY( !is_auto_affinity ) ) { if( FD_UNLIKELY( affinity_tile_cnt!=4UL ) ) - FD_LOG_ERR(( "Invalid [development.pktgen.affinity]: must include exactly three CPUs" )); + FD_LOG_ERR(( "Invalid [development.pktgen.affinity]: must include exactly 4 CPUs" )); } /* Reset topology from scratch */ @@ -56,6 +69,8 @@ pktgen_topo( config_t * config ) { fd_topos_net_tiles( topo, config->layout.net_tile_count, &config->net, config->tiles.netlink.max_routes, config->tiles.netlink.max_peer_routes, config->tiles.netlink.max_neighbors, tile_to_cpu ); fd_topob_tile( topo, "metric", "metric", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); + char const * net_tile = net_tile_name( config->net.provider ); + fd_topob_wksp( topo, "pktgen" ); fd_topo_tile_t * pktgen_tile = fd_topob_tile( topo, "pktgen", "pktgen", "pktgen", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); if( FD_UNLIKELY( !fd_cstr_to_ip4_addr( config->development.pktgen.fake_dst_ip, &pktgen_tile->pktgen.fake_dst_ip ) ) ) { @@ -63,7 +78,7 @@ pktgen_topo( config_t * config ) { } fd_topob_link( topo, "pktgen_out", "pktgen", 2048UL, FD_NET_MTU, 1UL ); fd_topob_tile_out( topo, "pktgen", 0UL, "pktgen_out", 0UL ); - fd_topob_tile_in( topo, "net", 0UL, "metric_in", "pktgen_out", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); + fd_topob_tile_in( topo, net_tile, 0UL, "metric_in", "pktgen_out", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* Create dummy RX link */ fd_topos_net_rx_link( topo, "net_quic", 0UL, config->net.ingress_buffer_size ); @@ -80,8 +95,7 @@ void pktgen_cmd_args( int * pargc, char *** pargv, args_t * args ) { - /* FIXME add config options here */ - (void)pargc; (void)pargv; (void)args; + args->pktgen.listen_port = fd_env_strip_cmdline_ushort( pargc, pargv, "--listen-port", NULL, 9000 ); } /* Hacky: Since the pktgen runs in the same process, use globals to @@ -91,8 +105,20 @@ extern uint fd_pktgen_active; /* render_status prints statistics at the top of the screen. Should be called at a low rate (~500ms). */ +union net_abstract_metrics { + ulong volatile const * a[4]; + struct { + ulong volatile const * rx_pkt_cnt; + ulong volatile const * rx_bytes_total; + ulong volatile const * tx_pkt_cnt; + ulong volatile const * tx_bytes_total; + }; +}; +typedef union net_abstract_metrics net_abstract_metrics_t; + static void -render_status( ulong volatile const * net_metrics ) { +render_status( ulong volatile const * net_metrics, + net_abstract_metrics_t const * abstract ) { fputs( "\0337" /* save cursor position */ "\033[H" /* move cursor to (0,0) */ "\033[2K\n", /* create an empty line to avoid spamming look back buffer */ @@ -130,15 +156,15 @@ render_status( ulong volatile const * net_metrics ) { /* */ cum_tick_now += net_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_PROCESSING_PREFRAG ) ]; /* */ cum_tick_now += net_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ]; /* */ cum_tick_now += net_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_PROCESSING_POSTFRAG ) ]; - ulong rx_ok_now = net_metrics[ MIDX( COUNTER, NET, RX_PKT_CNT ) ]; - ulong rx_byte_now = net_metrics[ MIDX( COUNTER, NET, RX_BYTES_TOTAL ) ]; + ulong rx_ok_now = abstract->rx_pkt_cnt[0]; + ulong rx_byte_now = abstract->rx_bytes_total[0]; ulong rx_drop_now = net_metrics[ MIDX( COUNTER, NET, RX_FILL_BLOCKED_CNT ) ]; /* */ rx_drop_now += net_metrics[ MIDX( COUNTER, NET, RX_BACKPRESSURE_CNT ) ]; /* */ rx_drop_now += net_metrics[ MIDX( COUNTER, NET, XDP_RX_DROPPED_OTHER ) ]; /* */ rx_drop_now += net_metrics[ MIDX( COUNTER, NET, XDP_RX_INVALID_DESCS ) ]; /* */ rx_drop_now += net_metrics[ MIDX( COUNTER, NET, XDP_RX_RING_FULL ) ]; - ulong tx_ok_now = net_metrics[ MIDX( COUNTER, NET, TX_COMPLETE_CNT ) ]; - ulong tx_byte_now = net_metrics[ MIDX( COUNTER, NET, TX_BYTES_TOTAL ) ]; + ulong tx_ok_now = abstract->tx_pkt_cnt[0]; + ulong tx_byte_now = abstract->tx_bytes_total[0]; ulong cum_idle_delta = cum_idle_now-cum_idle_last; ulong cum_tick_delta = cum_tick_now-cum_tick_last; @@ -189,15 +215,14 @@ render_status( ulong volatile const * net_metrics ) { /* FIXME fixup screen on window size changes */ void -pktgen_cmd_fn( args_t * args FD_PARAM_UNUSED, +pktgen_cmd_fn( args_t * args, config_t * config ) { pktgen_topo( config ); fd_topo_t * topo = &config->topo; - fd_topo_tile_t * net_tile = &topo->tiles[ fd_topo_find_tile( topo, "net", 0UL ) ]; + fd_topo_tile_t * net_tile = &topo->tiles[ fd_topo_find_tile( topo, net_tile_name( config->net.provider ), 0UL ) ]; fd_topo_tile_t * metric_tile = &topo->tiles[ fd_topo_find_tile( topo, "metric", 0UL ) ]; - ushort const listen_port = 9000; - net_tile->net.legacy_transaction_listen_port = listen_port; + net_tile->net.legacy_transaction_listen_port = args->pktgen.listen_port; if( FD_UNLIKELY( !fd_cstr_to_ip4_addr( config->tiles.metric.prometheus_listen_address, &metric_tile->metric.prometheus_listen_addr ) ) ) FD_LOG_ERR(( "failed to parse prometheus listen address `%s`", config->tiles.metric.prometheus_listen_address )); @@ -209,7 +234,6 @@ pktgen_cmd_fn( args_t * args FD_PARAM_UNUSED, configure_stage( &fd_cfg_stage_ethtool_channels, CONFIGURE_CMD_INIT, config ); configure_stage( &fd_cfg_stage_ethtool_offloads, CONFIGURE_CMD_INIT, config ); - fdctl_check_configure( config ); /* FIXME this allocates lots of memory unnecessarily */ initialize_workspaces( config ); initialize_stacks( config ); @@ -240,14 +264,31 @@ pktgen_cmd_fn( args_t * args FD_PARAM_UNUSED, for( ulong i=0UL; inet.provider, "xdp" ) ) { + abstract.rx_pkt_cnt = &net_metrics[ MIDX( COUNTER, NET, RX_PKT_CNT ) ]; + abstract.rx_bytes_total = &net_metrics[ MIDX( COUNTER, NET, RX_BYTES_TOTAL ) ]; + abstract.tx_pkt_cnt = &net_metrics[ MIDX( COUNTER, NET, TX_COMPLETE_CNT ) ]; + abstract.tx_bytes_total = &net_metrics[ MIDX( COUNTER, NET, TX_BYTES_TOTAL ) ]; + } else if( 0==strcmp( config->net.provider, "ibverbs" ) ) { + abstract.rx_pkt_cnt = &net_metrics[ MIDX( COUNTER, IBETH, RX_PKT_CNT ) ]; + abstract.rx_bytes_total = &net_metrics[ MIDX( COUNTER, IBETH, RX_BYTES_TOTAL ) ]; + abstract.tx_pkt_cnt = &net_metrics[ MIDX( COUNTER, IBETH, TX_PKT_CNT ) ]; + abstract.tx_bytes_total = &net_metrics[ MIDX( COUNTER, IBETH, TX_BYTES_TOTAL ) ]; + } + /* Simple REPL loop */ puts( "Running fddev pktgen" ); - printf( "XDP socket listening on port %u\n", (uint)listen_port ); + printf( "%s listening on port %u\n", config->net.provider, (uint)net_tile->net.legacy_transaction_listen_port ); puts( "Available commands: start, stop, quit" ); puts( "" ); char input[ 256 ] = {0}; for(;;) { - render_status( net_metrics ); + render_status( net_metrics, &abstract ); fputs( "pktgen> ", stdout ); fflush( stdout ); @@ -255,7 +296,7 @@ pktgen_cmd_fn( args_t * args FD_PARAM_UNUSED, struct pollfd fds[1] = {{ .fd=STDIN_FILENO, .events=POLLIN }}; int poll_res = poll( fds, 1, 500 ); if( poll_res==0 ) { - render_status( net_metrics ); + render_status( net_metrics, &abstract ); continue; } else if( poll_res>0 ) { break; diff --git a/src/disco/metrics/generate/types.py b/src/disco/metrics/generate/types.py index 08f47ddc327..20bc666cb6e 100644 --- a/src/disco/metrics/generate/types.py +++ b/src/disco/metrics/generate/types.py @@ -43,6 +43,7 @@ class Tile(Enum): SNAPLA = 37 SNAPLS = 38 TOWER = 39 + IBETH = 40 class MetricType(Enum): COUNTER = 0 diff --git a/src/disco/metrics/generated/fd_metrics_all.c b/src/disco/metrics/generated/fd_metrics_all.c index 54a7bbf8513..57f08bfc34f 100644 --- a/src/disco/metrics/generated/fd_metrics_all.c +++ b/src/disco/metrics/generated/fd_metrics_all.c @@ -73,6 +73,7 @@ const char * FD_METRICS_TILE_KIND_NAMES[FD_METRICS_TILE_KIND_CNT] = { "snapla", "snapls", "tower", + "ibeth", }; const ulong FD_METRICS_TILE_KIND_SIZES[FD_METRICS_TILE_KIND_CNT] = { @@ -113,6 +114,7 @@ const ulong FD_METRICS_TILE_KIND_SIZES[FD_METRICS_TILE_KIND_CNT] = { FD_METRICS_SNAPLA_TOTAL, FD_METRICS_SNAPLS_TOTAL, FD_METRICS_TOWER_TOTAL, + FD_METRICS_IBETH_TOTAL, }; const fd_metrics_meta_t * FD_METRICS_TILE_KIND_METRICS[FD_METRICS_TILE_KIND_CNT] = { FD_METRICS_NET, @@ -152,4 +154,5 @@ const fd_metrics_meta_t * FD_METRICS_TILE_KIND_METRICS[FD_METRICS_TILE_KIND_CNT] FD_METRICS_SNAPLA, FD_METRICS_SNAPLS, FD_METRICS_TOWER, + FD_METRICS_IBETH, }; diff --git a/src/disco/metrics/generated/fd_metrics_all.h b/src/disco/metrics/generated/fd_metrics_all.h index 766bb7f9d28..2b484654371 100644 --- a/src/disco/metrics/generated/fd_metrics_all.h +++ b/src/disco/metrics/generated/fd_metrics_all.h @@ -42,6 +42,7 @@ #include "fd_metrics_benchs.h" #include "fd_metrics_tower.h" #include "fd_metrics_gui.h" +#include "fd_metrics_ibeth.h" /* Start of LINK OUT metrics */ #define FD_METRICS_COUNTER_LINK_SLOW_COUNT_OFF (0UL) @@ -178,7 +179,7 @@ extern const fd_metrics_meta_t FD_METRICS_ALL_LINK_OUT[FD_METRICS_ALL_LINK_OUT_T #define FD_METRICS_TOTAL_SZ (8UL*254UL) -#define FD_METRICS_TILE_KIND_CNT 37 +#define FD_METRICS_TILE_KIND_CNT 38 extern const char * FD_METRICS_TILE_KIND_NAMES[FD_METRICS_TILE_KIND_CNT]; extern const ulong FD_METRICS_TILE_KIND_SIZES[FD_METRICS_TILE_KIND_CNT]; extern const fd_metrics_meta_t * FD_METRICS_TILE_KIND_METRICS[FD_METRICS_TILE_KIND_CNT]; diff --git a/src/disco/metrics/generated/fd_metrics_ibeth.c b/src/disco/metrics/generated/fd_metrics_ibeth.c new file mode 100644 index 00000000000..adbfc284e07 --- /dev/null +++ b/src/disco/metrics/generated/fd_metrics_ibeth.c @@ -0,0 +1,9 @@ +/* THIS FILE IS GENERATED BY gen_metrics.py. DO NOT HAND EDIT. */ +#include "fd_metrics_ibeth.h" + +const fd_metrics_meta_t FD_METRICS_IBETH[FD_METRICS_IBETH_TOTAL] = { + DECLARE_METRIC( IBETH_RX_PKT_CNT, COUNTER ), + DECLARE_METRIC( IBETH_RX_BYTES_TOTAL, COUNTER ), + DECLARE_METRIC( IBETH_TX_PKT_CNT, COUNTER ), + DECLARE_METRIC( IBETH_TX_BYTES_TOTAL, COUNTER ), +}; diff --git a/src/disco/metrics/generated/fd_metrics_ibeth.h b/src/disco/metrics/generated/fd_metrics_ibeth.h new file mode 100644 index 00000000000..155e55f5bce --- /dev/null +++ b/src/disco/metrics/generated/fd_metrics_ibeth.h @@ -0,0 +1,36 @@ +#ifndef HEADER_fd_src_disco_metrics_generated_fd_metrics_ibeth_h +#define HEADER_fd_src_disco_metrics_generated_fd_metrics_ibeth_h + +/* THIS FILE IS GENERATED BY gen_metrics.py. DO NOT HAND EDIT. */ + +#include "../fd_metrics_base.h" +#include "fd_metrics_enums.h" + +#define FD_METRICS_COUNTER_IBETH_RX_PKT_CNT_OFF (16UL) +#define FD_METRICS_COUNTER_IBETH_RX_PKT_CNT_NAME "ibeth_rx_pkt_cnt" +#define FD_METRICS_COUNTER_IBETH_RX_PKT_CNT_TYPE (FD_METRICS_TYPE_COUNTER) +#define FD_METRICS_COUNTER_IBETH_RX_PKT_CNT_DESC "Packet receive count." +#define FD_METRICS_COUNTER_IBETH_RX_PKT_CNT_CVT (FD_METRICS_CONVERTER_NONE) + +#define FD_METRICS_COUNTER_IBETH_RX_BYTES_TOTAL_OFF (17UL) +#define FD_METRICS_COUNTER_IBETH_RX_BYTES_TOTAL_NAME "ibeth_rx_bytes_total" +#define FD_METRICS_COUNTER_IBETH_RX_BYTES_TOTAL_TYPE (FD_METRICS_TYPE_COUNTER) +#define FD_METRICS_COUNTER_IBETH_RX_BYTES_TOTAL_DESC "Total number of bytes received (including Ethernet header)." +#define FD_METRICS_COUNTER_IBETH_RX_BYTES_TOTAL_CVT (FD_METRICS_CONVERTER_NONE) + +#define FD_METRICS_COUNTER_IBETH_TX_PKT_CNT_OFF (18UL) +#define FD_METRICS_COUNTER_IBETH_TX_PKT_CNT_NAME "ibeth_tx_pkt_cnt" +#define FD_METRICS_COUNTER_IBETH_TX_PKT_CNT_TYPE (FD_METRICS_TYPE_COUNTER) +#define FD_METRICS_COUNTER_IBETH_TX_PKT_CNT_DESC "Number of packet transmit jobs marked as completed by the kernel." +#define FD_METRICS_COUNTER_IBETH_TX_PKT_CNT_CVT (FD_METRICS_CONVERTER_NONE) + +#define FD_METRICS_COUNTER_IBETH_TX_BYTES_TOTAL_OFF (19UL) +#define FD_METRICS_COUNTER_IBETH_TX_BYTES_TOTAL_NAME "ibeth_tx_bytes_total" +#define FD_METRICS_COUNTER_IBETH_TX_BYTES_TOTAL_TYPE (FD_METRICS_TYPE_COUNTER) +#define FD_METRICS_COUNTER_IBETH_TX_BYTES_TOTAL_DESC "Total number of bytes transmitted (including Ethernet header)." +#define FD_METRICS_COUNTER_IBETH_TX_BYTES_TOTAL_CVT (FD_METRICS_CONVERTER_NONE) + +#define FD_METRICS_IBETH_TOTAL (4UL) +extern const fd_metrics_meta_t FD_METRICS_IBETH[FD_METRICS_IBETH_TOTAL]; + +#endif /* HEADER_fd_src_disco_metrics_generated_fd_metrics_ibeth_h */ diff --git a/src/disco/metrics/metrics.xml b/src/disco/metrics/metrics.xml index 5f543a988a0..d2027ecd701 100644 --- a/src/disco/metrics/metrics.xml +++ b/src/disco/metrics/metrics.xml @@ -1156,4 +1156,11 @@ metric introduced. + + + + + + + diff --git a/src/disco/net/fd_net_router.h b/src/disco/net/fd_net_router.h new file mode 100644 index 00000000000..bdeba757c56 --- /dev/null +++ b/src/disco/net/fd_net_router.h @@ -0,0 +1,170 @@ +#ifndef HEADER_fd_src_disco_net_xdp_fd_xdp_route_h +#define HEADER_fd_src_disco_net_xdp_fd_xdp_route_h + +/* fd_net_router.h provides an internal API for userland routing. */ + +#include "../../waltz/ip/fd_fib4.h" +#include "../../waltz/mib/fd_dbl_buf.h" +#include "../../waltz/mib/fd_netdev_tbl.h" +#include "../../waltz/neigh/fd_neigh4_map.h" +#include "../netlink/fd_netlink_tile.h" /* neigh4_solicit */ + +#include /* ARPHRD_LOOPBACK */ + +struct fd_net_router { + /* Route and neighbor tables */ + fd_fib4_t fib_local[1]; + fd_fib4_t fib_main[1]; + fd_neigh4_hmap_t neigh4[1]; + fd_netlink_neigh4_solicit_link_t neigh4_solicit[1]; + + /* Netdev table */ + fd_dbl_buf_t * netdev_dbl_buf; /* remote copy of device table */ + uchar * netdev_buf; /* local copy of device table */ + ulong netdev_buf_sz; + fd_netdev_tbl_join_t netdev_tbl; /* join to local copy of device table */ + int has_gre_interface; /* enable GRE support? */ + + uint if_virt; + uint bind_address; + uint default_address; + + /* Details pertaining to an inflight send op */ + struct { + uchar mac_addrs[12]; /* First 12 bytes of Ethernet header */ + uint src_ip; /* net order */ + + uint use_gre; /* The tx packet will be GRE-encapsulated */ + uint gre_outer_src_ip; /* For GRE: Outer iphdr's src_ip in net order */ + uint gre_outer_dst_ip; /* For GRE: Outer iphdr's dst_ip in net order */ + } tx_op; + + struct { + ulong tx_route_fail_cnt; + ulong tx_neigh_fail_cnt; + } metrics; +}; +typedef struct fd_net_router fd_net_router_t; + +FD_PROTOTYPES_BEGIN + +/* fd_net_tx_route resolves the destination interface index, src MAC + address, and dst MAC address. Returns 1 on success, 0 on failure. + On success, tx_op->{if_idx,mac_addrs} is set. */ + +static int +fd_net_tx_route( fd_net_router_t * ctx, + uint dst_ip, + uint * is_gre_inf ) { + + /* Route lookup */ + + fd_fib4_hop_t hop[2] = {0}; + hop[0] = fd_fib4_lookup( ctx->fib_local, dst_ip, 0UL ); + hop[1] = fd_fib4_lookup( ctx->fib_main, dst_ip, 0UL ); + fd_fib4_hop_t const * next_hop = fd_fib4_hop_or( hop+0, hop+1 ); + + uint rtype = next_hop->rtype; + uint if_idx = next_hop->if_idx; + uint ip4_src = next_hop->ip4_src; + + if( FD_UNLIKELY( rtype==FD_FIB4_RTYPE_LOCAL ) ) { + rtype = FD_FIB4_RTYPE_UNICAST; + if_idx = 1; + } + + if( FD_UNLIKELY( rtype!=FD_FIB4_RTYPE_UNICAST ) ) { + ctx->metrics.tx_route_fail_cnt++; + return 0; + } + + fd_netdev_t * netdev = fd_netdev_tbl_query( &ctx->netdev_tbl, if_idx ); + if( !netdev ) { + ctx->metrics.tx_route_fail_cnt++; + return 0; + } + + ip4_src = fd_uint_if( !!ctx->bind_address, ctx->bind_address, ip4_src ); + ctx->tx_op.src_ip = ip4_src; + + FD_TEST( is_gre_inf ); + *is_gre_inf = 0; + if( netdev->dev_type==ARPHRD_LOOPBACK ) { + /* FIXME loopback support */ + return 0; + } else if( netdev->dev_type==ARPHRD_IPGRE ) { + /* skip MAC addrs lookup for GRE inner dst ip */ + if( netdev->gre_src_ip ) ctx->tx_op.gre_outer_src_ip = netdev->gre_src_ip; + ctx->tx_op.gre_outer_dst_ip = netdev->gre_dst_ip; + *is_gre_inf = 1; + return 1; + } + + if( FD_UNLIKELY( netdev->dev_type!=ARPHRD_ETHER ) ) { + ctx->metrics.tx_route_fail_cnt++; + return 0; + } + + if( FD_UNLIKELY( if_idx!=ctx->if_virt ) ) { + ctx->metrics.tx_route_fail_cnt++; + return 0; + } + + /* Neighbor resolve */ + uint neigh_ip = next_hop->ip4_gw; + if( !neigh_ip ) neigh_ip = dst_ip; + + fd_neigh4_entry_t neigh[1]; + int neigh_res = fd_neigh4_hmap_query_entry( ctx->neigh4, neigh_ip, neigh ); + if( FD_UNLIKELY( neigh_res!=FD_MAP_SUCCESS ) ) { + /* Neighbor not found */ + fd_netlink_neigh4_solicit( ctx->neigh4_solicit, neigh_ip, if_idx, fd_frag_meta_ts_comp( fd_tickcount() ) ); + ctx->metrics.tx_neigh_fail_cnt++; + return 0; + } + if( FD_UNLIKELY( neigh->state != FD_NEIGH4_STATE_ACTIVE ) ) { + ctx->metrics.tx_neigh_fail_cnt++; + return 0; + } + ip4_src = fd_uint_if( !ip4_src, ctx->default_address, ip4_src ); + ctx->tx_op.src_ip = ip4_src; + memcpy( ctx->tx_op.mac_addrs+0, neigh->mac_addr, 6 ); + memcpy( ctx->tx_op.mac_addrs+6, netdev->mac_addr, 6 ); + + return 1; +} + +/* fd_net_tx_fill_addrs sets the Ethernet src and dst MAC, and optionally + the IPv4 source address. */ + +static int +fd_net_tx_fill_addrs( fd_net_router_t * ctx, + uchar * packet, + ulong sz ) { + /* Select Ethernet addresses */ + memcpy( packet, ctx->tx_op.mac_addrs, 12 ); + + /* Select IPv4 source address */ + uint ihl = packet[ 14 ] & 0x0f; + ushort ethertype = FD_LOAD( ushort, packet+12 ); + uint ip4_saddr = FD_LOAD( uint, packet+26 ); + if( ethertype==fd_ushort_bswap( FD_ETH_HDR_TYPE_IP ) && ip4_saddr==0 ) { + if( FD_UNLIKELY( ctx->tx_op.src_ip==0 || + ihl<5 || (14+(ihl<<2))>sz ) ) { + /* Outgoing IPv4 packet with unknown src IP or invalid IHL */ + /* FIXME should select first IPv4 address of device table here */ + ctx->metrics.tx_route_fail_cnt++; + return 0; + } + + /* Recompute checksum after changing header */ + FD_STORE( uint, packet+26, ctx->tx_op.src_ip ); + FD_STORE( ushort, packet+24, 0 ); + FD_STORE( ushort, packet+24, fd_ip4_hdr_check( packet+14 ) ); + } + return 1; +} + +FD_PROTOTYPES_END + +#endif /* HEADER_fd_src_disco_net_xdp_fd_xdp_route_h */ diff --git a/src/disco/net/fd_net_tile_topo.c b/src/disco/net/fd_net_tile_topo.c index 1ee60697e87..c5ebffb6d99 100644 --- a/src/disco/net/fd_net_tile_topo.c +++ b/src/disco/net/fd_net_tile_topo.c @@ -71,6 +71,40 @@ setup_sock_tile( fd_topo_t * topo, tile->sock.so_sndbuf = (int)net_cfg->socket.send_buffer_size ; } +#if FD_HAS_IBVERBS + +static void +setup_ibeth_tile( fd_topo_t * topo, + fd_topo_tile_t * netlink_tile, + ulong const * tile_to_cpu, + fd_config_net_t const * net_cfg ) { + fd_topo_tile_t * tile = fd_topob_tile( topo, "ibeth", "ibeth", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); + fd_topob_link( topo, "net_netlnk", "net_netlnk", 128UL, 0UL, 0UL ); + fd_topob_tile_in( topo, "netlnk", 0UL, "metric_in", "net_netlnk", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); + fd_topob_tile_out( topo, "ibeth", 0UL, "net_netlnk", 0UL ); + fd_netlink_topo_join( topo, netlink_tile, tile ); + + fd_topo_obj_t * umem_obj = fd_topob_obj( topo, "dcache", "net_umem" ); + fd_topob_tile_uses( topo, tile, umem_obj, FD_SHMEM_JOIN_MODE_READ_WRITE ); + fd_pod_insertf_ulong( topo->props, umem_obj->id, "net.%lu.umem", 0UL ); /* FIXME multi queue support */ + + fd_cstr_fini( fd_cstr_append_cstr_safe( + fd_cstr_init( tile->ibeth.if_name ), + net_cfg->interface, + IF_NAMESIZE-1UL ) ); + tile->ibeth.rx_queue_size = 1024U; /* FIXME */ + tile->ibeth.tx_queue_size = 1024U; /* FIXME */ + + tile->ibeth.umem_dcache_obj_id = umem_obj->id; + tile->ibeth.netdev_dbl_buf_obj_id = netlink_tile->netlink.netdev_dbl_buf_obj_id; + tile->ibeth.fib4_main_obj_id = netlink_tile->netlink.fib4_main_obj_id; + tile->ibeth.fib4_local_obj_id = netlink_tile->netlink.fib4_local_obj_id; + tile->ibeth.neigh4_obj_id = netlink_tile->netlink.neigh4_obj_id; + tile->ibeth.neigh4_ele_obj_id = netlink_tile->netlink.neigh4_ele_obj_id; +} + +#endif + void fd_topos_net_tiles( fd_topo_t * topo, ulong net_tile_cnt, @@ -82,6 +116,8 @@ fd_topos_net_tiles( fd_topo_t * topo, /* net_umem: Packet buffers */ fd_topob_wksp( topo, "net_umem" ); + fd_pod_insert_cstr( topo->props, "net.provider", net_cfg->provider ); + /* Create workspaces */ if( 0==strcmp( net_cfg->provider, "xdp" ) ) { @@ -150,22 +186,34 @@ fd_topos_net_tiles( fd_topo_t * topo, setup_sock_tile( topo, tile_to_cpu, net_cfg ); } + } else if( 0==strcmp( net_cfg->provider, "ibverbs" ) ) { + +# if !FD_HAS_IBVERBS + FD_LOG_ERR(( "[net.provider] is 'ibverbs', but Firedancer was built without ibverbs support.\n" + "Please do a clean rebuild with 'make EXTRAS=ibverbs'." )); +# else + + /* ibeth: private working memory of the ibeth tiles */ + fd_topob_wksp( topo, "ibeth" ); + /* netlnk: private working memory of the netlnk tile */ + fd_topob_wksp( topo, "netlnk" ); + /* netbase: shared network config (config plane) */ + fd_topob_wksp( topo, "netbase" ); + /* net_netlnk: net->netlnk ARP requests */ + fd_topob_wksp( topo, "net_netlnk" ); + + fd_topo_tile_t * netlink_tile = fd_topob_tile( topo, "netlnk", "netlnk", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); + fd_netlink_topo_create( netlink_tile, topo, netlnk_max_routes, netlnk_max_neighbors, net_cfg->interface ); + + setup_ibeth_tile( topo, netlink_tile, tile_to_cpu, net_cfg ); + +# endif + } else { FD_LOG_ERR(( "invalid `net.provider`" )); } } -static int -topo_is_xdp( fd_topo_t * topo ) { - /* FIXME hacky */ - for( ulong j=0UL; j<(topo->tile_cnt); j++ ) { - if( 0==strcmp( topo->tiles[ j ].name, "net" ) ) { - return 1; - } - } - return 0; -} - static void add_xdp_rx_link( fd_topo_t * topo, char const * link_name, @@ -203,12 +251,16 @@ fd_topos_net_rx_link( fd_topo_t * topo, char const * link_name, ulong net_kind_id, ulong depth ) { - if( topo_is_xdp( topo ) ) { + char const * provider = fd_pod_query_cstr( topo->props, "net.provider", "" ); + if( 0==strcmp( provider, "xdp" ) ) { add_xdp_rx_link( topo, link_name, net_kind_id, depth ); fd_topob_tile_out( topo, "net", net_kind_id, link_name, net_kind_id ); - } else { + } else if( 0==strcmp( provider, "socket" ) ) { fd_topob_link( topo, link_name, "net_umem", depth, FD_NET_MTU, 64 ); fd_topob_tile_out( topo, "sock", net_kind_id, link_name, net_kind_id ); + } else if( 0==strcmp( provider, "ibverbs" ) ) { + fd_topob_link( topo, link_name, "net_umem", depth, FD_NET_MTU, 64 ); + fd_topob_tile_out( topo, "ibeth", net_kind_id, link_name, net_kind_id ); } } @@ -220,26 +272,23 @@ fd_topos_tile_in_net( fd_topo_t * topo, int reliable, int polled ) { for( ulong j=0UL; j<(topo->tile_cnt); j++ ) { - if( 0==strcmp( topo->tiles[ j ].name, "net" ) || - 0==strcmp( topo->tiles[ j ].name, "sock" ) ) { + if( 0==strcmp( topo->tiles[ j ].name, "net" ) || + 0==strcmp( topo->tiles[ j ].name, "sock" ) || + 0==strcmp( topo->tiles[ j ].name, "ibeth" ) ) { fd_topob_tile_in( topo, topo->tiles[ j ].name, topo->tiles[ j ].kind_id, fseq_wksp, link_name, link_kind_id, reliable, polled ); } } } -void -fd_topos_net_tile_finish( fd_topo_t * topo, - ulong net_kind_id ) { - if( !topo_is_xdp( topo ) ) return; - - fd_topo_tile_t * net_tile = &topo->tiles[ fd_topo_find_tile( topo, "net", net_kind_id ) ]; - +static void +fd_topos_xdp_setup_mem( fd_topo_t * topo, + fd_topo_tile_t * net_tile ) { ulong rx_depth = net_tile->xdp.xdp_rx_queue_size; ulong tx_depth = net_tile->xdp.xdp_tx_queue_size; rx_depth += (rx_depth/2UL); tx_depth += (tx_depth/2UL); - if( net_kind_id==0 ) { + if( net_tile->kind_id==0 ) { /* Double it for loopback XSK */ rx_depth *= 2UL; tx_depth *= 2UL; @@ -259,7 +308,7 @@ fd_topos_net_tile_finish( fd_topo_t * topo, /* Create a dcache object */ - ulong umem_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "net.%lu.umem", net_kind_id ); + ulong umem_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "net.%lu.umem", net_tile->kind_id ); FD_TEST( umem_obj_id!=ULONG_MAX ); FD_TEST( net_tile->net.umem_dcache_obj_id > 0 ); @@ -359,6 +408,57 @@ fd_topo_install_xdp( fd_topo_t const * topo, FD_LOG_ERR(( "close() failed (%i-%s)", errno, fd_io_strerror( errno ) )); } } +} # undef ADD_IF_IDX + +static void +fd_topos_ibeth_setup_mem( fd_topo_t * topo, + fd_topo_tile_t * ibeth_tile ) { + ulong cum_frame_cnt = 0UL; + + ulong const rx_depth = ibeth_tile->ibeth.rx_queue_size; + ulong const tx_depth = ibeth_tile->ibeth.tx_queue_size; + cum_frame_cnt += rx_depth + tx_depth; + + /* Count up the depth of all RX mcaches */ + + for( ulong j=0UL; j<(ibeth_tile->out_cnt); j++ ) { + ulong link_id = ibeth_tile->out_link_id[ j ]; + ulong mcache_obj_id = topo->links[ link_id ].mcache_obj_id; + ulong depth = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "obj.%lu.depth", mcache_obj_id ); + if( FD_UNLIKELY( depth==ULONG_MAX ) ) FD_LOG_ERR(( "Didn't find depth for mcache %s", topo->links[ link_id ].name )); + cum_frame_cnt += depth + 1UL; + } + + /* Create a dcache object */ + + ulong umem_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "net.%lu.umem", ibeth_tile->kind_id ); + FD_TEST( umem_obj_id!=ULONG_MAX ); + + FD_TEST( ibeth_tile->ibeth.umem_dcache_obj_id > 0 ); + fd_pod_insertf_ulong( topo->props, cum_frame_cnt, "obj.%lu.depth", umem_obj_id ); + fd_pod_insertf_ulong( topo->props, 2UL, "obj.%lu.burst", umem_obj_id ); /* 4096 byte padding */ + fd_pod_insertf_ulong( topo->props, FD_NET_MTU, "obj.%lu.mtu", umem_obj_id ); +} + +void +fd_topos_net_tile_finish( fd_topo_t * topo, + ulong net_kind_id ) { + char const * provider = fd_pod_query_cstr( topo->props, "net.provider", "" ); + if( 0==strcmp( provider, "xdp" ) ) { + ulong tile_id = fd_topo_find_tile( topo, "net", net_kind_id ); + if( FD_UNLIKELY( tile_id==ULONG_MAX ) ) { + FD_LOG_ERR(( "tile net:%lu not found", net_kind_id )); + } + fd_topos_xdp_setup_mem( topo, &topo->tiles[ tile_id ] ); + } else if( 0==strcmp( provider, "ibverbs" ) ) { + ulong tile_id = fd_topo_find_tile( topo, "ibeth", net_kind_id ); + if( FD_UNLIKELY( tile_id==ULONG_MAX ) ) { + FD_LOG_ERR(( "tile ibeth:%lu not found", net_kind_id )); + } + fd_topos_ibeth_setup_mem( topo, &topo->tiles[ tile_id ] ); + } else { + return; + } } diff --git a/src/disco/net/ibeth/Local.mk b/src/disco/net/ibeth/Local.mk new file mode 100644 index 00000000000..02784548df4 --- /dev/null +++ b/src/disco/net/ibeth/Local.mk @@ -0,0 +1,4 @@ +ifdef FD_HAS_IBVERBS +$(call add-objs,fd_ibeth_tile,fd_disco) +$(call make-unit-test,test_ibeth_tile,test_ibeth_tile,fd_disco fd_waltz fd_tango fd_util,$(IBVERBS_LIBS)) +endif diff --git a/src/disco/net/ibeth/fd_ibeth_tile.c b/src/disco/net/ibeth/fd_ibeth_tile.c new file mode 100644 index 00000000000..af1a9ca9739 --- /dev/null +++ b/src/disco/net/ibeth/fd_ibeth_tile.c @@ -0,0 +1,859 @@ +/* The ibeth tile translates Ethernet frames between InfiniBand devices + in 'raw packet' mode and fd_tango traffic. Works best on Mellanox + ConnectX. */ + +#include "../fd_net_router.h" +#include "../../metrics/fd_metrics.h" +#include "../../topo/fd_topo.h" +#include "../../../util/net/fd_eth.h" +#include "../../../util/net/fd_ip4.h" +#include "../../../util/net/fd_udp.h" +#include +#include +#include +#include +#include +#include + +#define FD_IBETH_UDP_PORT_MAX (8UL) +#define FD_IBETH_TXQ_MAX (32UL) +#define FD_IBETH_PENDING_MAX (14UL) + +#define DEQUE_NAME tx_free +#define DEQUE_T uint +#include "../../../util/tmpl/fd_deque_dynamic.c" + +/* fd_ibeth_tile_t is private tile state */ + +struct fd_ibeth_txq { + void * base; + ulong chunk0; + ulong wmark; +}; +typedef struct fd_ibeth_txq fd_ibeth_txq_t; + +struct fd_ibeth_recv_wr { + struct ibv_recv_wr wr [1]; + struct ibv_sge sge[1]; +}; +typedef struct fd_ibeth_recv_wr fd_ibeth_recv_wr_t; + +struct fd_ibeth_tile { + /* ibverbs resources */ + struct ibv_context * ibv_ctx; + struct ibv_cq_ex * cq; /* completion queue */ + struct ibv_qp * qp; /* queue pair */ + uint mr_lkey; + uint rx_pending_rem; + + /* UMEM frame region within dcache */ + uchar * umem_base; /* Workspace base */ + uchar * umem_frame0; /* First UMEM frame */ + ulong umem_sz; /* Usable UMEM size starting at frame0 */ + + /* UMEM chunk region within workspace */ + uint umem_chunk0; /* Lowest allowed chunk number */ + uint umem_wmark; /* Highest allowed chunk number */ + + /* TX */ + ulong txq_cnt; + fd_ibeth_txq_t txq[ FD_IBETH_TXQ_MAX ]; + + /* TX free ring */ + uint * tx_free; + + /* Router */ + fd_net_router_t r; + uint main_if_idx; + + /* Port matcher */ + uint dst_port_cnt; + ushort dst_ports [ FD_IBETH_UDP_PORT_MAX ]; + uchar dst_protos [ FD_IBETH_UDP_PORT_MAX ]; + uchar dst_out_idx[ FD_IBETH_UDP_PORT_MAX ]; + + /* Batch RX work requests */ + fd_ibeth_recv_wr_t rx_pending[ FD_IBETH_PENDING_MAX ]; + + /* Out links */ + uchar rx_link_cnt; + uchar rx_link_out_idx[ FD_IBETH_UDP_PORT_MAX ]; + + /* RX frame range */ + uint rx_chunk0; + uint rx_chunk1; + + /* TX frame range */ + uint tx_chunk0; + uint tx_chunk1; + + struct { + ulong rx_pkt_cnt; + ulong rx_bytes_total; + ulong tx_pkt_cnt; + ulong tx_bytes_total; + } metrics; +}; +typedef struct fd_ibeth_tile fd_ibeth_tile_t; + +static ulong +scratch_align( void ) { + return fd_ulong_max( alignof(fd_ibeth_tile_t), tx_free_align() ); +} + +static ulong +scratch_footprint( fd_topo_tile_t const * tile ) { + ulong l = FD_LAYOUT_INIT; + l = FD_LAYOUT_APPEND( l, alignof(fd_ibeth_tile_t), sizeof(fd_ibeth_tile_t) ); + l = FD_LAYOUT_APPEND( l, tx_free_align(), tx_free_footprint( tile->ibeth.tx_queue_size ) ); + return FD_LAYOUT_FINI( l, scratch_align() ); +} + +/* fd_ibeth_dev_contains_if returns 1 if the PCIe device backing an ibverbs + device manages the specified interface, 0 otherwise. Useful as a quick + way to find which Linux interface corresponds to which ibverbs device. */ + +static int +fd_ibeth_dev_contains_if( struct ibv_device * dev, + char const * ifname ) { + char sysfs_net[ PATH_MAX ]; + if( FD_UNLIKELY( strlen( dev->ibdev_path )+11+1 > PATH_MAX ) ) { + return 0; + } + char * p = fd_cstr_init( sysfs_net ); + p = fd_cstr_append_cstr( p, dev->ibdev_path ); + p = fd_cstr_append_cstr( p, "/device/net" ); + fd_cstr_fini( p ); + + DIR * dir = opendir( sysfs_net ); + if( FD_UNLIKELY( !dir ) ) { + FD_LOG_WARNING(( "opeendir(%s) failed (%i-%s), skipping ibverbs device %s", + sysfs_net, errno, fd_io_strerror( errno ), dev->name )); + } + int found = 0; + struct dirent * entry; + while( (entry = readdir( dir )) ) { + if( entry->d_name[0] == '.' ) continue; + if( 0==strcmp( entry->d_name, ifname ) ) { + found = 1; + break; + } + } + if( FD_UNLIKELY( 0!=closedir( dir ) ) ) { + FD_LOG_ERR(( "closedir(%s) failed (%i-%s)", sysfs_net, errno, fd_io_strerror( errno ) )); + } + return found; +} + +/* fd_ibeth_dev_open attempts to open an ibv_context for the device + specified by tile configuration. */ + +static struct ibv_context * +fd_ibeth_dev_open( fd_topo_tile_t const * tile ) { + int device_cnt = 0; + struct ibv_device ** dev_list = ibv_get_device_list( &device_cnt ); + if( FD_UNLIKELY( !dev_list ) ) { + FD_LOG_ERR(( "ibv_get_device_list_failed (%i-%s)", errno, fd_io_strerror( errno ) )); + } + if( FD_UNLIKELY( !device_cnt ) ) { + FD_LOG_ERR(( "No ibverbs devices found" )); + } + FD_LOG_DEBUG(( "Found %i ibverbs devices", device_cnt )); + + /* Scan device list for interface */ + struct ibv_device * dev = NULL; + for( int i=0; iibeth.if_name ) ) { + dev = dev_candidate; + break; + } + } + if( FD_UNLIKELY( !dev ) ) { + FD_LOG_ERR(( "ibverbs device for interface `%s` not found", tile->ibeth.if_name )); + } + + FD_LOG_NOTICE(( "Opening ibverbs device `%s`", dev->name )); + + struct ibv_context * ibv_context = ibv_open_device( dev ); + if( FD_UNLIKELY( !ibv_context ) ) { + FD_LOG_ERR(( "ibv_open_device(%s) failed", dev->name )); + } + + ibv_free_device_list( dev_list ); + return ibv_context; +} + +/* fd_ibeth_rx_recycle sends a RX work request to the queue pair. It + contains a packet buffer that the NIC eventually fills. */ + +static inline void +fd_ibeth_rx_recycle( fd_ibeth_tile_t * ctx, + ulong chunk, + int flush ) { + fd_ibeth_recv_wr_t * verb = &ctx->rx_pending[ --ctx->rx_pending_rem ]; + verb->sge[0] = (struct ibv_sge) { + .addr = (ulong)fd_chunk_to_laddr( ctx->umem_base, chunk ), + .length = FD_NET_MTU, + .lkey = ctx->mr_lkey + }; + verb->wr[0].wr_id = chunk; + + if( !ctx->rx_pending_rem || flush ) { + struct ibv_recv_wr * bad_wr; + if( FD_UNLIKELY( ibv_post_recv( ctx->qp, verb->wr, &bad_wr ) ) ) { + FD_LOG_ERR(( "ibv_post_recv failed (%i-%s)", errno, fd_io_strerror( errno ) )); + } + ctx->rx_pending_rem = FD_IBETH_PENDING_MAX; + } +} + +/* rxq_assign adds a routing rule. All incoming IPv4 UDP ports with the + specified dst port will be redirected to the first output link in the + topology with the specified names. frag_meta descriptors are annotated + with the given 'dst_proto' value. */ + +static void +rxq_assign( fd_ibeth_tile_t * ctx, + fd_topo_t * topo, + fd_topo_tile_t * tile, + ulong dst_proto, + char const * out_link, + ushort dst_port ) { + ulong out_idx = fd_topo_find_tile_out_link( topo, tile, out_link, 0UL ); + if( FD_UNLIKELY( out_idx==ULONG_MAX || !dst_port ) ) return; + if( FD_UNLIKELY( ctx->dst_port_cnt >= FD_IBETH_UDP_PORT_MAX ) ) { + FD_LOG_ERR(( "ibeth tile rxq link count exceeds max of %lu", FD_IBETH_UDP_PORT_MAX )); + } + uint const idx = ctx->dst_port_cnt; + ctx->dst_protos [ idx ] = (uchar)dst_proto; + ctx->dst_ports [ idx ] = dst_port; + ctx->dst_out_idx[ idx ] = (uchar)out_idx; + ctx->dst_port_cnt++; + + for( ulong i=0UL; irx_link_cnt; i++ ) { + if( ctx->rx_link_out_idx[ i ]==out_idx ) { + goto registered; + } + } + FD_TEST( ctx->rx_link_cnt < FD_IBETH_UDP_PORT_MAX ); + ctx->rx_link_out_idx[ ctx->rx_link_cnt++ ] = (uchar)out_idx; +registered: + if(0){} +} + +static void +rxq_assign_all( fd_ibeth_tile_t * ctx, + fd_topo_t * topo, + fd_topo_tile_t * tile ) { + rxq_assign( ctx, topo, tile, DST_PROTO_TPU_UDP, "net_quic", tile->ibeth.net.legacy_transaction_listen_port ); + rxq_assign( ctx, topo, tile, DST_PROTO_TPU_QUIC, "net_quic", tile->ibeth.net.quic_transaction_listen_port ); + rxq_assign( ctx, topo, tile, DST_PROTO_SHRED, "net_shred", tile->ibeth.net.shred_listen_port ); + rxq_assign( ctx, topo, tile, DST_PROTO_GOSSIP, "net_gossip", tile->ibeth.net.gossip_listen_port ); + rxq_assign( ctx, topo, tile, DST_PROTO_REPAIR, "net_shred", tile->ibeth.net.repair_intake_listen_port ); + rxq_assign( ctx, topo, tile, DST_PROTO_REPAIR, "net_repair", tile->ibeth.net.repair_serve_listen_port ); +} + +/* privileged_init does various ibverbs configuration via userspace verbs + (/dev/interface/uverbs*). */ + +FD_FN_UNUSED static void +privileged_init( fd_topo_t * topo, + fd_topo_tile_t * tile ) { + fd_ibeth_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id ); + memset( ctx, 0, sizeof(fd_ibeth_tile_t) ); + + /* Load up dcache containing UMEM */ + void * const dcache_mem = fd_topo_obj_laddr( topo, tile->ibeth.umem_dcache_obj_id ); + void * const umem_dcache = fd_dcache_join( dcache_mem ); + FD_TEST( umem_dcache ); + ulong const umem_dcache_data_sz = fd_dcache_data_sz( umem_dcache ); + ulong const umem_frame_sz = 2048UL; + if( FD_UNLIKELY( !umem_dcache ) ) { + FD_LOG_ERR(( "fd_dcache_join(ibeth.umem_dcache_obj_id failed" )); + } + + /* Left shrink UMEM region to be 4096 byte aligned */ + void * const umem_frame0 = (void *)fd_ulong_align_up( (ulong)umem_dcache, 4096UL ); + ulong umem_sz = umem_dcache_data_sz - ((ulong)umem_frame0 - (ulong)umem_dcache); + umem_sz = fd_ulong_align_dn( umem_sz, umem_frame_sz ); + + /* Derive chunk bounds */ + void * const umem_base = fd_wksp_containing( dcache_mem ); + ulong const umem_chunk0 = ( (ulong)umem_frame0 - (ulong)umem_base )>>FD_CHUNK_LG_SZ; + ulong const umem_wmark = umem_chunk0 + ( ( umem_sz-umem_frame_sz )>>FD_CHUNK_LG_SZ ); + if( FD_UNLIKELY( umem_chunk0>UINT_MAX || umem_wmark>UINT_MAX || umem_chunk0>umem_wmark ) ) { + FD_LOG_ERR(( "Calculated invalid UMEM bounds [%lu,%lu]", umem_chunk0, umem_wmark )); + } + if( FD_UNLIKELY( !umem_base ) ) FD_LOG_ERR(( "UMEM dcache is not in a workspace" )); + if( FD_UNLIKELY( !umem_dcache ) ) FD_LOG_ERR(( "Failed to join UMEM dcache" )); + + ctx->umem_base = (uchar *)umem_base; + ctx->umem_frame0 = umem_frame0; + ctx->umem_sz = umem_sz; + ctx->umem_chunk0 = (uint)umem_chunk0; + ctx->umem_wmark = (uint)umem_wmark; + + if( FD_UNLIKELY( tile->kind_id!=0 ) ) { + /* FIXME support receive side scaling using ibv_create_rwq_ind_table + and ibv_rx_hash_conf. */ + FD_LOG_ERR(( "Sorry, net.provider='ibeth' only supports layout.net_tile_count=1" )); + } + + struct ibv_context * ibv_context = fd_ibeth_dev_open( tile ); + ctx->ibv_ctx = ibv_context; + + /* Receive async events non-blocking */ + int async_fd = ibv_context->async_fd; + int async_flags = fcntl( async_fd, F_GETFL ); + if( FD_UNLIKELY( 0!=fcntl( async_fd, F_SETFL, async_flags|O_NONBLOCK) ) ) { + FD_LOG_ERR(( "Failed to make ibv_context->async_fd non-blocking (%i-%s)", + errno, fd_io_strerror( errno ) )); + } + + uint if_idx = if_nametoindex( tile->ibeth.if_name ); + if( FD_UNLIKELY( !if_idx ) ) { + FD_LOG_ERR(( "if_nametoindex(%s) failed (%i-%s)", + tile->ibeth.if_name, errno, fd_io_strerror( errno ) )); + } + ctx->main_if_idx = if_idx; + + /* Create protection domain */ + struct ibv_pd * pd = ibv_alloc_pd( ibv_context ); + if( FD_UNLIKELY( !pd ) ) { + FD_LOG_ERR(( "ibv_alloc_pd failed" )); + } + + /* Add buffer to protection domain */ + struct ibv_mr * mr = ibv_reg_mr( pd, umem_frame0, umem_sz, IBV_ACCESS_LOCAL_WRITE ); + if( FD_UNLIKELY( !mr ) ) { + FD_LOG_ERR(( "ibv_reg_mr failed" )); + } + ctx->mr_lkey = mr->lkey; + + /* Create completion queue */ + struct ibv_cq_init_attr_ex cq_attr = { + .cqe = tile->ibeth.rx_queue_size + tile->ibeth.tx_queue_size, + .wc_flags = IBV_WC_EX_WITH_BYTE_LEN, + .comp_mask = IBV_CQ_INIT_ATTR_MASK_FLAGS, + .flags = IBV_CREATE_CQ_ATTR_SINGLE_THREADED + }; + ctx->cq = ibv_create_cq_ex( ibv_context, &cq_attr ); + if( FD_UNLIKELY( !ctx->cq ) ) { + FD_LOG_ERR(( "ibv_create_cq failed" )); + } + + /* Create queue pair */ + struct ibv_qp_init_attr qp_init_attr = { + .qp_context = NULL, + .recv_cq = ibv_cq_ex_to_cq( ctx->cq ), + .send_cq = ibv_cq_ex_to_cq( ctx->cq ), + .cap = { + .max_recv_wr = tile->ibeth.rx_queue_size, + .max_recv_sge = 1, + .max_send_wr = tile->ibeth.tx_queue_size, + .max_send_sge = 1 + }, + .qp_type = IBV_QPT_RAW_PACKET + }; + ctx->qp = ibv_create_qp( pd, &qp_init_attr ); + if( FD_UNLIKELY( !ctx->qp ) ) { + FD_LOG_ERR(( "ibv_create_qp(.cap.max_recv_wr=%u) failed", + tile->ibeth.rx_queue_size )); + } + + /* Set QP to INIT state, assign port */ + struct ibv_qp_attr qp_attr; + memset( &qp_attr, 0, sizeof(qp_attr) ); + qp_attr.qp_state = IBV_QPS_INIT; + qp_attr.port_num = 1; /* FIXME support multi-port NICs */ + int modify_err; + if( FD_UNLIKELY( (modify_err = ibv_modify_qp( ctx->qp, &qp_attr, IBV_QP_STATE | IBV_QP_PORT )) ) ) { + FD_LOG_ERR(( "ibv_modify_qp(IBV_QP_INIT,port_num=1,IBV_QP_STATE|IBV_QP_PORT) failed (%i-%s)", + modify_err, fd_io_strerror( modify_err ) )); + } + + /* Set QP to "Ready to Receive" state */ + memset( &qp_attr, 0, sizeof(qp_attr) ); + qp_attr.qp_state = IBV_QPS_RTR; + if( FD_UNLIKELY( (modify_err = ibv_modify_qp( ctx->qp, &qp_attr, IBV_QP_STATE )) ) ) { + FD_LOG_ERR(( "ibv_modify_qp(IBV_QPS_RTR,IBV_QP_STATE) failed (%i-%s)", modify_err, fd_io_strerror( modify_err ) )); + } + + /* Set QP to "Ready to Send" state */ + memset( &qp_attr, 0, sizeof(qp_attr) ); + qp_attr.qp_state = IBV_QPS_RTS; + if( FD_UNLIKELY( (modify_err = ibv_modify_qp( ctx->qp, &qp_attr, IBV_QP_STATE )) ) ) { + FD_LOG_ERR(( "ibv_modify_qp(IBV_QPS_RTS,IBV_QP_STATE) failed (%i-%s)", modify_err, fd_io_strerror( modify_err ) )); + } + + /* Setup flow steering */ + rxq_assign_all( ctx, topo, tile ); + struct __attribute__((packed,aligned(8))) { + struct ibv_flow_attr attr; + struct ibv_flow_spec_eth eth; + struct ibv_flow_spec_ipv4 ipv4; + struct ibv_flow_spec_tcp_udp udp; + } flow_rule; + for( ulong i=0UL; i<(ctx->dst_port_cnt); i++ ) { + flow_rule.attr = (struct ibv_flow_attr) { + .comp_mask = 0, + .type = IBV_FLOW_ATTR_NORMAL, + .size = sizeof flow_rule, + .priority = 0, + .num_of_specs = 3, + .port = 1, + .flags = 0 + }; + flow_rule.eth = (struct ibv_flow_spec_eth) { + .type = IBV_FLOW_SPEC_ETH, + .size = sizeof(struct ibv_flow_spec_eth), + .val = { + .ether_type = fd_ushort_bswap( FD_ETH_HDR_TYPE_IP ) + }, + .mask = { + .ether_type = USHORT_MAX + } + }; + flow_rule.ipv4 = (struct ibv_flow_spec_ipv4) { + .type = IBV_FLOW_SPEC_IPV4, + .size = sizeof(struct ibv_flow_spec_ipv4), + .val = { + .dst_ip = tile->ibeth.net.bind_address + }, + .mask = { + .dst_ip = tile->ibeth.net.bind_address ? UINT_MAX : 0U + } + }; + flow_rule.udp = (struct ibv_flow_spec_tcp_udp) { + .type = IBV_FLOW_SPEC_UDP, + .size = sizeof(struct ibv_flow_spec_tcp_udp), + .val = { + .dst_port = fd_ushort_bswap( ctx->dst_ports[ i ] ) + }, + .mask = { + .dst_port = USHORT_MAX + } + }; + + struct ibv_flow * flow = ibv_create_flow( ctx->qp, fd_type_pun( &flow_rule ) ); + if( FD_UNLIKELY( !flow ) ) { + FD_LOG_ERR(( "ibv_create_flow failed (%i-%s)", errno, fd_io_strerror( errno ) )); + } + FD_LOG_DEBUG(( "Created flow rule for ip4.dst_ip=" FD_IP4_ADDR_FMT " udp.dst_port:%hu", + FD_IP4_ADDR_FMT_ARGS( tile->ibeth.net.bind_address ), + ctx->dst_ports[ i ] )); + } + FD_LOG_NOTICE(( "Installed %u ibv_flow rules", ctx->dst_port_cnt )); +} + +FD_FN_UNUSED static void +unprivileged_init( fd_topo_t * topo, + fd_topo_tile_t * tile ) { + FD_SCRATCH_ALLOC_INIT( l, fd_topo_obj_laddr( topo, tile->tile_obj_id ) ); + fd_ibeth_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_ibeth_tile_t), sizeof(fd_ibeth_tile_t) ); + void * deque_mem = FD_SCRATCH_ALLOC_APPEND( l, tx_free_align(), tx_free_footprint( tile->ibeth.tx_queue_size ) ); + + /* chunk 0 is used as a sentinel value, so ensure actual chunk indices + do not use that value. */ + FD_TEST( ctx->umem_chunk0 > 0 ); + + + /* Prepare RX WR batching */ + for( uint i=0U; irx_pending[ i ]; + memset( verb, 0, sizeof(fd_ibeth_recv_wr_t) ); + verb->wr->next = (i<(FD_IBETH_PENDING_MAX-1)) ? ctx->rx_pending[i+1].wr : NULL; + verb->wr->sg_list = verb->sge; + verb->wr->num_sge = 1; + } + ctx->rx_pending_rem = FD_IBETH_PENDING_MAX; + + /* Post RX descriptors */ + ulong frame_chunks = FD_NET_MTU>>FD_CHUNK_LG_SZ; + ulong next_chunk = ctx->umem_chunk0; + ctx->rx_chunk0 = (uint)next_chunk; + ulong const rx_fill_cnt = tile->ibeth.rx_queue_size; + for( ulong i=0UL; iout_cnt); i++ ) { + fd_frag_meta_t * mcache = topo->links[ tile->out_link_id[ i ] ].mcache; + ulong const depth = fd_mcache_depth( mcache ); + for( ulong j=0UL; jrx_chunk1 = (uint)next_chunk; + + /* Init TX free list */ + ctx->tx_chunk0 = (uint)next_chunk; + ctx->tx_free = tx_free_join( tx_free_new( deque_mem, tile->ibeth.tx_queue_size ) ); + while( !tx_free_full( ctx->tx_free ) ) { + tx_free_push_tail( ctx->tx_free, (uint)next_chunk ); + next_chunk += frame_chunks; + } + ctx->tx_chunk1 = (uint)next_chunk; + + /* Init TX */ + for( ulong i=0UL; i<(tile->in_cnt); i++ ) { + fd_topo_link_t * link = &topo->links[ tile->in_link_id[ i ] ]; + if( FD_UNLIKELY( link->mtu!=FD_NET_MTU ) ) FD_LOG_ERR(( "ibeth tile in link does not have a normal MTU" )); + + ctx->txq[ i ].base = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp; + ctx->txq[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->txq[ i ].base, link->dcache ); + ctx->txq[ i ].wmark = fd_dcache_compact_wmark( ctx->txq[ i ].base, link->dcache, link->mtu ); + } + + /* Join netbase objects */ + FD_TEST( fd_fib4_join( ctx->fib_local, fd_topo_obj_laddr( topo, tile->xdp.fib4_local_obj_id ) ) ); + FD_TEST( fd_fib4_join( ctx->fib_main, fd_topo_obj_laddr( topo, tile->xdp.fib4_main_obj_id ) ) ); + + ulong neigh4_obj_id = tile->xdp.neigh4_obj_id; + ulong ele_max = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "obj.%lu.ele_max", neigh4_obj_id ); + ulong probe_max = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "obj.%lu.probe_max", neigh4_obj_id ); + ulong seed = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "obj.%lu.seed", neigh4_obj_id ); + if( FD_UNLIKELY( (ele_max==ULONG_MAX) | (probe_max==ULONG_MAX) | (seed==ULONG_MAX) ) ) + FD_LOG_ERR(( "neigh4 hmap properties not set" )); + if( FD_UNLIKELY( !fd_neigh4_hmap_join( + ctx->neigh4, + fd_topo_obj_laddr( topo, neigh4_obj_id ), + ele_max, + probe_max, + seed ) ) ) { + FD_LOG_ERR(( "fd_neigh4_hmap_join failed" )); + } + + ulong net_netlnk_id = fd_topo_find_link( topo, "net_netlnk", 0UL ); + if( FD_UNLIKELY( net_netlnk_id!=ULONG_MAX ) ) { + fd_topo_link_t * net_netlnk = &topo->links[ net_netlnk_id ]; + ctx->r.neigh4_solicit->mcache = net_netlnk->mcache; + ctx->r.neigh4_solicit->depth = fd_mcache_depth( ctx->r.neigh4_solicit->mcache ); + ctx->r.neigh4_solicit->seq = fd_mcache_seq_query( fd_mcache_seq_laddr( ctx->r.neigh4_solicit->mcache ) ); + } + + /* Check if all chunks are in bound */ + if( FD_UNLIKELY( next_chunk > ctx->umem_wmark ) ) { + FD_LOG_ERR(( "dcache is too small (topology bug)" )); + } +} + +static inline void +metrics_write( fd_ibeth_tile_t * ctx ) { + FD_MCNT_SET( IBETH, RX_PKT_CNT, ctx->metrics.rx_pkt_cnt ); + FD_MCNT_SET( IBETH, RX_BYTES_TOTAL, ctx->metrics.rx_bytes_total ); + FD_MCNT_SET( IBETH, TX_PKT_CNT, ctx->metrics.tx_pkt_cnt ); + FD_MCNT_SET( IBETH, TX_BYTES_TOTAL, ctx->metrics.tx_bytes_total ); +} + +static void +handle_async_event( struct ibv_async_event * event ) { + FD_LOG_NOTICE(( "Async event: %u-%s", event->event_type, ibv_event_type_str( event->event_type ) )); + switch( event->event_type ) { + case IBV_EVENT_CQ_ERR: + FD_LOG_ERR(( "CQ error" )); + break; + case IBV_EVENT_QP_FATAL: + FD_LOG_ERR(( "QP fatal error" )); + break; + default: + break; + } + ibv_ack_async_event( event ); +} + +static void +poll_async_events( fd_ibeth_tile_t * ctx ) { + for(;;) { + struct pollfd pfd[1] = {{ + .fd = ctx->ibv_ctx->async_fd, + .events = POLLIN + }}; + int ret = poll( pfd, 1, 0 ); + if( ret<0 || !( pfd->revents & POLLIN ) ) break; + struct ibv_async_event event; + if( 0==ibv_get_async_event( ctx->ibv_ctx, &event ) ) { + handle_async_event( &event ); + } + } +} + +static inline void +during_housekeeping( fd_ibeth_tile_t * ctx ) { + poll_async_events( ctx ); +} + +/* fd_ibeth_rx_pkt handles an ibverbs RX completion. If the completion + frees a frame, returns the chunk index. Returns zero if no frame can + be freed. + + The completion can either fail (immediately returns the chunk of the + failed WQE for freeing), or succeed (posts a frag to tango, returns + the shadowed chunk index for freeing). */ + +static inline ulong +fd_ibeth_rx_pkt( fd_ibeth_tile_t * ctx, + fd_stem_context_t * stem, + ulong wr_id, + ulong byte_len, + enum ibv_wc_status status ) { + if( FD_UNLIKELY( status!=IBV_WC_SUCCESS ) ) return wr_id; + + ulong const chunk = wr_id; + ulong const sz = byte_len; + + if( FD_UNLIKELY( chunkumem_chunk0 || chunk>ctx->umem_wmark ) ) { + FD_LOG_CRIT(( "ibv_wc wr_id %lu out of bounds [%u,%u]", chunk, ctx->umem_chunk0, ctx->umem_wmark )); + } + fd_eth_hdr_t const * l2 = fd_chunk_to_laddr_const( ctx->umem_base, chunk ); + fd_ip4_hdr_t const * l3 = (fd_ip4_hdr_t const *)(l2+1); + fd_udp_hdr_t const * l4 = (fd_udp_hdr_t const *)( (uchar *)l3 + FD_IP4_GET_LEN( *l3 ) ); + ulong const dgram_off = (ulong)(l4+1) - (ulong)l2; + + /* Even though these are reads of uninitialized / untrusted data, this + never actually goes beyond the bounds of a frame (FD_NET_MTU). */ + int const sz_ok = dgram_off<=sz; + int const hdr_ok = + ( fd_ushort_bswap( l2->net_type )==FD_ETH_HDR_TYPE_IP ) & + ( FD_IP4_GET_VERSION( *l3 )==4 ) & + ( l3->protocol==FD_IP4_HDR_PROTOCOL_UDP ); + + ushort const net_dport = fd_ushort_bswap( l4->net_dport ); + int out_idx = -1; + for( ulong i=0UL; idst_ports[ i ]==net_dport ) { + out_idx = ctx->dst_out_idx[ i ]; + break; + } + } + int const match_ok = + (out_idx >= 0) & + (out_idx < (int)ctx->dst_port_cnt); + + int const filter = sz_ok & hdr_ok & match_ok; + if( FD_UNLIKELY( !filter ) ) return wr_id; + + /* FIXME: Since the order of wr_ids in CQEs mirrors those posted in + WQEs, we could recover the shadowed wr_id/chunk without + touching memory here ... */ + fd_frag_meta_t * mcache = stem->mcaches[ out_idx ]; + ulong const depth = stem->depths [ out_idx ]; + ulong * seqp = &stem->seqs [ out_idx ]; + ulong const seq = *seqp; + + ulong freed_chunk = mcache[ fd_mcache_line_idx( seq, depth ) ].chunk; + + ulong const proto = ctx->dst_protos[ out_idx ]; + ulong const sig = fd_disco_netmux_sig( l3->saddr, l4->net_sport, 0U, proto, dgram_off ); + ulong const ctl = 0UL; + ulong const tsorig = 0UL; + ulong const tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() ); + + fd_mcache_publish_sse( mcache, depth, seq, sig, chunk, sz, ctl, tsorig, tspub ); + ctx->metrics.rx_pkt_cnt++; + ctx->metrics.rx_bytes_total += sz; + + return freed_chunk; +} + +/* fd_ibeth_tx_recycle recycles the TX frame of a completed TX operation. */ + +static void +fd_ibeth_tx_recycle( fd_ibeth_tile_t * ctx, + ulong chunk ) { + if( FD_UNLIKELY( (chunkumem_chunk0) | (chunk>ctx->umem_wmark) ) ) { + FD_LOG_ERR(( "TX completion chunk %lu out of bounds [%u,%u]", chunk, ctx->umem_chunk0, ctx->umem_wmark )); + return; + } + if( FD_UNLIKELY( !tx_free_push_head( ctx->tx_free, (uint)chunk ) ) ) { + FD_LOG_ERR(( "TX free list full" )); + } +} + +/* after_credit is called every run loop iteration, provided there is + sufficient downstream credit for forwarding on all output links. + Receives up to one packet. */ + +static inline void +after_credit( fd_ibeth_tile_t * ctx, + fd_stem_context_t * stem, + int * poll_in, + int * charge_busy ) { + (void)poll_in; + + /* Poll for new event */ + struct ibv_cq_ex * cq = ctx->cq; + struct ibv_poll_cq_attr cq_attr = {0}; + int poll_err = ibv_start_poll( cq, &cq_attr ); + if( poll_err ) goto poll_err; + *charge_busy = 1; + uint cqe_avail = 1024u; /* FIXME */ + do { + ulong wr_id = cq->wr_id; + enum ibv_wc_status status = cq->status; + ulong byte_len = ibv_wc_read_byte_len( cq ); + enum ibv_wc_opcode opcode = ibv_wc_read_opcode( cq ); + + if( FD_LIKELY( opcode==IBV_WC_RECV ) ) { + ulong freed_chunk = fd_ibeth_rx_pkt( ctx, stem, wr_id, byte_len, status ); + if( FD_UNLIKELY( !freed_chunk ) ) FD_LOG_CRIT(( "invalid chunk in mcache" )); + fd_ibeth_rx_recycle( ctx, freed_chunk, 0 ); + } else if( FD_LIKELY( opcode==IBV_WC_SEND ) ) { + if( FD_LIKELY( status==IBV_WC_SUCCESS ) ) { + ctx->metrics.tx_pkt_cnt++; + ctx->metrics.tx_bytes_total += byte_len; + } + fd_ibeth_tx_recycle( ctx, wr_id ); + } else { + FD_LOG_WARNING(( "ibv_wc opcode %u status %u not supported", opcode, status )); + } + + poll_err = ibv_next_poll( cq ); + } while( !poll_err && --cqe_avail ); + ibv_end_poll( cq ); +poll_err: + if( FD_UNLIKELY( poll_err && poll_err!=ENOENT ) ) { + FD_LOG_ERR(( "ibv_cq_ex poll failed (%i)", poll_err )); + } +} + +/* {before,during,after}_frag copy a packet received from an input link out + to an ibverbs queue pair for TX. */ + +static inline int +before_frag( fd_ibeth_tile_t * ctx, + ulong in_idx, + ulong seq, + ulong sig ) { + (void)in_idx; (void)seq; + + /* Find interface index of next packet */ + + ulong proto = fd_disco_netmux_sig_proto( sig ); + if( FD_UNLIKELY( proto!=DST_PROTO_OUTGOING ) ) return 1; + + uint dst_ip = fd_disco_netmux_sig_dst_ip( sig ); + if( FD_UNLIKELY( !fd_net_tx_route( &ctx->r, dst_ip ) ) ) return 1; + + uint const next_hop_if_idx = ctx->r.tx_op.if_idx; + if( FD_UNLIKELY( next_hop_if_idx==1 ) ) { + /* Sorry, loopback not supported yet */ + return 1; + } else { + /* "Real" interface */ + uint const main_if_idx = ctx->main_if_idx; + if( FD_UNLIKELY( main_if_idx != next_hop_if_idx ) ) { + /* Unreachable for now, since only the main_if_idx has a neighbor + table, therefore fd_net_tx_route would abort before this is + reached */ + return 1; /* ignore */ + } + } + + /* Skip if TX is blocked */ + + if( FD_UNLIKELY( tx_free_empty( ctx->tx_free ) ) ) { + /* FIXME metric */ + return 1; /* ignore */ + } + + return 0; /* continue */ +} + +static inline void +during_frag( fd_ibeth_tile_t * ctx, + ulong in_idx, + ulong seq, + ulong sig, + ulong chunk, + ulong sz, + ulong ctl ) { + (void)seq; (void)sig; (void)ctl; + + fd_ibeth_txq_t * txq = &ctx->txq[ in_idx ]; + if( FD_UNLIKELY( chunk < txq->chunk0 || chunk > txq->wmark || sz>FD_NET_MTU ) ) { + FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, txq->chunk0, txq->wmark )); + } + if( FD_UNLIKELY( sz<34UL ) ) { + FD_LOG_ERR(( "packet too small %lu (in_idx=%lu)", sz, in_idx )); + } + + /* Speculatively copy frame into buffer */ + ulong dst_chunk = *tx_free_peek_head( ctx->tx_free ); + void * dst = fd_chunk_to_laddr( ctx->umem_base, dst_chunk ); + void const * src = fd_chunk_to_laddr_const( txq->base, chunk ); + fd_memcpy( dst, src, sz ); +} + +static void +after_frag( fd_ibeth_tile_t * ctx, + ulong in_idx, + ulong seq, + ulong sig, + ulong sz, + ulong tsorig, + ulong tspub, + fd_stem_context_t * stem ) { + (void)in_idx; (void)seq; (void)sig; (void)tsorig; (void)tspub; (void)stem; + + /* Set Ethernet src and dst MAC addrs, optionally mangle IPv4 header to + fill in source address (if it's missing). */ + ulong chunk = *tx_free_peek_head( ctx->tx_free ); + void * frame = fd_chunk_to_laddr( ctx->umem_base, chunk ); + if( FD_UNLIKELY( !fd_net_tx_fill_addrs( &ctx->r, frame, sz ) ) ) return; + + /* Submit TX job */ + struct ibv_sge sge = { + .addr = (ulong)frame, + .length = (uint)sz, + .lkey = ctx->mr_lkey + }; + struct ibv_send_wr wr = { + .wr_id = chunk, + .sg_list = &sge, + .num_sge = 1, + .opcode = IBV_WR_SEND, + .send_flags = IBV_SEND_SIGNALED + }; + + errno = 0; + struct ibv_send_wr * bad_wr; + int send_err = ibv_post_send( ctx->qp, &wr, &bad_wr ); + if( FD_UNLIKELY( send_err ) ) { + return; /* send failed, recycle frame */ + } + + /* Consume frame */ + tx_free_pop_head( ctx->tx_free ); +} + +#define STEM_CALLBACK_CONTEXT_TYPE fd_ibeth_tile_t +#define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_ibeth_tile_t) +#define STEM_CALLBACK_AFTER_CREDIT after_credit +#define STEM_CALLBACK_BEFORE_FRAG before_frag +#define STEM_CALLBACK_DURING_FRAG during_frag +#define STEM_CALLBACK_AFTER_FRAG after_frag +#define STEM_CALLBACK_METRICS_WRITE metrics_write +#define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping +#define STEM_BURST 1UL /* ignored */ +#define STEM_LAZY 130000UL /* 130us */ +#include "../../stem/fd_stem.c" + +#ifndef FD_TILE_TEST +fd_topo_run_tile_t fd_tile_ibeth = { + .name = "ibeth", + //.populate_allowed_seccomp = populate_allowed_seccomp, + //.populate_allowed_fds = populate_allowed_fds, + .scratch_align = scratch_align, + .scratch_footprint = scratch_footprint, + .privileged_init = privileged_init, + .unprivileged_init = unprivileged_init, + .run = stem_run, +}; +#endif diff --git a/src/disco/net/ibeth/test_ibeth_tile.c b/src/disco/net/ibeth/test_ibeth_tile.c new file mode 100644 index 00000000000..46dcd68e9ff --- /dev/null +++ b/src/disco/net/ibeth/test_ibeth_tile.c @@ -0,0 +1,615 @@ +/* test_ibeth_tile.c runs parts of the ibeth tile against a mock ibverbs + queue pair. */ + +#define FD_TILE_TEST 1 +#include "fd_ibeth_tile.c" +#include "../../../disco/topo/fd_topob.h" +#include "../../../waltz/ibverbs/fd_ibverbs_mock.h" + +#define SET_NAME frame_track +#include "../../../util/tmpl/fd_set_dynamic.c" + +#define WKSP_TAG 1UL +#define MR_LKEY 42UL +#define SHRED_PORT ((ushort)4242) + +#define IF_IDX_LO 1U +#define IF_IDX_ETH0 7U +#define IF_IDX_ETH1 8U + +/* chunk_to_frame_idx converts a tango chunk index (64 byte stride) to a + frame index (MTU multiple of 64 bytes). */ + +static inline ulong +chunk_to_frame_idx( ulong chunk ) { + return chunk / (FD_NET_MTU / FD_CHUNK_SZ); +} + +/* verify_rx_balance verifies that: + - no frame is allocated twice + - no frame is allocated out-of-bounds + - no frame disappeared (memory leak) */ + +static void +verify_rx_balance( fd_ibeth_tile_t const * tile, + fd_stem_context_t const * stem, + fd_ibverbs_mock_qp_t const * mock, + frame_track_t * frame_track ) { + ulong const frame_max = frame_track_max( frame_track ); + ulong const rx_frame0 = chunk_to_frame_idx( tile->rx_chunk0 ); + ulong const rx_frame1 = chunk_to_frame_idx( tile->rx_chunk1 ); + FD_TEST( rx_frame0=rx_frame0 && frame_idxrx_pending_rem <= FD_IBETH_PENDING_MAX ); + ulong const pending_cnt = FD_IBETH_PENDING_MAX - tile->rx_pending_rem; + for( ulong i=0UL; irx_pending[ FD_IBETH_PENDING_MAX-i-1 ].wr; + CHECK( wr->wr_id ); + void const * frame = fd_chunk_to_laddr_const( tile->umem_base, wr->wr_id ); + FD_TEST( wr->num_sge==1 ); + FD_TEST( wr->sg_list[0].addr==(ulong)frame ); + FD_TEST( wr->sg_list[0].length==FD_NET_MTU ); + } + + /* RX work queue entries */ + struct ibv_recv_wr * rx_q = mock->rx_q; + for( fd_ibv_recv_wr_q_iter_t iter = fd_ibv_recv_wr_q_iter_init( rx_q ); + !fd_ibv_recv_wr_q_iter_done( rx_q, iter ); + iter = fd_ibv_recv_wr_q_iter_next( rx_q, iter ) ) { + struct ibv_recv_wr const * wr = fd_ibv_recv_wr_q_iter_ele_const( rx_q, iter ); + FD_TEST( wr ); + CHECK( wr->wr_id ); + void const * frame = fd_chunk_to_laddr_const( tile->umem_base, wr->wr_id ); + FD_TEST( wr->num_sge==1 ); + FD_TEST( wr->sg_list[0].addr==(ulong)frame ); + FD_TEST( wr->sg_list[0].length==FD_NET_MTU ); + } + + /* Completion queue entries */ + struct ibv_wc * wc_q = mock->wc_q; + for( fd_ibv_wc_q_iter_t iter = fd_ibv_wc_q_iter_init( wc_q ); + !fd_ibv_wc_q_iter_done( wc_q, iter ); + iter = fd_ibv_wc_q_iter_next( wc_q, iter ) ) { + struct ibv_wc const * wc = fd_ibv_wc_q_iter_ele_const( wc_q, iter ); + FD_TEST( wc ); + if( wc->opcode == IBV_WC_RECV ) { + CHECK( wc->wr_id ); + } + } + + /* Out links */ + ulong const out_cnt = tile->rx_link_cnt; + for( ulong out_idx=0UL; out_idxmcaches[ tile->rx_link_out_idx[ out_idx ] ]; + FD_TEST( mcache ); + ulong const depth = fd_mcache_depth( mcache ); + for( ulong j=0UL; jchunk ); + } + } + + /* Check for memory leaks */ + FD_TEST( frame_track_is_null( frame_track ) ); + +#undef CHECK +} + +/* verify_tx_balance is like verify_rx_balance, just for TX. + TX frames are distributed across: + - TX work queue entries + - Completion queue entries */ + +static void +verify_tx_balance( fd_ibeth_tile_t const * tile, + fd_ibverbs_mock_qp_t const * mock, + ulong * frame_track ) { + ulong const frame_max = frame_track_max( frame_track ); + ulong const tx_frame0 = chunk_to_frame_idx( tile->tx_chunk0 ); + ulong const tx_frame1 = chunk_to_frame_idx( tile->tx_chunk1 ); + FD_TEST( tx_frame0=tx_frame0 && frame_idxtx_free ); + !tx_free_iter_done( tile->tx_free, iter ); + iter = tx_free_iter_next( tile->tx_free, iter ) ) { + uint const chunk = *tx_free_iter_ele_const( tile->tx_free, iter ); + CHECK( chunk ); + } + + /* TX work queue entries */ + struct ibv_send_wr * tx_q = mock->tx_q; + for( fd_ibv_send_wr_q_iter_t iter = fd_ibv_send_wr_q_iter_init( tx_q ); + !fd_ibv_send_wr_q_iter_done( tx_q, iter ); + iter = fd_ibv_send_wr_q_iter_next( tx_q, iter ) ) { + struct ibv_send_wr const * wr = fd_ibv_send_wr_q_iter_ele_const( tx_q, iter ); + FD_TEST( wr ); + CHECK( wr->wr_id ); + } + + /* Completion queue entries */ + struct ibv_wc * wc_q = mock->wc_q; + for( fd_ibv_wc_q_iter_t iter = fd_ibv_wc_q_iter_init( wc_q ); + !fd_ibv_wc_q_iter_done( wc_q, iter ); + iter = fd_ibv_wc_q_iter_next( wc_q, iter ) ) { + struct ibv_wc const * wc = fd_ibv_wc_q_iter_ele_const( wc_q, iter ); + FD_TEST( wc ); + if( wc->opcode == IBV_WC_SEND ) { + CHECK( wc->wr_id ); + } + } + + /* Check for memory leaks */ + FD_TEST( frame_track_is_null( frame_track ) ); + +#undef CHECK +} + +/* verify_balances ensures that packet frames are correctly allocated + across rings. */ + +static void +verify_balances( fd_ibeth_tile_t const * tile, + fd_stem_context_t const * stem, + fd_ibverbs_mock_qp_t const * mock, + frame_track_t * frame_track ) { + verify_rx_balance( tile, stem, mock, frame_track ); + verify_tx_balance( tile, mock, frame_track ); +} + +/* rx_complete_one moves one RX work request to a completion. */ + +static ulong +rx_complete_one( fd_ibverbs_mock_qp_t * mock, + enum ibv_wc_status status, + ulong sz ) { + FD_TEST( fd_ibv_recv_wr_q_cnt( mock->rx_q ) ); + FD_TEST( fd_ibv_wc_q_avail ( mock->wc_q ) ); + struct ibv_recv_wr const * wr = fd_ibv_recv_wr_q_pop_head_nocopy( mock->rx_q ); + struct ibv_wc * wc = fd_ibv_wc_q_push_tail_nocopy ( mock->wc_q ); + wc->wr_id = wr->wr_id; + wc->opcode = IBV_WC_RECV; + wc->status = status; + wc->byte_len = (uint)sz; + return wr->wr_id; +} + +/* tx_complete_one moves one TX work request to a completion. */ + +static ulong +tx_complete_one( fd_ibverbs_mock_qp_t * mock, + enum ibv_wc_status status, + ulong wr_id, + ulong sz ) { + FD_TEST( fd_ibv_wc_q_avail( mock->wc_q ) ); + struct ibv_wc * wc = fd_ibv_wc_q_push_tail_nocopy( mock->wc_q ); + wc->wr_id = wr_id; + wc->opcode = IBV_WC_SEND; + wc->status = status; + wc->byte_len = (uint)sz; + return wr_id; +} + +static void +add_neighbor( fd_neigh4_hmap_t * join, + uint ip4_addr, + uchar mac0, uchar mac1, uchar mac2, + uchar mac3, uchar mac4, uchar mac5 ) { + fd_neigh4_hmap_query_t query[1]; + int prepare_res = fd_neigh4_hmap_prepare( join, &ip4_addr, NULL, query, FD_MAP_FLAG_BLOCKING ); + FD_TEST( prepare_res==FD_MAP_SUCCESS ); + fd_neigh4_entry_t * ele = fd_neigh4_hmap_query_ele( query ); + ele->state = FD_NEIGH4_STATE_ACTIVE; + ele->ip4_addr = ip4_addr; + ele->mac_addr[0] = mac0; ele->mac_addr[1] = mac1; ele->mac_addr[2] = mac2; + ele->mac_addr[3] = mac3; ele->mac_addr[4] = mac4; ele->mac_addr[5] = mac5; + fd_neigh4_hmap_publish( query ); +} + +int +main( int argc, + char ** argv ) { + fd_boot( &argc, &argv ); + + ulong cpu_idx = fd_tile_cpu_id( fd_tile_idx() ); + if( cpu_idx>fd_shmem_cpu_cnt() ) cpu_idx = 0UL; + + char const * _page_sz = fd_env_strip_cmdline_cstr ( &argc, &argv, "--page-sz", NULL, "gigantic" ); + ulong const page_cnt = fd_env_strip_cmdline_ulong( &argc, &argv, "--page-cnt", NULL, 1UL ); + ulong const numa_idx = fd_env_strip_cmdline_ulong( &argc, &argv, "--numa-idx", NULL, fd_shmem_numa_idx( cpu_idx ) ); + ulong const rxq_depth = 1024UL; + ulong const txq_depth = 1024UL; + ulong const link_depth = 128UL; + ulong const cq_depth = rxq_depth + txq_depth; + + ulong const sge_max = 1UL; /* ibeth tile only uses 1 SGE */ + + fd_wksp_t * wksp = fd_wksp_new_anonymous( fd_cstr_to_shmem_page_sz( _page_sz ), page_cnt, fd_shmem_cpu_idx( numa_idx ), "wksp", 0UL ); + FD_TEST( wksp ); + + /* Mock ibverbs queue pair */ + void * mock_mem = fd_wksp_alloc_laddr( wksp, fd_ibverbs_mock_qp_align(), fd_ibverbs_mock_qp_footprint( rxq_depth, txq_depth, cq_depth, sge_max ), WKSP_TAG ); + fd_ibverbs_mock_qp_t * mock = fd_ibverbs_mock_qp_new( mock_mem, rxq_depth, txq_depth, cq_depth, sge_max ); + FD_TEST( mock ); + + /* Mock a topology */ + static fd_topo_t topo[1]; + fd_topo_wksp_t * topo_wksp = fd_topob_wksp( topo, "wksp" ); + topo_wksp->wksp = wksp; + fd_topo_tile_t * topo_tile = fd_topob_tile( topo, "ibeth", "wksp", "wksp", cpu_idx, 0, 0 ); + topo_tile->ibeth.rx_queue_size = (uint)rxq_depth; + topo_tile->ibeth.tx_queue_size = (uint)txq_depth; + topo_tile->ibeth.net.shred_listen_port = SHRED_PORT; + + /* Mock an RX output link */ + fd_topo_link_t * rx_link = fd_topob_link( topo, "net_shred", "wksp", link_depth, 0UL, 1UL ); + void * rx_mcache_mem = fd_wksp_alloc_laddr( wksp, fd_mcache_align(), fd_mcache_footprint( 128UL, 0UL ), WKSP_TAG ); + rx_link->mcache = fd_mcache_join( fd_mcache_new( rx_mcache_mem, 128UL, 0UL, 0UL ) ); + FD_TEST( rx_link->mcache ); + topo->objs[ rx_link->mcache_obj_id ].offset = (ulong)rx_mcache_mem - (ulong)wksp; + + /* Allocate tile memory */ + fd_ibeth_tile_t * tile = fd_wksp_alloc_laddr( wksp, scratch_align(), scratch_footprint( topo_tile ), WKSP_TAG ); + FD_TEST( tile ); + memset( tile, 0, sizeof(fd_ibeth_tile_t) ); + topo->objs[ topo_tile->tile_obj_id ].offset = (ulong)tile - (ulong)wksp; + FD_TEST( fd_topo_obj_laddr( topo, topo_tile->tile_obj_id )==tile ); + FD_TEST( rx_link->mcache ); + + /* UMEM */ + ulong const dcache_depth = rxq_depth+txq_depth+link_depth; + ulong const dcache_data_sz = fd_dcache_req_data_sz( FD_NET_MTU, dcache_depth, 1UL, 1 ); + FD_TEST( dcache_data_sz ); + void * rx_dcache_mem = fd_wksp_alloc_laddr( wksp, fd_dcache_align(), fd_dcache_footprint( dcache_data_sz, 0UL ), WKSP_TAG ); + uchar * rx_dcache = fd_dcache_join( fd_dcache_new( rx_dcache_mem, dcache_data_sz, 0UL ) ); + fd_topo_obj_t * dcache_obj = fd_topob_obj( topo, "dcache", "wksp" ); + topo->objs[ dcache_obj->id ].offset = (ulong)rx_dcache_mem - (ulong)wksp; + topo_tile->ibeth.umem_dcache_obj_id = dcache_obj->id; + tile->umem_base = (uchar *)rx_dcache_mem; + tile->umem_frame0 = rx_dcache; + tile->umem_sz = dcache_data_sz; + tile->umem_chunk0 = (uint)fd_laddr_to_chunk( wksp, rx_dcache ); + tile->umem_wmark = (uint)fd_dcache_compact_wmark( wksp, rx_dcache, FD_NET_MTU ); + + /* Mock a TX input link */ + fd_topo_link_t * tx_link = fd_topob_link( topo, "shred_net", "wksp", link_depth, FD_NET_MTU, 1UL ); + void * tx_mcache_mem = fd_wksp_alloc_laddr( wksp, fd_mcache_align(), fd_mcache_footprint( 128UL, 0UL ), WKSP_TAG ); + tx_link->mcache = fd_mcache_join( fd_mcache_new( tx_mcache_mem, 128UL, 0UL, 0UL ) ); + FD_TEST( tx_link->mcache ); + void * tx_dcache_mem = fd_wksp_alloc_laddr( wksp, fd_dcache_align(), fd_dcache_footprint( dcache_data_sz, 0UL ), WKSP_TAG ); + tx_link->dcache = fd_dcache_join( fd_dcache_new( tx_dcache_mem, dcache_data_sz, 0UL ) ); + FD_TEST( tx_link->dcache ); + + /* Inject mock ibverbs QP into tile state */ + tile->cq = fd_ibverbs_mock_qp_get_cq_ex( mock ); + tile->qp = fd_ibverbs_mock_qp_get_qp ( mock ); + tile->mr_lkey = MR_LKEY; + + /* Netbase objects */ + ulong const fib4_max = 8UL; + void * fib4_local_mem = fd_wksp_alloc_laddr( wksp, fd_fib4_align(), fd_fib4_footprint( fib4_max ), WKSP_TAG ); + void * fib4_main_mem = fd_wksp_alloc_laddr( wksp, fd_fib4_align(), fd_fib4_footprint( fib4_max ), WKSP_TAG ); + FD_TEST( fd_fib4_new( fib4_local_mem, fib4_max ) ); + FD_TEST( fd_fib4_new( fib4_main_mem, fib4_max ) ); + fd_topo_obj_t * topo_fib4_local = fd_topob_obj( topo, "fib4", "wksp" ); + fd_topo_obj_t * topo_fib4_main = fd_topob_obj( topo, "fib4", "wksp" ); + topo_fib4_local->offset = (ulong)fib4_local_mem - (ulong)wksp; + topo_fib4_main->offset = (ulong)fib4_main_mem - (ulong)wksp; + topo_tile->ibeth.fib4_local_obj_id = topo_fib4_local->id; + topo_tile->ibeth.fib4_main_obj_id = topo_fib4_main->id; + ulong const neigh4_ele_max = 16UL; + ulong const neigh4_probe_max = 8UL; + ulong const neigh4_lock_max = 4UL; + void * neigh4_hmap_mem = fd_wksp_alloc_laddr( wksp, fd_neigh4_hmap_align(), fd_neigh4_hmap_footprint( neigh4_ele_max, neigh4_lock_max, neigh4_probe_max ), WKSP_TAG ); + void * neigh4_ele_mem = fd_wksp_alloc_laddr( wksp, alignof(fd_neigh4_entry_t), neigh4_ele_max*sizeof(fd_neigh4_entry_t), WKSP_TAG ); + FD_TEST( fd_neigh4_hmap_new( neigh4_hmap_mem, neigh4_ele_max, neigh4_lock_max, neigh4_probe_max, 1UL ) ); + fd_topo_obj_t * topo_neigh4_hmap = fd_topob_obj( topo, "neigh4_hmap", "wksp" ); + fd_topo_obj_t * topo_neigh4_ele = fd_topob_obj( topo, "opaque", "wksp" ); + topo_neigh4_hmap->offset = (ulong)neigh4_hmap_mem - (ulong)wksp; + topo_neigh4_ele->offset = (ulong)neigh4_ele_mem - (ulong)wksp; + topo_tile->ibeth.neigh4_obj_id = topo_neigh4_hmap->id; + topo_tile->ibeth.neigh4_ele_obj_id = topo_neigh4_ele->id; + fd_neigh4_hmap_t neigh4_hmap_[1]; + fd_neigh4_hmap_t * neigh4_hmap = fd_neigh4_hmap_join( neigh4_hmap_, neigh4_hmap_mem, neigh4_ele_mem ); + FD_TEST( neigh4_hmap ); + + /* Network configuration */ + tile->main_if_idx = IF_IDX_ETH0; + uint const public_ip4_addr = FD_IP4_ADDR( 203,0,113,88 ); /* our default source address */ + uint const site_ip4_addr = FD_IP4_ADDR( 203,0,113,89 ); /* our site address */ + uint const banned_ip4_addr = FD_IP4_ADDR( 7,0,0,1 ); /* blackholed at the route table */ + uint const path2_ip4_addr = FD_IP4_ADDR( 7,20,0,1 ); /* routed via a different interface */ + uint const neigh1_ip4_addr = FD_IP4_ADDR( 192,168,1,11 ); /* missing a neighbor table entry */ + uint const neigh2_ip4_addr = FD_IP4_ADDR( 192,168,1,12 ); /* can send packets via this guy */ + uint const gw_ip4_addr = FD_IP4_ADDR( 192,168,1,1 ); /* gateway */ + tile->r.default_address = public_ip4_addr; + + /* Basic routing tables */ + fd_fib4_t * fib_local = fd_fib4_join( fib4_local_mem ); FD_TEST( fib_local ); + fd_fib4_t * fib_main = fd_fib4_join( fib4_main_mem ); FD_TEST( fib_main ); + *fd_fib4_append( fib_local, FD_IP4_ADDR( 127,0,0,1 ), 32, 0U ) = (fd_fib4_hop_t) { + .if_idx = IF_IDX_LO, + .ip4_src = FD_IP4_ADDR( 127,0,0,1 ), + .rtype = FD_FIB4_RTYPE_LOCAL + }; + *fd_fib4_append( fib_main, FD_IP4_ADDR( 0,0,0,0 ), 0, 0U ) = (fd_fib4_hop_t) { + .if_idx = IF_IDX_ETH0, + .rtype = FD_FIB4_RTYPE_UNICAST, + .ip4_gw = gw_ip4_addr + }; + *fd_fib4_append( fib_main, banned_ip4_addr, 32, 0U ) = (fd_fib4_hop_t) { + .if_idx = IF_IDX_ETH0, + .rtype = FD_FIB4_RTYPE_BLACKHOLE + }; + *fd_fib4_append( fib_main, path2_ip4_addr, 32, 0U ) = (fd_fib4_hop_t) { + .if_idx = IF_IDX_ETH1, + .rtype = FD_FIB4_RTYPE_UNICAST + }; + *fd_fib4_append( fib_main, FD_IP4_ADDR( 192,168,1,0 ), 24, 0U ) = (fd_fib4_hop_t) { + .if_idx = IF_IDX_ETH0, + .rtype = FD_FIB4_RTYPE_UNICAST, + .ip4_src = site_ip4_addr + }; + + /* Neighbor table */ + add_neighbor( neigh4_hmap, neigh2_ip4_addr, 0x01,0x23,0x45,0x67,0x89,0xab ); + add_neighbor( neigh4_hmap, gw_ip4_addr, 0xff,0x23,0x45,0x67,0x89,0xab ); + + /* Stem publish context for RX */ + ulong stem_seq[1] = {0}; + ulong cr_avail = ULONG_MAX; + fd_stem_context_t stem[1] = {{ + .mcaches = &rx_link->mcache, + .seqs = stem_seq, + .depths = &link_depth, + .cr_avail = &cr_avail, + .cr_decrement_amount = 0UL + }}; + + /* Attach links to tile */ + fd_topob_tile_out( topo, "ibeth", 0UL, "net_shred", 0UL ); + fd_topob_tile_in( topo, "ibeth", 0UL, "wksp", "shred_net", 0UL, 0, 1 ); + + /* Initialize tile state (assigns frames) */ + rxq_assign_all( tile, topo, topo_tile ); + unprivileged_init( topo, topo_tile ); + FD_TEST( fd_ibv_recv_wr_q_cnt( mock->rx_q )==rxq_depth ); + + /* Allocate bit set tracking frames */ + ulong const chunk_max = fd_ulong_max( tile->rx_chunk1, tile->tx_chunk1 ); + ulong const frame_max = chunk_to_frame_idx( chunk_max ); + void * frame_track_mem = fd_wksp_alloc_laddr( wksp, frame_track_align(), frame_track_footprint( frame_max ), 1UL ); + frame_track_t * frame_track = frame_track_join( frame_track_new( frame_track_mem, frame_max ) ); + FD_TEST( frame_track ); + + /* Verify initial assignment */ + verify_balances( tile, stem, mock, frame_track ); + FD_TEST( fd_ibv_recv_wr_q_cnt( mock->rx_q )==rxq_depth ); + FD_TEST( fd_ibv_send_wr_q_cnt( mock->tx_q )==0UL ); + FD_TEST( fd_ibv_wc_q_cnt ( mock->wc_q )==0UL ); + FD_TEST( tile->rx_pending_rem==FD_IBETH_PENDING_MAX ); + FD_TEST( stem_seq[0]==0UL ); + + /* Trickle a few failed CQEs (should fill pending batch) */ + for( ulong i=1UL; irx_q )==rxq_depth-FD_IBETH_PENDING_MAX+1 ); + FD_TEST( fd_ibv_wc_q_cnt( mock->wc_q )==FD_IBETH_PENDING_MAX-1 ); + verify_balances( tile, stem, mock, frame_track ); + int poll_in = 1; + int charge_busy = 0; + after_credit( tile, stem, &poll_in, &charge_busy ); + FD_TEST( charge_busy==1 ); + FD_TEST( tile->rx_pending_rem==1 ); + verify_balances( tile, stem, mock, frame_track ); + + /* No op */ + charge_busy = 0; + after_credit( tile, stem, &poll_in, &charge_busy ); + FD_TEST( charge_busy==0 ); + + /* Flush pending batch */ + rx_complete_one( mock, IBV_WC_GENERAL_ERR, 0UL ); /* flushes batch */ + after_credit( tile, stem, &poll_in, &charge_busy ); + FD_TEST( tile->rx_pending_rem==FD_IBETH_PENDING_MAX ); + FD_TEST( fd_ibv_recv_wr_q_cnt( mock->rx_q )==rxq_depth ); + FD_TEST( fd_ibv_wc_q_cnt ( mock->wc_q )==0UL ); + FD_TEST( stem_seq[0]==0UL ); + + /* Poll a couple times, empty CQ */ + for( ulong i=0UL; i<1024UL; i++ ) { + int poll_in = 1; + int charge_busy = 0; + after_credit( tile, stem, &poll_in, &charge_busy ); + FD_TEST( !charge_busy ); + } + + /* RX packet undersz */ + ulong rx_seq = 0UL; + FD_TEST( fd_seq_ne( fd_frag_meta_seq_query( rx_link->mcache+rx_seq ), rx_seq ) ); + rx_complete_one( mock, IBV_WC_SUCCESS, 0UL ); + after_credit( tile, stem, &poll_in, &charge_busy ); + FD_TEST( fd_seq_ne( fd_frag_meta_seq_query( rx_link->mcache+rx_seq ), rx_seq ) ); + + /* RX packet valid */ + struct { + fd_eth_hdr_t eth; + fd_ip4_hdr_t ip4; + fd_udp_hdr_t udp; + } const rx_pkt_templ = { + .eth = { + .net_type = fd_ushort_bswap( FD_ETH_HDR_TYPE_IP ), + }, + .ip4 = { + .verihl = FD_IP4_VERIHL( 4, 5 ), + .protocol = FD_IP4_HDR_PROTOCOL_UDP, + .net_tot_len = fd_ushort_bswap( 28 ) + }, + .udp = { + .net_len = fd_ushort_bswap( 8 ), + .net_dport = fd_ushort_bswap( SHRED_PORT ) + } + }; + FD_TEST( fd_seq_ne( fd_frag_meta_seq_query( rx_link->mcache+rx_seq ), rx_seq ) ); + ulong rx_chunk = rx_complete_one( mock, IBV_WC_SUCCESS, sizeof(rx_pkt_templ) ); + uchar * rx_packet = fd_chunk_to_laddr( tile->umem_base, rx_chunk ); + fd_memcpy( rx_packet, &rx_pkt_templ, sizeof(rx_pkt_templ) ); + after_credit( tile, stem, &poll_in, &charge_busy ); + verify_balances( tile, stem, mock, frame_track ); + FD_TEST( fd_seq_eq( fd_frag_meta_seq_query( rx_link->mcache+rx_seq ), rx_seq ) ); + rx_seq++; + + /* RX packet with different dst port */ + FD_TEST( fd_seq_ne( fd_frag_meta_seq_query( rx_link->mcache+rx_seq ), rx_seq ) ); + rx_chunk = rx_complete_one( mock, IBV_WC_SUCCESS, sizeof(rx_pkt_templ) ); + rx_packet = fd_chunk_to_laddr( tile->umem_base, rx_chunk ); + fd_memcpy( rx_packet, &rx_pkt_templ, sizeof(rx_pkt_templ) ); + FD_STORE( ushort, rx_packet+offsetof( __typeof__(rx_pkt_templ), udp.net_dport ), + fd_ushort_bswap( 9999 ) ); + after_credit( tile, stem, &poll_in, &charge_busy ); + verify_balances( tile, stem, mock, frame_track ); + FD_TEST( fd_seq_ne( fd_frag_meta_seq_query( rx_link->mcache+rx_seq ), rx_seq ) ); + + /* RX packet with unsupported IP version */ + FD_TEST( fd_seq_ne( fd_frag_meta_seq_query( rx_link->mcache+rx_seq ), rx_seq ) ); + rx_chunk = rx_complete_one( mock, IBV_WC_SUCCESS, sizeof(rx_pkt_templ) ); + rx_packet = fd_chunk_to_laddr( tile->umem_base, rx_chunk ); + fd_memcpy( rx_packet, &rx_pkt_templ, sizeof(rx_pkt_templ) ); + FD_STORE( uchar, rx_packet+offsetof( __typeof__(rx_pkt_templ), ip4.verihl ), + FD_IP4_VERIHL( 6,5 ) ); + after_credit( tile, stem, &poll_in, &charge_busy ); + verify_balances( tile, stem, mock, frame_track ); + FD_TEST( fd_seq_ne( fd_frag_meta_seq_query( rx_link->mcache+rx_seq ), rx_seq ) ); + + /* RX packet with invalid Ethertype */ + FD_TEST( fd_seq_ne( fd_frag_meta_seq_query( rx_link->mcache+rx_seq ), rx_seq ) ); + rx_packet = fd_chunk_to_laddr( tile->umem_base, rx_complete_one( mock, IBV_WC_SUCCESS, 64UL ) ); + fd_memset( rx_packet, 0, FD_NET_MTU ); + fd_eth_hdr_t eth_hdr = { .net_type = fd_ushort_bswap( FD_ETH_HDR_TYPE_ARP ) }; + FD_STORE( fd_eth_hdr_t, rx_packet, eth_hdr ); + after_credit( tile, stem, &poll_in, &charge_busy ); + verify_balances( tile, stem, mock, frame_track ); + FD_TEST( fd_seq_ne( fd_frag_meta_seq_query( rx_link->mcache+rx_seq ), rx_seq ) ); + + ulong const tx_chunk0 = fd_dcache_compact_chunk0( wksp, tx_link->dcache ); + ulong const tx_wmark = fd_dcache_compact_wmark( wksp, tx_link->dcache, FD_NET_MTU ); + ulong tx_seq = 0UL; + ulong tx_chunk = tx_chunk0; + + /* TX packet with invalid sig */ + FD_TEST( 1==before_frag( tile, 0UL, tx_seq, + fd_disco_netmux_sig( 0U, 0, 0U, DST_PROTO_SHRED, 0UL ) ) ); + + /* TX packet with non-routable IP */ + FD_TEST( 1==before_frag( tile, 0UL, tx_seq, + fd_disco_netmux_sig( 0U, 0, banned_ip4_addr, DST_PROTO_OUTGOING, 0UL ) ) ); + + /* TX packet with loopback destination */ + FD_TEST( 1==before_frag( tile, 0UL, tx_seq, + fd_disco_netmux_sig( 0U, 0, FD_IP4_ADDR( 127,0,0,1 ), DST_PROTO_OUTGOING, 0UL ) ) ); + + /* TX packet targeting unsupported interface */ + FD_TEST( 1==before_frag( tile, 0UL, tx_seq, + fd_disco_netmux_sig( 0U, 0, path2_ip4_addr, DST_PROTO_OUTGOING, 0UL ) ) ); + + /* TX packet targeting unknown neighbor */ + FD_TEST( 1==before_frag( tile, 0UL, tx_seq, + fd_disco_netmux_sig( 0U, 0, neigh1_ip4_addr, DST_PROTO_OUTGOING, 0UL ) ) ); + verify_balances( tile, stem, mock, frame_track ); + + /* TX packet targeting resolved neighbor */ + memset( &tile->r.tx_op, 0, sizeof(tile->r.tx_op) ); + ulong tx_sig = fd_disco_netmux_sig( 0U, 0, neigh2_ip4_addr, DST_PROTO_OUTGOING, 0UL ); + FD_TEST( 0==before_frag( tile, 0UL, tx_seq, tx_sig ) ); + FD_TEST( tile->r.tx_op.if_idx==IF_IDX_ETH0 ); + FD_TEST( tile->r.tx_op.src_ip==site_ip4_addr ); + FD_TEST( 0==memcmp( tile->r.tx_op.mac_addrs+0, "\x01\x23\x45\x67\x89\xab", 6 ) ); + verify_balances( tile, stem, mock, frame_track ); + + /* TX packet targeting default gateway */ + memset( &tile->r.tx_op, 0, sizeof(tile->r.tx_op) ); + tx_sig = fd_disco_netmux_sig( 0U, 0, FD_IP4_ADDR( 1,1,1,1 ), DST_PROTO_OUTGOING, 0UL ); + FD_TEST( 0==before_frag( tile, 0UL, tx_seq, tx_sig ) ); + FD_TEST( tile->r.tx_op.if_idx==IF_IDX_ETH0 ); + FD_TEST( tile->r.tx_op.src_ip==public_ip4_addr ); + FD_TEST( 0==memcmp( tile->r.tx_op.mac_addrs+0, "\xff\x23\x45\x67\x89\xab", 6 ) ); + uchar * tx_packet = fd_chunk_to_laddr( wksp, tx_chunk ); + struct { + fd_eth_hdr_t eth; + fd_ip4_hdr_t ip4; + fd_udp_hdr_t udp; + uchar data[2]; + } const tx_pkt_templ = { + .eth = { + .net_type = fd_ushort_bswap( FD_ETH_HDR_TYPE_IP ), + }, + .ip4 = { + .verihl = FD_IP4_VERIHL( 4, 5 ), + .protocol = FD_IP4_HDR_PROTOCOL_UDP, + .net_tot_len = fd_ushort_bswap( 30 ), + .daddr = FD_IP4_ADDR( 1,1,1,1 ) + }, + .udp = { + .net_len = fd_ushort_bswap( 10 ), + .net_sport = fd_ushort_bswap( 1 ), + .net_dport = fd_ushort_bswap( 2 ) + }, + .data = { 0x11, 0x22 } + }; + fd_memcpy( tx_packet, &tx_pkt_templ, sizeof(tx_pkt_templ) ); + during_frag( tile, 0UL, tx_seq, tx_sig, tx_chunk, sizeof(tx_pkt_templ), 1UL ); + after_frag( tile, 0UL, tx_seq, tx_sig, sizeof(tx_pkt_templ), 0UL, 0UL, stem ); + verify_balances( tile, stem, mock, frame_track ); + FD_TEST( fd_ibv_send_wr_q_cnt( mock->tx_q )==1UL ); + struct ibv_send_wr tx_wr = fd_ibv_send_wr_q_pop_head( mock->tx_q ); + FD_TEST( tx_wr.wr_id >= tile->tx_chunk0 && tx_wr.wr_id < tile->tx_chunk1 ); + uchar * tx_frame = fd_chunk_to_laddr( tile->umem_base, tx_wr.wr_id ); + FD_TEST( 0==memcmp( tx_frame+0, "\xff\x23\x45\x67\x89\xab", 6 ) ); // eth.dst + FD_TEST( 0==memcmp( tx_frame+6, "\x00\x00\x00\x00\x00\x00", 6 ) ); // eth.src + FD_TEST( fd_ushort_bswap( FD_LOAD( ushort, tx_frame+12 ) )==FD_ETH_HDR_TYPE_IP ); // eth.net_type + FD_TEST( FD_LOAD( uchar, tx_frame+14 )==FD_IP4_VERIHL( 4, 5 ) ); // ip4.verihl + FD_TEST( FD_LOAD( uchar, tx_frame+23 )==FD_IP4_HDR_PROTOCOL_UDP ); // ip4.protocol + FD_TEST( FD_LOAD( uint, tx_frame+26 )==public_ip4_addr ); // ip4.saddr + FD_TEST( FD_LOAD( uint, tx_frame+30 )==FD_IP4_ADDR( 1,1,1,1 ) ); // ip4.daddr + FD_TEST( fd_ip4_hdr_check( tx_frame+14 )==0 ); + FD_TEST( tx_wr.num_sge==1 ); + FD_TEST( tx_wr.opcode==IBV_WR_SEND ); + FD_TEST( tx_wr.sg_list[0].addr==(ulong)tx_frame ); + FD_TEST( tx_wr.sg_list[0].length==sizeof(tx_pkt_templ) ); + FD_TEST( tx_wr.sg_list[0].lkey==tile->mr_lkey ); + fd_ibv_sge_p_ele_release( mock->sge_pool, (fd_ibv_mock_sge_t *)tx_wr.sg_list ); + tx_chunk = fd_dcache_compact_next( tx_chunk, sizeof(tx_pkt_templ), tx_chunk0, tx_wmark ); + tx_complete_one( mock, IBV_WC_SUCCESS, tx_wr.wr_id, sizeof(tx_pkt_templ) ); + verify_balances( tile, stem, mock, frame_track ); + charge_busy = 0; + after_credit( tile, stem, &poll_in, &charge_busy ); + verify_balances( tile, stem, mock, frame_track ); + + /* Clean up */ + fd_wksp_free_laddr( frame_track_delete( frame_track_leave( frame_track ) ) ); + fd_wksp_free_laddr( tile ); + fd_wksp_free_laddr( fd_ibverbs_mock_qp_delete( mock ) ); + fd_wksp_delete_anonymous( wksp ); + + FD_LOG_NOTICE(( "pass" )); + fd_halt(); + return 0; +} diff --git a/src/disco/net/xdp/fd_xdp_tile.c b/src/disco/net/xdp/fd_xdp_tile.c index 21852aa9ca1..1c64f4bb011 100644 --- a/src/disco/net/xdp/fd_xdp_tile.c +++ b/src/disco/net/xdp/fd_xdp_tile.c @@ -201,9 +201,7 @@ typedef struct { /* Ring tracking free packet buffers */ fd_net_free_ring_t free_tx; - uchar src_mac_addr[6]; uint default_address; - uint bind_address; ushort shred_listen_port; ushort quic_transaction_listen_port; @@ -600,9 +598,7 @@ net_tx_route( fd_net_ctx_t * ctx, return 1; } - if( FD_UNLIKELY( netdev->dev_type!=ARPHRD_ETHER ) ) return 0; // drop - - if( FD_UNLIKELY( if_idx!=ctx->if_virt ) ) { + if( FD_UNLIKELY( netdev->dev_type!=ARPHRD_ETHER || if_idx!=ctx->if_virt ) ) { ctx->metrics.tx_no_xdp_cnt++; return 0; } @@ -626,8 +622,8 @@ net_tx_route( fd_net_ctx_t * ctx, } ip4_src = fd_uint_if( !ip4_src, ctx->default_address, ip4_src ); ctx->tx_op.src_ip = ip4_src; - memcpy( ctx->tx_op.mac_addrs+0, neigh->mac_addr, 6 ); - memcpy( ctx->tx_op.mac_addrs+6, netdev->mac_addr, 6 ); + memcpy( ctx->tx_op.mac_addrs+0, neigh->mac_addr, 6 ); + memcpy( ctx->tx_op.mac_addrs+6, netdev->mac_addr, 6 ); return 1; } @@ -1215,17 +1211,11 @@ net_xsk_bootstrap( fd_net_ctx_t * ctx, static void interface_addrs( const char * interface, - uchar * mac, uint * ip4_addr ) { int fd = socket( AF_INET, SOCK_DGRAM, 0 ); struct ifreq ifr; ifr.ifr_addr.sa_family = AF_INET; - strncpy( ifr.ifr_name, interface, IFNAMSIZ ); - if( FD_UNLIKELY( ioctl( fd, SIOCGIFHWADDR, &ifr ) ) ) - FD_LOG_ERR(( "could not get MAC address of interface `%s`: (%i-%s)", interface, errno, fd_io_strerror( errno ) )); - fd_memcpy( mac, ifr.ifr_hwaddr.sa_data, 6 ); - if( FD_UNLIKELY( ioctl( fd, SIOCGIFADDR, &ifr ) ) ) FD_LOG_ERR(( "could not get IP address of interface `%s`: (%i-%s)", interface, errno, fd_io_strerror( errno ) )); *ip4_addr = ((struct sockaddr_in *)fd_type_pun( &ifr.ifr_addr ))->sin_addr.s_addr; @@ -1265,7 +1255,7 @@ privileged_init( fd_topo_t * topo, fd_memset( ctx, 0, sizeof(fd_net_ctx_t) ); - interface_addrs( tile->xdp.if_virt, ctx->src_mac_addr, &ctx->default_address ); + interface_addrs( tile->xdp.if_virt, &ctx->default_address ); ctx->if_virt = if_nametoindex( tile->xdp.if_virt ); FD_TEST( ctx->if_virt ); /* Load up dcache containing UMEM */ diff --git a/src/disco/stem/fd_stem.h b/src/disco/stem/fd_stem.h index d8e9777fab8..460299c8094 100644 --- a/src/disco/stem/fd_stem.h +++ b/src/disco/stem/fd_stem.h @@ -6,9 +6,9 @@ #define FD_STEM_SCRATCH_ALIGN (128UL) struct fd_stem_context { - fd_frag_meta_t ** mcaches; - ulong * seqs; - ulong * depths; + fd_frag_meta_t ** mcaches; + ulong * seqs; + ulong const * restrict depths; ulong * cr_avail; ulong * min_cr_avail; diff --git a/src/disco/topo/fd_topo.h b/src/disco/topo/fd_topo.h index f82be68a62c..297bf1a50a5 100644 --- a/src/disco/topo/fd_topo.h +++ b/src/disco/topo/fd_topo.h @@ -189,6 +189,21 @@ struct fd_topo_tile { int so_rcvbuf; } sock; + struct { + fd_topo_net_tile_t net; + + ulong umem_dcache_obj_id; /* dcache for XDP UMEM frames */ + char if_name[ 16 ]; + uint rx_queue_size; + uint tx_queue_size; + + ulong netdev_dbl_buf_obj_id; /* dbl_buf containing netdev_tbl */ + ulong fib4_main_obj_id; /* fib4 containing main route table */ + ulong fib4_local_obj_id; /* fib4 containing local route table */ + ulong neigh4_obj_id; /* neigh4 hash map header */ + ulong neigh4_ele_obj_id; /* neigh4 hash map slots */ + } ibeth; + struct { ulong netdev_dbl_buf_obj_id; /* dbl_buf containing netdev_tbl */ ulong fib4_main_obj_id; /* fib4 containing main route table */ diff --git a/src/disco/topo/fd_topob.c b/src/disco/topo/fd_topob.c index 956a802c430..f4376c43075 100644 --- a/src/disco/topo/fd_topob.c +++ b/src/disco/topo/fd_topob.c @@ -356,6 +356,7 @@ fd_topob_auto_layout( fd_topo_t * topo, "benchs", "net", "sock", + "ibeth", "quic", "bundle", "verify", diff --git a/src/waltz/ibverbs/Local.mk b/src/waltz/ibverbs/Local.mk new file mode 100644 index 00000000000..af7e6cd2355 --- /dev/null +++ b/src/waltz/ibverbs/Local.mk @@ -0,0 +1,4 @@ +ifdef FD_HAS_IBVERBS +$(call add-hdrs,fd_ibverbs_mock.h fd_ibverbs_mock_ds.h) +$(call add-objs,fd_ibverbs_mock,fd_waltz) +endif diff --git a/src/waltz/ibverbs/fd_ibverbs_mock.c b/src/waltz/ibverbs/fd_ibverbs_mock.c new file mode 100644 index 00000000000..1fc209bafde --- /dev/null +++ b/src/waltz/ibverbs/fd_ibverbs_mock.c @@ -0,0 +1,319 @@ +#include "fd_ibverbs_mock.h" +#include "fd_ibverbs_mock_ds.h" +#include "../../util/log/fd_log.h" + +FD_FN_CONST ulong +fd_ibverbs_mock_qp_align( void ) { + return alignof(fd_ibverbs_mock_qp_t); +} + +FD_FN_CONST static ulong +fd_ibverbs_mock_qp_sge_pool_max( + ulong const rx_depth, + ulong const tx_depth, + ulong const sge_max +) { + ulong desc_max; + ulong sge_pool_max; + if( FD_UNLIKELY( __builtin_uaddl_overflow( rx_depth, tx_depth, &desc_max ) ) ) { + return 0UL; + } + if( FD_UNLIKELY( __builtin_umull_overflow( desc_max, sge_max, &sge_pool_max ) ) ) { + return 0UL; + } + return sge_pool_max; +} + +FD_FN_CONST ulong +fd_ibverbs_mock_qp_footprint( ulong const rx_depth, + ulong const tx_depth, + ulong const cq_depth, + ulong const sge_max ) { + ulong const sge_pool_max = fd_ibverbs_mock_qp_sge_pool_max( rx_depth, tx_depth, sge_max ); + if( FD_UNLIKELY( !sge_pool_max ) ) return 0UL; + + if( FD_UNLIKELY( rx_depth>UINT_MAX || + tx_depth>UINT_MAX || + cq_depth>UINT_MAX || + sge_pool_max>UINT_MAX ) ) { + return 0UL; /* overflow */ + } + + ulong l = FD_LAYOUT_INIT; + l = FD_LAYOUT_APPEND( l, alignof(fd_ibverbs_mock_qp_t), sizeof(fd_ibverbs_mock_qp_t) ); + l = FD_LAYOUT_APPEND( l, fd_ibv_recv_wr_q_align(), fd_ibv_recv_wr_q_footprint( rx_depth ) ); + l = FD_LAYOUT_APPEND( l, fd_ibv_send_wr_q_align(), fd_ibv_send_wr_q_footprint( tx_depth ) ); + l = FD_LAYOUT_APPEND( l, fd_ibv_wc_q_align(), fd_ibv_wc_q_footprint ( cq_depth ) ); + l = FD_LAYOUT_APPEND( l, fd_ibv_sge_p_align(), fd_ibv_sge_p_footprint ( sge_pool_max ) ); + return FD_LAYOUT_FINI( l, fd_ibverbs_mock_qp_align() ); +} + +fd_ibverbs_mock_qp_t * +fd_ibverbs_mock_qp_new( void * const mem, + ulong const rx_depth, + ulong const tx_depth, + ulong const cq_depth, + ulong const sge_max ) { + ulong const sge_pool_max = fd_ibverbs_mock_qp_sge_pool_max( rx_depth, tx_depth, sge_max ); + ulong const footprint = fd_ibverbs_mock_qp_footprint( rx_depth, tx_depth, cq_depth, sge_max ); + if( FD_UNLIKELY( !footprint ) ) { + FD_LOG_WARNING(( "invalid config for ibverbs_mock_qp" )); + return NULL; + } + if( FD_UNLIKELY( !mem ) ) { + FD_LOG_WARNING(( "NULL mem" )); + return NULL; + } + if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_ibverbs_mock_qp_align() ) ) ) { + FD_LOG_WARNING(( "misaligned mem" )); + return NULL; + } + + FD_SCRATCH_ALLOC_INIT( l, mem ); + void * mock_mem = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_ibverbs_mock_qp_t), sizeof(fd_ibverbs_mock_qp_t) ); + void * rxq_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_ibv_recv_wr_q_align(), fd_ibv_recv_wr_q_footprint( rx_depth ) ); + void * txq_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_ibv_send_wr_q_align(), fd_ibv_send_wr_q_footprint( tx_depth ) ); + void * wcq_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_ibv_wc_q_align(), fd_ibv_wc_q_footprint ( cq_depth ) ); + void * sge_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_ibv_sge_p_align(), fd_ibv_sge_p_footprint ( sge_pool_max ) ); + ulong end = FD_SCRATCH_ALLOC_FINI( l, fd_ibverbs_mock_qp_align() ); + if( FD_UNLIKELY( end-(ulong)mem!=footprint ) ) { + FD_LOG_CRIT(( "memory corruption" )); + return NULL; + } + + fd_ibverbs_mock_qp_t * mock = mock_mem; + memset( mock, 0, sizeof(fd_ibverbs_mock_qp_t) ); + mock->sge_max = (uint)sge_max; + + mock->rx_q = fd_ibv_recv_wr_q_join( fd_ibv_recv_wr_q_new( rxq_mem, rx_depth ) ); + mock->tx_q = fd_ibv_send_wr_q_join( fd_ibv_send_wr_q_new( txq_mem, tx_depth ) ); + mock->wc_q = fd_ibv_wc_q_join ( fd_ibv_wc_q_new ( wcq_mem, cq_depth ) ); + if( FD_UNLIKELY( !mock->rx_q || !mock->tx_q || !mock->wc_q ) ) { + FD_LOG_WARNING(( "fd_deque_dynamic_new failed" )); + return NULL; + } + mock->sge_pool = fd_ibv_sge_p_join( fd_ibv_sge_p_new( sge_mem, sge_pool_max ) ); + if( FD_UNLIKELY( !mock->sge_pool ) ) { + FD_LOG_WARNING(( "fd_pool_new failed" )); + return NULL; + } + + struct ibv_context * ctx = mock->ctx; + ctx->ops.poll_cq = fd_ibv_mock_poll_cq; + ctx->ops.post_send = fd_ibv_mock_post_send; + ctx->ops.post_recv = fd_ibv_mock_post_recv; + + struct ibv_cq * cq = mock->cq; + cq->cq_context = mock; + cq->context = ctx; + + struct ibv_cq_ex * cq_ex = mock->cq_ex; + cq_ex->cq_context = mock; + cq_ex->context = ctx; + cq_ex->cqe = (int)cq_depth; + cq_ex->start_poll = fd_ibv_mock_start_poll; + cq_ex->next_poll = fd_ibv_mock_next_poll; + cq_ex->end_poll = fd_ibv_mock_end_poll; + cq_ex->read_opcode = fd_ibv_mock_wc_read_opcode; + cq_ex->read_byte_len = fd_ibv_mock_wc_read_byte_len; + + struct ibv_qp * qp = mock->qp; + qp->qp_context = mock; + qp->context = ctx; + qp->send_cq = cq; + qp->recv_cq = cq; + qp->qp_type = IBV_QPT_RAW_PACKET; + + FD_COMPILER_MFENCE(); + mock->magic = FD_IBVERBS_MOCK_QP_MAGIC; + FD_COMPILER_MFENCE(); + return mock; +} + +void * +fd_ibverbs_mock_qp_delete( fd_ibverbs_mock_qp_t * qp ) { + + if( FD_UNLIKELY( !qp ) ) { + FD_LOG_WARNING(( "NULL qp" )); + return NULL; + } + if( FD_UNLIKELY( qp->magic!=FD_IBVERBS_MOCK_QP_MAGIC ) ) { + FD_LOG_WARNING(( "invalid magic" )); + return NULL; + } + + FD_COMPILER_MFENCE(); + qp->magic = 0UL; + FD_COMPILER_MFENCE(); + + fd_ibv_recv_wr_q_delete( fd_ibv_recv_wr_q_leave( qp->rx_q ) ); + fd_ibv_send_wr_q_delete( fd_ibv_send_wr_q_leave( qp->tx_q ) ); + fd_ibv_wc_q_delete ( fd_ibv_wc_q_leave ( qp->wc_q ) ); + fd_ibv_sge_p_delete ( fd_ibv_sge_p_leave ( qp->sge_pool ) ); + memset( qp, 0, sizeof(fd_ibverbs_mock_qp_t) ); + + return qp; +} + +/* FIXME Double check the errnos returned below against real mlx5 behavior. + For example, if the NIC can't accept any more WRs, does it throw ENOMEM + or ENOSPC? */ + +#define IBV_INJECT_ERR( mock ) \ + do { \ + if( FD_UNLIKELY( (mock)->err_delay ) ) { \ + (mock)->err_delay--; \ + } else if( FD_UNLIKELY( (mock)->err ) ) { \ + return (mock)->err; \ + } \ + } while(0) + +int +fd_ibv_mock_poll_cq( struct ibv_cq * cq, + int num_entries, + struct ibv_wc * wc ) { + fd_ibverbs_mock_qp_t * mock = cq->cq_context; + FD_TEST( mock->magic==FD_IBVERBS_MOCK_QP_MAGIC ); + if( FD_UNLIKELY( num_entries<0 ) ) return EINVAL; + + ulong complete_cnt = 0UL; + for(;;) { + if( FD_UNLIKELY( fd_ibv_wc_q_empty( mock->wc_q ) ) ) break; + if( FD_UNLIKELY( num_entries<=0 ) ) break; + if( FD_UNLIKELY( (!mock->err_delay) & (!!mock->err) ) ) { + if( FD_LIKELY( !complete_cnt ) ) return -mock->err; + break; + } + *wc = *fd_ibv_wc_q_pop_head_nocopy( mock->wc_q ); + wc++; + num_entries--; + complete_cnt++; + } + return (int)complete_cnt; +} + +int +fd_ibv_mock_post_send( struct ibv_qp * qp, + struct ibv_send_wr * wr, + struct ibv_send_wr ** bad_wr ) { + fd_ibverbs_mock_qp_t * mock = qp->qp_context; + FD_TEST( mock->magic==FD_IBVERBS_MOCK_QP_MAGIC ); + while( wr ) { + IBV_INJECT_ERR( mock ); + ulong const sge_cnt = (ulong)wr->num_sge; + if( FD_UNLIKELY( fd_ibv_send_wr_q_full( mock->tx_q ) || + fd_ibv_sge_p_free( mock->sge_pool )tx_q ); + *next = *wr; + /* Copy SGEs */ + next->next = NULL; + next->sg_list = NULL; + for( long j=((long)sge_cnt-1L); j>=0L; j-- ) { + fd_ibv_mock_sge_t * sge = fd_ibv_sge_p_ele_acquire( mock->sge_pool ); + FD_TEST( sge ); /* never fails */ + sge->sge = wr->sg_list[j]; + sge->next = next->sg_list; + next->sg_list = &sge->sge; + } + /* Next WR */ + wr = wr->next; + } + return 0; +} + +int +fd_ibv_mock_post_recv( struct ibv_qp * qp, + struct ibv_recv_wr * wr, + struct ibv_recv_wr ** bad_wr ) { + fd_ibverbs_mock_qp_t * mock = qp->qp_context; + FD_TEST( mock->magic==FD_IBVERBS_MOCK_QP_MAGIC ); + while( wr ) { + FD_TEST( wr->num_sge ); + IBV_INJECT_ERR( mock ); + ulong const sge_cnt = (ulong)wr->num_sge; + if( FD_UNLIKELY( fd_ibv_recv_wr_q_full( mock->rx_q ) || + fd_ibv_sge_p_free( mock->sge_pool )rx_q ); + *next = *wr; + /* Copy SGEs */ + next->next = NULL; + next->sg_list = NULL; + for( long j=((long)sge_cnt-1L); j>=0L; j-- ) { + fd_ibv_mock_sge_t * sge = fd_ibv_sge_p_ele_acquire( mock->sge_pool ); + FD_TEST( sge ); /* never fails */ + sge->sge = wr->sg_list[j]; + sge->next = next->sg_list; + next->sg_list = &sge->sge; + } + /* Next WR */ + wr = wr->next; + } + return 0; +} + +static void +fd_ibv_mock_cq_ex_pop( fd_ibverbs_mock_qp_t * mock, + struct ibv_cq_ex * cq_ex ) { + FD_TEST( !fd_ibv_wc_q_empty( mock->wc_q ) ); + struct ibv_wc * wc = fd_ibv_wc_q_pop_head_nocopy( mock->wc_q ); + cq_ex->wr_id = wc->wr_id; + cq_ex->status = wc->status; + mock->poll_opt.byte_len = wc->byte_len; + mock->poll_opt.opcode = wc->opcode; +} + +/* FIXME Add error injection for cq_ex polling */ + +int +fd_ibv_mock_start_poll( struct ibv_cq_ex * cq_ex, + struct ibv_poll_cq_attr * attr ) { + fd_ibverbs_mock_qp_t * mock = cq_ex->cq_context; + FD_TEST( mock->magic==FD_IBVERBS_MOCK_QP_MAGIC ); + FD_TEST( !mock->cq_ex_polling ); + FD_TEST( attr ); + if( FD_UNLIKELY( attr->comp_mask ) ) { + FD_LOG_ERR(( "Sorry, ibv_mock doesn't support ibv_start_poll attr comp_mask" )); + } + if( fd_ibv_wc_q_empty( mock->wc_q ) ) return ENOENT; + mock->cq_ex_polling = 1; + fd_ibv_mock_cq_ex_pop( mock, cq_ex ); + return 0; +} + +int +fd_ibv_mock_next_poll( struct ibv_cq_ex * cq_ex ) { + fd_ibverbs_mock_qp_t * mock = cq_ex->cq_context; + FD_TEST( mock->cq_ex_polling ); + if( fd_ibv_wc_q_empty( mock->wc_q ) ) return ENOENT; + fd_ibv_mock_cq_ex_pop( mock, cq_ex ); + return 0; +} + +void +fd_ibv_mock_end_poll( struct ibv_cq_ex * cq_ex ) { + fd_ibverbs_mock_qp_t * mock = cq_ex->cq_context; + FD_TEST( mock->cq_ex_polling ); + mock->cq_ex_polling = 0; +} + +enum ibv_wc_opcode +fd_ibv_mock_wc_read_opcode( struct ibv_cq_ex * cq_ex ) { + fd_ibverbs_mock_qp_t * mock = cq_ex->cq_context; + FD_TEST( mock->cq_ex_polling ); + return mock->poll_opt.opcode; +} + +uint32_t +fd_ibv_mock_wc_read_byte_len( struct ibv_cq_ex * cq_ex ) { + fd_ibverbs_mock_qp_t * mock = cq_ex->cq_context; + FD_TEST( mock->cq_ex_polling ); + return mock->poll_opt.byte_len; +} diff --git a/src/waltz/ibverbs/fd_ibverbs_mock.h b/src/waltz/ibverbs/fd_ibverbs_mock.h new file mode 100644 index 00000000000..b7cab3c9d6f --- /dev/null +++ b/src/waltz/ibverbs/fd_ibverbs_mock.h @@ -0,0 +1,164 @@ +#ifndef HEADER_fd_src_waltz_ibverbs_fd_ibverbs_mock_h +#define HEADER_fd_src_waltz_ibverbs_fd_ibverbs_mock_h + +/* fd_ibverbs_mock.h provides APIs for mocking ibverbs objects. */ + +#include +#include "../../util/fd_util_base.h" +#include "fd_ibverbs_mock_ds.h" + +#define FD_IBVERBS_MOCK_QP_MAGIC 0xde28091e733ec21fUL /* random */ + +/* fd_ibverbs_mock_qp_t allows test code ("tester") to exercise ibverbs + interactions of production code ("target"). Mock ibv_qp and ibv_cq + objects are provided to the target. Internally, ibverbs_mock_qp is a + dumb buffer that holds work requets and CQEs. Work requests are + provided by the target, and consumed by tester. CQEs are provided by + the tester, and consumed by the target. Basic error injection is + supported. + + The target interacts using the ibverbs API (currently only basic + IBV_QPT_RAW_PACKET support is provided). + + The tester directly accesses the underlying data structures provided by + fd_ibverbs_mock_ds.h. + + For an example, see src/disco/net/ibeth/test_ibeth_tile.c in the + Firedancer repo. */ + +struct __attribute__((aligned(16))) fd_ibverbs_mock_qp { + + ulong magic; /* ==FD_IBVERBS_MOCK_QP_MAGIC */ + uint sge_max; + + /* Verbs */ + + struct ibv_context ctx[1]; + struct ibv_qp qp[1]; + struct ibv_cq cq[1]; + struct ibv_cq_ex cq_ex[1]; + + /* Internal buffer */ + + struct ibv_recv_wr * rx_q; /* fd_deque_dynamic */ + struct ibv_send_wr * tx_q; /* fd_deque_dynamic */ + struct ibv_wc * wc_q; /* fd_deque_dynamic */ + fd_ibv_mock_sge_t * sge_pool; /* fd_pool */ + + /* Polling */ + + uint cq_ex_polling : 1; + struct { + uint byte_len; + enum ibv_wc_opcode opcode; + } poll_opt; + + /* Error injection */ + + uint err_delay; /* Suppress error while non-zero, decrements every op */ + int err; /* Inject this errno */ + +}; +typedef struct fd_ibverbs_mock_qp fd_ibverbs_mock_qp_t; + +FD_PROTOTYPES_BEGIN + +/* Constructors */ + +FD_FN_CONST ulong +fd_ibverbs_mock_qp_align( void ); + +FD_FN_CONST ulong +fd_ibverbs_mock_qp_footprint( ulong rx_depth, + ulong tx_depth, + ulong cq_depth, + ulong sge_max ); + +fd_ibverbs_mock_qp_t * +fd_ibverbs_mock_qp_new( void * mem, + ulong rx_depth, + ulong tx_depth, + ulong cq_depth, + ulong sge_max ); + +void * +fd_ibverbs_mock_qp_delete( fd_ibverbs_mock_qp_t * mock ); + +/* fd_ibverbs_mock_qp_get_context returns a pointer to the embedded + ibv_context. Mostly useless for now. */ + +static inline struct ibv_context * +fd_ibverbs_mock_qp_get_context( fd_ibverbs_mock_qp_t * mock ) { + return mock->ctx; +} + +/* fd_ibverbs_mock_qp_get_qp returns a pointer to the embedded ibv_qp. + Supports the following ibverbs methods: + - ibv_post_send + - ibv_post_recv */ + +static inline struct ibv_qp * +fd_ibverbs_mock_qp_get_qp( fd_ibverbs_mock_qp_t * mock ) { + return mock->qp; +} + +/* fd_ibverbs_mock_qp_get_cq returns a pointer to the embedded ibv_cq. + Supports the following ibverbs methods: + - ibv_poll_cq */ + +static inline struct ibv_cq * +fd_ibverbs_mock_qp_get_cq( fd_ibverbs_mock_qp_t * mock ) { + return mock->cq; +} + +/* fd_ibverbs_mock_qp_get_cq_ex returns a pointer to the embedded + ibv_cq_ex. Supports the following ibverbs methods: + - ibv_start_poll + - ibv_next_poll + - ibv_end_poll + - ibv_wc_read_byte_len + - ibv_wc_read_opcode */ + +static inline struct ibv_cq_ex * +fd_ibverbs_mock_qp_get_cq_ex( fd_ibverbs_mock_qp_t * mock ) { + return mock->cq_ex; +} + +/* Begin ibv_context_ops mocks */ + +int +fd_ibv_mock_poll_cq( struct ibv_cq * cq, + int num_entries, + struct ibv_wc * wc ); + +int +fd_ibv_mock_post_send( struct ibv_qp * qp, + struct ibv_send_wr * wr, + struct ibv_send_wr ** bad_wr ); + +int +fd_ibv_mock_post_recv( struct ibv_qp * qp, + struct ibv_recv_wr * wr, + struct ibv_recv_wr ** bad_wr ); + +int +fd_ibv_mock_start_poll( struct ibv_cq_ex * cq, + struct ibv_poll_cq_attr * attr ); + +int +fd_ibv_mock_next_poll( struct ibv_cq_ex * cq ); + +void +fd_ibv_mock_end_poll( struct ibv_cq_ex * cq ); + +enum ibv_wc_opcode +fd_ibv_mock_wc_read_opcode( struct ibv_cq_ex * cq ); + +uint32_t +fd_ibv_mock_wc_read_byte_len( struct ibv_cq_ex * cq ); + +/* End ibv_context_ops mocks */ + +FD_PROTOTYPES_END + +#endif /* HEADER_fd_src_waltz_ibverbs_fd_ibverbs_mock_h */ diff --git a/src/waltz/ibverbs/fd_ibverbs_mock_ds.h b/src/waltz/ibverbs/fd_ibverbs_mock_ds.h new file mode 100644 index 00000000000..7cfce6cc504 --- /dev/null +++ b/src/waltz/ibverbs/fd_ibverbs_mock_ds.h @@ -0,0 +1,39 @@ +#ifndef HEADER_fd_src_waltz_ibverbs_fd_ibverbs_mock_ds_h +#define HEADER_fd_src_waltz_ibverbs_fd_ibverbs_mock_ds_h + +/* fd_ibverbs_mock_ds.h provides the data structures powering + fd_ibverbs_mock. */ + +#include "fd_ibverbs_mock.h" + +/* Provide deques for buffering ibv_recv_wr, ibv_send_wr, and ibv_wc. */ + +#define DEQUE_NAME fd_ibv_recv_wr_q +#define DEQUE_T struct ibv_recv_wr +#include "../../util/tmpl/fd_deque_dynamic.c" + +#define DEQUE_NAME fd_ibv_send_wr_q +#define DEQUE_T struct ibv_send_wr +#include "../../util/tmpl/fd_deque_dynamic.c" + +#define DEQUE_NAME fd_ibv_wc_q +#define DEQUE_T struct ibv_wc +#include "../../util/tmpl/fd_deque_dynamic.c" + +/* Provide an object pool for scatter-gather entries. */ + +typedef struct fd_ibv_mock_sge fd_ibv_mock_sge_t; +struct fd_ibv_mock_sge { + struct ibv_sge sge; + union { + ulong pool_next; + void * next; + }; +}; + +#define POOL_NAME fd_ibv_sge_p +#define POOL_T fd_ibv_mock_sge_t +#define POOL_NEXT pool_next +#include "../../util/tmpl/fd_pool.c" + +#endif /* HEADER_fd_src_waltz_ibverbs_fd_ibverbs_mock_ds_h */