diff --git a/book/api/metrics-generated.md b/book/api/metrics-generated.md index 451830dd95..d63d4eae3b 100644 --- a/book/api/metrics-generated.md +++ b/book/api/metrics-generated.md @@ -447,6 +447,8 @@ | shred_​force_​complete_​request | counter | The number of times we received a FEC force complete message | | shred_​force_​complete_​failure | counter | The number of times we failed to force complete a FEC set on request | | shred_​force_​complete_​success | counter | The number of times we successfully forced completed a FEC set on request | +| shred_​shred_​repair_​rcv | counter | The number of times we received a repair shred | +| shred_​shred_​turbine_​rcv | counter | The number of times we received a turbine shred | | shred_​store_​insert_​wait | histogram | Time in seconds spent waiting for the store to insert a new FEC set | | shred_​store_​insert_​work | histogram | Time in seconds spent on inserting a new FEC set | @@ -744,12 +746,13 @@ | repair_​recv_​serv_​pkt_​types
{repair_​serv_​pkt_​types="orphan"} | counter | Server messages received (Orphan) | | repair_​recv_​serv_​pkt_​types
{repair_​serv_​pkt_​types="unknown"} | counter | Server messages received (Unknown) | | repair_​recv_​pkt_​corrupted_​msg | counter | How many corrupt messages have we received | -| repair_​send_​pkt_​cnt | counter | How many packets have sent | +| repair_​shred_​repair_​req | counter | How many repair requests have we sent | | repair_​sent_​pkt_​types
{repair_​sent_​request_​types="needed_​window"} | counter | What types of client messages are we sending (Need Window) | | repair_​sent_​pkt_​types
{repair_​sent_​request_​types="needed_​highest_​window"} | counter | What types of client messages are we sending (Need Highest Window) | | repair_​sent_​pkt_​types
{repair_​sent_​request_​types="needed_​orphan"} | counter | What types of client messages are we sending (Need Orphans) | | repair_​store_​link_​wait | histogram | Time in seconds spent waiting for the store to link a new FEC set | | repair_​store_​link_​work | histogram | Time in seconds spent on linking a new FEC set | +| repair_​slot_​complete_​time | histogram | Time in seconds it took to complete a slot | | repair_​sign_​duration_​seconds | histogram | Duration of signing a message | diff --git a/contrib/repair-analysis/report.py b/contrib/repair-analysis/report.py index cf3963e6a8..7e8141df68 100644 --- a/contrib/repair-analysis/report.py +++ b/contrib/repair-analysis/report.py @@ -375,7 +375,6 @@ def completion_times( fec_stats, shred_data, first_turbine, pdf ): # Batch completion times (ref_tick) # We get this by keeping the first shred of fec0, and the completion time of fec1 - batch_stats = fec_stats.groupby(['slot', 'ref_tick']).agg({'first_shred_ts': 'min', 'timestamp': 'max'}).reset_index() batch_stats['time_to_complete'] = batch_stats['timestamp'] - batch_stats['first_shred_ts'] batch_stats['time_to_complete(ms)'] = batch_stats['time_to_complete'] / 1_000_000 # Convert to milliseconds @@ -459,7 +458,48 @@ def completion_times( fec_stats, shred_data, first_turbine, pdf ): pdf.savefig(fig, bbox_inches='tight') plt.close(fig) + # Time in between slot completions + + time_between_slots_live = slot_cmpl_live['timestamp_fec1'].sort_values().diff().fillna(0) + time_between_slots_catchup = slot_cmpl_catchup['timestamp_fec1'].sort_values().diff().fillna(0) + time_between_slots_live = time_between_slots_live / 1_000_000 # Convert to milliseconds + time_between_slots_catchup = time_between_slots_catchup / 1_000_000 # Convert to milliseconds + + # plot the time between slots + fig = plt.figure(figsize=(12, 6)) + sns.histplot(time_between_slots_live, bins=50, kde=True) + plt.title('Time Between Completing Live Slots') + plt.xlabel('Time Between Slots (ms)') + plt.ylabel('Frequency') + + pdf.savefig(fig, bbox_inches='tight') + plt.close(fig) + print("\n\033[1mTime Between Completing Live Slots\033[0m\n") + print(time_between_slots_live.describe()) + print("\n\033[1mTime Between Completing Catchup Slots\033[0m\n") + print(time_between_slots_catchup.describe()) + + +def show_turbine_arrivals(live, pdf): + # plot the turbine arrivals + fig = plt.figure(figsize=(12, 6)) + live_turbine = live[live['is_turbine']] + live_turbine = live_turbine[live_turbine['slot'] >= 348905600] + #bucket it by every 10 ms, and round to to the nearest int + live_turbine['timestamp'] = (live_turbine['timestamp'] // 10_000_000).astype(int) + live_turbine = live_turbine.groupby('timestamp').size().reset_index(name='count') + sns.barplot(data=live_turbine, x='timestamp', y='count') + plt.title('Turbine Arrivals') + plt.xlabel('Timestamp') + plt.ylabel('count') + # show labels for every 10 ticks only + plt.setp(plt.gca().get_xticklabels(), visible=False) + plt.setp(plt.gca().get_xticklabels()[::5], visible=True) + + plt.tight_layout() + pdf.savefig(fig, bbox_inches='tight') + plt.close(fig) def turbine_stats(catchup, live): print('\n\033[1mTurbine Statistics\033[0m\n') @@ -577,9 +617,13 @@ def generate_report( log_path, request_data_path, shred_data_path, peers_data_pa skipfooter=1 ) # because of the buffered writer the last row is probably incomplete if request_data_path: - repair_requests = pd.read_csv( request_data_path, - dtype={'dst_ip': str, 'dst_port': int, 'timestamp': int, 'slot': int, 'idx': int, 'nonce': int }, - skipfooter=1 ) + try: + repair_requests = pd.read_csv( request_data_path, + dtype={'dst_ip': str, 'dst_port': int, 'timestamp': int, 'slot': int, 'idx': int, 'nonce': int }, + skipfooter=1 ) + except Exception as e: + print(f'Error reading repair requests: {e}') + request_data_path = None if peers_data_path: peers_data = pd.read_csv( peers_data_path, @@ -610,6 +654,7 @@ def generate_report( log_path, request_data_path, shred_data_path, peers_data_pa catchup = shreds_data[shreds_data['slot'].between(snapshot_slot, first_turbine - 1)] live = shreds_data[shreds_data['slot'].between(first_turbine, last_executed)] + show_turbine_arrivals(live, pdf) if request_data_path: catchup_rq = repair_requests[repair_requests['slot'].between(snapshot_slot, first_turbine - 1)] diff --git a/src/ballet/shred/fd_shred.h b/src/ballet/shred/fd_shred.h index b7c69d2d27..1d6b5eb931 100644 --- a/src/ballet/shred/fd_shred.h +++ b/src/ballet/shred/fd_shred.h @@ -259,7 +259,7 @@ struct __attribute__((packed)) fd_shred { /* Total number of coding shreds in FEC set. Must be positive <= FD_REEDSOL_CODE_SHREDS_MAX. */ /* 0x55 */ ushort code_cnt; - /* Index within the vector of coding shreds in slot. In [0, + /* Index within the vector of coding shreds in FEC set. In [0, code_cnt). Also, shred.code.idx <= shred.idx. */ /* 0x57 */ ushort idx; } code; diff --git a/src/disco/fd_disco_base.h b/src/disco/fd_disco_base.h index 603e45f21a..2510ad851f 100644 --- a/src/disco/fd_disco_base.h +++ b/src/disco/fd_disco_base.h @@ -155,8 +155,8 @@ FD_FN_CONST static inline ulong fd_disco_replay_old_sig_slot( ulong sig ) { retu The encoded fields vary depending on the type of the sig. The diagram below describes the encoding. - completes (1) | slot (32) | fec_set_idx (15) | is_code (1) | shred_idx or data_cnt (15) - [63] | [31, 62] | [16, 30] | [15] | [0, 14] + is_turbine (1) | slot (32) | fec_set_idx (15) | is_code (1) | shred_idx or data_cnt (15) + [63] | [31, 62] | [16, 30] | [15] | [0, 14] There are two types of messages on the shred_repair link. The first type is a generic shred message. The second is a FEC set completion @@ -165,8 +165,8 @@ FD_FN_CONST static inline ulong fd_disco_replay_old_sig_slot( ulong sig ) { retu For the first message type (SHRED): - The first bit [63] describes whether this shred marks the end of a - batch and/or a slot, i.e. shred.flags & (DATA_COMPLETE | SLOT_COMPLETE) + The first bit [63] describes whether this shred source was turbine + or repair. The next 32 bits [31, 62] describe the slot number. Note: if the slot number is >= UINT_MAX, the sender will store the value UINT_MAX in @@ -194,7 +194,7 @@ FD_FN_CONST static inline ulong fd_disco_replay_old_sig_slot( ulong sig ) { retu are uniformly coding shreds and fixed size. */ FD_FN_CONST static inline ulong -fd_disco_shred_repair_shred_sig( int completes, +fd_disco_shred_repair_shred_sig( int is_turbine, ulong slot, uint fec_set_idx, int is_code, @@ -202,16 +202,16 @@ fd_disco_shred_repair_shred_sig( int completes, ulong slot_ul = fd_ulong_min( slot, (ulong)UINT_MAX ); ulong shred_idx_or_data_cnt_ul = fd_ulong_min( (ulong)shred_idx_or_data_cnt, (ulong)FD_SHRED_BLK_MAX ); ulong fec_set_idx_ul = fd_ulong_min( (ulong)fec_set_idx, (ulong)FD_SHRED_BLK_MAX ); - ulong completes_ul = !!completes; + ulong is_turbine_ul = !!is_turbine; ulong is_code_ul = !!is_code; - return completes_ul << 63 | slot_ul << 31 | fec_set_idx_ul << 16 | is_code_ul << 15 | shred_idx_or_data_cnt_ul; + return is_turbine_ul << 63 | slot_ul << 31 | fec_set_idx_ul << 16 | is_code_ul << 15 | shred_idx_or_data_cnt_ul; } /* fd_disco_shred_repair_shred_sig_{...} are accessors for the fields encoded in the sig described above. */ -FD_FN_CONST static inline int fd_disco_shred_repair_shred_sig_completes ( ulong sig ) { return fd_ulong_extract_bit( sig, 63 ); } +FD_FN_CONST static inline int fd_disco_shred_repair_shred_sig_is_turbine ( ulong sig ) { return fd_ulong_extract_bit( sig, 63 ); } FD_FN_CONST static inline ulong fd_disco_shred_repair_shred_sig_slot ( ulong sig ) { return fd_ulong_extract ( sig, 31, 62 ); } FD_FN_CONST static inline uint fd_disco_shred_repair_shred_sig_fec_set_idx( ulong sig ) { return (uint)fd_ulong_extract ( sig, 16, 30 ); } FD_FN_CONST static inline int fd_disco_shred_repair_shred_sig_is_code ( ulong sig ) { return fd_ulong_extract_bit( sig, 15 ); } diff --git a/src/disco/metrics/fd_metrics_base.h b/src/disco/metrics/fd_metrics_base.h index 8e9ae12b9b..c7021b2c1a 100644 --- a/src/disco/metrics/fd_metrics_base.h +++ b/src/disco/metrics/fd_metrics_base.h @@ -78,7 +78,7 @@ typedef struct { ulong min; ulong max; } none; - + struct { double min; double max; diff --git a/src/disco/metrics/generated/fd_metrics_repair.c b/src/disco/metrics/generated/fd_metrics_repair.c index 6a51cf710d..cd1e17dbb5 100644 --- a/src/disco/metrics/generated/fd_metrics_repair.c +++ b/src/disco/metrics/generated/fd_metrics_repair.c @@ -13,11 +13,12 @@ const fd_metrics_meta_t FD_METRICS_REPAIR[FD_METRICS_REPAIR_TOTAL] = { DECLARE_METRIC_ENUM( REPAIR_RECV_SERV_PKT_TYPES, COUNTER, REPAIR_SERV_PKT_TYPES, ORPHAN ), DECLARE_METRIC_ENUM( REPAIR_RECV_SERV_PKT_TYPES, COUNTER, REPAIR_SERV_PKT_TYPES, UNKNOWN ), DECLARE_METRIC( REPAIR_RECV_PKT_CORRUPTED_MSG, COUNTER ), - DECLARE_METRIC( REPAIR_SEND_PKT_CNT, COUNTER ), + DECLARE_METRIC( REPAIR_SHRED_REPAIR_REQ, COUNTER ), DECLARE_METRIC_ENUM( REPAIR_SENT_PKT_TYPES, COUNTER, REPAIR_SENT_REQUEST_TYPES, NEEDED_WINDOW ), DECLARE_METRIC_ENUM( REPAIR_SENT_PKT_TYPES, COUNTER, REPAIR_SENT_REQUEST_TYPES, NEEDED_HIGHEST_WINDOW ), DECLARE_METRIC_ENUM( REPAIR_SENT_PKT_TYPES, COUNTER, REPAIR_SENT_REQUEST_TYPES, NEEDED_ORPHAN ), DECLARE_METRIC_HISTOGRAM_SECONDS( REPAIR_STORE_LINK_WAIT ), DECLARE_METRIC_HISTOGRAM_SECONDS( REPAIR_STORE_LINK_WORK ), + DECLARE_METRIC_HISTOGRAM_SECONDS( REPAIR_SLOT_COMPLETE_TIME ), DECLARE_METRIC_HISTOGRAM_SECONDS( REPAIR_SIGN_DURATION_SECONDS ), }; diff --git a/src/disco/metrics/generated/fd_metrics_repair.h b/src/disco/metrics/generated/fd_metrics_repair.h index da05dbf0bd..c3172528ad 100644 --- a/src/disco/metrics/generated/fd_metrics_repair.h +++ b/src/disco/metrics/generated/fd_metrics_repair.h @@ -52,11 +52,11 @@ #define FD_METRICS_COUNTER_REPAIR_RECV_PKT_CORRUPTED_MSG_DESC "How many corrupt messages have we received" #define FD_METRICS_COUNTER_REPAIR_RECV_PKT_CORRUPTED_MSG_CVT (FD_METRICS_CONVERTER_NONE) -#define FD_METRICS_COUNTER_REPAIR_SEND_PKT_CNT_OFF (27UL) -#define FD_METRICS_COUNTER_REPAIR_SEND_PKT_CNT_NAME "repair_send_pkt_cnt" -#define FD_METRICS_COUNTER_REPAIR_SEND_PKT_CNT_TYPE (FD_METRICS_TYPE_COUNTER) -#define FD_METRICS_COUNTER_REPAIR_SEND_PKT_CNT_DESC "How many packets have sent" -#define FD_METRICS_COUNTER_REPAIR_SEND_PKT_CNT_CVT (FD_METRICS_CONVERTER_NONE) +#define FD_METRICS_COUNTER_REPAIR_SHRED_REPAIR_REQ_OFF (27UL) +#define FD_METRICS_COUNTER_REPAIR_SHRED_REPAIR_REQ_NAME "repair_shred_repair_req" +#define FD_METRICS_COUNTER_REPAIR_SHRED_REPAIR_REQ_TYPE (FD_METRICS_TYPE_COUNTER) +#define FD_METRICS_COUNTER_REPAIR_SHRED_REPAIR_REQ_DESC "How many repair requests have we sent" +#define FD_METRICS_COUNTER_REPAIR_SHRED_REPAIR_REQ_CVT (FD_METRICS_CONVERTER_NONE) #define FD_METRICS_COUNTER_REPAIR_SENT_PKT_TYPES_OFF (28UL) #define FD_METRICS_COUNTER_REPAIR_SENT_PKT_TYPES_NAME "repair_sent_pkt_types" @@ -85,7 +85,15 @@ #define FD_METRICS_HISTOGRAM_REPAIR_STORE_LINK_WORK_MIN (1e-08) #define FD_METRICS_HISTOGRAM_REPAIR_STORE_LINK_WORK_MAX (0.0005) -#define FD_METRICS_HISTOGRAM_REPAIR_SIGN_DURATION_SECONDS_OFF (65UL) +#define FD_METRICS_HISTOGRAM_REPAIR_SLOT_COMPLETE_TIME_OFF (65UL) +#define FD_METRICS_HISTOGRAM_REPAIR_SLOT_COMPLETE_TIME_NAME "repair_slot_complete_time" +#define FD_METRICS_HISTOGRAM_REPAIR_SLOT_COMPLETE_TIME_TYPE (FD_METRICS_TYPE_HISTOGRAM) +#define FD_METRICS_HISTOGRAM_REPAIR_SLOT_COMPLETE_TIME_DESC "Time in seconds it took to complete a slot" +#define FD_METRICS_HISTOGRAM_REPAIR_SLOT_COMPLETE_TIME_CVT (FD_METRICS_CONVERTER_SECONDS) +#define FD_METRICS_HISTOGRAM_REPAIR_SLOT_COMPLETE_TIME_MIN (0.2) +#define FD_METRICS_HISTOGRAM_REPAIR_SLOT_COMPLETE_TIME_MAX (2.0) + +#define FD_METRICS_HISTOGRAM_REPAIR_SIGN_DURATION_SECONDS_OFF (82UL) #define FD_METRICS_HISTOGRAM_REPAIR_SIGN_DURATION_SECONDS_NAME "repair_sign_duration_seconds" #define FD_METRICS_HISTOGRAM_REPAIR_SIGN_DURATION_SECONDS_TYPE (FD_METRICS_TYPE_HISTOGRAM) #define FD_METRICS_HISTOGRAM_REPAIR_SIGN_DURATION_SECONDS_DESC "Duration of signing a message" @@ -93,5 +101,5 @@ #define FD_METRICS_HISTOGRAM_REPAIR_SIGN_DURATION_SECONDS_MIN (1e-08) #define FD_METRICS_HISTOGRAM_REPAIR_SIGN_DURATION_SECONDS_MAX (0.001) -#define FD_METRICS_REPAIR_TOTAL (18UL) +#define FD_METRICS_REPAIR_TOTAL (19UL) extern const fd_metrics_meta_t FD_METRICS_REPAIR[FD_METRICS_REPAIR_TOTAL]; diff --git a/src/disco/metrics/generated/fd_metrics_shred.c b/src/disco/metrics/generated/fd_metrics_shred.c index 3444b563ca..944c6fcff7 100644 --- a/src/disco/metrics/generated/fd_metrics_shred.c +++ b/src/disco/metrics/generated/fd_metrics_shred.c @@ -22,6 +22,8 @@ const fd_metrics_meta_t FD_METRICS_SHRED[FD_METRICS_SHRED_TOTAL] = { DECLARE_METRIC( SHRED_FORCE_COMPLETE_REQUEST, COUNTER ), DECLARE_METRIC( SHRED_FORCE_COMPLETE_FAILURE, COUNTER ), DECLARE_METRIC( SHRED_FORCE_COMPLETE_SUCCESS, COUNTER ), + DECLARE_METRIC( SHRED_SHRED_REPAIR_RCV, COUNTER ), + DECLARE_METRIC( SHRED_SHRED_TURBINE_RCV, COUNTER ), DECLARE_METRIC_HISTOGRAM_SECONDS( SHRED_STORE_INSERT_WAIT ), DECLARE_METRIC_HISTOGRAM_SECONDS( SHRED_STORE_INSERT_WORK ), }; diff --git a/src/disco/metrics/generated/fd_metrics_shred.h b/src/disco/metrics/generated/fd_metrics_shred.h index 006f62142d..d6b2e64cef 100644 --- a/src/disco/metrics/generated/fd_metrics_shred.h +++ b/src/disco/metrics/generated/fd_metrics_shred.h @@ -111,7 +111,19 @@ #define FD_METRICS_COUNTER_SHRED_FORCE_COMPLETE_SUCCESS_DESC "The number of times we successfully forced completed a FEC set on request" #define FD_METRICS_COUNTER_SHRED_FORCE_COMPLETE_SUCCESS_CVT (FD_METRICS_CONVERTER_NONE) -#define FD_METRICS_HISTOGRAM_SHRED_STORE_INSERT_WAIT_OFF (116UL) +#define FD_METRICS_COUNTER_SHRED_SHRED_REPAIR_RCV_OFF (116UL) +#define FD_METRICS_COUNTER_SHRED_SHRED_REPAIR_RCV_NAME "shred_shred_repair_rcv" +#define FD_METRICS_COUNTER_SHRED_SHRED_REPAIR_RCV_TYPE (FD_METRICS_TYPE_COUNTER) +#define FD_METRICS_COUNTER_SHRED_SHRED_REPAIR_RCV_DESC "The number of times we received a repair shred" +#define FD_METRICS_COUNTER_SHRED_SHRED_REPAIR_RCV_CVT (FD_METRICS_CONVERTER_NONE) + +#define FD_METRICS_COUNTER_SHRED_SHRED_TURBINE_RCV_OFF (117UL) +#define FD_METRICS_COUNTER_SHRED_SHRED_TURBINE_RCV_NAME "shred_shred_turbine_rcv" +#define FD_METRICS_COUNTER_SHRED_SHRED_TURBINE_RCV_TYPE (FD_METRICS_TYPE_COUNTER) +#define FD_METRICS_COUNTER_SHRED_SHRED_TURBINE_RCV_DESC "The number of times we received a turbine shred" +#define FD_METRICS_COUNTER_SHRED_SHRED_TURBINE_RCV_CVT (FD_METRICS_CONVERTER_NONE) + +#define FD_METRICS_HISTOGRAM_SHRED_STORE_INSERT_WAIT_OFF (118UL) #define FD_METRICS_HISTOGRAM_SHRED_STORE_INSERT_WAIT_NAME "shred_store_insert_wait" #define FD_METRICS_HISTOGRAM_SHRED_STORE_INSERT_WAIT_TYPE (FD_METRICS_TYPE_HISTOGRAM) #define FD_METRICS_HISTOGRAM_SHRED_STORE_INSERT_WAIT_DESC "Time in seconds spent waiting for the store to insert a new FEC set" @@ -119,7 +131,7 @@ #define FD_METRICS_HISTOGRAM_SHRED_STORE_INSERT_WAIT_MIN (1e-08) #define FD_METRICS_HISTOGRAM_SHRED_STORE_INSERT_WAIT_MAX (0.0005) -#define FD_METRICS_HISTOGRAM_SHRED_STORE_INSERT_WORK_OFF (133UL) +#define FD_METRICS_HISTOGRAM_SHRED_STORE_INSERT_WORK_OFF (135UL) #define FD_METRICS_HISTOGRAM_SHRED_STORE_INSERT_WORK_NAME "shred_store_insert_work" #define FD_METRICS_HISTOGRAM_SHRED_STORE_INSERT_WORK_TYPE (FD_METRICS_TYPE_HISTOGRAM) #define FD_METRICS_HISTOGRAM_SHRED_STORE_INSERT_WORK_DESC "Time in seconds spent on inserting a new FEC set" @@ -127,5 +139,5 @@ #define FD_METRICS_HISTOGRAM_SHRED_STORE_INSERT_WORK_MIN (1e-08) #define FD_METRICS_HISTOGRAM_SHRED_STORE_INSERT_WORK_MAX (0.0005) -#define FD_METRICS_SHRED_TOTAL (22UL) +#define FD_METRICS_SHRED_TOTAL (24UL) extern const fd_metrics_meta_t FD_METRICS_SHRED[FD_METRICS_SHRED_TOTAL]; diff --git a/src/disco/metrics/metrics.xml b/src/disco/metrics/metrics.xml index 0de555fb6a..2505087e72 100644 --- a/src/disco/metrics/metrics.xml +++ b/src/disco/metrics/metrics.xml @@ -657,6 +657,8 @@ metric introduced. + + Time in seconds spent waiting for the store to insert a new FEC set @@ -803,7 +805,7 @@ metric introduced. - + Time in seconds spent waiting for the store to link a new FEC set @@ -811,6 +813,9 @@ metric introduced. Time in seconds spent on linking a new FEC set + + Time in seconds it took to complete a slot + diff --git a/src/disco/shred/fd_shred_tile.c b/src/disco/shred/fd_shred_tile.c index b991dfca18..24d472260c 100644 --- a/src/disco/shred/fd_shred_tile.c +++ b/src/disco/shred/fd_shred_tile.c @@ -225,6 +225,8 @@ typedef struct { ulong shred_processing_result[ FD_FEC_RESOLVER_ADD_SHRED_RETVAL_CNT+FD_SHRED_ADD_SHRED_EXTRA_RETVAL_CNT ]; ulong invalid_block_id_cnt; ulong shred_rejected_unchained_cnt; + ulong repair_rcv_cnt; + ulong turbine_rcv_cnt; fd_histf_t store_insert_wait[ 1 ]; fd_histf_t store_insert_work[ 1 ]; } metrics[ 1 ]; @@ -296,6 +298,8 @@ metrics_write( fd_shred_ctx_t * ctx ) { FD_MHIST_COPY( SHRED, BATCH_MICROBLOCK_CNT, ctx->metrics->batch_microblock_cnt ); FD_MHIST_COPY( SHRED, SHREDDING_DURATION_SECONDS, ctx->metrics->shredding_timing ); FD_MHIST_COPY( SHRED, ADD_SHRED_DURATION_SECONDS, ctx->metrics->add_shred_timing ); + FD_MCNT_SET ( SHRED, SHRED_REPAIR_RCV, ctx->metrics->repair_rcv_cnt ); + FD_MCNT_SET ( SHRED, SHRED_TURBINE_RCV, ctx->metrics->turbine_rcv_cnt ); FD_MCNT_SET ( SHRED, INVALID_BLOCK_ID, ctx->metrics->invalid_block_id_cnt ); FD_MCNT_SET ( SHRED, SHRED_REJECTED_UNCHAINED, ctx->metrics->shred_rejected_unchained_cnt ); @@ -641,6 +645,9 @@ during_frag( fd_shred_ctx_t * ctx, return; }; + if( fd_disco_netmux_sig_proto( sig )==DST_PROTO_REPAIR ) ctx->metrics->repair_rcv_cnt++; + else ctx->metrics->turbine_rcv_cnt++; + /* Drop unchained merkle shreds (if feature is active) */ int is_unchained = !fd_shred_is_chained( fd_shred_type( shred->variant ) ); if( FD_UNLIKELY( is_unchained && shred->slot >= ctx->features_activation->drop_unchained_merkle_shreds ) ) { @@ -888,17 +895,15 @@ after_frag( fd_shred_ctx_t * ctx, int is_code = fd_shred_is_code( fd_shred_type( shred->variant ) ); uint shred_idx_or_data_cnt = shred->idx; - int completes = 0; if( FD_LIKELY( is_code ) ) shred_idx_or_data_cnt = shred->code.data_cnt; /* optimize for code_cnt >= data_cnt */ - else completes = shred->data.flags & ( FD_SHRED_DATA_FLAG_SLOT_COMPLETE | FD_SHRED_DATA_FLAG_DATA_COMPLETE ); - ulong sig = fd_disco_shred_repair_shred_sig( !!completes, shred->slot, shred->fec_set_idx, is_code, shred_idx_or_data_cnt ); + ulong _sig = fd_disco_shred_repair_shred_sig( fd_disco_netmux_sig_proto(sig)==DST_PROTO_SHRED, shred->slot, shred->fec_set_idx, is_code, shred_idx_or_data_cnt ); /* Copy the shred header into the frag and publish. */ ulong sz = fd_shred_header_sz( shred->variant ); fd_memcpy( fd_chunk_to_laddr( ctx->repair_out_mem, ctx->repair_out_chunk ), shred, sz ); ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() ); - fd_stem_publish( stem, ctx->repair_out_idx, sig, ctx->repair_out_chunk, sz, 0UL, ctx->tsorig, tspub ); + fd_stem_publish( stem, ctx->repair_out_idx, _sig, ctx->repair_out_chunk, sz, 0UL, ctx->tsorig, tspub ); ctx->repair_out_chunk = fd_dcache_compact_next( ctx->repair_out_chunk, sz, ctx->repair_out_chunk0, ctx->repair_out_wmark ); } } @@ -1331,6 +1336,8 @@ unprivileged_init( fd_topo_t * topo, memset( ctx->metrics->shred_processing_result, '\0', sizeof(ctx->metrics->shred_processing_result) ); ctx->metrics->invalid_block_id_cnt = 0UL; ctx->metrics->shred_rejected_unchained_cnt = 0UL; + ctx->metrics->repair_rcv_cnt = 0UL; + ctx->metrics->turbine_rcv_cnt = 0UL; ctx->pending_batch.microblock_cnt = 0UL; ctx->pending_batch.txn_cnt = 0UL; diff --git a/src/discof/forest/fd_forest.c b/src/discof/forest/fd_forest.c index 003ca133a7..9098143f67 100644 --- a/src/discof/forest/fd_forest.c +++ b/src/discof/forest/fd_forest.c @@ -364,6 +364,11 @@ acquire( fd_forest_t * forest, ulong slot ) { fd_forest_ele_idxs_null( ele->fecs ); /* FIXME expensive */ fd_forest_ele_idxs_null( ele->idxs ); /* FIXME expensive */ + fd_forest_ele_idxs_null( ele->code ); /* FIXME expensive */ + ele->first_ts = 0; + ele->turbine_cnt = 0; + ele->repair_cnt = 0; + return ele; } @@ -408,7 +413,7 @@ fd_forest_query( fd_forest_t * forest, ulong slot ) { } fd_forest_ele_t * -fd_forest_data_shred_insert( fd_forest_t * forest, ulong slot, ushort parent_off, uint shred_idx, uint fec_set_idx, FD_PARAM_UNUSED int data_complete, int slot_complete ) { +fd_forest_data_shred_insert( fd_forest_t * forest, ulong slot, ushort parent_off, uint shred_idx, uint fec_set_idx, int slot_complete, shred_src_t src ) { # if FD_FOREST_USE_HANDHOLDING FD_TEST( slot > fd_forest_root_slot( forest ) ); /* caller error - inval */ # endif @@ -436,6 +441,13 @@ fd_forest_data_shred_insert( fd_forest_t * forest, ulong slot, ushort parent_off fd_forest_ancestry_ele_insert( fd_forest_ancestry( forest ), ele, pool ); link( forest, parent, ele ); } + + if( !fd_forest_ele_idxs_test( ele->idxs, shred_idx ) ) { /* newly seen shred */ + ele->turbine_cnt += (src==TURBINE); + ele->repair_cnt += (src==REPAIR); + } + if( FD_UNLIKELY( !slot_complete && ele->first_ts == 0 ) ) ele->first_ts = fd_tickcount(); + fd_forest_ele_idxs_insert( ele->fecs, fec_set_idx ); fd_forest_ele_idxs_insert( ele->idxs, shred_idx ); while( fd_forest_ele_idxs_test( ele->idxs, ele->buffered_idx + 1U ) ) ele->buffered_idx++; @@ -444,6 +456,24 @@ fd_forest_data_shred_insert( fd_forest_t * forest, ulong slot, ushort parent_off return ele; } +fd_forest_ele_t * +fd_forest_code_shred_insert( fd_forest_t * forest, ulong slot, uint shred_idx ) { + //fd_forest_ele_t * pool = fd_forest_pool( forest ); + fd_forest_ele_t * ele = query( forest, slot ); + if( FD_UNLIKELY( !ele ) ) { + /* very annoying, but if coding shred is the first shred in a slot, + we can't create its ele without the parent_off. But slight + in-accuracies in metrics are ok for now */ + return NULL; + } + + if( FD_LIKELY( !fd_forest_ele_idxs_test( ele->code, shred_idx ) ) ) { /* newly seen shred */ + ele->turbine_cnt += 1; + fd_forest_ele_idxs_insert( ele->code, shred_idx ); + } + return ele; +} + fd_forest_ele_t const * fd_forest_publish( fd_forest_t * forest, ulong new_root_slot ) { FD_LOG_DEBUG(( "[%s] slot %lu", __func__, new_root_slot )); @@ -894,6 +924,7 @@ void fd_forest_ancestry_print( fd_forest_t const * forest ) { printf(("\n\n[Ancestry]\n" ) ); ancestry_print3( forest, fd_forest_pool_ele_const( fd_forest_pool_const( forest ), forest->root ), 0, "[", NULL, 0 ); + fflush(stdout); /* Ensure ancestry printf output is flushed */ } void @@ -908,6 +939,7 @@ fd_forest_frontier_print( fd_forest_t const * forest ) { printf("%lu (%u/%u)\n", ele->slot, ele->buffered_idx + 1, ele->complete_idx + 1 ); //ancestry_print( forest, fd_forest_pool_ele_const( fd_forest_pool_const( forest ), fd_forest_pool_idx( pool, ele ) ), 0, "" ); } + fflush(stdout); } void @@ -921,6 +953,7 @@ fd_forest_orphaned_print( fd_forest_t const * forest ) { fd_forest_ele_t const * ele = fd_forest_orphaned_iter_ele_const( iter, orphaned, pool ); ancestry_print2( forest, fd_forest_pool_ele_const( fd_forest_pool_const( forest ), fd_forest_pool_idx( pool, ele ) ), NULL, 0, 0, "" ); } + fflush(stdout); } void @@ -932,6 +965,7 @@ fd_forest_print( fd_forest_t const * forest ) { fd_forest_frontier_print( forest ); fd_forest_orphaned_print( forest ); printf("\n"); + fflush(stdout); # endif } diff --git a/src/discof/forest/fd_forest.h b/src/discof/forest/fd_forest.h index 0d80191b3d..98ff86272d 100644 --- a/src/discof/forest/fd_forest.h +++ b/src/discof/forest/fd_forest.h @@ -56,6 +56,13 @@ struct __attribute__((aligned(128UL))) fd_forest_ele { fd_forest_ele_idxs_t cmpl[fd_forest_ele_idxs_word_cnt]; /* fec complete idxs */ fd_forest_ele_idxs_t fecs[fd_forest_ele_idxs_word_cnt]; /* fec set idxs */ fd_forest_ele_idxs_t idxs[fd_forest_ele_idxs_word_cnt]; /* data shred idxs */ + + /* Metrics */ + + fd_forest_ele_idxs_t code[fd_forest_ele_idxs_word_cnt]; /* code shred idxs */ + long first_ts; /* timestamp of first shred rcved in slot != complete_idx */ + uint turbine_cnt; /* number of shreds received from turbine */ + uint repair_cnt; /* number of data shreds received from repair */ }; typedef struct fd_forest_ele fd_forest_ele_t; @@ -301,6 +308,13 @@ fd_forest_query( fd_forest_t * forest, ulong slot ); /* Operations */ +enum shred_src { + TURBINE = 0, + REPAIR = 1, + RECOVERED = 2, +}; +typedef enum shred_src shred_src_t; + /* fd_forest_shred_insert inserts a new shred into the forest. Assumes slot >= forest->smr, slot is not already in forest, parent_slot is already in forest, and the ele pool has a free @@ -308,7 +322,10 @@ fd_forest_query( fd_forest_t * forest, ulong slot ); Returns the inserted forest ele. */ fd_forest_ele_t * -fd_forest_data_shred_insert( fd_forest_t * forest, ulong slot, ushort parent_off, uint shred_idx, uint fec_set_idx, int data_complete, int slot_complete ); +fd_forest_data_shred_insert( fd_forest_t * forest, ulong slot, ushort parent_off, uint shred_idx, uint fec_set_idx, int slot_complete, shred_src_t src ); + +fd_forest_ele_t * +fd_forest_code_shred_insert( fd_forest_t * forest, ulong slot, uint shred_idx ); /* fd_forest_publish publishes slot as the new forest root, setting the subtree beginning from slot as the new forest tree (ie. slot diff --git a/src/discof/forest/test_forest.c b/src/discof/forest/test_forest.c index 74d30cbae5..f68a6a73ec 100644 --- a/src/discof/forest/test_forest.c +++ b/src/discof/forest/test_forest.c @@ -18,12 +18,12 @@ fd_forest_t * setup_preorder( fd_forest_t * forest ) { fd_forest_init( forest, 0 ); - fd_forest_data_shred_insert( forest, 1, 1, 0, 0, 0, 0 ); - fd_forest_data_shred_insert( forest, 2, 1, 0, 0, 0, 0 ); - fd_forest_data_shred_insert( forest, 4, 2, 0, 0, 0, 0 ); - fd_forest_data_shred_insert( forest, 3, 2, 0, 0, 0, 0 ); - fd_forest_data_shred_insert( forest, 5, 2, 0, 0, 0, 0 ); - fd_forest_data_shred_insert( forest, 6, 1, 0, 0, 0, 0 ); + fd_forest_data_shred_insert( forest, 1, 1, 0, 0, 0, REPAIR ); + fd_forest_data_shred_insert( forest, 2, 1, 0, 0, 0, REPAIR ); + fd_forest_data_shred_insert( forest, 4, 2, 0, 0, 0, REPAIR ); + fd_forest_data_shred_insert( forest, 3, 2, 0, 0, 0, REPAIR ); + fd_forest_data_shred_insert( forest, 5, 2, 0, 0, 0, REPAIR ); + fd_forest_data_shred_insert( forest, 6, 1, 0, 0, 0, REPAIR ); FD_TEST( !fd_forest_verify( forest ) ); fd_forest_print( forest ); return forest; @@ -96,7 +96,7 @@ test_publish_incremental( fd_wksp_t * wksp ){ */ fd_forest_init( forest, 0 ); - fd_forest_data_shred_insert( forest, 11, 1, 0, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, 11, 1, 0, 0, 1, REPAIR ); ulong new_root = 1; fd_forest_publish( forest, new_root ); @@ -110,8 +110,8 @@ test_publish_incremental( fd_wksp_t * wksp ){ */ - fd_forest_data_shred_insert( forest, 2, 1, 0, 0, 1, 1 ); - fd_forest_data_shred_insert( forest, 3, 1, 0, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, 2, 1, 0, 0, 1, REPAIR ); + fd_forest_data_shred_insert( forest, 3, 1, 0, 0, 1, REPAIR ); ulong frontier = 3; FD_TEST( fd_forest_frontier_ele_query( fd_forest_frontier( forest ), &frontier, NULL, fd_forest_pool( forest ) ) ); @@ -130,10 +130,10 @@ test_publish_incremental( fd_wksp_t * wksp ){ */ - fd_forest_data_shred_insert( forest, 4, 1, 0, 0, 0, 0 ); - fd_forest_data_shred_insert( forest, 5, 1, 0, 0, 0, 0 ); - fd_forest_data_shred_insert( forest, 6, 1, 0, 0, 0, 0 ); - fd_forest_data_shred_insert( forest, 7, 1, 0, 0, 0, 0 ); + fd_forest_data_shred_insert( forest, 4, 1, 0, 0, 0, REPAIR ); + fd_forest_data_shred_insert( forest, 5, 1, 0, 0, 0, REPAIR ); + fd_forest_data_shred_insert( forest, 6, 1, 0, 0, 0, REPAIR ); + fd_forest_data_shred_insert( forest, 7, 1, 0, 0, 0, REPAIR ); frontier = 4; new_root = 6; @@ -152,7 +152,7 @@ test_publish_incremental( fd_wksp_t * wksp ){ 8 -> 9 (should get pruned) */ - fd_forest_data_shred_insert( forest, 9, 1, 0, 0, 0, 0 ); + fd_forest_data_shred_insert( forest, 9, 1, 0, 0, 0, REPAIR ); new_root = 10; frontier = 11; @@ -174,9 +174,9 @@ test_publish_incremental( fd_wksp_t * wksp ){ */ - fd_forest_data_shred_insert( forest, 14, 1, 0, 0, 0, 0 ); - fd_forest_data_shred_insert( forest, 15, 1, 0, 0, 0, 0 ); - fd_forest_data_shred_insert( forest, 16, 1, 0, 0, 0, 0 ); + fd_forest_data_shred_insert( forest, 14, 1, 0, 0, 0, REPAIR ); + fd_forest_data_shred_insert( forest, 15, 1, 0, 0, 0, REPAIR ); + fd_forest_data_shred_insert( forest, 16, 1, 0, 0, 0, REPAIR ); new_root = 15; frontier = 16; @@ -219,11 +219,11 @@ void test_out_of_order( fd_wksp_t * wksp ) { fd_forest_t * forest = fd_forest_join( fd_forest_new( mem, ele_max, 42UL /* seed */ ) ); fd_forest_init( forest, 0 ); - fd_forest_data_shred_insert( forest, 6, 1, 0, 0, 0, 0 ); - fd_forest_data_shred_insert( forest, 5, 2, 0, 0, 0, 0 ); - fd_forest_data_shred_insert( forest, 2, 1, 0, 0, 0, 0 ); - fd_forest_data_shred_insert( forest, 1, 1, 0, 0, 0, 0 ); - fd_forest_data_shred_insert( forest, 3, 2, 0, 0, 0, 0 ); + fd_forest_data_shred_insert( forest, 6, 1, 0, 0, 0, REPAIR ); + fd_forest_data_shred_insert( forest, 5, 2, 0, 0, 0, REPAIR ); + fd_forest_data_shred_insert( forest, 2, 1, 0, 0, 0, REPAIR ); + fd_forest_data_shred_insert( forest, 1, 1, 0, 0, 0, REPAIR ); + fd_forest_data_shred_insert( forest, 3, 2, 0, 0, 0, REPAIR ); fd_forest_print( forest ); ulong * arr = frontier_arr( wksp, forest ); @@ -232,7 +232,7 @@ void test_out_of_order( fd_wksp_t * wksp ) { FD_TEST( !fd_forest_verify( forest ) ); fd_wksp_free_laddr( arr ); - fd_forest_data_shred_insert( forest, 1, 1, 1, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, 1, 1, 1, 0, 1, REPAIR ); fd_forest_print( forest ); arr = frontier_arr( wksp, forest ); FD_TEST( arr[0] == 2 ); @@ -241,7 +241,7 @@ void test_out_of_order( fd_wksp_t * wksp ) { FD_TEST( !fd_forest_verify( forest ) ); fd_wksp_free_laddr( arr ); - fd_forest_data_shred_insert( forest, 3, 2, 1, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, 3, 2, 1, 0, 1, REPAIR ); fd_forest_print( forest ); arr = frontier_arr( wksp, forest ); FD_TEST( arr[0] == 2 ); @@ -250,7 +250,7 @@ void test_out_of_order( fd_wksp_t * wksp ) { FD_TEST( !fd_forest_verify( forest ) ); fd_wksp_free_laddr( arr ); - fd_forest_data_shred_insert( forest, 5, 2, 1, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, 5, 2, 1, 0, 1, REPAIR ); fd_forest_print( forest ); arr = frontier_arr( wksp, forest ); FD_TEST( arr[0] == 2 ); @@ -259,8 +259,8 @@ void test_out_of_order( fd_wksp_t * wksp ) { FD_TEST( !fd_forest_verify( forest ) ); fd_wksp_free_laddr( arr ); - fd_forest_data_shred_insert( forest, 4, 2, 0, 0, 0, 0 ); - fd_forest_data_shred_insert( forest, 2, 1, 1, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, 4, 2, 0, 0, 0, REPAIR ); + fd_forest_data_shred_insert( forest, 2, 1, 1, 0, 1, REPAIR ); fd_forest_print( forest ); arr = frontier_arr( wksp, forest ); FD_TEST( arr[0] == 4 ); @@ -269,7 +269,7 @@ void test_out_of_order( fd_wksp_t * wksp ) { FD_TEST( !fd_forest_verify( forest ) ); fd_wksp_free_laddr( arr ); - fd_forest_data_shred_insert( forest, 6, 1, 1, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, 6, 1, 1, 0, 1, REPAIR ); fd_forest_print( forest ); arr = frontier_arr( wksp, forest ); FD_TEST( arr[0] == 4 ); @@ -278,8 +278,8 @@ void test_out_of_order( fd_wksp_t * wksp ) { FD_TEST( !fd_forest_verify( forest ) ); fd_wksp_free_laddr( arr ); - fd_forest_data_shred_insert( forest, 4, 2, 1, 0, 0, 0 ); /* shred complete arrives before */ - fd_forest_data_shred_insert( forest, 4, 2, 2, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, 4, 2, 1, 0, 0, REPAIR ); /* shred complete arrives before */ + fd_forest_data_shred_insert( forest, 4, 2, 2, 0, 1, REPAIR ); fd_forest_print( forest ); arr = frontier_arr( wksp, forest ); FD_TEST( arr[0] == 4 ); @@ -306,11 +306,11 @@ test_forks( fd_wksp_t * wksp ){ // these slots all have 2 shreds, 0,1 fd_forest_init( forest, 0 ); - fd_forest_data_shred_insert( forest, 1, 1, 1, 0, 1, 1 ); - fd_forest_data_shred_insert( forest, 2, 1, 1, 0, 1, 1 ); - fd_forest_data_shred_insert( forest, 3, 1, 1, 0, 1, 1 ); - fd_forest_data_shred_insert( forest, 4, 1, 1, 0, 1, 1 ); - fd_forest_data_shred_insert( forest, 10, 1, 1, 0, 1, 1 ); /* orphan */ + fd_forest_data_shred_insert( forest, 1, 1, 1, 0, 1, REPAIR ); + fd_forest_data_shred_insert( forest, 2, 1, 1, 0, 1, REPAIR ); + fd_forest_data_shred_insert( forest, 3, 1, 1, 0, 1, REPAIR ); + fd_forest_data_shred_insert( forest, 4, 1, 1, 0, 1, REPAIR ); + fd_forest_data_shred_insert( forest, 10, 1, 1, 0, 1, REPAIR ); /* orphan */ /* Frontier should be slot 1. */ ulong key = 1 ; @@ -327,14 +327,14 @@ test_forks( fd_wksp_t * wksp ){ FD_TEST( cnt == 1 ); // advance frontier to slot 3 - fd_forest_data_shred_insert( forest, 1, 1, 0, 0, 0, 0 ); - fd_forest_data_shred_insert( forest, 2, 1, 0, 0, 0, 0 ); + fd_forest_data_shred_insert( forest, 1, 1, 0, 0, 0, REPAIR ); + fd_forest_data_shred_insert( forest, 2, 1, 0, 0, 0, REPAIR ); key = 3; FD_TEST( fd_forest_frontier_ele_query( fd_forest_frontier( forest ), &key, NULL, fd_forest_pool( forest ) ) ); // add a new fork off slot 1 - fd_forest_data_shred_insert( forest, 5, 4, 1, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, 5, 4, 1, 0, 1, REPAIR ); fd_forest_print( forest ); @@ -352,8 +352,8 @@ test_forks( fd_wksp_t * wksp ){ FD_TEST( cnt == 2 ); // add a fork off of the orphan - fd_forest_data_shred_insert( forest, 11, 1, 1, 0, 1, 1 ); - fd_forest_data_shred_insert( forest, 12, 4, 1, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, 11, 1, 1, 0, 1, REPAIR ); + fd_forest_data_shred_insert( forest, 12, 4, 1, 0, 1, REPAIR ); cnt = 0; for( fd_forest_frontier_iter_t iter = fd_forest_frontier_iter_init( fd_forest_frontier( forest ), fd_forest_pool( forest ) ); @@ -377,19 +377,19 @@ test_print_tree( fd_wksp_t *wksp ){ fd_forest_t * forest = fd_forest_join( fd_forest_new( mem, ele_max, 42UL /* seed */ ) ); fd_forest_init( forest, 1568376 ); - fd_forest_data_shred_insert( forest, 1568377, 1, 0, 0, 1, 1 ); - fd_forest_data_shred_insert( forest, 1568378, 1, 0, 0, 1, 1 ); - fd_forest_data_shred_insert( forest, 1568379, 1, 0, 0, 1, 1 ); - fd_forest_data_shred_insert( forest, 1568380, 1, 0, 0, 1, 1 ); - fd_forest_data_shred_insert( forest, 1568381, 2, 0, 0, 1, 1 ); - fd_forest_data_shred_insert( forest, 1568382, 1, 0, 0, 1, 1 ); - fd_forest_data_shred_insert( forest, 1568383, 4, 0, 0, 1, 1 ); - fd_forest_data_shred_insert( forest, 1568384, 5, 0, 0, 1, 1 ); - fd_forest_data_shred_insert( forest, 1568385, 5, 0, 0, 1, 1 ); - fd_forest_data_shred_insert( forest, 1568386, 6, 0, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, 1568377, 1, 0, 0, 1, REPAIR ); + fd_forest_data_shred_insert( forest, 1568378, 1, 0, 0, 1, REPAIR ); + fd_forest_data_shred_insert( forest, 1568379, 1, 0, 0, 1, REPAIR ); + fd_forest_data_shred_insert( forest, 1568380, 1, 0, 0, 1, REPAIR ); + fd_forest_data_shred_insert( forest, 1568381, 2, 0, 0, 1, REPAIR ); + fd_forest_data_shred_insert( forest, 1568382, 1, 0, 0, 1, REPAIR ); + fd_forest_data_shred_insert( forest, 1568383, 4, 0, 0, 1, REPAIR ); + fd_forest_data_shred_insert( forest, 1568384, 5, 0, 0, 1, REPAIR ); + fd_forest_data_shred_insert( forest, 1568385, 5, 0, 0, 1, REPAIR ); + fd_forest_data_shred_insert( forest, 1568386, 6, 0, 0, 1, REPAIR ); for( ulong i = 1568387; i < 1568400; i++ ){ - FD_TEST( fd_forest_data_shred_insert( forest, i, 1, 0, 0, 1, 1) ); + FD_TEST( fd_forest_data_shred_insert( forest, i, 1, 0, 0, 1, REPAIR) ); } FD_TEST( !fd_forest_verify( forest ) ); @@ -414,32 +414,32 @@ test_large_print_tree( fd_wksp_t * wksp ){ fd_forest_init( forest, 330090532 ); for( ulong slot = 330090533; slot <= 330090539; slot++ ){ - fd_forest_data_shred_insert( forest, slot, 1, 0, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, slot, 1, 0, 0, 1, REPAIR ); } - fd_forest_data_shred_insert( forest, 330090544, 5, 0, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, 330090544, 5, 0, 0, 1, REPAIR ); for( ulong slot = 330090545; slot <= 330090583; slot++ ){ - fd_forest_data_shred_insert( forest, slot, 1, 0, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, slot, 1, 0, 0, 1, REPAIR ); } - fd_forest_data_shred_insert( forest, 330090588, 5, 0, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, 330090588, 5, 0, 0, 1, REPAIR ); for( ulong slot = 330090589; slot <= 330090855; slot++ ){ - fd_forest_data_shred_insert( forest, slot, 1, 0, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, slot, 1, 0, 0, 1, REPAIR ); } - fd_forest_data_shred_insert( forest, 330090856, 5, 0, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, 330090856, 5, 0, 0, 1, REPAIR ); for( ulong slot = 330090857; slot <= 330090859; slot++ ){ - fd_forest_data_shred_insert( forest, slot, 1, 0, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, slot, 1, 0, 0, 1, REPAIR ); } - fd_forest_data_shred_insert( forest, 330090864, 5, 0, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, 330090864, 5, 0, 0, 1, REPAIR ); for( ulong slot = 330090865; slot <= 330091007; slot++ ){ - fd_forest_data_shred_insert( forest, slot, 1, 0, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, slot, 1, 0, 0, 1, REPAIR ); } - fd_forest_data_shred_insert( forest, 330091008, 5, 0, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, 330091008, 5, 0, 0, 1, REPAIR ); - fd_forest_data_shred_insert( forest, 330091010, 3, 0, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, 330091010, 3, 0, 0, 1, REPAIR ); for( ulong slot = 330091011; slot <= 330091048; slot++ ){ - fd_forest_data_shred_insert( forest, slot, 1, 0, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, slot, 1, 0, 0, 1, REPAIR ); } FD_TEST( !fd_forest_verify( forest ) ); @@ -476,11 +476,11 @@ test_linear_forest_iterator( fd_wksp_t * wksp ) { (slot 5, idx 0), (slot 5, idx 1), (slot 5, idx 2), (slot 5, idx 3), (slot 5, idx 4) */ fd_forest_init( forest, 0 ); - fd_forest_data_shred_insert( forest, 1, 1, 1, 0, 1, 1 ); - fd_forest_data_shred_insert( forest, 2, 1, 2, 0, 1, 1 ); - fd_forest_data_shred_insert( forest, 3, 1, 0, 0, 0, 0 ); - fd_forest_data_shred_insert( forest, 4, 1, 3, 0, 0, 0 ); - fd_forest_data_shred_insert( forest, 5, 1, 5, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, 1, 1, 1, 0, 1, REPAIR ); + fd_forest_data_shred_insert( forest, 2, 1, 2, 0, 1, REPAIR ); + fd_forest_data_shred_insert( forest, 3, 1, 0, 0, 0, REPAIR ); + fd_forest_data_shred_insert( forest, 4, 1, 3, 0, 0, REPAIR ); + fd_forest_data_shred_insert( forest, 5, 1, 5, 0, 1, REPAIR ); iter_order_t expected[10] = { { 1, 0 }, { 2, 0 }, { 2, 1 }, { 3, UINT_MAX }, { 4, UINT_MAX }, @@ -533,11 +533,11 @@ test_branched_forest_iterator( fd_wksp_t * wksp ) { (slot 5, idx 0), (slot 5, idx 1), (slot 5, idx 2), (slot 5, idx 3), (slot 5, idx 4) */ fd_forest_init( forest, 0 ); - fd_forest_data_shred_insert( forest, 1, 1, 1, 0, 1, 1 ); - fd_forest_data_shred_insert( forest, 2, 1, 2, 0, 1, 1 ); - fd_forest_data_shred_insert( forest, 3, 2, 0, 0, 0, 0 ); - fd_forest_data_shred_insert( forest, 4, 2, 3, 0, 0, 0 ); - fd_forest_data_shred_insert( forest, 5, 2, 5, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, 1, 1, 1, 0, 1, REPAIR ); + fd_forest_data_shred_insert( forest, 2, 1, 2, 0, 1, REPAIR ); + fd_forest_data_shred_insert( forest, 3, 2, 0, 0, 0, REPAIR ); + fd_forest_data_shred_insert( forest, 4, 2, 3, 0, 0, REPAIR ); + fd_forest_data_shred_insert( forest, 5, 2, 5, 0, 1, REPAIR ); /* This is deterministic. With only one frontier, we will try to DFS the left most fork */ @@ -558,7 +558,7 @@ test_branched_forest_iterator( fd_wksp_t * wksp ) { /* Now frontier advances to the point where we have two things in the frontier */ ulong curr_ver = fd_fseq_query( fd_forest_ver_const( forest ) ); - fd_forest_data_shred_insert( forest, 1, 1, 0, 0, 0, 0 ); + fd_forest_data_shred_insert( forest, 1, 1, 0, 0, 0, REPAIR ); // slot one is complete, so we should now have two things in the frontier FD_TEST( curr_ver < fd_fseq_query( fd_forest_ver_const( forest ) ) ); @@ -591,7 +591,7 @@ test_branched_forest_iterator( fd_wksp_t * wksp ) { i++; if( i == 2 ) { /* insert a data shred in the middle of the iteration */ - fd_forest_data_shred_insert( forest, 3, 2, 3, 0, 1, 1 ); + fd_forest_data_shred_insert( forest, 3, 2, 3, 0, 1, REPAIR ); FD_TEST( curr_ver < fd_fseq_query( fd_forest_ver_const( forest ) ) ); curr_ver = fd_fseq_query( fd_forest_ver_const( forest ) ); } @@ -609,16 +609,16 @@ test_frontier( fd_wksp_t * wksp ) { fd_forest_t * forest = fd_forest_join( fd_forest_new( mem, ele_max, 42UL /* seed */ ) ); fd_forest_init( forest, 0 ); - fd_forest_data_shred_insert( forest, 1, 1, 0, 0, 1, 1 ); - fd_forest_data_shred_insert( forest, 2, 1, 0, 0, 1, 1 ); - fd_forest_data_shred_insert( forest, 3, 1, 0, 0, 0, 0 ); /* new frontier */ + fd_forest_data_shred_insert( forest, 1, 1, 0, 0, 1, REPAIR ); + fd_forest_data_shred_insert( forest, 2, 1, 0, 0, 1, REPAIR ); + fd_forest_data_shred_insert( forest, 3, 1, 0, 0, 0, REPAIR ); /* new frontier */ ulong frontier_slot = 3; FD_TEST( !fd_forest_verify( forest ) ); FD_TEST( fd_forest_frontier_ele_query( fd_forest_frontier( forest ), &frontier_slot, NULL, fd_forest_pool( forest ) ) ); /* frontier chaining from slot 1 */ - fd_forest_data_shred_insert( forest, 4, 3, 0, 0, 0, 0 ); /* new frontier */ + fd_forest_data_shred_insert( forest, 4, 3, 0, 0, 0, REPAIR ); /* new frontier */ frontier_slot = 4; FD_TEST( !fd_forest_verify( forest ) ); FD_TEST( fd_forest_frontier_ele_query( fd_forest_frontier( forest ), &frontier_slot, NULL, fd_forest_pool( forest ) ) ); diff --git a/src/discof/reasm/fd_reasm.c b/src/discof/reasm/fd_reasm.c index e16adaf8b6..6077f60dd9 100644 --- a/src/discof/reasm/fd_reasm.c +++ b/src/discof/reasm/fd_reasm.c @@ -451,6 +451,7 @@ print( fd_reasm_t const * reasm, fd_reasm_fec_t const * fec, int space, const ch } curr = pool_ele_const( pool, curr->sibling ); } + } void diff --git a/src/discof/repair/fd_repair_tile.c b/src/discof/repair/fd_repair_tile.c index e283d55dc9..2b0b70d512 100644 --- a/src/discof/repair/fd_repair_tile.c +++ b/src/discof/repair/fd_repair_tile.c @@ -115,18 +115,6 @@ typedef struct fd_fec_sig fd_fec_sig_t; #define MAP_MEMOIZE 0 #include "../../util/tmpl/fd_map_dynamic.c" -struct fd_sreasm { - ulong slot; - uint cnt; -}; -typedef struct fd_sreasm fd_sreasm_t; - -#define MAP_NAME fd_sreasm -#define MAP_T fd_sreasm_t -#define MAP_KEY slot -#define MAP_MEMOIZE 0 -#include "../../util/tmpl/fd_map_dynamic.c" - struct fd_repair_tile_ctx { long tsprint; /* timestamp for printing */ long tsrepair; /* timestamp for repair */ @@ -145,7 +133,6 @@ struct fd_repair_tile_ctx { fd_forest_t * forest; fd_fec_sig_t * fec_sigs; - fd_sreasm_t * sreasm; fd_reasm_t * reasm; fd_forest_iter_t repair_iter; fd_store_t * store; @@ -224,6 +211,21 @@ struct fd_repair_tile_ctx { /* Pending sign requests */ fd_repair_pending_sign_req_t * pending_sign_req_pool; fd_repair_pending_sign_req_map_t * pending_sign_req_map; + + struct { + ulong recv_clnt_pkt; + ulong recv_serv_pkt; + ulong recv_serv_corrupt_pkt; + ulong recv_serv_invalid_signature; + ulong recv_serv_full_ping_table; + ulong recv_serv_pkt_types[FD_METRICS_ENUM_REPAIR_SERV_PKT_TYPES_CNT]; + ulong recv_pkt_corrupted_msg; + ulong send_pkt_cnt; + ulong sent_pkt_types[FD_METRICS_ENUM_REPAIR_SENT_REQUEST_TYPES_CNT]; + fd_histf_t store_link_wait[ 1 ]; + fd_histf_t store_link_work[ 1 ]; + fd_histf_t slot_compl_time[ 1 ]; + } metrics[ 1 ]; }; typedef struct fd_repair_tile_ctx fd_repair_tile_ctx_t; @@ -248,9 +250,7 @@ scratch_footprint( fd_topo_tile_t const * tile ) { l = FD_LAYOUT_APPEND( l, fd_repair_align(), fd_repair_footprint() ); l = FD_LAYOUT_APPEND( l, fd_forest_align(), fd_forest_footprint( tile->repair.slot_max ) ); l = FD_LAYOUT_APPEND( l, fd_fec_sig_align(), fd_fec_sig_footprint( 20 ) ); - l = FD_LAYOUT_APPEND( l, fd_sreasm_align(), fd_sreasm_footprint( 20 ) ); l = FD_LAYOUT_APPEND( l, fd_reasm_align(), fd_reasm_footprint( 1 << 20 ) ); -//l = FD_LAYOUT_APPEND( l, fd_fec_repair_align(), fd_fec_repair_footprint( ( 1<<20 ), tile->repair.shred_tile_cnt ) ); l = FD_LAYOUT_APPEND( l, fd_repair_pending_sign_req_pool_align(), fd_repair_pending_sign_req_pool_footprint( FD_REPAIR_PENDING_SIGN_REQ_MAX ) ); l = FD_LAYOUT_APPEND( l, fd_repair_pending_sign_req_map_align(), fd_repair_pending_sign_req_map_footprint( FD_REPAIR_PENDING_SIGN_REQ_MAX ) ); l = FD_LAYOUT_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint( FD_REPAIR_SCRATCH_MAX ) ); @@ -300,6 +300,7 @@ send_packet( fd_repair_tile_ctx_t * ctx, uchar const * payload, ulong payload_sz, ulong tsorig ) { + uchar * packet = fd_chunk_to_laddr( ctx->net_out_mem, ctx->net_out_chunk ); fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)packet; *hdr = *(is_intake ? ctx->intake_hdr : ctx->serve_hdr); @@ -361,14 +362,14 @@ fd_repair_handle_ping( fd_repair_tile_ctx_t * repair_tile_ctx, /* Pass a raw client response packet into the protocol. addr is the address of the sender */ static int -fd_repair_recv_clnt_packet( fd_repair_tile_ctx_t * repair_tile_ctx, +fd_repair_recv_clnt_packet( fd_repair_tile_ctx_t * ctx, fd_stem_context_t * stem, fd_repair_t * glob, uchar const * msg, ulong msglen, fd_repair_peer_addr_t const * src_addr, uint dst_ip4_addr ) { - glob->metrics.recv_clnt_pkt++; + ctx->metrics->recv_clnt_pkt++; FD_SCRATCH_SCOPE_BEGIN { while( 1 ) { @@ -388,9 +389,9 @@ fd_repair_recv_clnt_packet( fd_repair_tile_ctx_t * repair_tile_ctx, case fd_repair_response_enum_ping: { uchar buf[FD_REPAIR_MAX_SIGN_BUF_SIZE]; - ulong buflen = fd_repair_handle_ping( repair_tile_ctx, glob, &gmsg->inner.ping, src_addr, dst_ip4_addr, buf, sizeof(buf) ); + ulong buflen = fd_repair_handle_ping( ctx, glob, &gmsg->inner.ping, src_addr, dst_ip4_addr, buf, sizeof(buf) ); ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() ); - send_packet( repair_tile_ctx, stem, 1, src_addr->addr, src_addr->port, dst_ip4_addr, buf, buflen, tsorig ); + send_packet( ctx, stem, 1, src_addr->addr, src_addr->port, dst_ip4_addr, buf, buflen, tsorig ); break; } } @@ -489,42 +490,23 @@ sign_avail_credits( fd_repair_tile_ctx_t * ctx ) { } static void -fd_repair_send_request( fd_repair_tile_ctx_t * repair_tile_ctx, - fd_stem_context_t * stem, - fd_repair_t * glob, - enum fd_needed_elem_type type, - ulong slot, - uint shred_index, - fd_pubkey_t const * recipient, - long now ) { +fd_repair_send_initial_request( fd_repair_tile_ctx_t * ctx, + fd_stem_context_t * stem, + fd_repair_t * glob, + fd_pubkey_t const * recipient, + long now ) { fd_repair_protocol_t protocol; - fd_repair_construct_request_protocol( glob, &protocol, type, slot, shred_index, recipient, 0, now ); + fd_repair_construct_request_protocol( glob, &protocol, fd_needed_window_index, 0, 0, recipient, 0, now ); fd_active_elem_t * active = fd_active_table_query( glob->actives, recipient, NULL ); + ctx->metrics->sent_pkt_types[fd_needed_window_index]++; active->avg_reqs++; - glob->metrics.send_pkt_cnt++; uchar buf[FD_REPAIR_MAX_SIGN_BUF_SIZE]; - ulong buflen = fd_repair_sign_and_send( repair_tile_ctx, &protocol, &active->addr, buf, sizeof(buf), 0, 1, NULL ); + ulong buflen = fd_repair_sign_and_send( ctx, &protocol, &active->addr, buf, sizeof(buf), 0, 1, NULL ); ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() ); uint src_ip4_addr = 0U; /* unknown */ - send_packet( repair_tile_ctx, stem, 1, active->addr.addr, active->addr.port, src_ip4_addr, buf, buflen, tsorig ); -} - -static void FD_FN_UNUSED -fd_repair_send_requests( fd_repair_tile_ctx_t * ctx, - fd_stem_context_t * stem, - enum fd_needed_elem_type type, - ulong slot, - uint shred_index, - long now ){ - fd_repair_t * glob = ctx->repair; - - for( uint i=0; ipeers[ glob->peer_idx++ ].key; - fd_repair_send_request( ctx, stem, glob, type, slot, shred_index, id, now ); - if( FD_UNLIKELY( glob->peer_idx >= glob->peer_cnt ) ) glob->peer_idx = 0; /* wrap around */ - } + send_packet( ctx, stem, 1, active->addr.addr, active->addr.port, src_ip4_addr, buf, buflen, tsorig ); } /* Sends a request asynchronously. If successful, adds it to the @@ -546,12 +528,13 @@ fd_repair_send_request_async( fd_repair_tile_ctx_t * ctx, /* Acquire and add a pending request from the pool */ fd_repair_protocol_t protocol; - fd_repair_pending_sign_req_t * pending = fd_repair_insert_pending_request( glob, &protocol, peer->addr.addr, peer->addr.port, type, slot, shred_index, now, recipient ); + fd_repair_pending_sign_req_t * pending = fd_repair_insert_pending_request( ctx->repair, &protocol, peer->addr.addr, peer->addr.port, type, slot, shred_index, now, recipient ); + if( FD_UNLIKELY( !pending ) ) { FD_LOG_WARNING(( "No free pending sign reqs" )); return; } - + ctx->metrics->sent_pkt_types[type]++; /* Sign and prepare the message directly into the pending buffer */ pending->buflen = fd_repair_sign_and_send( ctx, &protocol, &peer->addr, pending->buf, sizeof(pending->buf), 1, pending->nonce, sign_out ); @@ -603,7 +586,7 @@ handle_new_cluster_contact_info( fd_repair_tile_ctx_t * ctx, receive a peer's contact information for the first time, effectively prepaying the RTT cost. */ if( FD_LIKELY( ctx->repair_sign_cnt > 0 ) ) { - fd_repair_send_request(ctx, ctx->stem, ctx->repair, fd_needed_window_index, 0, 0, in_dests[i].pubkey, fd_log_wallclock()); + fd_repair_send_initial_request(ctx, ctx->stem, ctx->repair, in_dests[i].pubkey, fd_log_wallclock()); } ulong hash_src = 0xfffffUL & fd_ulong_hash( (ulong)in_dests[i].ip4_addr | ((ulong)repair_peer.port<<32) ); FD_LOG_INFO(( "Added repair peer: pubkey %s hash_src %lu", FD_BASE58_ENC_32_ALLOCA(in_dests[i].pubkey), hash_src )); @@ -795,6 +778,7 @@ fd_repair_handle_sign_response( fd_repair_tile_ctx_t * ctx, fd_memcpy( pending->buf + pending->sig_offset, ctx->buffer, 64UL ); ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() ); uint src_ip4_addr = 0U; + ctx->metrics->send_pkt_cnt++; send_packet( ctx, stem, 1, pending->dst_ip_addr, pending->dst_port, src_ip4_addr, pending->buf, pending->buflen, tsorig ); fd_repair_remove_pending_request( ctx->repair, response_nonce ); @@ -870,7 +854,7 @@ after_frag( fd_repair_tile_ctx_t * ctx, if( FD_UNLIKELY( sz == FD_SHRED_DATA_HEADER_SZ + sizeof(fd_hash_t) + sizeof(fd_hash_t) ) ) { fd_forest_ele_t * ele = NULL; for( uint idx = shred->fec_set_idx; idx <= shred->idx; idx++ ) { - ele = fd_forest_data_shred_insert( ctx->forest, shred->slot, shred->data.parent_off, idx, shred->fec_set_idx, 0, 0 ); + ele = fd_forest_data_shred_insert( ctx->forest, shred->slot, shred->data.parent_off, idx, shred->fec_set_idx, 0, RECOVERED ); } FD_TEST( ele ); /* must be non-empty */ fd_forest_ele_idxs_insert( ele->cmpl, shred->fec_set_idx ); @@ -887,17 +871,21 @@ after_frag( fd_repair_tile_ctx_t * ctx, cmr = &fd_reasm_root( ctx->reasm )->key; } FD_TEST( fd_reasm_insert( ctx->reasm, merkle_root, cmr, shred->slot, shred->fec_set_idx, shred->data.parent_off, (ushort)(shred->idx - shred->fec_set_idx + 1), data_complete, slot_complete ) ); + + if( FD_UNLIKELY( ele->complete_idx != UINT_MAX && ele->buffered_idx == ele->complete_idx ) ) { + fd_histf_sample( ctx->metrics->slot_compl_time, (ulong)(fd_tickcount() - ele->first_ts) ); + FD_LOG_INFO(( "slot is complete %lu. num_data_shreds: %u, num_repaired: %u, num_turbine: %u", ele->slot, ele->complete_idx + 1, ele->repair_cnt, ele->turbine_cnt )); + } } /* Insert the shred into the map. */ int is_code = fd_shred_is_code( fd_shred_type( shred->variant ) ); + shred_src_t src = fd_disco_shred_repair_shred_sig_is_turbine( sig ) ? TURBINE : REPAIR; if( FD_LIKELY( !is_code ) ) { fd_repair_inflight_remove( ctx->repair, shred->slot, shred->idx ); - - int data_complete = !!(shred->data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE); int slot_complete = !!(shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE); - fd_forest_ele_t * ele = fd_forest_data_shred_insert( ctx->forest, shred->slot, shred->data.parent_off, shred->idx, shred->fec_set_idx, data_complete, slot_complete ); + fd_forest_ele_t * ele = fd_forest_data_shred_insert( ctx->forest, shred->slot, shred->data.parent_off, shred->idx, shred->fec_set_idx, slot_complete, src ); /* Check if there are FECs to force complete. Algorithm: window through the idxs in interval [i, j). If j = next fec_set_idx @@ -929,6 +917,8 @@ after_frag( fd_repair_tile_ctx_t * ctx, // FD_LOG_NOTICE(( "not a fec boundary %lu %u", ele->slot, j )); } } + } else { + fd_forest_code_shred_insert( ctx->forest, shred->slot, shred->idx ); } return; } @@ -991,8 +981,8 @@ after_credit( fd_repair_tile_ctx_t * ctx, fd_stem_publish( stem, REPLAY_OUT_IDX, sig, ctx->replay_out_chunk, sizeof(fd_reasm_fec_t), 0, 0, tspub ); ctx->replay_out_chunk = fd_dcache_compact_next( ctx->replay_out_chunk, sizeof(fd_reasm_fec_t), ctx->replay_out_chunk0, ctx->replay_out_wmark ); - fd_histf_sample( ctx->repair->metrics.store_link_wait, (ulong)fd_long_max(shacq_end - shacq_start, 0) ); - fd_histf_sample( ctx->repair->metrics.store_link_work, (ulong)fd_long_max(shrel_end - shacq_end, 0) ); + fd_histf_sample( ctx->metrics->store_link_wait, (ulong)fd_long_max(shacq_end - shacq_start, 0) ); + fd_histf_sample( ctx->metrics->store_link_work, (ulong)fd_long_max(shrel_end - shacq_end, 0) ); } /* We might have more reassembled FEC sets to deliver to the @@ -1272,7 +1262,6 @@ unprivileged_init( fd_topo_t * topo, ctx->repair = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_align(), fd_repair_footprint() ); ctx->forest = FD_SCRATCH_ALLOC_APPEND( l, fd_forest_align(), fd_forest_footprint( tile->repair.slot_max ) ); ctx->fec_sigs = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_sig_align(), fd_fec_sig_footprint( 20 ) ); - ctx->sreasm = FD_SCRATCH_ALLOC_APPEND( l, fd_sreasm_align(), fd_sreasm_footprint( 20 ) ); ctx->reasm = FD_SCRATCH_ALLOC_APPEND( l, fd_reasm_align(), fd_reasm_footprint( 1 << 20 ) ); ctx->pending_sign_req_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_pending_sign_req_pool_align(), fd_repair_pending_sign_req_pool_footprint( FD_REPAIR_PENDING_SIGN_REQ_MAX ) ); ctx->pending_sign_req_map = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_pending_sign_req_map_align(), fd_repair_pending_sign_req_map_footprint( FD_REPAIR_PENDING_SIGN_REQ_MAX ) ); @@ -1325,7 +1314,6 @@ unprivileged_init( fd_topo_t * topo, ctx->forest = fd_forest_join ( fd_forest_new ( ctx->forest, tile->repair.slot_max, ctx->repair_seed ) ); // ctx->fec_repair = fd_fec_repair_join( fd_fec_repair_new( ctx->fec_repair, ( tile->repair.max_pending_shred_sets + 2 ), tile->repair.shred_tile_cnt, 0 ) ); ctx->fec_sigs = fd_fec_sig_join ( fd_fec_sig_new ( ctx->fec_sigs, 20 ) ); - ctx->sreasm = fd_sreasm_join( fd_sreasm_new( ctx->sreasm, 20 ) ); ctx->reasm = fd_reasm_join( fd_reasm_new( ctx->reasm, 1 << 20, 0 ) ); ctx->pending_sign_req_pool = fd_repair_pending_sign_req_pool_join ( fd_repair_pending_sign_req_pool_new ( ctx->pending_sign_req_pool, FD_REPAIR_PENDING_SIGN_REQ_MAX ) ); ctx->pending_sign_req_map = fd_repair_pending_sign_req_map_join ( fd_repair_pending_sign_req_map_new ( ctx->pending_sign_req_map, FD_REPAIR_PENDING_SIGN_REQ_MAX, ctx->repair_seed ) ); @@ -1364,10 +1352,12 @@ unprivileged_init( fd_topo_t * topo, fd_repair_update_addr( ctx->repair, &ctx->repair_intake_addr, &ctx->repair_serve_addr ); - fd_histf_join( fd_histf_new( ctx->repair->metrics.store_link_wait, FD_MHIST_SECONDS_MIN( REPAIR, STORE_LINK_WAIT ), - FD_MHIST_SECONDS_MAX( REPAIR, STORE_LINK_WAIT ) ) ); - fd_histf_join( fd_histf_new( ctx->repair->metrics.store_link_work, FD_MHIST_SECONDS_MIN( REPAIR, STORE_LINK_WORK ), - FD_MHIST_SECONDS_MAX( REPAIR, STORE_LINK_WORK ) ) ); + fd_histf_join( fd_histf_new( ctx->metrics->store_link_wait, FD_MHIST_SECONDS_MIN( REPAIR, STORE_LINK_WAIT ), + FD_MHIST_SECONDS_MAX( REPAIR, STORE_LINK_WAIT ) ) ); + fd_histf_join( fd_histf_new( ctx->metrics->store_link_work, FD_MHIST_SECONDS_MIN( REPAIR, STORE_LINK_WORK ), + FD_MHIST_SECONDS_MAX( REPAIR, STORE_LINK_WORK ) ) ); + fd_histf_join( fd_histf_new( ctx->metrics->slot_compl_time, FD_MHIST_SECONDS_MIN( REPAIR, SLOT_COMPLETE_TIME ), + FD_MHIST_SECONDS_MAX( REPAIR, SLOT_COMPLETE_TIME ) ) ); fd_repair_settime( ctx->repair, fd_log_wallclock() ); fd_repair_start( ctx->repair ); @@ -1406,18 +1396,20 @@ populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED, static inline void metrics_write( fd_repair_tile_ctx_t * ctx ) { /* Repair-protocol-specific metrics */ - fd_repair_metrics_t * metrics = fd_repair_get_metrics( ctx->repair ); - FD_MCNT_SET( REPAIR, RECV_CLNT_PKT, metrics->recv_clnt_pkt ); - FD_MCNT_SET( REPAIR, RECV_SERV_PKT, metrics->recv_serv_pkt ); - FD_MCNT_SET( REPAIR, RECV_SERV_CORRUPT_PKT, metrics->recv_serv_corrupt_pkt ); - FD_MCNT_SET( REPAIR, RECV_SERV_INVALID_SIGNATURE, metrics->recv_serv_invalid_signature ); - FD_MCNT_SET( REPAIR, RECV_SERV_FULL_PING_TABLE, metrics->recv_serv_full_ping_table ); - FD_MCNT_ENUM_COPY( REPAIR, RECV_SERV_PKT_TYPES, metrics->recv_serv_pkt_types ); - FD_MCNT_SET( REPAIR, RECV_PKT_CORRUPTED_MSG, metrics->recv_pkt_corrupted_msg ); - FD_MCNT_SET( REPAIR, SEND_PKT_CNT, metrics->send_pkt_cnt ); - FD_MCNT_ENUM_COPY( REPAIR, SENT_PKT_TYPES, metrics->sent_pkt_types ); - FD_MHIST_COPY( REPAIR, STORE_LINK_WAIT, metrics->store_link_wait ); - FD_MHIST_COPY( REPAIR, STORE_LINK_WORK, metrics->store_link_work ); + FD_MCNT_SET( REPAIR, RECV_CLNT_PKT, ctx->metrics->recv_clnt_pkt ); + FD_MCNT_SET( REPAIR, RECV_SERV_PKT, ctx->metrics->recv_serv_pkt ); + FD_MCNT_SET( REPAIR, RECV_SERV_CORRUPT_PKT, ctx->metrics->recv_serv_corrupt_pkt ); + FD_MCNT_SET( REPAIR, RECV_SERV_INVALID_SIGNATURE, ctx->metrics->recv_serv_invalid_signature ); + FD_MCNT_SET( REPAIR, RECV_SERV_FULL_PING_TABLE, ctx->metrics->recv_serv_full_ping_table ); + FD_MCNT_SET( REPAIR, RECV_PKT_CORRUPTED_MSG, ctx->metrics->recv_pkt_corrupted_msg ); + + FD_MCNT_SET ( REPAIR, SHRED_REPAIR_REQ, ctx->metrics->send_pkt_cnt ); + FD_MCNT_ENUM_COPY( REPAIR, RECV_SERV_PKT_TYPES, ctx->metrics->recv_serv_pkt_types ); + FD_MCNT_ENUM_COPY( REPAIR, SENT_PKT_TYPES, ctx->metrics->sent_pkt_types ); + + FD_MHIST_COPY( REPAIR, STORE_LINK_WAIT, ctx->metrics->store_link_wait ); + FD_MHIST_COPY( REPAIR, STORE_LINK_WORK, ctx->metrics->store_link_work ); + FD_MHIST_COPY( REPAIR, SLOT_COMPLETE_TIME, ctx->metrics->slot_compl_time ); } /* TODO: This is not correct, but is temporary and will be fixed diff --git a/src/discof/shredcap/fd_shredcap_tile.c b/src/discof/shredcap/fd_shredcap_tile.c index 230e38bc5f..98e5ada63f 100644 --- a/src/discof/shredcap/fd_shredcap_tile.c +++ b/src/discof/shredcap/fd_shredcap_tile.c @@ -286,7 +286,7 @@ handle_new_turbine_contact_info( fd_capture_tile_ctx_t * ctx, static int is_fec_completes_msg( ulong sz ) { - return sz == FD_SHRED_DATA_HEADER_SZ + 2*FD_SHRED_MERKLE_ROOT_SZ; + return sz == FD_SHRED_DATA_HEADER_SZ + 2 * FD_SHRED_MERKLE_ROOT_SZ; } static inline void diff --git a/src/flamenco/repair/fd_repair.c b/src/flamenco/repair/fd_repair.c index 3feacbfb39..5d4187fd24 100644 --- a/src/flamenco/repair/fd_repair.c +++ b/src/flamenco/repair/fd_repair.c @@ -308,7 +308,6 @@ fd_repair_construct_request_protocol( fd_repair_t * glob, long now ) { switch( type ) { case fd_needed_window_index: { - glob->metrics.sent_pkt_types[FD_METRICS_ENUM_REPAIR_SENT_REQUEST_TYPES_V_NEEDED_WINDOW_IDX]++; fd_repair_protocol_new_disc(protocol, fd_repair_protocol_enum_window_index); fd_repair_window_index_t * wi = &protocol->inner.window_index; wi->header.sender = *glob->public_key; @@ -317,12 +316,10 @@ fd_repair_construct_request_protocol( fd_repair_t * glob, wi->header.nonce = nonce; wi->slot = slot; wi->shred_index = shred_index; - //FD_LOG_INFO(( "repair request for %lu, %lu", wi->slot, wi->shred_index )); return 1; } case fd_needed_highest_window_index: { - glob->metrics.sent_pkt_types[FD_METRICS_ENUM_REPAIR_SENT_REQUEST_TYPES_V_NEEDED_HIGHEST_WINDOW_IDX]++; fd_repair_protocol_new_disc( protocol, fd_repair_protocol_enum_highest_window_index ); fd_repair_highest_window_index_t * wi = &protocol->inner.highest_window_index; wi->header.sender = *glob->public_key; @@ -331,12 +328,10 @@ fd_repair_construct_request_protocol( fd_repair_t * glob, wi->header.nonce = nonce; wi->slot = slot; wi->shred_index = shred_index; - //FD_LOG_INFO(( "repair request for %lu, %lu", wi->slot, wi->shred_index )); return 1; } case fd_needed_orphan: { - glob->metrics.sent_pkt_types[FD_METRICS_ENUM_REPAIR_SENT_REQUEST_TYPES_V_NEEDED_ORPHAN_IDX]++; fd_repair_protocol_new_disc( protocol, fd_repair_protocol_enum_orphan ); fd_repair_orphan_t * wi = &protocol->inner.orphan; wi->header.sender = *glob->public_key; @@ -344,7 +339,6 @@ fd_repair_construct_request_protocol( fd_repair_t * glob, wi->header.timestamp = (ulong)now/1000000L; wi->header.nonce = nonce; wi->slot = slot; - //FD_LOG_INFO(( "repair request for %lu", ele->dupkey.slot)); return 1; } } @@ -528,12 +522,6 @@ fd_repair_set_stake_weights_fini( fd_repair_t * repair ) { repair->stake_weights_cnt = repair->stake_weights_temp_cnt; } - -fd_repair_metrics_t * -fd_repair_get_metrics( fd_repair_t * repair ) { - return &repair->metrics; -} - /* Pending Sign Request API These functions manage the pool and map of pending sign requests in @@ -580,18 +568,16 @@ fd_repair_insert_pending_request( fd_repair_t * repair, return NULL; } - pending->nonce = repair->next_nonce; + pending->nonce = repair->next_nonce; fd_repair_pending_sign_req_map_ele_insert( repair->pending_sign_req_map, pending, repair->pending_sign_req_pool ); - fd_repair_construct_request_protocol( repair, protocol, type, slot, shred_index, recipient, repair->next_nonce, now ); - pending->sig_offset = 4; + pending->sig_offset = 4; pending->dst_ip_addr = dst_ip_addr; - pending->dst_port = dst_port; - pending->recipient = *recipient; + pending->dst_port = dst_port; + pending->recipient = *recipient; - repair->metrics.send_pkt_cnt++; repair->next_nonce++; return pending; } diff --git a/src/flamenco/repair/fd_repair.h b/src/flamenco/repair/fd_repair.h index d667343506..a8445c865b 100644 --- a/src/flamenco/repair/fd_repair.h +++ b/src/flamenco/repair/fd_repair.h @@ -3,8 +3,6 @@ #include "../gossip/fd_gossip.h" #include "../../ballet/shred/fd_shred.h" -#include "../../disco/metrics/generated/fd_metrics_repair.h" -#include "../../disco/metrics/fd_metrics.h" #define FD_REPAIR_DELIVER_FAIL_TIMEOUT -1 @@ -97,6 +95,9 @@ typedef struct fd_active_elem fd_active_elem_t; enum fd_needed_elem_type { fd_needed_window_index, fd_needed_highest_window_index, fd_needed_orphan }; +FD_STATIC_ASSERT( fd_needed_window_index==FD_METRICS_ENUM_REPAIR_SENT_REQUEST_TYPES_V_NEEDED_WINDOW_IDX, update repair metrics enums ); +FD_STATIC_ASSERT( fd_needed_highest_window_index==FD_METRICS_ENUM_REPAIR_SENT_REQUEST_TYPES_V_NEEDED_HIGHEST_WINDOW_IDX, update repair metrics enums ); +FD_STATIC_ASSERT( fd_needed_orphan==FD_METRICS_ENUM_REPAIR_SENT_REQUEST_TYPES_V_NEEDED_ORPHAN_IDX, update repair metrics enums ); struct fd_inflight_key { enum fd_needed_elem_type type; @@ -161,6 +162,7 @@ struct fd_pinged_elem { int good; }; typedef struct fd_pinged_elem fd_pinged_elem_t; + #define MAP_NAME fd_pinged_table #define MAP_KEY_T fd_repair_peer_addr_t #define MAP_KEY_EQ fd_repair_peer_addr_eq @@ -196,22 +198,7 @@ struct fd_peer { fd_ip4_port_t ip4; }; typedef struct fd_peer fd_peer_t; -/* Repair Metrics */ -struct fd_repair_metrics { - ulong recv_clnt_pkt; - ulong recv_serv_pkt; - ulong recv_serv_corrupt_pkt; - ulong recv_serv_invalid_signature; - ulong recv_serv_full_ping_table; - ulong recv_serv_pkt_types[FD_METRICS_ENUM_REPAIR_SERV_PKT_TYPES_CNT]; - ulong recv_pkt_corrupted_msg; - ulong send_pkt_cnt; - ulong sent_pkt_types[FD_METRICS_ENUM_REPAIR_SENT_REQUEST_TYPES_CNT]; - fd_histf_t store_link_wait[ 1 ]; - fd_histf_t store_link_work[ 1 ]; -}; -typedef struct fd_repair_metrics fd_repair_metrics_t; -#define FD_REPAIR_METRICS_FOOTPRINT ( sizeof( fd_repair_metrics_t ) ) + /* Global data for repair service */ struct fd_repair { /* Current time in nanosecs */ @@ -267,8 +254,6 @@ struct fd_repair { /* Pending sign requests for async operations */ fd_repair_pending_sign_req_t * pending_sign_req_pool; fd_repair_pending_sign_req_map_t * pending_sign_req_map; - /* Metrics */ - fd_repair_metrics_t metrics; }; typedef struct fd_repair fd_repair_t; @@ -359,9 +344,6 @@ void fd_repair_set_stake_weights_init( fd_repair_t * repair, void fd_repair_set_stake_weights_fini( fd_repair_t * repair ); -fd_repair_metrics_t * -fd_repair_get_metrics( fd_repair_t * repair ); - /* Pending sign request operations */ fd_repair_pending_sign_req_t * fd_repair_insert_pending_request( fd_repair_t * repair, diff --git a/src/flamenco/repair/test_repair.c b/src/flamenco/repair/test_repair.c index f1bff6c67a..8c1e991a54 100644 --- a/src/flamenco/repair/test_repair.c +++ b/src/flamenco/repair/test_repair.c @@ -3,6 +3,7 @@ #include "../../util/fd_util.h" #include #include +#include /* init repair test */ static fd_repair_t *