diff --git a/book/api/metrics-generated.md b/book/api/metrics-generated.md index 53319efe4f..14035083fc 100644 --- a/book/api/metrics-generated.md +++ b/book/api/metrics-generated.md @@ -494,17 +494,6 @@ -## Storei Tile - -
- -| Metric | Type | Description | -|--------|------|-------------| -| storei_​first_​turbine_​slot | gauge | | -| storei_​current_​turbine_​slot | gauge | | - -
- ## Gossip Tile
@@ -730,6 +719,9 @@ | Metric | Type | Description | |--------|------|-------------| +| repair_​first_​turbine_​slot | gauge | | +| repair_​latest_​turbine_​slot | gauge | | +| repair_​latest_​repair_​slot | gauge | | | repair_​recv_​clnt_​pkt | counter | Now many client packets have we received | | repair_​recv_​serv_​pkt | counter | How many server packets have we received | | repair_​recv_​serv_​corrupt_​pkt | counter | How many corrupt server packets have we received | diff --git a/book/api/websocket.md b/book/api/websocket.md index 09b3c8e46e..141ac7e663 100644 --- a/book/api/websocket.md +++ b/book/api/websocket.md @@ -130,6 +130,35 @@ will be republished to make sure it reflects the new fork choice. ### summary A set of high level informational fields about the validator. +#### `summary.client` +| frequency | type | example | +|-------------|----------------|-----------------| +| *Once* | `string` | "frankendancer" | + +The client of the running validator. This can be either `frankendancer` +or `firedancer`. Certain websocket messages are exclusive to one client +or the other. Eventually, firedancer will replace frankendancer +permanently and those messages may be phased out. + +##### frankendancer messages +- summary.startup_progress + +##### firedancer messages +- summary.boot_progress +- gossip.network_stats + +::: details Example + +```json +{ + "topic": "summary", + "key": "client", + "value": "frankendancer" +} +``` + +::: + #### `summary.ping` | frequency | type | example | |-------------|----------------|---------| @@ -325,6 +354,120 @@ The phases are, | waiting_for_supermajority_slot | `number\|null` | If the phase is at least `waiting_for_supermajority` or later, and we are stopped waiting for supermajority, this is the slot that we are stopped at. Otherwise it is `null` | | waiting_for_supermajority_stake_percent | `number\|null` | If the phase is at least `waiting_for_supermajority` or later, and we are stopped waiting for supermajority, this is the percentage of stake that is currently online and gossiping to our node. Otherwise it is `null`. The validator will proceed with starting up once the stake percent reaches 80 | +#### `summary.boot_progress` +| frequency | type | example | +|-----------------|-------------------|---------| +| *Once* + *Live* | `BootProgress` | below | + +Information about the validators progress in starting up. There are +various stages of starting up which the validator goes through in order +before it is ready. + +The phases are, + +| Phase | Description | +|------------------------------------|-------------| +| joining_gossip | The validator has just booted and has started looking for RPC services to download snapshots from | +| loading_full_snapshot | The validator has found an RPC peer to download a full snapshot. The snapshot is being downloaded, decompressed, and inserted into the client database | +| loading_incremental_snapshot | The validator has found an RPC peer to download a incremental snapshot. The snapshot is being downloaded, decompressed, and inserted into the client database | +| catching_up | The validator is replaying / repairing an missing slots up to the move tip of the chain | +| running | The validator is fully booted and running normally | + +::: details Example + +```json +{ + "topic": "summary", + "key": "boot_progress", + "value": { + "phase": "loading_full_snapshot", + "total_elapsed": "12123", + "joining_gossip_elapsed": "9321", + "loading_full_snapshot_slot": "291059318", + "loading_full_snapshot_peer": "145.40.125.99:8899", + "loading_full_snapshot_peer_identity": "Fe4StcZSQ228dKK2hni7aCP7ZprNhj8QKWzFe5usGFYF", + "loading_full_snapshot_elapsed": 3213, + "loading_full_snapshot_total_bytes": 123456, + "loading_full_snapshot_current_bytes": 12345, + "loading_full_snapshot_read_bytes": 12345, + "loading_full_snapshot_read_throughput": 17193374.123, + "loading_full_snapshot_read_remaining": 54321, + "loading_full_snapshot_read_elapsed": 12345, + "loading_full_snapshot_read_path": "https://192.168.0.1/snapshot-full-123456.tar.zst", + "loading_full_snapshot_decompress_bytes": 12345, + "loading_full_snapshot_decompress_throughput": 17193374.123, + "loading_full_snapshot_decompress_remaining": 54321, + "loading_full_snapshot_decompress_elapsed": 12345, + "loading_full_snapshot_insert_bytes": 12345, + "loading_full_snapshot_insert_throughput": 17193374.123, + "loading_full_snapshot_insert_remaining": 54321, + "loading_full_snapshot_insert_elapsed": 12345, + "loading_full_snapshot_insert_accounts_throughput": 1300000, + "loading_full_snapshot_insert_accounts_current": 412000000, + "loading_incr_snapshot_slot": null, + "loading_incr_snapshot_peer": null, + "loading_incr_snapshot_peer_identity": null, + "loading_incr_snapshot_elapsed": null, + "loading_incr_snapshot_total_bytes": null, + "loading_incr_snapshot_current_bytes": null, + "loading_incr_snapshot_read_bytes": null, + "loading_incr_snapshot_read_throughput": null, + "loading_incr_snapshot_read_remaining": null, + "loading_incr_snapshot_read_elapsed": null, + "loading_incr_snapshot_read_path": null, + "loading_incr_snapshot_decompress_bytes": null, + "loading_incr_snapshot_decompress_throughput": null, + "loading_incr_snapshot_decompress_remaining": null, + "loading_incr_snapshot_decompress_elapsed": null, + "loading_incr_snapshot_insert_bytes": null, + "loading_incr_snapshot_insert_throughput": null, + "loading_incr_snapshot_insert_remaining": null, + "loading_incr_snapshot_insert_elapsed": null, + "loading_incr_snapshot_insert_accounts_throughput": null, + "loading_incr_snapshot_insert_accounts_current": null, + "catching_up_elapsed": null, + "catching_up_first_turbine_slot": null, + "catching_up_max_turbine_slot": null, + "catching_up_latest_turbine_slot": null, + "catching_up_latest_repair_slot": null, + "catching_up_latest_replay_slot": null, + } +} +``` + +::: + +**`BootProgress`** +| Field | Type | Description | +|-------------------------------------------------------------------|-----------------|-------------| +| phase | `string` | One of `joining_gossip`, `loading_full_snapshot`, `loading_incremental_snapshot`, `catching_up`, or `running`. This indicates the current phase of the boot process. | +| total_elapsed | `number` | The time, in seconds, that has elapsed since the boot process began. | +| joining_gossip_elapsed | `number\|null` | If the phase is `joining_gossip`, this is the duration, in seconds, spent joining the gossip network. Otherwise, `null`. | +| loading_{full|incremental}_snapshot_reset_cnt | `number\|null` | If the phase is at least `loading_{full|incremental}_snapshot` or later, this is the number of times the load for the snapshot failed and the phase was restarted from scratch. A snapshot load may fail due to an unreliable or underperforming network connection. Otherwise, `null`. | +| loading_{full|incremental}_snapshot_slot | `number\|null` | If the phase is at least `loading_{full|incremental}_snapshot` or later, this is the slot of the snapshot being loaded. Otherwise, `null`. | +| loading_{full|incremental}_snapshot_peer | `string\|null` | If the phase is at least `loading_{full|incremental}_snapshot` and the snapshot is being downloaded over HTTP, this is the peer RPC IP address + port from which the snapshot is being loaded. Otherwise, `null`. | +| loading_{full|incremental}_snapshot_total_bytes | `number\|null` | If the phase is at least `loading_{full|incremental}_snapshot`, this is the (compressed) total size of the snapshot being loaded, in bytes. Otherwise, `null`. | +| loading_{full|incremental}_snapshot_read_elapsed | `number\|null` | If the phase is at least `loading_{full|incremental}_snapshot`, this is the elapsed time, in seconds, spent reading (either downloading or reading from disk) the snapshot since the last reset. Otherwise, `null`. | +| loading_{full|incremental}_snapshot_read_bytes | `number\|null` | If the phase is at least `loading_{full|incremental}_snapshot`, this is the (compressed) total number of bytes read from disk for the snapshot. Otherwise, `null`. | +| loading_{full|incremental}_snapshot_read_throughput | `number\|null` | If the phase is at least `loading_{full|incremental}_snapshot`, this is the (compressed) current read throughput for the snapshot in bytes per second. Otherwise, `null`. | +| loading_{full|incremental}_snapshot_read_remaining | `number\|null` | If the phase is at least `loading_{full|incremental}_snapshot`, this is the estimated time remaining, in seconds, to complete reading the snapshot. Otherwise, `null`. | +| loading_{full|incremental}_snapshot_read_path | `string\|null` | If the phase is at least `loading_{full|incremental}_snapshot`, this is either the remote url or local file path from which the snapshot is being read. Otherwise, `null`. | +| loading_{full|incremental}_snapshot_decompress_decompressed_bytes | `number\|null` | If the phase is at least `loading_{full|incremental}_snapshot`, this is the (decompressed) number of bytes processed by decompress from the snapshot so far. Otherwise, `null`. | +| loading_{full|incremental}_snapshot_decompress_compressed_bytes | `number\|null` | If the phase is at least `loading_{full|incremental}_snapshot`, this is the (compressed) number of bytes processed by decompress from the snapshot so far. Otherwise, `null`. | +| loading_{full|incremental}_snapshot_decompress_throughput | `number\|null` | If the phase is at least `loading_{full|incremental}_snapshot`, this is the (compressed) throughput of decompress from the snapshot in bytes per second. Otherwise, `null`. | +| loading_{full|incremental}_snapshot_decompress_remaining | `number\|null` | If the phase is at least `loading_{full|incremental}_snapshot`, this is the estimated time remaining, in seconds, to complete decompressing the snapshot. Otherwise, `null`. | +| loading_{full|incremental}_snapshot_insert_bytes | `number\|null` | If the phase is at least `loading_{full|incremental}_snapshot`, this is the (compressed) number of bytes processed from the snapshot by the snapshot insert time so far. Otherwise, `null`. | +| loading_{full|incremental}_snapshot_insert_throughput | `number\|null` | If the phase is at least `loading_{full|incremental}_snapshot`, this is the (compressed) insertion throughput in bytes per second for the snapshot. Otherwise, `null`. | +| loading_{full|incremental}_snapshot_insert_remaining | `number\|null` | If the phase is at least `loading_{full|incremental}_snapshot`, this is the estimated time remaining, in seconds, to complete inserting data from the {full|incremental} snapshot. Otherwise, `null`. | +| loading_{full|incremental}_snapshot_insert_accounts_throughput | `number\|null` | If the phase is at least `loading_{full|incremental}_snapshot`, this is the throughput for inserting accounts from the snapshot into the validator's accounts database. Otherwise, `null`. | +| loading_{full|incremental}_snapshot_insert_accounts_current | `number\|null` | If the phase is at least `loading_{full|incremental}_snapshot`, this is the current number of inserted accounts from the snapshot into the validator's accounts database. Otherwise, `null`. | +| catching_up_elapsed | `number\|null` | If the phase is `catching_up`, this is the duration, in seconds, the validator has spent catching up to the current slot. Otherwise, `null`. | +| catching_up_first_turbine_slot | `number\|null` | If the phase is `catching_up`, this is the first slot received through the Turbine gossip network. Otherwise, `null`. | +| catching_up_latest_turbine_slot | `number\|null` | If the phase is `catching_up`, this is the most recent slot received through the Turbine gossip network. Otherwise, `null`. | +| catching_up_latest_repair_slot | `number\|null` | If the phase is `catching_up`, this is the slot associated with the most recent repair request. Otherwise, `null`. | +| catching_up_latest_replay_slot | `number\|null` | If the phase is `catching_up`, this is the latest slot replayed by this node. Otherwise, `null`. | + + #### `summary.schedule_strategy` | frequency | type | example | |------------|----------|---------| @@ -823,6 +966,108 @@ epoch is speculatively known as soon as `end_slot` in epoch T-2 is completed, rather than rooted, but no speculative epoch information is published until the epoch is finalized by rooting the slot. +### gossip +Information about the validator's connection to the gossip network. + +#### `gossip.network_stats` +| frequency | type | example | +|-----------------|----------------------|-------------| +| *Once* + *Live* | `GossipNetworkStats` | below | + +::: details Example + +```json +{ + "health": { + "rx_push_pct": 98, + "duplicate_pct": 24, + "bad_pct": 2, + "pull_already_known_pct": 10, + "total_stake": "12345923874", + "total_peers": "1001", + "connected_stake": "1234000", + "connected_peers": 1432, + }, + "ingress": { + "total_throughput": 1234000, + "peer_names": [ + "Coinbase 02", + "Figment", + "Jupiter", + ], + "peer_throughputs": [ + 12349, + 9294, + 7134989, + ] + }, + "egress": { + "total_throughput": 1234000, + "peer_names": [ + "Coinbase 02", + "Figment", + "Jupiter", + ], + "peer_throughputs": [ + 12349, + 9294, + 7134989, + ] + }, + "storage": { + "total_bytes": 1234000, + "peer_names": [ + "Coinbase 02", + "Figment", + "Jupiter", + ], + "peer_bytes": [ + 12349, + 9294, + 7134989, + ] + }, +} +``` + +::: + +**`GossipNetworkStats`** +| Field | Type | Description | +|------------|---------|-------------| +| health | `GossipNetworkStake` | Aggregate statistics related to the health of the gossip network and the amount of connected peers / stake | +| ingress | `GossipNetworkTraffic` | Network statistics showing the total amount of ingress network traffic as well as per-peer traffic | +| egress | `GossipNetworkTraffic` | Network statistics showing the total amount of egress network traffic as well as per-peer traffic | +| storage | `GossipStorageUtil` | Storage statistics showing the total storage utilization as well as per-peer utilization required for our copy of the Cluster Replicated Data Store (crds), which refer to as the "gossip table" below | + + +**`GossipNetworkHealth`** +| Field | Type | Description | +|------------|--------------|-------------| +| rx_push_pct | `number` | The percentage of all received and deserialized gossip messages that are push messages, taken over the previous 5 second window | +| duplicate_pct | `number` | The percentage of all received and deserialized gossip messages that are duplicates, taken over the previous 5 second window. A duplicate message will contain an origin peer/timestamp that we have already seen | +| bad_pct | `number` | The percentage of all received gossip messages that failed to successfully deserialize and insert into the gossip table, taken over the previous 5 second window | +| pull_already_known_pct | `number` | The percentage of all gossip table entries that we received from push messages (as opposed to pull response messages), taken over the last 5 second window | +| total_stake | `number` | The total active stake on the solana network for the current epoch. The information is derived from the getLeaderSchedule rpc call at startup | +| total_peers | `number` | The total number of peers on the solana network. This information is derived from a getClusterNodes rpc call at startup | +| connected_stake | `number` | The sum of active stake across all peers with a ContactInfo entry in the gossip table. The stake quantity is taken from the leader schedule, and reflects the activate stake at the start of the current epoch | +| connected_peers | `number` | The number of currently connected peers| + +**`GossipNetworkTraffic`** +| Field | Type | Description | +|------------|---------|-------------| +| total_throughput | `number` | The network throughput in bytes per second | +| peer_names | `string[]` | Represents a subset of peers on the gossip network which have a large contribution to our ingress/egress traffic. A "large" peer will have a average throughput (taken over the previous 5 second window) larger than some fixed threshold | +| peer_throughputs | `number[]` | A list of network ingress/egress throughputs in bytes per second. The peer name for each entry is the corresponding entry in `peer_names` | + +**`GossipStorageUtil`** +| Field | Type | Description | +|------------|---------|-------------| +| total_bytes | `number` | The total size of our instance of the Cluster Replicated Data Store (crds) | +| peer_names | `string[]` | Represents a subset of peers on the gossip network which have a large contribution to the gossip table, either through push or pull response messages. A "large" contribution will have stored bytes be larger than some fixed threshold | +| peer_bytes | `number[]` | A list of storage sizes in bytes where each entry has a corresponding entry in `peer_names` | + + ### peers Information about validator peers from the cluster. Peer data is sourced from gossip, the accounts database, and the on-chain configuration diff --git a/src/app/firedancer/topology.c b/src/app/firedancer/topology.c index a9fe7a0bd0..00d3897406 100644 --- a/src/app/firedancer/topology.c +++ b/src/app/firedancer/topology.c @@ -788,22 +788,22 @@ fd_topo_initialize( config_t * config ) { fd_topob_wksp( topo, "plugin_out" ); fd_topob_wksp( topo, "plugin" ); - /**/ fd_topob_link( topo, "plugin_out", "plugin_out", 128UL, 8UL+40200UL*(58UL+12UL*34UL), 1UL ); - /**/ fd_topob_link( topo, "replay_plugi", "plugin_in", 128UL, 4098*8UL, 1UL ); - /**/ fd_topob_link( topo, "gossip_plugi", "plugin_in", 128UL, 8UL+40200UL*(58UL+12UL*34UL), 1UL ); - /**/ fd_topob_link( topo, "votes_plugin", "plugin_in", 128UL, 8UL+40200UL*(58UL+12UL*34UL), 1UL ); + /**/ fd_topob_link( topo, "plugin_out", "plugin_out", 128UL, 8UL+40200UL*(58UL+12UL*34UL), 1UL ); + /**/ fd_topob_link( topo, "replay_plugi", "plugin_in", 128UL, 4098*8UL, 1UL ); + /**/ fd_topob_link( topo, "gossip_plugi", "plugin_in", 128UL, 8UL+40200UL*(58UL+12UL*34UL), 1UL ); + /**/ fd_topob_link( topo, "snaprd_plugi", "plugin_in", 128UL, sizeof(fd_restore_snapshot_update_t), 1UL ); /**/ fd_topob_tile( topo, "plugin", "plugin", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); /**/ fd_topob_tile_out( topo, "replay", 0UL, "replay_plugi", 0UL ); - /**/ fd_topob_tile_out( topo, "replay", 0UL, "votes_plugin", 0UL ); /**/ fd_topob_tile_out( topo, "gossip", 0UL, "gossip_plugi", 0UL ); - /**/ fd_topob_tile_out( topo, "plugin", 0UL, "plugin_out", 0UL ); + /**/ fd_topob_tile_out( topo, "plugin", 0UL, "plugin_out", 0UL ); + /**/ fd_topob_tile_out( topo, "snaprd", 0UL, "snaprd_plugi", 0UL ); /**/ fd_topob_tile_in( topo, "plugin", 0UL, "metric_in", "replay_plugi", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); /**/ fd_topob_tile_in( topo, "plugin", 0UL, "metric_in", "gossip_plugi", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); /**/ fd_topob_tile_in( topo, "plugin", 0UL, "metric_in", "stake_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); - /**/ fd_topob_tile_in( topo, "plugin", 0UL, "metric_in", "votes_plugin", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); + /**/ fd_topob_tile_in( topo, "plugin", 0UL, "metric_in", "snaprd_plugi", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); } /* Link from writer tile to replay tile, to send solcap account updates so that they are serialized. diff --git a/src/disco/gui/fd_gui.c b/src/disco/gui/fd_gui.c index 0e98a0015b..f019299526 100644 --- a/src/disco/gui/fd_gui.c +++ b/src/disco/gui/fd_gui.c @@ -10,6 +10,7 @@ #include "../../disco/pack/fd_pack.h" #include "../../disco/pack/fd_pack_cost.h" #include "../../disco/shred/fd_stake_ci.h" +#include "../../disco/plugin/fd_plugin.h" FD_FN_CONST ulong fd_gui_align( void ) { @@ -29,6 +30,7 @@ fd_gui_new( void * shmem, uchar const * identity_key, int has_vote_key, uchar const * vote_key, + int is_full_client, int is_voting, int schedule_strategy, fd_topo_t * topo ) { @@ -75,15 +77,26 @@ fd_gui_new( void * shmem, memset( gui->summary.vote_key_base58, 0, sizeof(gui->summary.vote_key_base58) ); } + gui->summary.is_full_client = is_full_client; gui->summary.version = version; gui->summary.cluster = cluster; gui->summary.startup_time_nanos = gui->next_sample_400millis; - gui->summary.startup_progress = FD_GUI_START_PROGRESS_TYPE_INITIALIZING; - gui->summary.startup_got_full_snapshot = 0; - gui->summary.startup_full_snapshot_slot = 0; - gui->summary.startup_incremental_snapshot_slot = 0; - gui->summary.startup_waiting_for_supermajority_slot = ULONG_MAX; + if( FD_UNLIKELY( gui->summary.is_full_client ) ) { + gui->summary.boot_progress.phase = FD_GUI_BOOT_PROGRESS_TYPE_JOINING_GOSSIP; + gui->summary.boot_progress.joining_gossip_time_nanos = gui->next_sample_400millis; + for( ulong i=0UL; i<2UL; i++ ) { + gui->summary.boot_progress.loading_snapshot[ i ].reset_cnt = ULONG_MAX; /* ensures other fields are reset initially */ + gui->summary.boot_progress.loading_snapshot[ i ].read_path[ 0 ] = '\0'; + gui->summary.boot_progress.loading_snapshot[ i ].insert_path[ 0 ] = '\0'; + } + } else { + gui->summary.startup_progress.phase = FD_GUI_START_PROGRESS_TYPE_INITIALIZING; + gui->summary.startup_progress.startup_got_full_snapshot = 0; + gui->summary.startup_progress.startup_full_snapshot_slot = 0; + gui->summary.startup_progress.startup_incremental_snapshot_slot = 0; + gui->summary.startup_progress.startup_waiting_for_supermajority_slot = ULONG_MAX; + } gui->summary.identity_account_balance = 0UL; gui->summary.vote_account_balance = 0UL; @@ -120,6 +133,8 @@ fd_gui_new( void * shmem, gui->summary.tile_timers_history_idx = 0UL; for( ulong i=0UL; isummary.tile_timers_leader_history_slot[ i ] = ULONG_MAX; + memset( &gui->summary.gossip_network_stats, 0, sizeof(gui->summary.gossip_network_stats) ); + gui->block_engine.has_block_engine = 0; gui->epoch.has_epoch[ 0 ] = 0; @@ -161,7 +176,8 @@ void fd_gui_ws_open( fd_gui_t * gui, ulong ws_conn_id ) { void (* printers[] )( fd_gui_t * gui ) = { - fd_gui_printf_startup_progress, + fd_gui_printf_client, + gui->summary.is_full_client ? fd_gui_printf_boot_progress : fd_gui_printf_startup_progress, fd_gui_printf_version, fd_gui_printf_cluster, fd_gui_printf_commit_hash, @@ -536,6 +552,124 @@ fd_gui_tile_stats_snap( fd_gui_t * gui, stats->bank_txn_exec_cnt = waterfall->out.block_fail + waterfall->out.block_success; } +static void +fd_gui_run_boot_progress( fd_gui_t * gui, long now_nanos ) { + fd_topo_tile_t const * snaprd = &gui->topo->tiles[ fd_topo_find_tile( gui->topo, "snaprd", 0UL ) ]; + volatile ulong * snaprd_metrics = fd_metrics_tile( snaprd->metrics ); + + fd_topo_tile_t const * snapdc = &gui->topo->tiles[ fd_topo_find_tile( gui->topo, "snapdc", 0UL ) ]; + volatile ulong * snapdc_metrics = fd_metrics_tile( snapdc->metrics ); + + fd_topo_tile_t const * snapin = &gui->topo->tiles[ fd_topo_find_tile( gui->topo, "snapin", 0UL ) ]; + volatile ulong * snapin_metrics = fd_metrics_tile( snapin->metrics ); + + fd_topo_tile_t const * replay = &gui->topo->tiles[ fd_topo_find_tile( gui->topo, "replay", 0UL ) ]; + volatile ulong * replay_metrics = fd_metrics_tile( replay->metrics ); + + fd_topo_tile_t const * repair = &gui->topo->tiles[ fd_topo_find_tile( gui->topo, "repair", 0UL ) ]; + volatile ulong * repair_metrics = fd_metrics_tile( repair->metrics ); + + ulong phase = snaprd_metrics[ MIDX( GAUGE, SNAPRD, STATE ) ]; + ulong snapshot_idx = ULONG_MAX; + + if( FD_UNLIKELY( phase >= 2UL /* FD_SNAPRD_STATE_READING_FULL_FILE */ ) ) { + gui->summary.boot_progress.phase = FD_GUI_BOOT_PROGRESS_TYPE_LOADING_FULL_SNAPSHOT; + snapshot_idx = FD_GUI_BOOT_PROGRESS_FULL_SNAPSHOT_IDX; + } + + if( FD_UNLIKELY( phase >= 8UL /* FD_SNAPRD_STATE_READING_INCREMENTAL_FILE */ ) ) { + gui->summary.boot_progress.phase = FD_GUI_BOOT_PROGRESS_TYPE_LOADING_INCREMENTAL_SNAPSHOT; + snapshot_idx = FD_GUI_BOOT_PROGRESS_INCREMENTAL_SNAPSHOT_IDX; + } + + ulong _latest_turbine_slot = repair_metrics[ MIDX( GAUGE, REPAIR, LATEST_TURBINE_SLOT ) ]; + ulong _first_turbine_slot = repair_metrics[ MIDX( GAUGE, REPAIR, FIRST_TURBINE_SLOT ) ]; + ulong _latest_replay_slot = replay_metrics[ MIDX( GAUGE, REPLAY, SLOT ) ]; + ulong _latest_repair_slot = repair_metrics[ MIDX( GAUGE, REPAIR, LATEST_REPAIR_SLOT ) ]; + + if( FD_UNLIKELY( _latest_replay_slot > 0UL ) ) { + gui->summary.boot_progress.phase = FD_GUI_BOOT_PROGRESS_TYPE_CATCHING_UP; + } + + if( FD_UNLIKELY( _latest_turbine_slot && _latest_replay_slot && ((long)_latest_turbine_slot - (long)_latest_replay_slot < 5L) ) ) { + gui->summary.boot_progress.phase = FD_GUI_BOOT_PROGRESS_TYPE_RUNNING; + } + + switch ( gui->summary.boot_progress.phase ) { + case FD_GUI_BOOT_PROGRESS_TYPE_JOINING_GOSSIP: { + gui->summary.boot_progress.joining_gossip_time_nanos = now_nanos; + break; + } + case FD_GUI_BOOT_PROGRESS_TYPE_LOADING_FULL_SNAPSHOT: + case FD_GUI_BOOT_PROGRESS_TYPE_LOADING_INCREMENTAL_SNAPSHOT: { + ulong _retry_cnt = fd_ulong_if( snapshot_idx==FD_GUI_BOOT_PROGRESS_FULL_SNAPSHOT_IDX, snaprd_metrics[ MIDX( GAUGE, SNAPRD, FULL_DOWNLOAD_RETRIES ) ], snaprd_metrics[ MIDX( GAUGE, SNAPRD, INCREMENTAL_DOWNLOAD_RETRIES ) ]); + + /* reset boot state if necessary */ + if( FD_UNLIKELY( gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].reset_cnt!=_retry_cnt ) ) { + gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].reset_time_nanos = now_nanos; + gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].reset_cnt = _retry_cnt; + + gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].read_throughput_ema = 0.; + gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].decompress_throughput_ema = 0.; + gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].insert_throughput_ema = 0.; + gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].insert_accounts_throughput_ema = 0.; + } + +#define EMA_FILTER(filt, sample) filt = (FD_GUI_EMA_FILTER_ALPHA * (sample)) + (( 1. - FD_GUI_EMA_FILTER_ALPHA ) * (filt)) + + ulong _total_bytes = fd_ulong_if( snapshot_idx==FD_GUI_BOOT_PROGRESS_FULL_SNAPSHOT_IDX, snaprd_metrics[ MIDX( GAUGE, SNAPRD, FULL_BYTES_TOTAL ) ], snaprd_metrics[ MIDX( GAUGE, SNAPRD, INCREMENTAL_BYTES_TOTAL ) ]); + ulong _read_bytes = fd_ulong_if( snapshot_idx==FD_GUI_BOOT_PROGRESS_FULL_SNAPSHOT_IDX, snaprd_metrics[ MIDX( GAUGE, SNAPRD, FULL_BYTES_READ ) ], snaprd_metrics[ MIDX( GAUGE, SNAPRD, INCREMENTAL_BYTES_READ ) ]); + ulong _decompress_decompressed_bytes = fd_ulong_if( snapshot_idx==FD_GUI_BOOT_PROGRESS_FULL_SNAPSHOT_IDX, snapdc_metrics[ MIDX( GAUGE, SNAPDC, FULL_DECOMPRESSED_BYTES_READ ) ], snapdc_metrics[ MIDX( GAUGE, SNAPDC, INCREMENTAL_DECOMPRESSED_BYTES_READ ) ]); + ulong _decompress_compressed_bytes = fd_ulong_if( snapshot_idx==FD_GUI_BOOT_PROGRESS_FULL_SNAPSHOT_IDX, snapdc_metrics[ MIDX( GAUGE, SNAPDC, FULL_COMPRESSED_BYTES_READ ) ], snapdc_metrics[ MIDX( GAUGE, SNAPDC, INCREMENTAL_COMPRESSED_BYTES_READ ) ]); + ulong _insert_bytes = fd_ulong_if( snapshot_idx==FD_GUI_BOOT_PROGRESS_FULL_SNAPSHOT_IDX, snapin_metrics[ MIDX( GAUGE, SNAPIN, FULL_BYTES_READ ) ], snapin_metrics[ MIDX( GAUGE, SNAPIN, INCREMENTAL_BYTES_READ ) ]); + ulong _insert_accounts = snapin_metrics[ MIDX( GAUGE, SNAPIN, ACCOUNTS_INSERTED ) ]; + + /* metadata */ + ulong _nanos_elapsed_since_last_sample = (ulong)fd_long_max(now_nanos - gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].sample_time_nanos, 1L); + gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].total_bytes = _total_bytes; + gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].sample_time_nanos = now_nanos; + + /* read stage */ + ulong _read_throughput_ema_sample = (ulong)fd_long_max( (long)_read_bytes - (long)gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].read_bytes, 0L ); + EMA_FILTER( gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].read_throughput_ema, (double)_read_throughput_ema_sample / (double)_nanos_elapsed_since_last_sample ); + gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].read_bytes = _read_bytes; + gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].read_remaining_nanos = (long)((double)(gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].total_bytes - _read_bytes) / gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].read_throughput_ema); + + /* decompress stage */ + ulong _decompress_bytes_since_last_sample = (ulong)fd_long_max( (long)_decompress_compressed_bytes - (long)gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].decompress_compressed_bytes, 0L ); + EMA_FILTER( gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].decompress_throughput_ema, (double)_decompress_bytes_since_last_sample / (double)_nanos_elapsed_since_last_sample ); + gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].decompress_compressed_bytes = _decompress_compressed_bytes; + gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].decompress_decompressed_bytes = _decompress_decompressed_bytes; + gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].decompress_remaining_nanos = (long)((double)(gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].total_bytes - _decompress_compressed_bytes) / gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].decompress_throughput_ema); + + /* insert stage */ + ulong _insert_bytes_since_last_sample = (ulong)fd_long_max( (long)_insert_bytes - (long)gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].insert_bytes, 0L); + EMA_FILTER( gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].insert_throughput_ema, (double)_insert_bytes_since_last_sample / (double)_nanos_elapsed_since_last_sample ); + gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].insert_bytes = _insert_bytes; + + /* Use the latest compression ratio to estimate decompressed size */ + double _compression_ratio_estimate = (double)gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].decompress_decompressed_bytes/(double)gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].decompress_compressed_bytes; + ulong _total_size_estimate = (ulong)((double)gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].total_bytes * _compression_ratio_estimate); + gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].insert_remaining_nanos = (long)((double)(_total_size_estimate - _insert_bytes) / gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].insert_throughput_ema); + ulong _insert_accounts_since_last_sample = (ulong)fd_long_max( (long)_insert_accounts - (long)gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].insert_accounts_current, 0L); + EMA_FILTER( gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].insert_accounts_throughput_ema, (double)_insert_accounts_since_last_sample / (double)_nanos_elapsed_since_last_sample ); + gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].insert_accounts_current = _insert_accounts; + +#undef EMA_FILTER + break; + } + case FD_GUI_BOOT_PROGRESS_TYPE_CATCHING_UP: { + gui->summary.boot_progress.catching_up_time_nanos = now_nanos; + gui->summary.boot_progress.catching_up_first_turbine_slot = _first_turbine_slot; + gui->summary.boot_progress.catching_up_latest_turbine_slot = _latest_turbine_slot; + gui->summary.boot_progress.catching_up_latest_repair_slot = _latest_repair_slot; + gui->summary.boot_progress.catching_up_latest_replay_slot = _latest_replay_slot; + break; + } + case FD_GUI_BOOT_PROGRESS_TYPE_RUNNING: break; + default: FD_LOG_ERR(( "unknown boot progress phase: %d", gui->summary.boot_progress.phase )); + } +} int fd_gui_poll( fd_gui_t * gui ) { long now = fd_log_wallclock(); @@ -543,6 +677,10 @@ fd_gui_poll( fd_gui_t * gui ) { int did_work = 0; if( FD_LIKELY( now>gui->next_sample_400millis ) ) { + // fd_gui_run_gossip_network_stats( gui, now ); + fd_gui_printf_gossip_network_stats( gui ); + fd_http_server_ws_broadcast( gui->http ); + fd_gui_estimated_tps_snap( gui ); fd_gui_printf_estimated_tps( gui ); fd_http_server_ws_broadcast( gui->http ); @@ -561,6 +699,12 @@ fd_gui_poll( fd_gui_t * gui ) { fd_gui_printf_live_tile_stats( gui, gui->summary.tile_stats_reference, gui->summary.tile_stats_current ); fd_http_server_ws_broadcast( gui->http ); + if( FD_UNLIKELY( gui->summary.is_full_client && gui->summary.boot_progress.phase!=FD_GUI_BOOT_PROGRESS_TYPE_RUNNING ) ) { + fd_gui_run_boot_progress( gui, now ); + fd_gui_printf_boot_progress( gui ); + fd_http_server_ws_broadcast( gui->http ); + } + gui->next_sample_100millis += 100L*1000L*1000L; did_work = 1; } @@ -1521,16 +1665,16 @@ fd_gui_handle_start_progress( fd_gui_t * gui, switch (type) { case 0: - gui->summary.startup_progress = FD_GUI_START_PROGRESS_TYPE_INITIALIZING; + gui->summary.startup_progress.phase = FD_GUI_START_PROGRESS_TYPE_INITIALIZING; FD_LOG_INFO(( "progress: initializing" )); break; case 1: { char const * snapshot_type; - if( FD_UNLIKELY( gui->summary.startup_got_full_snapshot ) ) { - gui->summary.startup_progress = FD_GUI_START_PROGRESS_TYPE_SEARCHING_FOR_INCREMENTAL_SNAPSHOT; + if( FD_UNLIKELY( gui->summary.startup_progress.startup_got_full_snapshot ) ) { + gui->summary.startup_progress.phase = FD_GUI_START_PROGRESS_TYPE_SEARCHING_FOR_INCREMENTAL_SNAPSHOT; snapshot_type = "incremental"; } else { - gui->summary.startup_progress = FD_GUI_START_PROGRESS_TYPE_SEARCHING_FOR_FULL_SNAPSHOT; + gui->summary.startup_progress.phase = FD_GUI_START_PROGRESS_TYPE_SEARCHING_FOR_FULL_SNAPSHOT; snapshot_type = "full"; } FD_LOG_INFO(( "progress: searching for %s snapshot", snapshot_type )); @@ -1539,70 +1683,70 @@ fd_gui_handle_start_progress( fd_gui_t * gui, case 2: { uchar is_full_snapshot = msg[ 1 ]; if( FD_LIKELY( is_full_snapshot ) ) { - gui->summary.startup_progress = FD_GUI_START_PROGRESS_TYPE_DOWNLOADING_FULL_SNAPSHOT; - gui->summary.startup_full_snapshot_slot = *((ulong *)(msg + 2)); - gui->summary.startup_full_snapshot_peer_ip_addr = *((uint *)(msg + 10)); - gui->summary.startup_full_snapshot_peer_port = *((ushort *)(msg + 14)); - gui->summary.startup_full_snapshot_total_bytes = *((ulong *)(msg + 16)); - gui->summary.startup_full_snapshot_current_bytes = *((ulong *)(msg + 24)); - gui->summary.startup_full_snapshot_elapsed_secs = *((double *)(msg + 32)); - gui->summary.startup_full_snapshot_remaining_secs = *((double *)(msg + 40)); - gui->summary.startup_full_snapshot_throughput = *((double *)(msg + 48)); - FD_LOG_INFO(( "progress: downloading full snapshot: slot=%lu", gui->summary.startup_full_snapshot_slot )); + gui->summary.startup_progress.phase = FD_GUI_START_PROGRESS_TYPE_DOWNLOADING_FULL_SNAPSHOT; + gui->summary.startup_progress.startup_full_snapshot_slot = *((ulong *)(msg + 2)); + gui->summary.startup_progress.startup_full_snapshot_peer_ip_addr = *((uint *)(msg + 10)); + gui->summary.startup_progress.startup_full_snapshot_peer_port = *((ushort *)(msg + 14)); + gui->summary.startup_progress.startup_full_snapshot_total_bytes = *((ulong *)(msg + 16)); + gui->summary.startup_progress.startup_full_snapshot_current_bytes = *((ulong *)(msg + 24)); + gui->summary.startup_progress.startup_full_snapshot_elapsed_secs = *((double *)(msg + 32)); + gui->summary.startup_progress.startup_full_snapshot_remaining_secs = *((double *)(msg + 40)); + gui->summary.startup_progress.startup_full_snapshot_throughput = *((double *)(msg + 48)); + FD_LOG_INFO(( "progress: downloading full snapshot: slot=%lu", gui->summary.startup_progress.startup_full_snapshot_slot )); } else { - gui->summary.startup_progress = FD_GUI_START_PROGRESS_TYPE_DOWNLOADING_INCREMENTAL_SNAPSHOT; - gui->summary.startup_incremental_snapshot_slot = *((ulong *)(msg + 2)); - gui->summary.startup_incremental_snapshot_peer_ip_addr = *((uint *)(msg + 10)); - gui->summary.startup_incremental_snapshot_peer_port = *((ushort *)(msg + 14)); - gui->summary.startup_incremental_snapshot_total_bytes = *((ulong *)(msg + 16)); - gui->summary.startup_incremental_snapshot_current_bytes = *((ulong *)(msg + 24)); - gui->summary.startup_incremental_snapshot_elapsed_secs = *((double *)(msg + 32)); - gui->summary.startup_incremental_snapshot_remaining_secs = *((double *)(msg + 40)); - gui->summary.startup_incremental_snapshot_throughput = *((double *)(msg + 48)); - FD_LOG_INFO(( "progress: downloading incremental snapshot: slot=%lu", gui->summary.startup_incremental_snapshot_slot )); + gui->summary.startup_progress.phase = FD_GUI_START_PROGRESS_TYPE_DOWNLOADING_INCREMENTAL_SNAPSHOT; + gui->summary.startup_progress.startup_incremental_snapshot_slot = *((ulong *)(msg + 2)); + gui->summary.startup_progress.startup_incremental_snapshot_peer_ip_addr = *((uint *)(msg + 10)); + gui->summary.startup_progress.startup_incremental_snapshot_peer_port = *((ushort *)(msg + 14)); + gui->summary.startup_progress.startup_incremental_snapshot_total_bytes = *((ulong *)(msg + 16)); + gui->summary.startup_progress.startup_incremental_snapshot_current_bytes = *((ulong *)(msg + 24)); + gui->summary.startup_progress.startup_incremental_snapshot_elapsed_secs = *((double *)(msg + 32)); + gui->summary.startup_progress.startup_incremental_snapshot_remaining_secs = *((double *)(msg + 40)); + gui->summary.startup_progress.startup_incremental_snapshot_throughput = *((double *)(msg + 48)); + FD_LOG_INFO(( "progress: downloading incremental snapshot: slot=%lu", gui->summary.startup_progress.startup_incremental_snapshot_slot )); } break; } case 3: { - gui->summary.startup_got_full_snapshot = 1; + gui->summary.startup_progress.startup_got_full_snapshot = 1; break; } case 4: - gui->summary.startup_progress = FD_GUI_START_PROGRESS_TYPE_CLEANING_BLOCK_STORE; + gui->summary.startup_progress.phase = FD_GUI_START_PROGRESS_TYPE_CLEANING_BLOCK_STORE; FD_LOG_INFO(( "progress: cleaning block store" )); break; case 5: - gui->summary.startup_progress = FD_GUI_START_PROGRESS_TYPE_CLEANING_ACCOUNTS; + gui->summary.startup_progress.phase = FD_GUI_START_PROGRESS_TYPE_CLEANING_ACCOUNTS; FD_LOG_INFO(( "progress: cleaning accounts" )); break; case 6: - gui->summary.startup_progress = FD_GUI_START_PROGRESS_TYPE_LOADING_LEDGER; + gui->summary.startup_progress.phase = FD_GUI_START_PROGRESS_TYPE_LOADING_LEDGER; FD_LOG_INFO(( "progress: loading ledger" )); break; case 7: { - gui->summary.startup_progress = FD_GUI_START_PROGRESS_TYPE_PROCESSING_LEDGER; - gui->summary.startup_ledger_slot = fd_ulong_load_8( msg + 1 ); - gui->summary.startup_ledger_max_slot = fd_ulong_load_8( msg + 9 ); - FD_LOG_INFO(( "progress: processing ledger: slot=%lu, max_slot=%lu", gui->summary.startup_ledger_slot, gui->summary.startup_ledger_max_slot )); + gui->summary.startup_progress.phase = FD_GUI_START_PROGRESS_TYPE_PROCESSING_LEDGER; + gui->summary.startup_progress.startup_ledger_slot = fd_ulong_load_8( msg + 1 ); + gui->summary.startup_progress.startup_ledger_max_slot = fd_ulong_load_8( msg + 9 ); + FD_LOG_INFO(( "progress: processing ledger: slot=%lu, max_slot=%lu", gui->summary.startup_progress.startup_ledger_slot, gui->summary.startup_progress.startup_ledger_max_slot )); break; } case 8: - gui->summary.startup_progress = FD_GUI_START_PROGRESS_TYPE_STARTING_SERVICES; + gui->summary.startup_progress.phase = FD_GUI_START_PROGRESS_TYPE_STARTING_SERVICES; FD_LOG_INFO(( "progress: starting services" )); break; case 9: - gui->summary.startup_progress = FD_GUI_START_PROGRESS_TYPE_HALTED; + gui->summary.startup_progress.phase = FD_GUI_START_PROGRESS_TYPE_HALTED; FD_LOG_INFO(( "progress: halted" )); break; case 10: { - gui->summary.startup_progress = FD_GUI_START_PROGRESS_TYPE_WAITING_FOR_SUPERMAJORITY; - gui->summary.startup_waiting_for_supermajority_slot = fd_ulong_load_8( msg + 1 ); - gui->summary.startup_waiting_for_supermajority_stake_pct = fd_ulong_load_8( msg + 9 ); - FD_LOG_INFO(( "progress: waiting for supermajority: slot=%lu, gossip_stake_percent=%lu", gui->summary.startup_waiting_for_supermajority_slot, gui->summary.startup_waiting_for_supermajority_stake_pct )); + gui->summary.startup_progress.phase = FD_GUI_START_PROGRESS_TYPE_WAITING_FOR_SUPERMAJORITY; + gui->summary.startup_progress.startup_waiting_for_supermajority_slot = fd_ulong_load_8( msg + 1 ); + gui->summary.startup_progress.startup_waiting_for_supermajority_stake_pct = fd_ulong_load_8( msg + 9 ); + FD_LOG_INFO(( "progress: waiting for supermajority: slot=%lu, gossip_stake_percent=%lu", gui->summary.startup_progress.startup_waiting_for_supermajority_slot, gui->summary.startup_progress.startup_waiting_for_supermajority_stake_pct )); break; } case 11: - gui->summary.startup_progress = FD_GUI_START_PROGRESS_TYPE_RUNNING; + gui->summary.startup_progress.phase = FD_GUI_START_PROGRESS_TYPE_RUNNING; FD_LOG_INFO(( "progress: running" )); break; default: @@ -1642,6 +1786,16 @@ fd_gui_handle_block_engine_update( fd_gui_t * gui, fd_http_server_ws_broadcast( gui->http ); } +static void +fd_gui_handle_snapshot_update( fd_gui_t * gui, + fd_restore_snapshot_update_t * msg ) { + ulong snapshot_idx = fd_ulong_if( msg->type==FD_PLUGIN_MSG_SNAPSHOT_TYPE_FULL, FD_GUI_BOOT_PROGRESS_FULL_SNAPSHOT_IDX, FD_GUI_BOOT_PROGRESS_INCREMENTAL_SNAPSHOT_IDX ); + gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].slot = msg->slot; + gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].peer_addr = msg->peer_addr; + gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].peer_port = msg->peer_port; + fd_cstr_printf_check( gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].read_path, sizeof(gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].read_path), NULL, "%s", msg->read_path ); +} + void fd_gui_plugin_message( fd_gui_t * gui, ulong plugin_msg, @@ -1706,6 +1860,10 @@ fd_gui_plugin_message( fd_gui_t * gui, fd_gui_handle_block_engine_update( gui, msg ); break; } + case FD_PLUGIN_MSG_SNAPSHOT_UPDATE: { + fd_gui_handle_snapshot_update( gui, (fd_restore_snapshot_update_t *)msg ); + break; + } default: FD_LOG_ERR(( "Unhandled plugin msg: 0x%lx", plugin_msg )); break; diff --git a/src/disco/gui/fd_gui.h b/src/disco/gui/fd_gui.h index 08b48facd3..5c86221af2 100644 --- a/src/disco/gui/fd_gui.h +++ b/src/disco/gui/fd_gui.h @@ -43,6 +43,19 @@ #define FD_GUI_START_PROGRESS_TYPE_WAITING_FOR_SUPERMAJORITY (11) #define FD_GUI_START_PROGRESS_TYPE_RUNNING (12) +#define FD_GUI_BOOT_PROGRESS_TYPE_JOINING_GOSSIP ( 0) +#define FD_GUI_BOOT_PROGRESS_TYPE_LOADING_FULL_SNAPSHOT ( 1) +#define FD_GUI_BOOT_PROGRESS_TYPE_LOADING_INCREMENTAL_SNAPSHOT ( 2) +#define FD_GUI_BOOT_PROGRESS_TYPE_CATCHING_UP ( 3) +#define FD_GUI_BOOT_PROGRESS_TYPE_RUNNING ( 4) + +#define FD_GUI_BOOT_PROGRESS_FULL_SNAPSHOT_IDX (0UL) +#define FD_GUI_BOOT_PROGRESS_INCREMENTAL_SNAPSHOT_IDX (1UL) + +#define FD_GUI_EMA_FILTER_ALPHA ((double)0.05) + +#define FD_GUI_GOSSIP_NETWORK_STATS_PEER_CNT (64UL) + /* Ideally, we would store an entire epoch's worth of transactions. If we assume any given validator will have at most 5% stake, and average transactions per slot is around 10_000, then an epoch will have about @@ -291,6 +304,7 @@ struct fd_gui { char vote_key_base58[ FD_BASE58_ENCODED_32_SZ ]; char identity_key_base58[ FD_BASE58_ENCODED_32_SZ ]; + int is_full_client; char const * version; char const * cluster; @@ -299,32 +313,96 @@ struct fd_gui { long startup_time_nanos; - uchar startup_progress; - int startup_got_full_snapshot; - - ulong startup_incremental_snapshot_slot; - uint startup_incremental_snapshot_peer_ip_addr; - ushort startup_incremental_snapshot_peer_port; - double startup_incremental_snapshot_elapsed_secs; - double startup_incremental_snapshot_remaining_secs; - double startup_incremental_snapshot_throughput; - ulong startup_incremental_snapshot_total_bytes; - ulong startup_incremental_snapshot_current_bytes; - - ulong startup_full_snapshot_slot; - uint startup_full_snapshot_peer_ip_addr; - ushort startup_full_snapshot_peer_port; - double startup_full_snapshot_elapsed_secs; - double startup_full_snapshot_remaining_secs; - double startup_full_snapshot_throughput; - ulong startup_full_snapshot_total_bytes; - ulong startup_full_snapshot_current_bytes; - - ulong startup_ledger_slot; - ulong startup_ledger_max_slot; - - ulong startup_waiting_for_supermajority_slot; - ulong startup_waiting_for_supermajority_stake_pct; + union { + struct { /* used in frankendancer */ + uchar phase; + int startup_got_full_snapshot; + + ulong startup_incremental_snapshot_slot; + uint startup_incremental_snapshot_peer_ip_addr; + ushort startup_incremental_snapshot_peer_port; + double startup_incremental_snapshot_elapsed_secs; + double startup_incremental_snapshot_remaining_secs; + double startup_incremental_snapshot_throughput; + ulong startup_incremental_snapshot_total_bytes; + ulong startup_incremental_snapshot_current_bytes; + + ulong startup_full_snapshot_slot; + uint startup_full_snapshot_peer_ip_addr; + ushort startup_full_snapshot_peer_port; + double startup_full_snapshot_elapsed_secs; + double startup_full_snapshot_remaining_secs; + double startup_full_snapshot_throughput; + ulong startup_full_snapshot_total_bytes; + ulong startup_full_snapshot_current_bytes; + + ulong startup_ledger_slot; + ulong startup_ledger_max_slot; + + ulong startup_waiting_for_supermajority_slot; + ulong startup_waiting_for_supermajority_stake_pct; + } startup_progress; + struct { /* used in the full client */ + uchar phase; + long joining_gossip_time_nanos; + struct { + ulong slot; + uint peer_addr; + ushort peer_port; + ulong total_bytes; /* compressed */ + long reset_time_nanos; /* since this phase can reset, we keep a reset timestamp for proper timekeeping */ + long sample_time_nanos; + ulong reset_cnt; + + ulong read_bytes; /* compressed */ + double read_throughput_ema; /* EMA filtered throughput, in compressed bytes / ns */ + long read_remaining_nanos; + char read_path[ PATH_MAX ]; + + ulong decompress_decompressed_bytes; /* decompressed */ + ulong decompress_compressed_bytes; /* compressed */ + double decompress_throughput_ema; /* compressed */ + long decompress_remaining_nanos; + + ulong insert_bytes; /* decompressed */ + double insert_throughput_ema; /* decompressed */ + long insert_remaining_nanos; + char insert_path[ PATH_MAX ]; + ulong insert_accounts_current; + double insert_accounts_throughput_ema; + } loading_snapshot[ 2UL ]; + + long catching_up_time_nanos; + ulong catching_up_first_turbine_slot; + ulong catching_up_latest_turbine_slot; + ulong catching_up_latest_repair_slot; + ulong catching_up_latest_replay_slot; + } boot_progress; + }; + + struct { + uchar health_rx_push_pct; + uchar health_duplicate_pct; + uchar health_bad_pct; + uchar health_pull_alread_known_pct; + ulong health_total_stake; /* lamports */ + ulong health_total_peers; + ulong health_connected_stake; /* lamports */ + ulong health_connected_peers; + + double ingress_total_throughput_ema; /* bytes per nanosecond */ + char ingress_peer_names[ FD_GUI_GOSSIP_NETWORK_STATS_PEER_CNT ][ 64 ]; /* maintain the top 64 peers */ + double ingress_peer_throughputs_ema[ 64 ]; /* bytes per nanosecond */ + + double egress_total_throughput_ema; /* bytes per nanosecond */ + char egress_peer_names[ FD_GUI_GOSSIP_NETWORK_STATS_PEER_CNT ][ 64 ]; /* maintain the top 64 peers */ + double egress_peer_throughputs_ema[ 64 ]; /* bytes per nanosecond */ + + ulong storage_total_bytes; + char storage_peer_names[ FD_GUI_GOSSIP_NETWORK_STATS_PEER_CNT ][ 64 ]; /* maintain the top 64 peers */ + ulong storage_peer_bytes[ 64 ]; + + } gossip_network_stats; int schedule_strategy; @@ -433,6 +511,7 @@ fd_gui_new( void * shmem, uchar const * identity_key, int has_vote_key, uchar const * vote_key, + int is_full_client, int is_voting, int schedule_strategy, fd_topo_t * topo ); diff --git a/src/disco/gui/fd_gui_printf.c b/src/disco/gui/fd_gui_printf.c index 84fd1f2d0b..15f2196a72 100644 --- a/src/disco/gui/fd_gui_printf.c +++ b/src/disco/gui/fd_gui_printf.c @@ -290,11 +290,21 @@ fd_gui_printf_tps_history( fd_gui_t * gui ) { jsonp_close_envelope( gui ); } +void +fd_gui_printf_client( fd_gui_t * gui ) { + jsonp_open_envelope( gui, "summary", "client" ); + jsonp_open_array( gui, "value" ); + if( FD_LIKELY( gui->summary.is_full_client ) ) jsonp_string( gui, NULL, "firedancer" ); + else jsonp_string( gui, NULL, "frankendancer" ); + jsonp_close_array( gui ); + jsonp_close_envelope( gui ); +} + void fd_gui_printf_startup_progress( fd_gui_t * gui ) { char const * phase; - switch( gui->summary.startup_progress ) { + switch( gui->summary.startup_progress.phase ) { case FD_GUI_START_PROGRESS_TYPE_INITIALIZING: phase = "initializing"; break; @@ -335,23 +345,23 @@ fd_gui_printf_startup_progress( fd_gui_t * gui ) { phase = "running"; break; default: - FD_LOG_ERR(( "unknown phase %d", gui->summary.startup_progress )); + FD_LOG_ERR(( "unknown phase %d", gui->summary.startup_progress.phase )); } jsonp_open_envelope( gui, "summary", "startup_progress" ); jsonp_open_object( gui, "value" ); jsonp_string( gui, "phase", phase ); - if( FD_LIKELY( gui->summary.startup_progress>=FD_GUI_START_PROGRESS_TYPE_DOWNLOADING_FULL_SNAPSHOT) ) { + if( FD_LIKELY( gui->summary.startup_progress.phase>=FD_GUI_START_PROGRESS_TYPE_DOWNLOADING_FULL_SNAPSHOT) ) { char peer_addr[ 64 ]; - FD_TEST( fd_cstr_printf_check( peer_addr, sizeof(peer_addr), NULL, FD_IP4_ADDR_FMT ":%u", FD_IP4_ADDR_FMT_ARGS(gui->summary.startup_full_snapshot_peer_ip_addr), gui->summary.startup_full_snapshot_peer_port ) ); + FD_TEST( fd_cstr_printf_check( peer_addr, sizeof(peer_addr), NULL, FD_IP4_ADDR_FMT ":%u", FD_IP4_ADDR_FMT_ARGS(gui->summary.startup_progress.startup_full_snapshot_peer_ip_addr), gui->summary.startup_progress.startup_full_snapshot_peer_port ) ); jsonp_string( gui, "downloading_full_snapshot_peer", peer_addr ); - jsonp_ulong( gui, "downloading_full_snapshot_slot", gui->summary.startup_full_snapshot_slot ); - jsonp_double( gui, "downloading_full_snapshot_elapsed_secs", gui->summary.startup_full_snapshot_elapsed_secs ); - jsonp_double( gui, "downloading_full_snapshot_remaining_secs", gui->summary.startup_full_snapshot_remaining_secs ); - jsonp_double( gui, "downloading_full_snapshot_throughput", gui->summary.startup_full_snapshot_throughput ); - jsonp_ulong( gui, "downloading_full_snapshot_total_bytes", gui->summary.startup_full_snapshot_total_bytes ); - jsonp_ulong( gui, "downloading_full_snapshot_current_bytes", gui->summary.startup_full_snapshot_current_bytes ); + jsonp_ulong( gui, "downloading_full_snapshot_slot", gui->summary.startup_progress.startup_full_snapshot_slot ); + jsonp_double( gui, "downloading_full_snapshot_elapsed_secs", gui->summary.startup_progress.startup_full_snapshot_elapsed_secs ); + jsonp_double( gui, "downloading_full_snapshot_remaining_secs", gui->summary.startup_progress.startup_full_snapshot_remaining_secs ); + jsonp_double( gui, "downloading_full_snapshot_throughput", gui->summary.startup_progress.startup_full_snapshot_throughput ); + jsonp_ulong( gui, "downloading_full_snapshot_total_bytes", gui->summary.startup_progress.startup_full_snapshot_total_bytes ); + jsonp_ulong( gui, "downloading_full_snapshot_current_bytes", gui->summary.startup_progress.startup_full_snapshot_current_bytes ); } else { jsonp_null( gui, "downloading_full_snapshot_peer" ); jsonp_null( gui, "downloading_full_snapshot_slot" ); @@ -362,17 +372,17 @@ fd_gui_printf_startup_progress( fd_gui_t * gui ) { jsonp_null( gui, "downloading_full_snapshot_current_bytes" ); } - if( FD_LIKELY( gui->summary.startup_progress>=FD_GUI_START_PROGRESS_TYPE_DOWNLOADING_INCREMENTAL_SNAPSHOT) ) { + if( FD_LIKELY( gui->summary.startup_progress.phase>=FD_GUI_START_PROGRESS_TYPE_DOWNLOADING_INCREMENTAL_SNAPSHOT) ) { char peer_addr[ 64 ]; - FD_TEST( fd_cstr_printf_check( peer_addr, sizeof(peer_addr), NULL, FD_IP4_ADDR_FMT ":%u", FD_IP4_ADDR_FMT_ARGS(gui->summary.startup_incremental_snapshot_peer_ip_addr), gui->summary.startup_incremental_snapshot_peer_port ) ); + FD_TEST( fd_cstr_printf_check( peer_addr, sizeof(peer_addr), NULL, FD_IP4_ADDR_FMT ":%u", FD_IP4_ADDR_FMT_ARGS(gui->summary.startup_progress.startup_incremental_snapshot_peer_ip_addr), gui->summary.startup_progress.startup_incremental_snapshot_peer_port ) ); jsonp_string( gui, "downloading_incremental_snapshot_peer", peer_addr ); - jsonp_ulong( gui, "downloading_incremental_snapshot_slot", gui->summary.startup_incremental_snapshot_slot ); - jsonp_double( gui, "downloading_incremental_snapshot_elapsed_secs", gui->summary.startup_incremental_snapshot_elapsed_secs ); - jsonp_double( gui, "downloading_incremental_snapshot_remaining_secs", gui->summary.startup_incremental_snapshot_remaining_secs ); - jsonp_double( gui, "downloading_incremental_snapshot_throughput", gui->summary.startup_incremental_snapshot_throughput ); - jsonp_ulong( gui, "downloading_incremental_snapshot_total_bytes", gui->summary.startup_incremental_snapshot_total_bytes ); - jsonp_ulong( gui, "downloading_incremental_snapshot_current_bytes", gui->summary.startup_incremental_snapshot_current_bytes ); + jsonp_ulong( gui, "downloading_incremental_snapshot_slot", gui->summary.startup_progress.startup_incremental_snapshot_slot ); + jsonp_double( gui, "downloading_incremental_snapshot_elapsed_secs", gui->summary.startup_progress.startup_incremental_snapshot_elapsed_secs ); + jsonp_double( gui, "downloading_incremental_snapshot_remaining_secs", gui->summary.startup_progress.startup_incremental_snapshot_remaining_secs ); + jsonp_double( gui, "downloading_incremental_snapshot_throughput", gui->summary.startup_progress.startup_incremental_snapshot_throughput ); + jsonp_ulong( gui, "downloading_incremental_snapshot_total_bytes", gui->summary.startup_progress.startup_incremental_snapshot_total_bytes ); + jsonp_ulong( gui, "downloading_incremental_snapshot_current_bytes", gui->summary.startup_progress.startup_incremental_snapshot_current_bytes ); } else { jsonp_null( gui, "downloading_incremental_snapshot_peer" ); jsonp_null( gui, "downloading_incremental_snapshot_slot" ); @@ -383,17 +393,17 @@ fd_gui_printf_startup_progress( fd_gui_t * gui ) { jsonp_null( gui, "downloading_incremental_snapshot_current_bytes" ); } - if( FD_LIKELY( gui->summary.startup_progress>=FD_GUI_START_PROGRESS_TYPE_PROCESSING_LEDGER) ) { - jsonp_ulong( gui, "ledger_slot", gui->summary.startup_ledger_slot ); - jsonp_ulong( gui, "ledger_max_slot", gui->summary.startup_ledger_max_slot ); + if( FD_LIKELY( gui->summary.startup_progress.phase>=FD_GUI_START_PROGRESS_TYPE_PROCESSING_LEDGER) ) { + jsonp_ulong( gui, "ledger_slot", gui->summary.startup_progress.startup_ledger_slot ); + jsonp_ulong( gui, "ledger_max_slot", gui->summary.startup_progress.startup_ledger_max_slot ); } else { jsonp_null( gui, "ledger_slot" ); jsonp_null( gui, "ledger_max_slot" ); } - if( FD_LIKELY( gui->summary.startup_progress>=FD_GUI_START_PROGRESS_TYPE_WAITING_FOR_SUPERMAJORITY ) && gui->summary.startup_waiting_for_supermajority_slot!=ULONG_MAX ) { - jsonp_ulong( gui, "waiting_for_supermajority_slot", gui->summary.startup_waiting_for_supermajority_slot ); - jsonp_ulong( gui, "waiting_for_supermajority_stake_percent", gui->summary.startup_waiting_for_supermajority_stake_pct ); + if( FD_LIKELY( gui->summary.startup_progress.phase>=FD_GUI_START_PROGRESS_TYPE_WAITING_FOR_SUPERMAJORITY ) && gui->summary.startup_progress.startup_waiting_for_supermajority_slot!=ULONG_MAX ) { + jsonp_ulong( gui, "waiting_for_supermajority_slot", gui->summary.startup_progress.startup_waiting_for_supermajority_slot ); + jsonp_ulong( gui, "waiting_for_supermajority_stake_percent", gui->summary.startup_progress.startup_waiting_for_supermajority_stake_pct ); } else { jsonp_null( gui, "waiting_for_supermajority_slot" ); jsonp_null( gui, "waiting_for_supermajority_stake_percent" ); @@ -402,6 +412,167 @@ fd_gui_printf_startup_progress( fd_gui_t * gui ) { jsonp_close_envelope( gui ); } +void +fd_gui_printf_boot_progress( fd_gui_t * gui ) { + const double _ns_per_sec = 1000000000.0; + + jsonp_open_envelope( gui, "summary", "boot_progress" ); + jsonp_open_object( gui, "value" ); + switch( gui->summary.boot_progress.phase ) { + case FD_GUI_BOOT_PROGRESS_TYPE_JOINING_GOSSIP: + jsonp_string( gui, "phase", "joining_gossip" ); + jsonp_double( gui, "total_elapsed", (double)(gui->summary.boot_progress.joining_gossip_time_nanos - gui->summary.startup_time_nanos) / _ns_per_sec ); + break; + case FD_GUI_BOOT_PROGRESS_TYPE_LOADING_FULL_SNAPSHOT: + jsonp_string( gui, "phase", "loading_full_snapshot" ); + jsonp_double( gui, "total_elapsed", (double)(gui->summary.boot_progress.loading_snapshot[ FD_GUI_BOOT_PROGRESS_FULL_SNAPSHOT_IDX ].sample_time_nanos - gui->summary.startup_time_nanos) / _ns_per_sec ); + break; + case FD_GUI_BOOT_PROGRESS_TYPE_LOADING_INCREMENTAL_SNAPSHOT: + jsonp_string( gui, "phase", "loading_incr_snapshot" ); + jsonp_double( gui, "total_elapsed", (double)(gui->summary.boot_progress.loading_snapshot[ FD_GUI_BOOT_PROGRESS_INCREMENTAL_SNAPSHOT_IDX ].sample_time_nanos - gui->summary.startup_time_nanos) / _ns_per_sec ); + break; + case FD_GUI_BOOT_PROGRESS_TYPE_CATCHING_UP: + jsonp_string( gui, "phase", "catching_up" ); + jsonp_double( gui, "total_elapsed", (double)(gui->summary.boot_progress.catching_up_time_nanos - gui->summary.startup_time_nanos) / _ns_per_sec ); + break; + case FD_GUI_BOOT_PROGRESS_TYPE_RUNNING: + jsonp_string( gui, "phase", "running" ); + jsonp_double( gui, "total_elapsed", (double)(gui->summary.boot_progress.catching_up_time_nanos - gui->summary.startup_time_nanos) / _ns_per_sec ); + break; + default: + FD_LOG_ERR(( "unknown phase %d", gui->summary.startup_progress.phase )); + } + + jsonp_long( gui, "joining_gossip_elapsed", (gui->summary.boot_progress.joining_gossip_time_nanos - gui->summary.startup_time_nanos) / 1000000L ); + +#define HANDLE_SNAPSHOT_STATE(snapshot_type, snapshot_type_upper) \ + if( FD_LIKELY( gui->summary.boot_progress.phase>=FD_GUI_BOOT_PROGRESS_TYPE_LOADING_##snapshot_type_upper##_SNAPSHOT )) { \ + char peer_addr[ 64UL ]; \ + ulong snapshot_idx = FD_GUI_BOOT_PROGRESS_##snapshot_type_upper##_SNAPSHOT_IDX; \ + FD_TEST( fd_cstr_printf_check( peer_addr, sizeof(peer_addr), NULL, FD_IP4_ADDR_FMT ":%u", FD_IP4_ADDR_FMT_ARGS(gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].peer_addr), gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].peer_port ) ); \ + if( FD_LIKELY( gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].peer_addr!=0U ) ) jsonp_string( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_peer", peer_addr ); \ + else jsonp_null( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_peer" ); /* local snapshots don't have a peer */ \ + jsonp_double ( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_elapsed", (double)(gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].sample_time_nanos - gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].reset_time_nanos) / _ns_per_sec ); \ + jsonp_ulong ( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_reset_cnt", gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].reset_cnt ); \ + jsonp_ulong ( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_slot", gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].slot ); \ + jsonp_ulong_as_str( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_total_bytes", gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].total_bytes ); \ + jsonp_double ( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_read_elapsed", (double)gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].sample_time_nanos / _ns_per_sec ); \ + jsonp_ulong_as_str( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_read_bytes", gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].read_bytes ); \ + jsonp_double ( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_read_throughput", gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].read_throughput_ema * _ns_per_sec ); \ + jsonp_double ( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_read_remaining", (double)gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].read_remaining_nanos / _ns_per_sec ); \ + jsonp_string ( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_read_path", gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].read_path ); \ + jsonp_ulong_as_str( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_decompress_decompressed_bytes", gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].decompress_decompressed_bytes ); \ + jsonp_ulong_as_str( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_decompress_compressed_bytes", gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].decompress_compressed_bytes ); \ + jsonp_double ( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_decompress_throughput", gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].decompress_throughput_ema * _ns_per_sec ); \ + jsonp_double ( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_decompress_remaining", (double)gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].decompress_remaining_nanos / _ns_per_sec ); \ + jsonp_ulong_as_str( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_insert_bytes", gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].insert_bytes ); \ + jsonp_double ( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_insert_throughput", gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].insert_throughput_ema * _ns_per_sec ); \ + jsonp_double ( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_insert_remaining", (double)gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].insert_remaining_nanos / _ns_per_sec ); \ + jsonp_double ( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_insert_accounts_throughput", (double)gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].insert_accounts_throughput_ema * _ns_per_sec ); \ + jsonp_ulong ( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_insert_accounts_current", gui->summary.boot_progress.loading_snapshot[ snapshot_idx ].insert_accounts_current ); \ + } else { \ + jsonp_null( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_peer" ); \ + jsonp_null( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_elapsed" ); \ + jsonp_null( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_reset_cnt" ); \ + jsonp_null( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_slot" ); \ + jsonp_null( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_total_bytes" ); \ + jsonp_null( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_read_elapsed" ); \ + jsonp_null( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_read_bytes" ); \ + jsonp_null( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_read_throughput" ); \ + jsonp_null( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_read_remaining" ); \ + jsonp_null( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_read_path" ); \ + jsonp_null( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_decompress_decompressed_bytes" ); \ + jsonp_null( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_decompress_compressed_bytes" ); \ + jsonp_null( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_decompress_throughput" ); \ + jsonp_null( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_decompress_remaining" ); \ + jsonp_null( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_insert_bytes" ); \ + jsonp_null( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_insert_throughput" ); \ + jsonp_null( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_insert_remaining" ); \ + jsonp_null( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_insert_accounts_throughput" ); \ + jsonp_null( gui, "loading_" FD_STRINGIFY(snapshot_type) "_snapshot_insert_accounts_current" ); \ + } + + HANDLE_SNAPSHOT_STATE(full, FULL) + HANDLE_SNAPSHOT_STATE(incremental, INCREMENTAL) +#undef HANDLE_SNAPSHOT_STATE + + if( FD_LIKELY( gui->summary.boot_progress.phase>=FD_GUI_BOOT_PROGRESS_TYPE_CATCHING_UP )) { + jsonp_double( gui, "catching_up_elapsed", (double)(gui->summary.boot_progress.catching_up_time_nanos - gui->summary.boot_progress.loading_snapshot[ FD_GUI_BOOT_PROGRESS_INCREMENTAL_SNAPSHOT_IDX ].sample_time_nanos) / _ns_per_sec ); + jsonp_ulong ( gui, "catching_up_first_turbine_slot", gui->summary.boot_progress.catching_up_first_turbine_slot ); + jsonp_ulong ( gui, "catching_up_latest_turbine_slot", gui->summary.boot_progress.catching_up_latest_turbine_slot ); + jsonp_ulong ( gui, "catching_up_latest_repair_slot", gui->summary.boot_progress.catching_up_latest_repair_slot ); + jsonp_ulong ( gui, "catching_up_latest_replay_slot", gui->summary.boot_progress.catching_up_latest_replay_slot ); + } else { + jsonp_null( gui, "catching_up_elapsed" ); + jsonp_null( gui, "catching_up_first_turbine_slot" ); + jsonp_null( gui, "catching_up_latest_turbine_slot" ); + jsonp_null( gui, "catching_up_latest_repair_slot" ); + jsonp_null( gui, "catching_up_latest_replay_slot" ); + } + + jsonp_close_object( gui ); + jsonp_close_envelope( gui ); +} + +void +fd_gui_printf_gossip_network_stats( fd_gui_t * gui ) { + double _ns_per_sec = 1000000000.0; + jsonp_open_envelope( gui, "gossip", "network_stats" ); + jsonp_open_object( gui, "value" ); + + jsonp_open_object( gui, "health" ); + jsonp_ulong ( gui, "rx_push", (ulong)gui->summary.gossip_network_stats.health_rx_push_pct ); + jsonp_ulong ( gui, "duplicate", (ulong)gui->summary.gossip_network_stats.health_duplicate_pct ); + jsonp_ulong ( gui, "bad", (ulong)gui->summary.gossip_network_stats.health_bad_pct ); + jsonp_ulong ( gui, "pull_already_known", (ulong)gui->summary.gossip_network_stats.health_pull_alread_known_pct ); + jsonp_ulong_as_str( gui, "total_stake", (ulong)gui->summary.gossip_network_stats.health_total_stake ); + jsonp_ulong ( gui, "total_peers", (ulong)gui->summary.gossip_network_stats.health_total_peers ); + jsonp_ulong_as_str( gui, "connected_stake", (ulong)gui->summary.gossip_network_stats.health_connected_stake ); + jsonp_ulong ( gui, "connected_peers", (ulong)gui->summary.gossip_network_stats.health_connected_peers ); + jsonp_close_object( gui ); + + jsonp_open_object( gui, "ingress" ); + jsonp_double( gui, "total_throughput_ema", gui->summary.gossip_network_stats.ingress_total_throughput_ema * _ns_per_sec ); + jsonp_open_array( gui, "peer_names" ); + for( ulong i=0UL; isummary.gossip_network_stats.ingress_peer_names[ i ] ); + jsonp_close_array( gui ); + + jsonp_open_array( gui, "peer_throughputs_ema" ); + for( ulong i=0UL; isummary.gossip_network_stats.ingress_peer_throughputs_ema[ i ] * _ns_per_sec ); + jsonp_close_array( gui ); + jsonp_close_object( gui ); + + jsonp_open_object( gui, "egress" ); + jsonp_double( gui, "total_throughput_ema", gui->summary.gossip_network_stats.egress_total_throughput_ema * _ns_per_sec ); + jsonp_open_array( gui, "peer_names" ); + for( ulong i=0UL; isummary.gossip_network_stats.egress_peer_names[ i ] ); + jsonp_close_array( gui ); + + jsonp_open_array( gui, "peer_throughputs_ema" ); + for( ulong i=0UL; isummary.gossip_network_stats.egress_peer_throughputs_ema[ i ] * _ns_per_sec ); + jsonp_close_array( gui ); + jsonp_close_object( gui ); + + jsonp_open_object( gui, "storage" ); + jsonp_ulong( gui, "total_bytes", gui->summary.gossip_network_stats.storage_total_bytes ); + jsonp_open_array( gui, "peer_names" ); + for( ulong i=0UL; isummary.gossip_network_stats.egress_peer_names[ i ] ); + jsonp_close_array( gui ); + + jsonp_open_array( gui, "peer_bytes" ); + for( ulong i=0UL; isummary.gossip_network_stats.storage_peer_bytes[ i ] ); + jsonp_close_array( gui ); + jsonp_close_object( gui ); + jsonp_close_object( gui ); + jsonp_close_envelope( gui ); +} + void fd_gui_printf_block_engine( fd_gui_t * gui ) { jsonp_open_envelope( gui, "block_engine", "update" ); diff --git a/src/disco/gui/fd_gui_printf.h b/src/disco/gui/fd_gui_printf.h index b0b177c936..e2b4dd161a 100644 --- a/src/disco/gui/fd_gui_printf.h +++ b/src/disco/gui/fd_gui_printf.h @@ -7,6 +7,7 @@ messages into the GUI outgoing message buffer, where they can be sent to a specific WebSocket client, or broadcast out to all clients. */ +void fd_gui_printf_client( fd_gui_t * gui ); void fd_gui_printf_version( fd_gui_t * gui ); void fd_gui_printf_cluster( fd_gui_t * gui ); void fd_gui_printf_commit_hash( fd_gui_t * gui ); @@ -18,6 +19,8 @@ void fd_gui_printf_vote_distance( fd_gui_t * gui ); void fd_gui_printf_skipped_history( fd_gui_t * gui ); void fd_gui_printf_tps_history( fd_gui_t * gui ); void fd_gui_printf_startup_progress( fd_gui_t * gui ); +void fd_gui_printf_boot_progress( fd_gui_t * gui ); +void fd_gui_printf_gossip_network_stats( fd_gui_t * gui ); void fd_gui_printf_block_engine( fd_gui_t * gui ); void fd_gui_printf_tiles( fd_gui_t * gui ); void fd_gui_printf_schedule_strategy( fd_gui_t * gui ); diff --git a/src/disco/gui/fd_gui_tile.c b/src/disco/gui/fd_gui_tile.c index a3a7d69ccf..0c0ce039cc 100644 --- a/src/disco/gui/fd_gui_tile.c +++ b/src/disco/gui/fd_gui_tile.c @@ -14,10 +14,15 @@ #include "generated/fd_gui_tile_seccomp.h" -extern ulong const fdctl_major_version; -extern ulong const fdctl_minor_version; -extern ulong const fdctl_patch_version; -extern uint const fdctl_commit_ref; +#ifdef __has_include +#if __has_include("../../app/firedancer/version.h") +#include "../../app/firedancer/version.h" +#endif +#endif + +#ifndef FDCTL_MAJOR_VERSION +#define FDCTL_MAJOR_VERSION 0 +#endif #include "../../disco/tiles.h" #include "../../disco/keyguard/fd_keyload.h" @@ -501,7 +506,7 @@ unprivileged_init( fd_topo_t * topo, FD_TEST( fd_cstr_printf_check( ctx->version_string, sizeof( ctx->version_string ), NULL, "%s", fdctl_version_string ) ); ctx->topo = topo; - ctx->gui = fd_gui_join( fd_gui_new( _gui, ctx->gui_server, ctx->version_string, tile->gui.cluster, ctx->identity_key, ctx->has_vote_key, ctx->vote_key->uc, tile->gui.is_voting, tile->gui.schedule_strategy, ctx->topo ) ); + ctx->gui = fd_gui_join( fd_gui_new( _gui, ctx->gui_server, ctx->version_string, tile->gui.cluster, ctx->identity_key, ctx->has_vote_key, ctx->vote_key->uc, /* is_full_client */ FIREDANCER_MAJOR_VERSION >= 1,tile->gui.is_voting, tile->gui.schedule_strategy, ctx->topo ) ); FD_TEST( ctx->gui ); ctx->keyswitch = fd_keyswitch_join( fd_topo_obj_laddr( topo, tile->keyswitch_obj_id ) ); diff --git a/src/disco/metrics/generated/fd_metrics_all.c b/src/disco/metrics/generated/fd_metrics_all.c index 94fd4d5b7d..26cf7dee40 100644 --- a/src/disco/metrics/generated/fd_metrics_all.c +++ b/src/disco/metrics/generated/fd_metrics_all.c @@ -50,7 +50,6 @@ const char * FD_METRICS_TILE_KIND_NAMES[FD_METRICS_TILE_KIND_CNT] = { "sign", "metric", "replay", - "storei", "gossip", "netlnk", "sock", @@ -76,7 +75,6 @@ const ulong FD_METRICS_TILE_KIND_SIZES[FD_METRICS_TILE_KIND_CNT] = { FD_METRICS_SIGN_TOTAL, FD_METRICS_METRIC_TOTAL, FD_METRICS_REPLAY_TOTAL, - FD_METRICS_STOREI_TOTAL, FD_METRICS_GOSSIP_TOTAL, FD_METRICS_NETLNK_TOTAL, FD_METRICS_SOCK_TOTAL, @@ -101,7 +99,6 @@ const fd_metrics_meta_t * FD_METRICS_TILE_KIND_METRICS[FD_METRICS_TILE_KIND_CNT] FD_METRICS_SIGN, FD_METRICS_METRIC, FD_METRICS_REPLAY, - FD_METRICS_STOREI, FD_METRICS_GOSSIP, FD_METRICS_NETLNK, FD_METRICS_SOCK, diff --git a/src/disco/metrics/generated/fd_metrics_all.h b/src/disco/metrics/generated/fd_metrics_all.h index 3da99e2e15..5366298bdb 100644 --- a/src/disco/metrics/generated/fd_metrics_all.h +++ b/src/disco/metrics/generated/fd_metrics_all.h @@ -16,7 +16,6 @@ #include "fd_metrics_shred.h" #include "fd_metrics_store.h" #include "fd_metrics_replay.h" -#include "fd_metrics_storei.h" #include "fd_metrics_repair.h" #include "fd_metrics_gossip.h" #include "fd_metrics_sign.h" @@ -161,7 +160,7 @@ extern const fd_metrics_meta_t FD_METRICS_ALL_LINK_OUT[FD_METRICS_ALL_LINK_OUT_T #define FD_METRICS_TOTAL_SZ (8UL*253UL) -#define FD_METRICS_TILE_KIND_CNT 23 +#define FD_METRICS_TILE_KIND_CNT 22 extern const char * FD_METRICS_TILE_KIND_NAMES[FD_METRICS_TILE_KIND_CNT]; extern const ulong FD_METRICS_TILE_KIND_SIZES[FD_METRICS_TILE_KIND_CNT]; extern const fd_metrics_meta_t * FD_METRICS_TILE_KIND_METRICS[FD_METRICS_TILE_KIND_CNT]; diff --git a/src/disco/metrics/generated/fd_metrics_repair.c b/src/disco/metrics/generated/fd_metrics_repair.c index 6a51cf710d..2dd615e0d5 100644 --- a/src/disco/metrics/generated/fd_metrics_repair.c +++ b/src/disco/metrics/generated/fd_metrics_repair.c @@ -2,6 +2,9 @@ #include "fd_metrics_repair.h" const fd_metrics_meta_t FD_METRICS_REPAIR[FD_METRICS_REPAIR_TOTAL] = { + DECLARE_METRIC( REPAIR_FIRST_TURBINE_SLOT, GAUGE ), + DECLARE_METRIC( REPAIR_LATEST_TURBINE_SLOT, GAUGE ), + DECLARE_METRIC( REPAIR_LATEST_REPAIR_SLOT, GAUGE ), DECLARE_METRIC( REPAIR_RECV_CLNT_PKT, COUNTER ), DECLARE_METRIC( REPAIR_RECV_SERV_PKT, COUNTER ), DECLARE_METRIC( REPAIR_RECV_SERV_CORRUPT_PKT, COUNTER ), diff --git a/src/disco/metrics/generated/fd_metrics_repair.h b/src/disco/metrics/generated/fd_metrics_repair.h index da05dbf0bd..9cd5a037cd 100644 --- a/src/disco/metrics/generated/fd_metrics_repair.h +++ b/src/disco/metrics/generated/fd_metrics_repair.h @@ -3,73 +3,91 @@ #include "../fd_metrics_base.h" #include "fd_metrics_enums.h" -#define FD_METRICS_COUNTER_REPAIR_RECV_CLNT_PKT_OFF (16UL) +#define FD_METRICS_GAUGE_REPAIR_FIRST_TURBINE_SLOT_OFF (16UL) +#define FD_METRICS_GAUGE_REPAIR_FIRST_TURBINE_SLOT_NAME "repair_first_turbine_slot" +#define FD_METRICS_GAUGE_REPAIR_FIRST_TURBINE_SLOT_TYPE (FD_METRICS_TYPE_GAUGE) +#define FD_METRICS_GAUGE_REPAIR_FIRST_TURBINE_SLOT_DESC "" +#define FD_METRICS_GAUGE_REPAIR_FIRST_TURBINE_SLOT_CVT (FD_METRICS_CONVERTER_NONE) + +#define FD_METRICS_GAUGE_REPAIR_LATEST_TURBINE_SLOT_OFF (17UL) +#define FD_METRICS_GAUGE_REPAIR_LATEST_TURBINE_SLOT_NAME "repair_latest_turbine_slot" +#define FD_METRICS_GAUGE_REPAIR_LATEST_TURBINE_SLOT_TYPE (FD_METRICS_TYPE_GAUGE) +#define FD_METRICS_GAUGE_REPAIR_LATEST_TURBINE_SLOT_DESC "" +#define FD_METRICS_GAUGE_REPAIR_LATEST_TURBINE_SLOT_CVT (FD_METRICS_CONVERTER_NONE) + +#define FD_METRICS_GAUGE_REPAIR_LATEST_REPAIR_SLOT_OFF (18UL) +#define FD_METRICS_GAUGE_REPAIR_LATEST_REPAIR_SLOT_NAME "repair_latest_repair_slot" +#define FD_METRICS_GAUGE_REPAIR_LATEST_REPAIR_SLOT_TYPE (FD_METRICS_TYPE_GAUGE) +#define FD_METRICS_GAUGE_REPAIR_LATEST_REPAIR_SLOT_DESC "" +#define FD_METRICS_GAUGE_REPAIR_LATEST_REPAIR_SLOT_CVT (FD_METRICS_CONVERTER_NONE) + +#define FD_METRICS_COUNTER_REPAIR_RECV_CLNT_PKT_OFF (19UL) #define FD_METRICS_COUNTER_REPAIR_RECV_CLNT_PKT_NAME "repair_recv_clnt_pkt" #define FD_METRICS_COUNTER_REPAIR_RECV_CLNT_PKT_TYPE (FD_METRICS_TYPE_COUNTER) #define FD_METRICS_COUNTER_REPAIR_RECV_CLNT_PKT_DESC "Now many client packets have we received" #define FD_METRICS_COUNTER_REPAIR_RECV_CLNT_PKT_CVT (FD_METRICS_CONVERTER_NONE) -#define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_OFF (17UL) +#define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_OFF (20UL) #define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_NAME "repair_recv_serv_pkt" #define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_TYPE (FD_METRICS_TYPE_COUNTER) #define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_DESC "How many server packets have we received" #define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_CVT (FD_METRICS_CONVERTER_NONE) -#define FD_METRICS_COUNTER_REPAIR_RECV_SERV_CORRUPT_PKT_OFF (18UL) +#define FD_METRICS_COUNTER_REPAIR_RECV_SERV_CORRUPT_PKT_OFF (21UL) #define FD_METRICS_COUNTER_REPAIR_RECV_SERV_CORRUPT_PKT_NAME "repair_recv_serv_corrupt_pkt" #define FD_METRICS_COUNTER_REPAIR_RECV_SERV_CORRUPT_PKT_TYPE (FD_METRICS_TYPE_COUNTER) #define FD_METRICS_COUNTER_REPAIR_RECV_SERV_CORRUPT_PKT_DESC "How many corrupt server packets have we received" #define FD_METRICS_COUNTER_REPAIR_RECV_SERV_CORRUPT_PKT_CVT (FD_METRICS_CONVERTER_NONE) -#define FD_METRICS_COUNTER_REPAIR_RECV_SERV_INVALID_SIGNATURE_OFF (19UL) +#define FD_METRICS_COUNTER_REPAIR_RECV_SERV_INVALID_SIGNATURE_OFF (22UL) #define FD_METRICS_COUNTER_REPAIR_RECV_SERV_INVALID_SIGNATURE_NAME "repair_recv_serv_invalid_signature" #define FD_METRICS_COUNTER_REPAIR_RECV_SERV_INVALID_SIGNATURE_TYPE (FD_METRICS_TYPE_COUNTER) #define FD_METRICS_COUNTER_REPAIR_RECV_SERV_INVALID_SIGNATURE_DESC "How many invalid signatures have we received" #define FD_METRICS_COUNTER_REPAIR_RECV_SERV_INVALID_SIGNATURE_CVT (FD_METRICS_CONVERTER_NONE) -#define FD_METRICS_COUNTER_REPAIR_RECV_SERV_FULL_PING_TABLE_OFF (20UL) +#define FD_METRICS_COUNTER_REPAIR_RECV_SERV_FULL_PING_TABLE_OFF (23UL) #define FD_METRICS_COUNTER_REPAIR_RECV_SERV_FULL_PING_TABLE_NAME "repair_recv_serv_full_ping_table" #define FD_METRICS_COUNTER_REPAIR_RECV_SERV_FULL_PING_TABLE_TYPE (FD_METRICS_TYPE_COUNTER) #define FD_METRICS_COUNTER_REPAIR_RECV_SERV_FULL_PING_TABLE_DESC "Is our ping table full and causing packet drops" #define FD_METRICS_COUNTER_REPAIR_RECV_SERV_FULL_PING_TABLE_CVT (FD_METRICS_CONVERTER_NONE) -#define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_TYPES_OFF (21UL) +#define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_TYPES_OFF (24UL) #define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_TYPES_NAME "repair_recv_serv_pkt_types" #define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_TYPES_TYPE (FD_METRICS_TYPE_COUNTER) #define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_TYPES_DESC "Server messages received" #define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_TYPES_CVT (FD_METRICS_CONVERTER_NONE) #define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_TYPES_CNT (5UL) -#define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_TYPES_PONG_OFF (21UL) -#define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_TYPES_WINDOW_OFF (22UL) -#define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_TYPES_HIGHEST_WINDOW_OFF (23UL) -#define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_TYPES_ORPHAN_OFF (24UL) -#define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_TYPES_UNKNOWN_OFF (25UL) +#define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_TYPES_PONG_OFF (24UL) +#define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_TYPES_WINDOW_OFF (25UL) +#define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_TYPES_HIGHEST_WINDOW_OFF (26UL) +#define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_TYPES_ORPHAN_OFF (27UL) +#define FD_METRICS_COUNTER_REPAIR_RECV_SERV_PKT_TYPES_UNKNOWN_OFF (28UL) -#define FD_METRICS_COUNTER_REPAIR_RECV_PKT_CORRUPTED_MSG_OFF (26UL) +#define FD_METRICS_COUNTER_REPAIR_RECV_PKT_CORRUPTED_MSG_OFF (29UL) #define FD_METRICS_COUNTER_REPAIR_RECV_PKT_CORRUPTED_MSG_NAME "repair_recv_pkt_corrupted_msg" #define FD_METRICS_COUNTER_REPAIR_RECV_PKT_CORRUPTED_MSG_TYPE (FD_METRICS_TYPE_COUNTER) #define FD_METRICS_COUNTER_REPAIR_RECV_PKT_CORRUPTED_MSG_DESC "How many corrupt messages have we received" #define FD_METRICS_COUNTER_REPAIR_RECV_PKT_CORRUPTED_MSG_CVT (FD_METRICS_CONVERTER_NONE) -#define FD_METRICS_COUNTER_REPAIR_SEND_PKT_CNT_OFF (27UL) +#define FD_METRICS_COUNTER_REPAIR_SEND_PKT_CNT_OFF (30UL) #define FD_METRICS_COUNTER_REPAIR_SEND_PKT_CNT_NAME "repair_send_pkt_cnt" #define FD_METRICS_COUNTER_REPAIR_SEND_PKT_CNT_TYPE (FD_METRICS_TYPE_COUNTER) #define FD_METRICS_COUNTER_REPAIR_SEND_PKT_CNT_DESC "How many packets have sent" #define FD_METRICS_COUNTER_REPAIR_SEND_PKT_CNT_CVT (FD_METRICS_CONVERTER_NONE) -#define FD_METRICS_COUNTER_REPAIR_SENT_PKT_TYPES_OFF (28UL) +#define FD_METRICS_COUNTER_REPAIR_SENT_PKT_TYPES_OFF (31UL) #define FD_METRICS_COUNTER_REPAIR_SENT_PKT_TYPES_NAME "repair_sent_pkt_types" #define FD_METRICS_COUNTER_REPAIR_SENT_PKT_TYPES_TYPE (FD_METRICS_TYPE_COUNTER) #define FD_METRICS_COUNTER_REPAIR_SENT_PKT_TYPES_DESC "What types of client messages are we sending" #define FD_METRICS_COUNTER_REPAIR_SENT_PKT_TYPES_CVT (FD_METRICS_CONVERTER_NONE) #define FD_METRICS_COUNTER_REPAIR_SENT_PKT_TYPES_CNT (3UL) -#define FD_METRICS_COUNTER_REPAIR_SENT_PKT_TYPES_NEEDED_WINDOW_OFF (28UL) -#define FD_METRICS_COUNTER_REPAIR_SENT_PKT_TYPES_NEEDED_HIGHEST_WINDOW_OFF (29UL) -#define FD_METRICS_COUNTER_REPAIR_SENT_PKT_TYPES_NEEDED_ORPHAN_OFF (30UL) +#define FD_METRICS_COUNTER_REPAIR_SENT_PKT_TYPES_NEEDED_WINDOW_OFF (31UL) +#define FD_METRICS_COUNTER_REPAIR_SENT_PKT_TYPES_NEEDED_HIGHEST_WINDOW_OFF (32UL) +#define FD_METRICS_COUNTER_REPAIR_SENT_PKT_TYPES_NEEDED_ORPHAN_OFF (33UL) -#define FD_METRICS_HISTOGRAM_REPAIR_STORE_LINK_WAIT_OFF (31UL) +#define FD_METRICS_HISTOGRAM_REPAIR_STORE_LINK_WAIT_OFF (34UL) #define FD_METRICS_HISTOGRAM_REPAIR_STORE_LINK_WAIT_NAME "repair_store_link_wait" #define FD_METRICS_HISTOGRAM_REPAIR_STORE_LINK_WAIT_TYPE (FD_METRICS_TYPE_HISTOGRAM) #define FD_METRICS_HISTOGRAM_REPAIR_STORE_LINK_WAIT_DESC "Time in seconds spent waiting for the store to link a new FEC set" @@ -77,7 +95,7 @@ #define FD_METRICS_HISTOGRAM_REPAIR_STORE_LINK_WAIT_MIN (1e-08) #define FD_METRICS_HISTOGRAM_REPAIR_STORE_LINK_WAIT_MAX (0.0005) -#define FD_METRICS_HISTOGRAM_REPAIR_STORE_LINK_WORK_OFF (48UL) +#define FD_METRICS_HISTOGRAM_REPAIR_STORE_LINK_WORK_OFF (51UL) #define FD_METRICS_HISTOGRAM_REPAIR_STORE_LINK_WORK_NAME "repair_store_link_work" #define FD_METRICS_HISTOGRAM_REPAIR_STORE_LINK_WORK_TYPE (FD_METRICS_TYPE_HISTOGRAM) #define FD_METRICS_HISTOGRAM_REPAIR_STORE_LINK_WORK_DESC "Time in seconds spent on linking a new FEC set" @@ -85,7 +103,7 @@ #define FD_METRICS_HISTOGRAM_REPAIR_STORE_LINK_WORK_MIN (1e-08) #define FD_METRICS_HISTOGRAM_REPAIR_STORE_LINK_WORK_MAX (0.0005) -#define FD_METRICS_HISTOGRAM_REPAIR_SIGN_DURATION_SECONDS_OFF (65UL) +#define FD_METRICS_HISTOGRAM_REPAIR_SIGN_DURATION_SECONDS_OFF (68UL) #define FD_METRICS_HISTOGRAM_REPAIR_SIGN_DURATION_SECONDS_NAME "repair_sign_duration_seconds" #define FD_METRICS_HISTOGRAM_REPAIR_SIGN_DURATION_SECONDS_TYPE (FD_METRICS_TYPE_HISTOGRAM) #define FD_METRICS_HISTOGRAM_REPAIR_SIGN_DURATION_SECONDS_DESC "Duration of signing a message" @@ -93,5 +111,5 @@ #define FD_METRICS_HISTOGRAM_REPAIR_SIGN_DURATION_SECONDS_MIN (1e-08) #define FD_METRICS_HISTOGRAM_REPAIR_SIGN_DURATION_SECONDS_MAX (0.001) -#define FD_METRICS_REPAIR_TOTAL (18UL) +#define FD_METRICS_REPAIR_TOTAL (21UL) extern const fd_metrics_meta_t FD_METRICS_REPAIR[FD_METRICS_REPAIR_TOTAL]; diff --git a/src/disco/metrics/metrics.xml b/src/disco/metrics/metrics.xml index 77c146bfc1..10045efe57 100644 --- a/src/disco/metrics/metrics.xml +++ b/src/disco/metrics/metrics.xml @@ -680,10 +680,6 @@ metric introduced. Time in seconds spent on publishing a new FEC set - - - - @@ -791,6 +787,9 @@ metric introduced. + + + diff --git a/src/disco/plugin/fd_plugin.h b/src/disco/plugin/fd_plugin.h index 793cfa0d91..cab7c577f7 100644 --- a/src/disco/plugin/fd_plugin.h +++ b/src/disco/plugin/fd_plugin.h @@ -107,4 +107,16 @@ typedef struct { int status; } fd_plugin_msg_block_engine_update_t; +#define FD_PLUGIN_MSG_SNAPSHOT_UPDATE (15UL) + +#define FD_PLUGIN_MSG_SNAPSHOT_TYPE_FULL (0) +#define FD_PLUGIN_MSG_SNAPSHOT_TYPE_INCREMENTAL (1) +typedef struct __attribute__((packed)) { + int type; + ulong slot; + uint peer_addr; + ushort peer_port; + char read_path[ PATH_MAX ]; +} fd_restore_snapshot_update_t; + #endif /* HEADER_fd_src_disco_plugin_fd_plugin_h */ diff --git a/src/disco/plugin/fd_plugin_tile.c b/src/disco/plugin/fd_plugin_tile.c index e4ff3471e7..83ba749d93 100644 --- a/src/disco/plugin/fd_plugin_tile.c +++ b/src/disco/plugin/fd_plugin_tile.c @@ -13,6 +13,7 @@ #define IN_KIND_VOTEL (6) #define IN_KIND_BUNDLE (7) #define IN_KIND_VALCFG (8) +#define IN_KIND_SNAPRD (9) typedef struct { fd_wksp_t * mem; @@ -130,6 +131,10 @@ after_frag( fd_plugin_ctx_t * ctx, FD_TEST( sig==FD_PLUGIN_MSG_VALIDATOR_INFO ); break; } + case IN_KIND_SNAPRD: { + FD_TEST( sig==FD_PLUGIN_MSG_SNAPSHOT_UPDATE ); + break; + } default: FD_LOG_ERR(( "bad in_idx" )); } @@ -161,11 +166,11 @@ unprivileged_init( fd_topo_t * topo, else if( !strcmp( link->name, "gossip_plugi" ) ) ctx->in_kind[ i ] = IN_KIND_GOSSIP; else if( !strcmp( link->name, "stake_out" ) ) ctx->in_kind[ i ] = IN_KIND_STAKE; else if( !strcmp( link->name, "poh_plugin" ) ) ctx->in_kind[ i ] = IN_KIND_POH; - else if( !strcmp( link->name, "votes_plugin" ) ) ctx->in_kind[ i ] = IN_KIND_VOTE; else if( !strcmp( link->name, "startp_plugi" ) ) ctx->in_kind[ i ] = IN_KIND_STARTP; else if( !strcmp( link->name, "votel_plugin" ) ) ctx->in_kind[ i ] = IN_KIND_VOTEL; else if( !strcmp( link->name, "bundle_plugi" ) ) ctx->in_kind[ i ] = IN_KIND_BUNDLE; else if( !strcmp( link->name, "valcfg_plugi" ) ) ctx->in_kind[ i ] = IN_KIND_VALCFG; + else if( !strcmp( link->name, "snaprd_plugi" ) ) ctx->in_kind[ i ] = IN_KIND_SNAPRD; else FD_LOG_ERR(( "unexpected link name %s", link->name )); } diff --git a/src/discof/repair/fd_repair_tile.c b/src/discof/repair/fd_repair_tile.c index 0572d3b8fb..b6365287d2 100644 --- a/src/discof/repair/fd_repair_tile.c +++ b/src/discof/repair/fd_repair_tile.c @@ -104,6 +104,7 @@ struct fd_repair_tile_ctx { ulong * turbine_slot0; ulong * turbine_slot; + ulong latest_repair_slot; /* slot of most recent repair request, used as a gauge metric */ uchar identity_private_key[ 32 ]; fd_pubkey_t identity_public_key; @@ -412,6 +413,7 @@ fd_repair_send_requests( fd_repair_tile_ctx_t * ctx, fd_repair_send_request( ctx, stem, glob, type, slot, shred_index, id, now ); if( FD_UNLIKELY( glob->peer_idx >= glob->peer_cnt ) ) glob->peer_idx = 0; /* wrap around */ } + ctx->latest_repair_slot = slot; } @@ -1061,6 +1063,10 @@ static inline void metrics_write( fd_repair_tile_ctx_t * ctx ) { /* Repair-protocol-specific metrics */ fd_repair_metrics_t * metrics = fd_repair_get_metrics( ctx->repair ); + FD_MGAUGE_SET( REPAIR, FIRST_TURBINE_SLOT, fd_fseq_query( ctx->turbine_slot0 ) ); + FD_MGAUGE_SET( REPAIR, LATEST_TURBINE_SLOT, fd_fseq_query( ctx->turbine_slot ) ); + FD_MGAUGE_SET( REPAIR, LATEST_REPAIR_SLOT, ctx->latest_repair_slot ); + FD_MCNT_SET( REPAIR, RECV_CLNT_PKT, metrics->recv_clnt_pkt ); FD_MCNT_SET( REPAIR, RECV_SERV_PKT, metrics->recv_serv_pkt ); FD_MCNT_SET( REPAIR, RECV_SERV_CORRUPT_PKT, metrics->recv_serv_corrupt_pkt ); diff --git a/src/discof/replay/fd_replay_tile.c b/src/discof/replay/fd_replay_tile.c index 49947e9be7..843aea83eb 100644 --- a/src/discof/replay/fd_replay_tile.c +++ b/src/discof/replay/fd_replay_tile.c @@ -192,7 +192,6 @@ struct fd_replay_tile_ctx { // Inputs to plugin/gui fd_replay_out_link_t plugin_out[1]; - fd_replay_out_link_t votes_plugin_out[1]; long last_plugin_push_time; int tx_metadata_storage; @@ -1070,103 +1069,6 @@ init_poh( fd_replay_tile_ctx_t * ctx ) { ctx->poh_init_done = 1; } -static void -publish_votes_to_plugin( fd_replay_tile_ctx_t * ctx, - fd_stem_context_t * stem ) { - uchar * dst = (uchar *)fd_chunk_to_laddr( ctx->votes_plugin_out->mem, ctx->votes_plugin_out->chunk ); - - ulong bank_slot = fd_bank_slot_get( ctx->slot_ctx->bank ); - fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &bank_slot, NULL, ctx->forks->pool ); - if( FD_UNLIKELY ( !fork ) ) return; - - fd_vote_accounts_global_t const * epoch_stakes = fd_bank_epoch_stakes_locking_query( ctx->slot_ctx->bank ); - fd_vote_accounts_pair_global_t_mapnode_t * epoch_stakes_pool = fd_vote_accounts_vote_accounts_pool_join( epoch_stakes ); - fd_vote_accounts_pair_global_t_mapnode_t * epoch_stakes_root = fd_vote_accounts_vote_accounts_root_join( epoch_stakes ); - - ulong i = 0; - FD_SPAD_FRAME_BEGIN( ctx->runtime_spad ) { - for( fd_vote_accounts_pair_global_t_mapnode_t const * n = fd_vote_accounts_pair_global_t_map_minimum_const( epoch_stakes_pool, epoch_stakes_root ); - n && i < FD_CLUSTER_NODE_CNT; - n = fd_vote_accounts_pair_global_t_map_successor_const( epoch_stakes_pool, n ) ) { - if( n->elem.stake == 0 ) continue; - - uchar * data = (uchar *)&n->elem.value + n->elem.value.data_offset; - ulong data_len = n->elem.value.data_len; - - int err; - fd_vote_state_versioned_t * vsv = fd_bincode_decode_spad( - vote_state_versioned, ctx->runtime_spad, - data, - data_len, - &err ); - if( FD_UNLIKELY( err ) ) { - FD_LOG_ERR(( "Unexpected failure in decoding vote state %d", err )); - } - - fd_pubkey_t node_pubkey; - ulong commission; - ulong epoch_credits; - fd_vote_epoch_credits_t const * _epoch_credits; - ulong root_slot; - - switch( vsv->discriminant ) { - case fd_vote_state_versioned_enum_v0_23_5: - node_pubkey = vsv->inner.v0_23_5.node_pubkey; - commission = vsv->inner.v0_23_5.commission; - _epoch_credits = deq_fd_vote_epoch_credits_t_cnt( vsv->inner.v0_23_5.epoch_credits ) == 0 ? NULL : deq_fd_vote_epoch_credits_t_peek_tail_const( vsv->inner.v0_23_5.epoch_credits ); - epoch_credits = _epoch_credits==NULL ? 0UL : _epoch_credits->credits - _epoch_credits->prev_credits; - root_slot = vsv->inner.v0_23_5.root_slot; - break; - case fd_vote_state_versioned_enum_v1_14_11: - node_pubkey = vsv->inner.v1_14_11.node_pubkey; - commission = vsv->inner.v1_14_11.commission; - _epoch_credits = deq_fd_vote_epoch_credits_t_cnt( vsv->inner.v1_14_11.epoch_credits ) == 0 ? NULL : deq_fd_vote_epoch_credits_t_peek_tail_const( vsv->inner.v1_14_11.epoch_credits ); - epoch_credits = _epoch_credits==NULL ? 0UL : _epoch_credits->credits - _epoch_credits->prev_credits; - root_slot = vsv->inner.v1_14_11.root_slot; - break; - case fd_vote_state_versioned_enum_current: - node_pubkey = vsv->inner.current.node_pubkey; - commission = vsv->inner.current.commission; - _epoch_credits = deq_fd_vote_epoch_credits_t_cnt( vsv->inner.current.epoch_credits ) == 0 ? NULL : deq_fd_vote_epoch_credits_t_peek_tail_const( vsv->inner.current.epoch_credits ); - epoch_credits = _epoch_credits==NULL ? 0UL : _epoch_credits->credits - _epoch_credits->prev_credits; - root_slot = vsv->inner.v0_23_5.root_slot; - break; - default: - __builtin_unreachable(); - } - - fd_clock_timestamp_vote_t_mapnode_t query; - memcpy( query.elem.pubkey.uc, n->elem.key.uc, 32UL ); - fd_clock_timestamp_votes_global_t const * clock_timestamp_votes = fd_bank_clock_timestamp_votes_locking_query( ctx->slot_ctx->bank ); - fd_clock_timestamp_vote_t_mapnode_t * timestamp_votes_root = fd_clock_timestamp_votes_votes_root_join( clock_timestamp_votes ); - fd_clock_timestamp_vote_t_mapnode_t * timestamp_votes_pool = fd_clock_timestamp_votes_votes_pool_join( clock_timestamp_votes ); - - fd_clock_timestamp_vote_t_mapnode_t * res = fd_clock_timestamp_vote_t_map_find( timestamp_votes_pool, timestamp_votes_root, &query ); - - fd_vote_update_msg_t * msg = (fd_vote_update_msg_t *)(dst + sizeof(ulong) + i*112U); - memset( msg, 0, 112U ); - memcpy( msg->vote_pubkey, n->elem.key.uc, sizeof(fd_pubkey_t) ); - memcpy( msg->node_pubkey, node_pubkey.uc, sizeof(fd_pubkey_t) ); - msg->activated_stake = n->elem.stake; - msg->last_vote = res == NULL ? 0UL : res->elem.slot; - msg->root_slot = root_slot; - msg->epoch_credits = epoch_credits; - msg->commission = (uchar)commission; - msg->is_delinquent = (uchar)fd_int_if(fd_bank_slot_get( ctx->slot_ctx->bank ) >= 128UL, msg->last_vote <= fd_bank_slot_get( ctx->slot_ctx->bank ) - 128UL, msg->last_vote == 0); - ++i; - fd_bank_clock_timestamp_votes_end_locking_query( ctx->slot_ctx->bank ); - } - } FD_SPAD_FRAME_END; - - fd_bank_epoch_stakes_end_locking_query( ctx->slot_ctx->bank ); - - *(ulong *)dst = i; - - ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() ); - fd_stem_publish( stem, ctx->votes_plugin_out->idx, FD_PLUGIN_MSG_VOTE_ACCOUNT_UPDATE, ctx->votes_plugin_out->chunk, 0, 0UL, 0UL, tspub ); - ctx->votes_plugin_out->chunk = fd_dcache_compact_next( ctx->votes_plugin_out->chunk, 8UL + 40200UL*(58UL+12UL*34UL), ctx->votes_plugin_out->chunk0, ctx->votes_plugin_out->wmark ); -} - static void handle_writer_state_updates( fd_replay_tile_ctx_t * ctx ) { @@ -1394,6 +1296,7 @@ handle_slot_change( fd_replay_tile_ctx_t * ctx, } ulong turbine_slot = fd_fseq_query( ctx->turbine_slot ); + ctx->metrics.slot = slot; FD_LOG_NOTICE(( "\n\n[Distance]\n" "slot: %lu\n" "current turbine: %lu\n" @@ -1681,19 +1584,19 @@ after_credit( fd_replay_tile_ctx_t * ctx, int * opt_poll_in FD_PARAM_UNUSED, int * charge_busy FD_PARAM_UNUSED ) { - if( !ctx->snapshot_init_done ) { - if( ctx->plugin_out->mem ) { - uchar msg[56]; - fd_memset( msg, 0, sizeof(msg) ); - msg[ 0 ] = 0; // ValidatorStartProgress::Initializing - replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_START_PROGRESS, msg, sizeof(msg) ); - } - - if( strlen( ctx->genesis )>0 ) { + if( FD_UNLIKELY( !ctx->snapshot_init_done ) ) { + if( FD_LIKELY( strlen( ctx->genesis )>0 ) ) { init_from_genesis( ctx, stem ); + + if( ctx->plugin_out->mem ) { + uchar msg[56]; + fd_memset( msg, 0, sizeof(msg) ); + msg[ 0 ] = 0; // ValidatorStartProgress::Initializing + replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_START_PROGRESS, msg, sizeof(msg) ); + } + ctx->snapshot_init_done = 1; } - return; } @@ -1704,12 +1607,6 @@ after_credit( fd_replay_tile_ctx_t * ctx, exec_and_handle_slice( ctx, stem ); - long now = fd_log_wallclock(); - if( ctx->votes_plugin_out->mem && FD_UNLIKELY( ( now - ctx->last_plugin_push_time )>PLUGIN_PUBLISH_TIME_NS ) ) { - ctx->last_plugin_push_time = now; - publish_votes_to_plugin( ctx, stem ); - } - } static void @@ -2180,16 +2077,6 @@ unprivileged_init( fd_topo_t * topo, ctx->plugin_out->chunk0 = fd_dcache_compact_chunk0( ctx->plugin_out->mem, replay_plugin_out->dcache ); ctx->plugin_out->wmark = fd_dcache_compact_wmark ( ctx->plugin_out->mem, replay_plugin_out->dcache, replay_plugin_out->mtu ); ctx->plugin_out->chunk = ctx->plugin_out->chunk0; - - ctx->votes_plugin_out->idx = fd_topo_find_tile_out_link( topo, tile, "votes_plugin", 0 ); - fd_topo_link_t const * votes_plugin_out = &topo->links[ tile->out_link_id[ ctx->votes_plugin_out->idx] ]; - if( strcmp( votes_plugin_out->name, "votes_plugin" ) ) { - FD_LOG_ERR(("output link confusion for output %lu", ctx->votes_plugin_out->idx)); - } - ctx->votes_plugin_out->mem = topo->workspaces[ topo->objs[ votes_plugin_out->dcache_obj_id ].wksp_id ].wksp; - ctx->votes_plugin_out->chunk0 = fd_dcache_compact_chunk0( ctx->votes_plugin_out->mem, votes_plugin_out->dcache ); - ctx->votes_plugin_out->wmark = fd_dcache_compact_wmark ( ctx->votes_plugin_out->mem, votes_plugin_out->dcache, votes_plugin_out->mtu ); - ctx->votes_plugin_out->chunk = ctx->votes_plugin_out->chunk0; } if( strnlen( tile->replay.slots_replayed, sizeof(tile->replay.slots_replayed) )>0UL ) { diff --git a/src/discof/restore/fd_snapdc_tile.c b/src/discof/restore/fd_snapdc_tile.c index 3baf7d6bde..042d66bb39 100644 --- a/src/discof/restore/fd_snapdc_tile.c +++ b/src/discof/restore/fd_snapdc_tile.c @@ -147,6 +147,7 @@ handle_control_frag( fd_snapdc_tile_t * ctx, case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN: FD_TEST( ctx->state==FD_SNAPDC_STATE_DONE ); ctx->state = FD_SNAPDC_STATE_SHUTDOWN; + metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */ break; default: FD_LOG_ERR(( "unexpected control sig %lu", sig )); diff --git a/src/discof/restore/fd_snapin_tile.c b/src/discof/restore/fd_snapin_tile.c index 85c2747030..c5b028ad69 100644 --- a/src/discof/restore/fd_snapin_tile.c +++ b/src/discof/restore/fd_snapin_tile.c @@ -256,6 +256,7 @@ handle_control_frag( fd_snapin_tile_t * ctx, break; case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN: ctx->state = FD_SNAPIN_STATE_SHUTDOWN; + metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */ break; default: FD_LOG_ERR(( "unexpected control sig %lu", sig )); diff --git a/src/discof/restore/fd_snaprd_tile.c b/src/discof/restore/fd_snaprd_tile.c index 7465adcd0a..ca3846e0a3 100644 --- a/src/discof/restore/fd_snaprd_tile.c +++ b/src/discof/restore/fd_snaprd_tile.c @@ -5,10 +5,11 @@ #include "../../disco/topo/fd_topo.h" #include "../../disco/metrics/fd_metrics.h" - +#include "../../disco/plugin/fd_plugin.h" #include #include #include +#include #include #include @@ -28,17 +29,29 @@ #define FD_SNAPRD_STATE_READING_FULL_FILE ( 2) /* Full file looks better than peer, reading it from disk */ #define FD_SNAPRD_STATE_FLUSHING_FULL_FILE ( 3) /* Full file was read ok, confirm it decompressed and inserted ok */ #define FD_SNAPRD_STATE_FLUSHING_FULL_FILE_RESET ( 4) /* Resetting to load full snapshot from file again, confirm decompress and inserter are reset too */ -#define FD_SNAPRD_STATE_READING_INCREMENTAL_FILE ( 5) /* Incremental file looks better than peer, reading it from disk */ -#define FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_FILE ( 6) /* Incremental file was read ok, confirm it decompressed and inserted ok */ -#define FD_SNAPRD_STATE_READING_FULL_HTTP ( 7) /* Peer was selected, reading full snapshot from HTTP */ -#define FD_SNAPRD_STATE_FLUSHING_FULL_HTTP ( 8) /* Full snapshot was downloaded ok, confirm it decompressed and inserted ok */ -#define FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET ( 9) /* Resetting to load full snapshot from HTTP again, confirm decompress and inserter are reset too */ +#define FD_SNAPRD_STATE_READING_FULL_HTTP ( 5) /* Peer was selected, reading full snapshot from HTTP */ +#define FD_SNAPRD_STATE_FLUSHING_FULL_HTTP ( 6) /* Full snapshot was downloaded ok, confirm it decompressed and inserted ok */ +#define FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET ( 7) /* Resetting to load full snapshot from HTTP again, confirm decompress and inserter are reset too */ +#define FD_SNAPRD_STATE_READING_INCREMENTAL_FILE ( 8) /* Incremental file looks better than peer, reading it from disk */ +#define FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_FILE ( 9) /* Incremental file was read ok, confirm it decompressed and inserted ok */ #define FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP (10) /* Peer was selected, reading incremental snapshot from HTTP */ #define FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP (11) /* Incremental snapshot was downloaded ok, confirm it decompressed and inserted ok */ #define FD_SNAPRD_STATE_SHUTDOWN (12) /* The tile is done, and has likely already exited */ #define SNAPRD_FILE_BUF_SZ (1024UL*1024UL) /* 1 MiB */ +#define FD_SNAPRD_CTL_OUT_IDX (0) +#define FD_SNAPRD_PLUGIN_OUT_IDX (1) + +typedef struct { + fd_wksp_t * wksp; + ulong chunk0; + ulong wmark; + ulong chunk; + ulong mtu; + ulong out_idx; +} fd_snaprd_out_ctx_t; + struct fd_snaprd_tile { fd_ssping_t * ssping; fd_sshttp_t * sshttp; @@ -98,13 +111,7 @@ struct fd_snaprd_tile { } incremental; } metrics; - struct { - fd_wksp_t * wksp; - ulong chunk0; - ulong wmark; - ulong chunk; - ulong mtu; - } out; + fd_snaprd_out_ctx_t out[ 2UL ]; }; typedef struct fd_snaprd_tile fd_snaprd_tile_t; @@ -147,11 +154,12 @@ metrics_write( fd_snaprd_tile_t * ctx ) { static void read_file_data( fd_snaprd_tile_t * ctx, fd_stem_context_t * stem ) { - uchar * out = fd_chunk_to_laddr( ctx->out.wksp, ctx->out.chunk ); + fd_snaprd_out_ctx_t * out_ctx = &ctx->out[ FD_SNAPRD_CTL_OUT_IDX ]; + uchar * out = fd_chunk_to_laddr( ctx->out[ FD_SNAPRD_CTL_OUT_IDX ].wksp, out_ctx->chunk ); FD_TEST( ctx->state==FD_SNAPRD_STATE_READING_INCREMENTAL_FILE || ctx->state==FD_SNAPRD_STATE_READING_FULL_FILE ); int full = ctx->state==FD_SNAPRD_STATE_READING_FULL_FILE; - long result = read( full ? ctx->local_in.full_snapshot_fd : ctx->local_in.incremental_snapshot_fd , out, ctx->out.mtu ); + long result = read( full ? ctx->local_in.full_snapshot_fd : ctx->local_in.incremental_snapshot_fd , out, out_ctx->mtu ); if( FD_UNLIKELY( -1==result && errno==EAGAIN ) ) return; else if( FD_UNLIKELY( -1==result ) ) FD_LOG_ERR(( "read() failed (%i-%s)", errno, fd_io_strerror( errno ) )); @@ -169,14 +177,14 @@ read_file_data( fd_snaprd_tile_t * ctx, if( FD_UNLIKELY( !result ) ) { switch( ctx->state ) { case FD_SNAPRD_STATE_READING_INCREMENTAL_FILE: - fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL ); + fd_stem_publish( stem, out_ctx->out_idx, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL ); ctx->state = FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_FILE; break; case FD_SNAPRD_STATE_READING_FULL_FILE: if( FD_LIKELY( ctx->config.incremental_snapshot_fetch ) ) { - fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_EOF_FULL, 0UL, 0UL, 0UL, 0UL, 0UL ); + fd_stem_publish( stem, out_ctx->out_idx, FD_SNAPSHOT_MSG_CTRL_EOF_FULL, 0UL, 0UL, 0UL, 0UL, 0UL ); } else { - fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL ); + fd_stem_publish( stem, out_ctx->out_idx, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL ); } ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_FILE; break; @@ -186,18 +194,19 @@ read_file_data( fd_snaprd_tile_t * ctx, return; } - fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_DATA, ctx->out.chunk, (ulong)result, 0UL, 0UL, 0UL ); - ctx->out.chunk = fd_dcache_compact_next( ctx->out.chunk, (ulong)result, ctx->out.chunk0, ctx->out.wmark ); + fd_stem_publish( stem, out_ctx->out_idx, FD_SNAPSHOT_MSG_DATA, out_ctx->chunk, (ulong)result, 0UL, 0UL, 0UL ); + out_ctx->chunk = fd_dcache_compact_next( out_ctx->chunk, (ulong)result, out_ctx->chunk0, out_ctx->wmark ); } static void read_http_data( fd_snaprd_tile_t * ctx, fd_stem_context_t * stem, long now ) { - uchar * out = fd_chunk_to_laddr( ctx->out.wksp, ctx->out.chunk ); + fd_snaprd_out_ctx_t * out_ctx = &ctx->out[ FD_SNAPRD_CTL_OUT_IDX ]; + uchar * out = fd_chunk_to_laddr( out_ctx->wksp, out_ctx->chunk ); ulong buffer_avail = fd_ulong_if( -1!=ctx->local_out.dir_fd, SNAPRD_FILE_BUF_SZ-ctx->local_out.write_buffer_len, ULONG_MAX ); - ulong data_len = fd_ulong_min( buffer_avail, ctx->out.mtu ); + ulong data_len = fd_ulong_min( buffer_avail, out_ctx->mtu ); int result = fd_sshttp_advance( ctx->sshttp, &data_len, out, now ); switch( result ) { @@ -206,7 +215,7 @@ read_http_data( fd_snaprd_tile_t * ctx, FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port )); fd_ssping_invalidate( ctx->ssping, ctx->addr, now ); - fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL ); + fd_stem_publish( stem, out_ctx->out_idx, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL ); ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET; ctx->deadline_nanos = now; break; @@ -214,14 +223,14 @@ read_http_data( fd_snaprd_tile_t * ctx, case FD_SSHTTP_ADVANCE_DONE: { switch( ctx->state ) { case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP: - fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL ); + fd_stem_publish( stem, out_ctx->out_idx, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL ); ctx->state = FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP; break; case FD_SNAPRD_STATE_READING_FULL_HTTP: if( FD_LIKELY( ctx->config.incremental_snapshot_fetch ) ) { - fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_EOF_FULL, 0UL, 0UL, 0UL, 0UL, 0UL ); + fd_stem_publish( stem, out_ctx->out_idx, FD_SNAPSHOT_MSG_CTRL_EOF_FULL, 0UL, 0UL, 0UL, 0UL, 0UL ); } else { - fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL ); + fd_stem_publish( stem, out_ctx->out_idx, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL ); } ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP; break; @@ -231,8 +240,8 @@ read_http_data( fd_snaprd_tile_t * ctx, break; } case FD_SSHTTP_ADVANCE_DATA: { - fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_DATA, ctx->out.chunk, data_len, 0UL, 0UL, 0UL ); - ctx->out.chunk = fd_dcache_compact_next( ctx->out.chunk, data_len, ctx->out.chunk0, ctx->out.wmark ); + fd_stem_publish( stem, out_ctx->out_idx, FD_SNAPSHOT_MSG_DATA, out_ctx->chunk, data_len, 0UL, 0UL, 0UL ); + out_ctx->chunk = fd_dcache_compact_next( out_ctx->chunk, data_len, out_ctx->chunk0, out_ctx->wmark ); ulong written_sz = 0UL; if( FD_LIKELY( -1!=ctx->local_out.dir_fd && !ctx->local_out.write_buffer_len ) ) { @@ -257,12 +266,15 @@ read_http_data( fd_snaprd_tile_t * ctx, } ctx->local_out.write_buffer_len += data_len-written_sz; + ulong size; fd_sshttp_download_size( ctx->sshttp, &size ); switch( ctx->state ) { case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP: + if( FD_LIKELY( size!=ULONG_MAX ) ) ctx->metrics.incremental.bytes_total = fd_ulong_max( size, ctx->metrics.incremental.bytes_total ); ctx->metrics.incremental.bytes_read += data_len; ctx->metrics.incremental.bytes_written += written_sz; break; case FD_SNAPRD_STATE_READING_FULL_HTTP: + if( FD_LIKELY( size!=ULONG_MAX ) ) ctx->metrics.full.bytes_total = fd_ulong_max( size, ctx->metrics.full.bytes_total ); ctx->metrics.full.bytes_read += data_len; ctx->metrics.full.bytes_written += written_sz; break; @@ -321,16 +333,40 @@ rename_snapshots( fd_snaprd_tile_t * ctx ) { if( FD_UNLIKELY( -1==ctx->local_out.dir_fd ) ) return; char const * full_snapshot_name; char const * incremental_snapshot_name; - fd_sshttp_snapshot_names( ctx->sshttp, &full_snapshot_name, &incremental_snapshot_name ); + fd_sshttp_snapshot_names( ctx->sshttp, NULL, NULL, &full_snapshot_name, &incremental_snapshot_name ); if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd ) ) { if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "snapshot.tar.bz2-partial", ctx->local_out.dir_fd, full_snapshot_name ) ) ) - FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) )); + FD_LOG_ERR(( "renameat() failed (%i-%s) when renaming snapshot.tar.bz2-partial to %s", errno, fd_io_strerror( errno ), full_snapshot_name )); } - if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd ) ) { + if( FD_LIKELY( ctx->config.incremental_snapshot_fetch && -1!=ctx->local_out.incremental_snapshot_fd ) ) { if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "incremental-snapshot.tar.bz2-partial", ctx->local_out.dir_fd, incremental_snapshot_name ) ) ) - FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) )); + FD_LOG_ERR(( "renameat() failed (%i-%s) when renaming incremental-snapshot.tar.bz2-partial to %s", errno, fd_io_strerror( errno ), incremental_snapshot_name )); + } +} + +static void +fd_snaprd_publish_snapshot_update( fd_snaprd_tile_t * ctx, + fd_stem_context_t * stem, + int type, + ulong slot, + int remote, + const char * path ) { + fd_snaprd_out_ctx_t * out_plugin_ctx = &ctx->out[ FD_SNAPRD_PLUGIN_OUT_IDX ]; + fd_restore_snapshot_update_t * snapshot_update = fd_chunk_to_laddr( out_plugin_ctx->wksp , out_plugin_ctx->chunk ); + snapshot_update->type = type; + snapshot_update->slot = slot; + if( FD_UNLIKELY( remote ) ) { + FD_TEST( fd_cstr_printf_check( snapshot_update->read_path, sizeof(snapshot_update->read_path), NULL, "http://" FD_IP4_ADDR_FMT ":%hu/%s", FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port, path ) ); + } else { + FD_TEST( fd_cstr_printf_check( snapshot_update->read_path, sizeof(snapshot_update->read_path), NULL, "%s", path ) ); } + snapshot_update->peer_addr = fd_uint_if( remote, ctx->addr.addr, 0U ); + snapshot_update->peer_port = fd_ushort_if( remote, ctx->addr.port, 0U ); + + ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() ); + fd_stem_publish( stem, out_plugin_ctx->out_idx, FD_PLUGIN_MSG_SNAPSHOT_UPDATE, out_plugin_ctx->chunk, sizeof(fd_restore_snapshot_update_t), 0UL, tspub, tspub ); + out_plugin_ctx->chunk = fd_dcache_compact_next( out_plugin_ctx->chunk, sizeof(fd_restore_snapshot_update_t), out_plugin_ctx->chunk0, out_plugin_ctx->wmark ); } static void @@ -347,6 +383,7 @@ after_credit( fd_snaprd_tile_t * ctx, } drain_buffer( ctx ); + fd_snaprd_out_ctx_t * out_ctx = &ctx->out[ FD_SNAPRD_CTL_OUT_IDX ]; /* All control fragments sent by the snaprd tile must be fully acknowledged by all downstream consumers before processing can @@ -376,12 +413,30 @@ after_credit( fd_snaprd_tile_t * ctx, ulong highest_cluster_slot = 0UL; /* TODO: Implement, using incremental snapshot slot for age */ if( FD_LIKELY( ctx->local_in.full_snapshot_slot!=ULONG_MAX && ctx->local_in.full_snapshot_slot>=fd_ulong_sat_sub( highest_cluster_slot, ctx->config.maximum_local_snapshot_age ) ) ) { FD_LOG_NOTICE(( "loading full snapshot from local file `%s`", ctx->local_in.full_snapshot_path )); + { + struct stat st; + if( FD_UNLIKELY( -1==fstat( ctx->local_in.full_snapshot_fd, &st ) ) ) { + FD_LOG_ERR(( "fstat() failed (%i-%s)", errno, fd_io_strerror( errno ) )); + } + ctx->metrics.full.bytes_total = (ulong)st.st_size; + } ctx->state = FD_SNAPRD_STATE_READING_FULL_FILE; + fd_snaprd_publish_snapshot_update( ctx, stem, FD_PLUGIN_MSG_SNAPSHOT_TYPE_FULL, ctx->local_in.full_snapshot_slot, 0, ctx->local_in.full_snapshot_path ); } else { FD_LOG_NOTICE(( "downloading full snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( best.addr ), best.port )); ctx->addr = best; ctx->state = FD_SNAPRD_STATE_READING_FULL_HTTP; - fd_sshttp_init( ctx->sshttp, best, "/snapshot.tar.bz2", 17UL, now ); + fd_sshttp_init( ctx->sshttp, best, "/snapshot.tar.bz2", 17UL, 1, now ); + + ulong full_snapshot_slot[ 1 ]; + ulong incremental_snapshot_slot[ 1 ]; + char const * full_snapshot_name; + char const * incremental_snapshot_name; + do { /* run until redirect is fully resolved */ + read_http_data( ctx, stem, now ); + fd_sshttp_snapshot_names( ctx->sshttp, full_snapshot_slot, incremental_snapshot_slot, &full_snapshot_name, &incremental_snapshot_name ); + } while( full_snapshot_slot[ 0 ] == 0 ); + fd_snaprd_publish_snapshot_update( ctx, stem, FD_PLUGIN_MSG_SNAPSHOT_TYPE_FULL, full_snapshot_slot[ 0 ], 1, full_snapshot_name ); } break; } @@ -400,7 +455,7 @@ after_credit( fd_snaprd_tile_t * ctx, ctx->ack_cnt = 0UL; if( FD_UNLIKELY( ctx->malformed ) ) { - fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL ); + fd_stem_publish( stem, out_ctx->out_idx, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL ); ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET; ctx->malformed = 0; break; @@ -410,7 +465,8 @@ after_credit( fd_snaprd_tile_t * ctx, rename_snapshots( ctx ); ctx->state = FD_SNAPRD_STATE_SHUTDOWN; - fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL ); + metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */ + fd_stem_publish( stem, out_ctx->out_idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL ); break; case FD_SNAPRD_STATE_FLUSHING_FULL_FILE: if( FD_UNLIKELY( ctx->ack_cntconfig.incremental_snapshot_fetch ) ) { ctx->state = FD_SNAPRD_STATE_SHUTDOWN; - fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL ); + metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */ + fd_stem_publish( stem, out_ctx->out_idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL ); break; } FD_LOG_NOTICE(( "reading incremental snapshot from local file `%s`", ctx->local_in.incremental_snapshot_path )); + + { + struct stat st; + if( FD_UNLIKELY( -1==fstat( ctx->local_in.incremental_snapshot_fd, &st ) ) ) { + FD_LOG_ERR(( "fstat() failed (%i-%s)", errno, fd_io_strerror( errno ) )); + } + ctx->metrics.incremental.bytes_total = (ulong)st.st_size; + } + ctx->state = FD_SNAPRD_STATE_READING_INCREMENTAL_FILE; + fd_snaprd_publish_snapshot_update( ctx, stem, FD_PLUGIN_MSG_SNAPSHOT_TYPE_INCREMENTAL, ctx->local_in.incremental_snapshot_slot, 0, ctx->local_in.incremental_snapshot_path ); break; case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP: if( FD_UNLIKELY( ctx->ack_cntack_cnt = 0UL; if( FD_UNLIKELY( ctx->malformed ) ) { - fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL ); + fd_stem_publish( stem, out_ctx->out_idx, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL ); ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET; ctx->malformed = 0; break; @@ -441,13 +508,24 @@ after_credit( fd_snaprd_tile_t * ctx, if( FD_LIKELY( !ctx->config.incremental_snapshot_fetch ) ) { rename_snapshots( ctx ); ctx->state = FD_SNAPRD_STATE_SHUTDOWN; - fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL ); + metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */ + fd_stem_publish( stem, out_ctx->out_idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL ); break; } FD_LOG_NOTICE(( "downloading incremental snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port )); - fd_sshttp_init( ctx->sshttp, ctx->addr, "/incremental-snapshot.tar.bz2", 29UL, fd_log_wallclock() ); + fd_sshttp_init( ctx->sshttp, ctx->addr, "/incremental-snapshot.tar.bz2", 29UL, 0, fd_log_wallclock() ); ctx->state = FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP; + + ulong full_snapshot_slot[ 1 ]; + ulong incremental_snapshot_slot[ 1 ]; + char const * full_snapshot_name; + char const * incremental_snapshot_name; + do { /* run until redirect is fully resolved */ + read_http_data( ctx, stem, now ); + fd_sshttp_snapshot_names( ctx->sshttp, full_snapshot_slot, incremental_snapshot_slot, &full_snapshot_name, &incremental_snapshot_name ); + } while( incremental_snapshot_slot[ 0 ] == 0 ); + fd_snaprd_publish_snapshot_update( ctx, stem, FD_PLUGIN_MSG_SNAPSHOT_TYPE_INCREMENTAL, incremental_snapshot_slot[ 0 ], 1, incremental_snapshot_name ); break; case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET: case FD_SNAPRD_STATE_FLUSHING_FULL_FILE_RESET: @@ -490,6 +568,7 @@ after_frag( fd_snaprd_tile_t * ctx, (void)sz; FD_TEST( sig==FD_SNAPSHOT_MSG_CTRL_ACK || sig==FD_SNAPSHOT_MSG_CTRL_MALFORMED ); + fd_snaprd_out_ctx_t * out_ctx = &ctx->out[ FD_SNAPRD_CTL_OUT_IDX ]; if( FD_LIKELY( sig==FD_SNAPSHOT_MSG_CTRL_ACK ) ) ctx->ack_cnt++; else { @@ -511,7 +590,7 @@ after_frag( fd_snaprd_tile_t * ctx, FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port )); fd_sshttp_cancel( ctx->sshttp ); fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() ); - fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL ); + fd_stem_publish( stem, out_ctx->out_idx, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL ); ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET; break; case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP: @@ -647,7 +726,8 @@ unprivileged_init( fd_topo_t * topo, FD_TEST( ctx->sshttp ); if( FD_LIKELY( !strcmp( tile->snaprd.cluster, "testnet" ) ) ) { - fd_ip4_port_t initial_peers[ 2UL ] = { + fd_ip4_port_t initial_peers[ 3UL ] = { + { .addr = FD_IP4_ADDR( 35 , 209, 131, 19 ), .port = 8899 }, { .addr = FD_IP4_ADDR( 35 , 214, 172, 227 ), .port = 8899 }, { .addr = FD_IP4_ADDR( 145, 40 , 95 , 69 ), .port = 8899 }, /* Solana testnet peer */ }; @@ -669,13 +749,22 @@ unprivileged_init( fd_topo_t * topo, FD_LOG_ERR(( "unexpected cluster %s", tile->snaprd.cluster )); } - if( FD_UNLIKELY( tile->out_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 1", tile->out_cnt )); - - ctx->out.wksp = topo->workspaces[ topo->objs[ topo->links[ tile->out_link_id[ 0 ] ].dcache_obj_id ].wksp_id ].wksp; - ctx->out.chunk0 = fd_dcache_compact_chunk0( ctx->out.wksp, topo->links[ tile->out_link_id[ 0 ] ].dcache ); - ctx->out.wmark = fd_dcache_compact_wmark ( ctx->out.wksp, topo->links[ tile->out_link_id[ 0 ] ].dcache, topo->links[ tile->out_link_id[ 0 ] ].mtu ); - ctx->out.chunk = ctx->out.chunk0; - ctx->out.mtu = topo->links[ tile->out_link_id[ 0 ] ].mtu; + if( FD_UNLIKELY( tile->out_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2", tile->out_cnt )); + + for( ulong i=0; iout_cnt; i++ ) { + fd_topo_link_t * link = &topo->links[ tile->out_link_id[ i ] ]; + ulong out_kind; + if( !strcmp( link->name, "snap_zstd" ) ) out_kind = FD_SNAPRD_CTL_OUT_IDX; + else if( !strcmp( link->name, "snaprd_plugi" ) ) out_kind = FD_SNAPRD_PLUGIN_OUT_IDX; + else FD_LOG_ERR(( "unexpected link name %s", link->name )); + + ctx->out[ out_kind ].wksp = topo->workspaces[ topo->objs[ topo->links[ tile->out_link_id[ i ] ].dcache_obj_id ].wksp_id ].wksp; + ctx->out[ out_kind ].chunk0 = fd_dcache_compact_chunk0( ctx->out[ out_kind ].wksp, topo->links[ tile->out_link_id[ i ] ].dcache ); + ctx->out[ out_kind ].wmark = fd_dcache_compact_wmark ( ctx->out[ out_kind ].wksp, topo->links[ tile->out_link_id[ i ] ].dcache, topo->links[ tile->out_link_id[ i ] ].mtu ); + ctx->out[ out_kind ].chunk = ctx->out[ out_kind ].chunk0; + ctx->out[ out_kind ].mtu = topo->links[ tile->out_link_id[ i ] ].mtu; + ctx->out[ out_kind ].out_idx = i; + } } #define STEM_BURST 2UL /* One control message, and one data message */ diff --git a/src/discof/restore/utils/fd_sshttp.c b/src/discof/restore/utils/fd_sshttp.c index 4746e02c81..5218acb54c 100644 --- a/src/discof/restore/utils/fd_sshttp.c +++ b/src/discof/restore/utils/fd_sshttp.c @@ -22,7 +22,6 @@ struct fd_sshttp_private { int state; long deadline; - int full; int hops; @@ -36,10 +35,13 @@ struct fd_sshttp_private { ulong response_len; char response[ USHORT_MAX ]; - char full_snapshot_name[ PATH_MAX ]; - char incremental_snapshot_name[ PATH_MAX ]; + struct { + char name[ PATH_MAX ]; + ulong slot; + } full_snapshot, incremental_snapshot; ulong content_len; + ulong content_len_total; ulong magic; }; @@ -108,6 +110,7 @@ fd_sshttp_init( fd_sshttp_t * http, fd_ip4_port_t addr, char const * path, ulong path_len, + int reset, long now ) { FD_TEST( http->state==FD_SSHTTP_STATE_INIT ); @@ -146,6 +149,14 @@ fd_sshttp_init( fd_sshttp_t * http, http->state = FD_SSHTTP_STATE_REQ; http->deadline = now + 500L*1000L*1000L; + + if( FD_LIKELY( reset ) ) { + fd_memset( &http->incremental_snapshot, 0, sizeof(http->incremental_snapshot) ); + fd_memset( &http->full_snapshot, 0, sizeof(http->full_snapshot) ); + http->content_len = 0UL; + http->content_len_total = 0UL; + http->response_len = 0UL; + } } void @@ -232,9 +243,11 @@ follow_redirect( fd_sshttp_t * http, fd_base58_encode_32( decoded_hash, NULL, encoded_hash ); if( FD_LIKELY( incremental_entry_slot!=ULONG_MAX ) ) { - FD_TEST( fd_cstr_printf_check( http->incremental_snapshot_name, PATH_MAX, NULL, "incremental-snapshot-%lu-%lu-%s.tar.zst", full_entry_slot, incremental_entry_slot, encoded_hash ) ); + FD_TEST( fd_cstr_printf_check( http->incremental_snapshot.name, PATH_MAX, NULL, "incremental-snapshot-%lu-%lu-%s.tar.zst", full_entry_slot, incremental_entry_slot, encoded_hash ) ); + http->incremental_snapshot.slot = incremental_entry_slot; } else { - FD_TEST( fd_cstr_printf_check( http->full_snapshot_name, PATH_MAX, NULL, "snapshot-%lu-%s.tar.zst", full_entry_slot, encoded_hash ) ); + FD_TEST( fd_cstr_printf_check( http->full_snapshot.name, PATH_MAX, NULL, "snapshot-%lu-%s.tar.zst", full_entry_slot, encoded_hash ) ); + http->full_snapshot.slot = full_entry_slot; } break; } @@ -263,7 +276,7 @@ follow_redirect( fd_sshttp_t * http, (int)headers[ 0 ].value_len, headers[ 0 ].value )); fd_sshttp_cancel( http ); - fd_sshttp_init( http, http->addr, location, location_len, now ); + fd_sshttp_init( http, http->addr, location, location_len, 0, now ); return FD_SSHTTP_ADVANCE_AGAIN; } @@ -324,11 +337,13 @@ read_response( fd_sshttp_t * http, } http->content_len = ULONG_MAX; + http->content_len_total = ULONG_MAX; for( ulong i=0UL; icontent_len = strtoul( headers[i].value, NULL, 10 ); + http->content_len_total = http->content_len; break; } @@ -378,10 +393,20 @@ read_body( fd_sshttp_t * http, void fd_sshttp_snapshot_names( fd_sshttp_t * http, - char const ** full_snapshot_name, - char const ** incremental_snapshot_name ) { - *full_snapshot_name = http->full_snapshot_name; - *incremental_snapshot_name = http->incremental_snapshot_name; + ulong * opt_full_snapshot_slot, + ulong * opt_incremental_snapshot_slot, + char const ** opt_full_snapshot_name, + char const ** opt_incremental_snapshot_name ) { + if( FD_UNLIKELY( !!opt_full_snapshot_name )) *opt_full_snapshot_name = http->full_snapshot.name; + if( FD_UNLIKELY( !!opt_full_snapshot_slot )) *opt_full_snapshot_slot = http->full_snapshot.slot; + if( FD_UNLIKELY( !!opt_incremental_snapshot_name )) *opt_incremental_snapshot_name = http->incremental_snapshot.name; + if( FD_UNLIKELY( !!opt_incremental_snapshot_slot )) *opt_incremental_snapshot_slot = http->incremental_snapshot.slot; +} + +void +fd_sshttp_download_size( fd_sshttp_t * http, + ulong * size ) { + *size = http->content_len_total; } int diff --git a/src/discof/restore/utils/fd_sshttp.h b/src/discof/restore/utils/fd_sshttp.h index e718ff9e8b..216a58cc55 100644 --- a/src/discof/restore/utils/fd_sshttp.h +++ b/src/discof/restore/utils/fd_sshttp.h @@ -24,17 +24,34 @@ fd_sshttp_new( void * shmem ); fd_sshttp_t * fd_sshttp_join( void * sshttp ); -/* Sets points to snapshot names */ +/* Optionally sets (non-NULL) pointers to snapshot names ans slots. If + the name / slot is not available yet, then slots will be set to 0 + snapshot names will be set to "" (empty cstr). */ void fd_sshttp_snapshot_names( fd_sshttp_t * http, - char const ** full_snapshot_name, - char const ** incremental_snapshot_name ); + ulong * opt_full_snapshot_slot, + ulong * opt_incremental_snapshot_slot, + char const ** opt_full_snapshot_name, + char const ** opt_incremental_snapshot_name ); void +fd_sshttp_download_size( fd_sshttp_t * http, ulong * size ); + +/* fd_sshttp_init prepares the http client for a snapshot download. + addr is the ipv4 address of the server to request the snapshot from. + path is the url path to the snapshot. path_len is the cstr length of + path. reset indicates that we are initializing a full snapshot + download and the http client should fully reset its state. A partial + re-initialization (i.e. reset==0) is used when resolving an http + redirect, or transitioning to a incremental snapshot download. This + retains state related to the processed full snapshot. now is the + current time as a unix nanosecond timestamp */ +void fd_sshttp_init( fd_sshttp_t * http, fd_ip4_port_t addr, char const * path, ulong path_len, + int reset, long now ); void