diff --git a/backend/common/src/assign_id.rs b/backend/common/src/assign_id.rs index dfcbb20b6..d9e139fad 100644 --- a/backend/common/src/assign_id.rs +++ b/backend/common/src/assign_id.rs @@ -31,6 +31,17 @@ pub struct AssignId { _id_type: std::marker::PhantomData, } +impl Default for AssignId +where + Details: Eq + Hash, + Id: From + Copy, + usize: From, +{ + fn default() -> Self { + Self::new() + } +} + impl AssignId where Details: Eq + Hash, @@ -68,7 +79,7 @@ where pub fn remove_by_details(&mut self, details: &Details) -> Option { self.mapping - .remove_by_right(&details) + .remove_by_right(details) .map(|(id, _)| id.into()) } diff --git a/backend/common/src/byte_size.rs b/backend/common/src/byte_size.rs index f9b1d9b22..973ce8ae6 100644 --- a/backend/common/src/byte_size.rs +++ b/backend/common/src/byte_size.rs @@ -84,20 +84,20 @@ mod test { ("20 kB", 20 * 1000), ("20K", 20 * 1000), (" 20k", 20 * 1000), - ("1MB", 1 * 1000 * 1000), - ("1M", 1 * 1000 * 1000), - ("1m", 1 * 1000 * 1000), - ("1 m", 1 * 1000 * 1000), - ("1GB", 1 * 1000 * 1000 * 1000), - ("1G", 1 * 1000 * 1000 * 1000), - ("1g", 1 * 1000 * 1000 * 1000), - ("1KiB", 1 * 1024), - ("1Ki", 1 * 1024), - ("1MiB", 1 * 1024 * 1024), - ("1Mi", 1 * 1024 * 1024), - ("1GiB", 1 * 1024 * 1024 * 1024), - ("1Gi", 1 * 1024 * 1024 * 1024), - (" 1 Gi ", 1 * 1024 * 1024 * 1024), + ("1MB", 1000 * 1000), + ("1M", 1000 * 1000), + ("1m", 1000 * 1000), + ("1 m", 1000 * 1000), + ("1GB", 1000 * 1000 * 1000), + ("1G", 1000 * 1000 * 1000), + ("1g", 1000 * 1000 * 1000), + ("1KiB", 1024), + ("1Ki", 1024), + ("1MiB", 1024 * 1024), + ("1Mi", 1024 * 1024), + ("1GiB", 1024 * 1024 * 1024), + ("1Gi", 1024 * 1024 * 1024), + (" 1 Gi ", 1024 * 1024 * 1024), ]; for (s, expected) in cases { diff --git a/backend/common/src/dense_map.rs b/backend/common/src/dense_map.rs index 24bc60d53..224a4fc37 100644 --- a/backend/common/src/dense_map.rs +++ b/backend/common/src/dense_map.rs @@ -23,6 +23,7 @@ /// Items are keyed by an Id, which can be any type you wish, but /// must be convertible to/from a `usize`. This promotes using a /// custom Id type to talk about items in the map. +#[derive(Default)] pub struct DenseMap { /// List of retired indexes that can be re-used retired: Vec, @@ -108,6 +109,7 @@ where .filter_map(|(id, item)| Some((id.into(), item.as_mut()?))) } + #[allow(clippy::should_implement_trait)] pub fn into_iter(self) -> impl Iterator { self.items .into_iter() diff --git a/backend/common/src/http_utils.rs b/backend/common/src/http_utils.rs index 76812b008..deb9c6240 100644 --- a/backend/common/src/http_utils.rs +++ b/backend/common/src/http_utils.rs @@ -134,7 +134,7 @@ fn generate_websocket_accept_key<'a>(key: &[u8], buf: &'a mut [u8; 32]) -> &'a [ digest.update(KEY); let d = digest.finalize(); - let n = base64::encode_config_slice(&d, base64::STANDARD, buf); + let n = base64::encode_config_slice(d, base64::STANDARD, buf); &buf[..n] } diff --git a/backend/common/src/multi_map_unique.rs b/backend/common/src/multi_map_unique.rs index 51e8e9ac1..c25f3f807 100644 --- a/backend/common/src/multi_map_unique.rs +++ b/backend/common/src/multi_map_unique.rs @@ -19,6 +19,7 @@ use std::hash::Hash; /// A map where each key can contain multiple values. We enforce that a value /// only ever belongs to one key at a time (the latest key it was inserted /// against). +#[derive(Default)] pub struct MultiMapUnique { value_to_key: HashMap, key_to_values: HashMap>, diff --git a/backend/common/src/node_message.rs b/backend/common/src/node_message.rs index 9b63e8b41..35647b1df 100644 --- a/backend/common/src/node_message.rs +++ b/backend/common/src/node_message.rs @@ -134,7 +134,7 @@ mod tests { // know whether things can (de)serialize to bincode or not at runtime without failing unless // we test the different types we want to (de)serialize ourselves. We just need to test each // type, not each variant. - fn bincode_can_serialize_and_deserialize<'de, T>(item: T) + fn bincode_can_serialize_and_deserialize(item: T) where T: Serialize + serde::de::DeserializeOwned, { diff --git a/backend/common/src/node_types.rs b/backend/common/src/node_types.rs index 1fdc5d1fd..c788da7d2 100644 --- a/backend/common/src/node_types.rs +++ b/backend/common/src/node_types.rs @@ -132,7 +132,7 @@ impl Serialize for NodeIO { } /// Concise block details -#[derive(Deserialize, Serialize, Debug, Clone, Copy, PartialEq)] +#[derive(Deserialize, Serialize, Debug, Clone, Copy, PartialEq, Eq)] pub struct Block { pub hash: BlockHash, pub height: BlockNumber, @@ -208,7 +208,7 @@ impl<'de> Deserialize<'de> for NodeLocation { } /// Verbose block details -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct BlockDetails { pub block: Block, pub block_time: u64, diff --git a/backend/common/src/rolling_total.rs b/backend/common/src/rolling_total.rs index b9caf9f5a..00738d060 100644 --- a/backend/common/src/rolling_total.rs +++ b/backend/common/src/rolling_total.rs @@ -27,11 +27,17 @@ pub struct RollingTotalBuilder { time_source: Time, } +impl Default for RollingTotalBuilder { + fn default() -> Self { + Self::new() + } +} + impl RollingTotalBuilder { /// Build a [`RollingTotal`] struct. By default, /// the window_size is 10s, the granularity is 1s, /// and system time is used. - pub fn new() -> RollingTotalBuilder { + pub fn new() -> Self { Self { window_size_multiple: 10, granularity: Duration::from_secs(1), @@ -253,11 +259,7 @@ mod test { // Regardless of the exact time that's elapsed, we'll end up with buckets that // are exactly granularity spacing (or multiples of) apart. assert_eq!( - rolling_total - .averages() - .into_iter() - .copied() - .collect::>(), + rolling_total.averages().iter().copied().collect::>(), vec![ (start_time, 1), (start_time + granularity, 2), diff --git a/backend/common/src/ws_client/connect.rs b/backend/common/src/ws_client/connect.rs index 616469c55..7301e1f2d 100644 --- a/backend/common/src/ws_client/connect.rs +++ b/backend/common/src/ws_client/connect.rs @@ -115,7 +115,7 @@ impl Connection { let msg = match message_data { soketto::Data::Binary(_) => Ok(RecvMessage::Binary(data)), soketto::Data::Text(_) => String::from_utf8(data) - .map(|s| RecvMessage::Text(s)) + .map(RecvMessage::Text) .map_err(|e| e.into()), }; @@ -249,7 +249,7 @@ pub async fn connect(uri: &http::Uri) -> Result { let socket = may_connect_tls(socket, host, scheme == "https" || scheme == "wss").await?; // Establish a WS connection: - let mut client = Client::new(socket.compat(), host, &path); + let mut client = Client::new(socket.compat(), host, path); let (ws_to_connection, ws_from_connection) = match client.handshake().await? { ServerResponse::Accepted { .. } => client.into_builder().finish(), ServerResponse::Redirect { status_code, .. } => { diff --git a/backend/common/src/ws_client/receiver.rs b/backend/common/src/ws_client/receiver.rs index 2937fdd01..02dc62b57 100644 --- a/backend/common/src/ws_client/receiver.rs +++ b/backend/common/src/ws_client/receiver.rs @@ -48,7 +48,7 @@ impl Stream for Receiver { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - self.inner.poll_next_unpin(cx).map_err(|e| e.into()) + self.inner.poll_next_unpin(cx) } } @@ -68,4 +68,8 @@ impl RecvMessage { RecvMessage::Text(s) => s.len(), } } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } diff --git a/backend/telemetry_core/benches/subscribe.rs b/backend/telemetry_core/benches/subscribe.rs index 390efa3b0..91f8cda6c 100644 --- a/backend/telemetry_core/benches/subscribe.rs +++ b/backend/telemetry_core/benches/subscribe.rs @@ -103,7 +103,7 @@ pub fn benchmark_subscribe_speed(c: &mut Criterion) { let start = Instant::now(); loop { let msgs = feeds[0].1.recv_feed_messages_once().await.unwrap(); - if msgs.iter().find(|&m| m == &finished).is_some() { + if msgs.iter().any(|m| m == &finished) { break; } } diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 0b42082a7..32cd055a2 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -55,13 +55,13 @@ pub enum FromShardWebsocket { Add { local_id: ShardNodeId, ip: std::net::IpAddr, - node: common::node_types::NodeDetails, + node: Box, genesis_hash: common::node_types::BlockHash, }, /// Update/pass through details about a node. Update { local_id: ShardNodeId, - payload: node_message::Payload, + payload: Box, }, /// Tell the aggregator that a node has been removed when it disconnects. Remove { local_id: ShardNodeId }, @@ -139,7 +139,7 @@ impl FromStr for FromFeedWebsocket { "subscribe" => Ok(FromFeedWebsocket::Subscribe { chain: value.parse()?, }), - _ => return Err(anyhow::anyhow!("Command {} not recognised", cmd)), + _ => Err(anyhow::anyhow!("Command {} not recognised", cmd)), } } } @@ -235,16 +235,16 @@ impl InnerLoop { // ignore node updates if we have too many messages to handle, in an attempt // to reduce the queue length back to something reasonable, lest it get out of // control and start consuming a load of memory. - if metered_tx.len() > max_queue_len { - if matches!( + if metered_tx.len() > max_queue_len + && matches!( msg, ToAggregator::FromShardWebsocket(.., FromShardWebsocket::Update { .. }) - ) { - // Note: this wraps on overflow (which is probably the best - // behaviour for graphing it anyway) - dropped_messages.fetch_add(1, Ordering::Relaxed); - continue; - } + ) + { + // Note: this wraps on overflow (which is probably the best + // behaviour for graphing it anyway) + dropped_messages.fetch_add(1, Ordering::Relaxed); + continue; } if let Err(e) = metered_tx.send(msg) { @@ -327,7 +327,7 @@ impl InnerLoop { } => { // Conditionally modify the node's details to include the IP address. node.ip = self.expose_node_ips.then_some(ip.to_string().into()); - match self.node_state.add_node(genesis_hash, node) { + match self.node_state.add_node(genesis_hash, *node) { state::AddNodeResult::ChainOnDenyList => { if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) { let _ = shard_conn.send(ToShardWebsocket::Mute { @@ -359,7 +359,7 @@ impl InnerLoop { let mut feed_messages_for_chain = FeedMessageSerializer::new(); feed_messages_for_chain.push(feed_message::AddedNode( node_id.get_chain_node_id().into(), - &details.node, + details.node, )); self.finalize_and_broadcast_to_chain_feeds( &genesis_hash, @@ -411,7 +411,7 @@ impl InnerLoop { let mut feed_message_serializer = FeedMessageSerializer::new(); self.node_state - .update_node(node_id, payload, &mut feed_message_serializer); + .update_node(node_id, *payload, &mut feed_message_serializer); if let Some(chain) = self.node_state.get_chain_by_node_id(node_id) { let genesis_hash = chain.genesis_hash(); diff --git a/backend/telemetry_core/src/aggregator/mod.rs b/backend/telemetry_core/src/aggregator/mod.rs index 9caab5124..d012e900e 100644 --- a/backend/telemetry_core/src/aggregator/mod.rs +++ b/backend/telemetry_core/src/aggregator/mod.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +#[allow(clippy::module_inception)] mod aggregator; mod aggregator_set; mod inner_loop; diff --git a/backend/telemetry_core/src/find_location.rs b/backend/telemetry_core/src/find_location.rs index afc6e8a90..38a013c5b 100644 --- a/backend/telemetry_core/src/find_location.rs +++ b/backend/telemetry_core/src/find_location.rs @@ -43,7 +43,7 @@ where cache.insert( Ipv4Addr::new(127, 0, 0, 1).into(), Arc::new(NodeLocation { - latitude: 52.516_6667, + latitude: 52.516_666, longitude: 13.4, city: "Berlin".into(), }), @@ -103,7 +103,7 @@ impl Locator { return cached_loc; } - let City { city, location, .. } = self.city.lookup(ip.into()).ok()?; + let City { city, location, .. } = self.city.lookup(ip).ok()?; let city = city .as_ref()? .names diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index 051bcb0bb..00e8a3d2d 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -279,12 +279,15 @@ where genesis_hash, } => FromShardWebsocket::Add { ip, - node, + node: Box::new(node), genesis_hash, local_id, }, internal_messages::FromShardAggregator::UpdateNode { payload, local_id } => { - FromShardWebsocket::Update { local_id, payload } + FromShardWebsocket::Update { + local_id, + payload: Box::new(payload), + } } internal_messages::FromShardAggregator::RemoveNode { local_id } => { FromShardWebsocket::Remove { local_id } @@ -525,34 +528,34 @@ async fn return_prometheus_metrics(aggregator: AggregatorSet) -> Response { // If our node validator address (and thus details) change, send an // updated "add node" feed message: - if node.set_validator_address(authority.authority_id.clone()) { - feed.push(feed_message::AddedNode(nid.into(), &node)); + if node.set_validator_address(authority.authority_id) { + feed.push(feed_message::AddedNode(nid.into(), node)); } return; } @@ -252,30 +253,34 @@ impl Chain { }; if node.update_block(*block) { - if block.height > self.best.height { - self.best = *block; - log::debug!( - "[{}] [nodes={}] new best block={}/{:?}", - self.labels.best(), - nodes_len, - self.best.height, - self.best.hash, - ); - if let Some(timestamp) = self.timestamp { - self.block_times.push(now.saturating_sub(timestamp)); - self.average_block_time = Some(self.block_times.average()); + match block.height.cmp(&self.best.height) { + Ordering::Greater => { + self.best = *block; + log::debug!( + "[{}] [nodes={}] new best block={}/{:?}", + self.labels.best(), + nodes_len, + self.best.height, + self.best.hash, + ); + if let Some(timestamp) = self.timestamp { + self.block_times.push(now.saturating_sub(timestamp)); + self.average_block_time = Some(self.block_times.average()); + } + self.timestamp = Some(now); + feed.push(feed_message::BestBlock( + self.best.height, + now, + self.average_block_time, + )); + propagation_time = Some(0); } - self.timestamp = Some(now); - feed.push(feed_message::BestBlock( - self.best.height, - now, - self.average_block_time, - )); - propagation_time = Some(0); - } else if block.height == self.best.height { - if let Some(timestamp) = self.timestamp { - propagation_time = Some(now.saturating_sub(timestamp)); + Ordering::Equal => { + if let Some(timestamp) = self.timestamp { + propagation_time = Some(now.saturating_sub(timestamp)) + } } + Ordering::Less => (), } if let Some(details) = node.update_details(now, propagation_time) { @@ -370,7 +375,7 @@ impl Chain { self.nodes.as_slice() } pub fn label(&self) -> &str { - &self.labels.best() + self.labels.best() } pub fn node_count(&self) -> usize { self.nodes.len() diff --git a/backend/telemetry_core/src/state/chain_stats.rs b/backend/telemetry_core/src/state/chain_stats.rs index 95bca7895..af731fef5 100644 --- a/backend/telemetry_core/src/state/chain_stats.rs +++ b/backend/telemetry_core/src/state/chain_stats.rs @@ -125,11 +125,9 @@ impl ChainStatsCollator { ) { self.version.modify(Some(&*details.version), op); - self.target_os - .modify(details.target_os.as_ref().map(|value| &**value), op); + self.target_os.modify(details.target_os.as_deref(), op); - self.target_arch - .modify(details.target_arch.as_ref().map(|value| &**value), op); + self.target_arch.modify(details.target_arch.as_deref(), op); let sysinfo = details.sysinfo.as_ref(); self.cpu.modify( diff --git a/backend/telemetry_core/src/state/counter.rs b/backend/telemetry_core/src/state/counter.rs index d3e12679e..01fd23c57 100644 --- a/backend/telemetry_core/src/state/counter.rs +++ b/backend/telemetry_core/src/state/counter.rs @@ -40,7 +40,7 @@ where K: Sized + std::hash::Hash + Eq, { /// Either adds or removes a single occurence of a given `key`. - pub fn modify<'a, Q>(&mut self, key: Option<&'a Q>, op: CounterValue) + pub fn modify(&mut self, key: Option<&'_ Q>, op: CounterValue) where Q: ?Sized + std::hash::Hash + Eq, K: std::borrow::Borrow, diff --git a/backend/telemetry_core/src/state/mod.rs b/backend/telemetry_core/src/state/mod.rs index 018412284..cd17f5675 100644 --- a/backend/telemetry_core/src/state/mod.rs +++ b/backend/telemetry_core/src/state/mod.rs @@ -19,6 +19,7 @@ mod chain_stats; mod counter; mod node; +#[allow(clippy::module_inception)] mod state; pub use node::Node; diff --git a/backend/telemetry_core/src/state/state.rs b/backend/telemetry_core/src/state/state.rs index 5385e247d..c5e9ef317 100644 --- a/backend/telemetry_core/src/state/state.rs +++ b/backend/telemetry_core/src/state/state.rs @@ -320,9 +320,9 @@ mod test { assert_eq!(add_node_result.id, NodeId(0.into(), 0.into())); assert_eq!(&*add_node_result.old_chain_label, ""); - assert_eq!(&*add_node_result.new_chain_label, "Chain One"); + assert_eq!(add_node_result.new_chain_label, "Chain One"); assert_eq!(add_node_result.chain_node_count, 1); - assert_eq!(add_node_result.has_chain_label_changed, true); + assert!(add_node_result.has_chain_label_changed); let add_result = state.add_node(chain1_genesis, node("A", "Chain One")); @@ -334,9 +334,9 @@ mod test { assert_eq!(add_node_result.id, NodeId(0.into(), 1.into())); assert_eq!(&*add_node_result.old_chain_label, "Chain One"); - assert_eq!(&*add_node_result.new_chain_label, "Chain One"); + assert_eq!(add_node_result.new_chain_label, "Chain One"); assert_eq!(add_node_result.chain_node_count, 2); - assert_eq!(add_node_result.has_chain_label_changed, false); + assert!(!add_node_result.has_chain_label_changed); } #[test] diff --git a/backend/telemetry_core/tests/e2e_tests.rs b/backend/telemetry_core/tests/e2e_tests.rs index a9e27caa7..94d2c66c5 100644 --- a/backend/telemetry_core/tests/e2e_tests.rs +++ b/backend/telemetry_core/tests/e2e_tests.rs @@ -592,14 +592,12 @@ async fn e2e_node_banned_if_it_sends_too_much_data() { node_tx.is_closed() } - assert_eq!( - try_send_data(1000, 10, 1000).await, - false, + assert!( + !try_send_data(1000, 10, 1000).await, "shouldn't be closed; we didn't exceed 10x threshold" ); - assert_eq!( + assert!( try_send_data(999, 10, 1000).await, - true, "should be closed; we sent just over 10x the block threshold" ); } diff --git a/backend/telemetry_core/tests/soak_tests.rs b/backend/telemetry_core/tests/soak_tests.rs index b11ae95db..ccfdbc8a5 100644 --- a/backend/telemetry_core/tests/soak_tests.rs +++ b/backend/telemetry_core/tests/soak_tests.rs @@ -83,7 +83,6 @@ async fn run_soak_test(opts: SoakTestOpts) { ServerOpts { release_mode: true, log_output: opts.log_output, - ..Default::default() }, CoreOpts { worker_threads: opts.core_worker_threads, @@ -229,8 +228,8 @@ async fn run_soak_test(opts: SoakTestOpts) { /// Return an iterator of `total` unique chain names. fn chain_names(total: usize) -> impl Iterator { - static CHAIN_STARTS: [&'static str; 5] = ["Polkadot", "Kusama", "Khala", "Wibble", "Moonbase"]; - static CHAIN_ENDS: [&'static str; 6] = ["", " Testnet", " Main", "-Dev", "Alpha", "Beta"]; + static CHAIN_STARTS: [&str; 5] = ["Polkadot", "Kusama", "Khala", "Wibble", "Moonbase"]; + static CHAIN_ENDS: [&str; 6] = ["", " Testnet", " Main", "-Dev", "Alpha", "Beta"]; let mut count = 0; let mut s_n = 0; diff --git a/backend/telemetry_shard/src/aggregator.rs b/backend/telemetry_shard/src/aggregator.rs index d40807fed..74eb0947f 100644 --- a/backend/telemetry_shard/src/aggregator.rs +++ b/backend/telemetry_shard/src/aggregator.rs @@ -65,13 +65,13 @@ pub enum FromWebsocket { Add { message_id: node_message::NodeMessageId, ip: std::net::IpAddr, - node: common::node_types::NodeDetails, + node: Box, genesis_hash: BlockHash, }, /// Update/pass through details about a node. Update { message_id: node_message::NodeMessageId, - payload: node_message::Payload, + payload: Box, }, /// remove a node with the given message ID Remove { @@ -118,7 +118,11 @@ impl Aggregator { Message::Disconnected => ToAggregator::DisconnectedFromTelemetryCore, Message::Data(data) => ToAggregator::FromTelemetryCore(data), }; - if let Err(_) = tx_to_aggregator2.send_async(msg_to_aggregator).await { + if tx_to_aggregator2 + .send_async(msg_to_aggregator) + .await + .is_err() + { // This will close the ws channels, which themselves log messages. break; } @@ -217,7 +221,7 @@ impl Aggregator { let _ = tx_to_telemetry_core .send_async(FromShardAggregator::AddNode { ip, - node, + node: *node, genesis_hash, local_id, }) @@ -248,7 +252,10 @@ impl Aggregator { // Send the message to the telemetry core with this local ID: let _ = tx_to_telemetry_core - .send_async(FromShardAggregator::UpdateNode { local_id, payload }) + .send_async(FromShardAggregator::UpdateNode { + local_id, + payload: *payload, + }) .await; } ToAggregator::FromWebsocket(conn_id, FromWebsocket::Remove { message_id }) => { diff --git a/backend/telemetry_shard/src/connection.rs b/backend/telemetry_shard/src/connection.rs index 4b24beef9..c6f8f27f8 100644 --- a/backend/telemetry_shard/src/connection.rs +++ b/backend/telemetry_shard/src/connection.rs @@ -50,7 +50,7 @@ where // Throw away any pending messages from the incoming channel so that it // doesn't get filled up and begin blocking while we're looping and waiting // for a reconnection. - while let Ok(_) = rx_in.try_recv() {} + while rx_in.try_recv().is_ok() {} // Try to connect. If connection established, we serialize and forward messages // to/from the core. If the external channels break, we end for good. If the internal diff --git a/backend/telemetry_shard/src/main.rs b/backend/telemetry_shard/src/main.rs index a95142a39..896a35369 100644 --- a/backend/telemetry_shard/src/main.rs +++ b/backend/telemetry_shard/src/main.rs @@ -205,6 +205,7 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { } /// This takes care of handling messages from an established socket connection. +#[allow(clippy::too_many_arguments)] async fn handle_node_websocket_connection( real_addr: IpAddr, ws_send: http_utils::WsSender, @@ -359,19 +360,18 @@ where let _ = tx_to_aggregator.send(FromWebsocket::Add { message_id, ip: real_addr, - node: info.node, + node: Box::new(info.node), genesis_hash: info.genesis_hash, }).await; } // Anything that's not an "Add" is an Update. The aggregator will ignore // updates against a message_id that hasn't first been Added, above. - else { - if let Some(last_seen) = allowed_message_ids.get_mut(&message_id) { - *last_seen = Instant::now(); - if let Err(e) = tx_to_aggregator.send(FromWebsocket::Update { message_id, payload } ).await { - log::error!("Failed to send node message to aggregator: {}", e); - continue; - } + else if let Some(last_seen) = allowed_message_ids.get_mut(&message_id) { + *last_seen = Instant::now(); + let payload = Box::new(payload); + if let Err(e) = tx_to_aggregator.send(FromWebsocket::Update { message_id, payload } ).await { + log::error!("Failed to send node message to aggregator: {}", e); + continue; } } } diff --git a/backend/telemetry_shard/src/real_ip.rs b/backend/telemetry_shard/src/real_ip.rs index 416673bc6..f55b26a01 100644 --- a/backend/telemetry_shard/src/real_ip.rs +++ b/backend/telemetry_shard/src/real_ip.rs @@ -90,9 +90,9 @@ fn pick_best_ip_from_options( }) .or_else(|| { // fall back to X-Real-IP - real_ip.as_ref().and_then(|val| { + real_ip.as_ref().map(|val| { let addr = val.trim(); - Some((addr, Source::XRealIpHeader)) + (addr, Source::XRealIpHeader) }) }) .and_then(|(ip, source)| { @@ -129,9 +129,7 @@ fn get_first_addr_from_forwarded_header(value: &str) -> Option<&str> { let first_values = value.split(',').next()?; for pair in first_values.split(';') { - let mut parts = pair.trim().splitn(2, '='); - let key = parts.next()?; - let value = parts.next()?; + let (key, value) = pair.trim().split_once('=')?; if key.to_lowercase() == "for" { // trim double quotes if they surround the value: @@ -148,7 +146,7 @@ fn get_first_addr_from_forwarded_header(value: &str) -> Option<&str> { } fn get_first_addr_from_x_forwarded_for_header(value: &str) -> Option<&str> { - value.split(",").map(|val| val.trim()).next() + value.split(',').map(|val| val.trim()).next() } #[cfg(test)] diff --git a/backend/test_utils/src/feed_message_de.rs b/backend/test_utils/src/feed_message_de.rs index b46e6147e..90b3ca413 100644 --- a/backend/test_utils/src/feed_message_de.rs +++ b/backend/test_utils/src/feed_message_de.rs @@ -127,7 +127,7 @@ pub enum FeedMessage { }, } -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Eq)] pub struct NodeDetails { pub name: String, pub implementation: String, diff --git a/backend/test_utils/src/server/channels.rs b/backend/test_utils/src/server/channels.rs index ff68b0fe2..2c708bc25 100644 --- a/backend/test_utils/src/server/channels.rs +++ b/backend/test_utils/src/server/channels.rs @@ -147,7 +147,7 @@ impl Stream for FeedReceiver { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - self.0.poll_next_unpin(cx).map_err(|e| e.into()) + self.0.poll_next_unpin(cx) } } diff --git a/backend/test_utils/src/server/mod.rs b/backend/test_utils/src/server/mod.rs index eb3e3731d..641eac08d 100644 --- a/backend/test_utils/src/server/mod.rs +++ b/backend/test_utils/src/server/mod.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +#[allow(clippy::module_inception)] mod server; mod utils; diff --git a/backend/test_utils/src/server/server.rs b/backend/test_utils/src/server/server.rs index 293422487..8bfe13f40 100644 --- a/backend/test_utils/src/server/server.rs +++ b/backend/test_utils/src/server/server.rs @@ -191,7 +191,7 @@ impl Server { let pid = shards.add_with(|id| Process { id, - host: format!("{}", host), + host: host.to_string(), handle: None, _channel_type: PhantomData, }); @@ -222,7 +222,7 @@ impl Server { let mut child_stdout = shard_process.stdout.take().expect("shard stdout"); let shard_port = utils::get_port(&mut child_stdout) .await - .map_err(|e| Error::ErrorObtainingPort(e))?; + .map_err(Error::ErrorObtainingPort)?; // Attempt to wait until we've received word that the shard is connected to the // core before continuing. If we don't wait for this, the connection may happen @@ -332,7 +332,7 @@ impl Server { let mut child_stdout = child.stdout.take().expect("core stdout"); let core_port = utils::get_port(&mut child_stdout) .await - .map_err(|e| Error::ErrorObtainingPort(e))?; + .map_err(Error::ErrorObtainingPort)?; // Since we're piping stdout from the child process, we need somewhere for it to go // else the process will get stuck when it tries to produce output: @@ -502,10 +502,10 @@ impl Command { } } -impl Into for Command { - fn into(self) -> TokioCommand { - let mut cmd = TokioCommand::new(self.command); - cmd.args(self.args); +impl From for TokioCommand { + fn from(Command { command, args }: Command) -> Self { + let mut cmd = Self::new(command); + cmd.args(args); cmd } } diff --git a/backend/test_utils/src/workspace/start_server.rs b/backend/test_utils/src/workspace/start_server.rs index f94d0e2e3..81088181d 100644 --- a/backend/test_utils/src/workspace/start_server.rs +++ b/backend/test_utils/src/workspace/start_server.rs @@ -18,38 +18,22 @@ use super::commands; use crate::server::{self, Command, Server}; /// Options for the server +#[derive(Default)] pub struct ServerOpts { pub release_mode: bool, pub log_output: bool, } -impl Default for ServerOpts { - fn default() -> Self { - Self { - release_mode: false, - log_output: false, - } - } -} - /// Additional options to pass to the core command. +#[derive(Default)] pub struct CoreOpts { pub feed_timeout: Option, pub worker_threads: Option, pub num_aggregators: Option, } -impl Default for CoreOpts { - fn default() -> Self { - Self { - feed_timeout: None, - worker_threads: None, - num_aggregators: None, - } - } -} - /// Additional options to pass to the shard command. +#[derive(Default)] pub struct ShardOpts { pub max_nodes_per_connection: Option, pub max_node_data_per_second: Option, @@ -57,17 +41,6 @@ pub struct ShardOpts { pub worker_threads: Option, } -impl Default for ShardOpts { - fn default() -> Self { - Self { - max_nodes_per_connection: None, - max_node_data_per_second: None, - node_block_seconds: None, - worker_threads: None, - } - } -} - /// Start a telemetry server. We'll use `cargo run` by default, but you can also provide /// env vars to configure the binary that runs for the shard and core process. Either: /// @@ -104,8 +77,8 @@ pub async fn start_server( if let Ok(feed_host) = std::env::var("TELEMETRY_FEED_HOST") { let feed_host = feed_host.trim().into(); let submit_hosts: Vec<_> = std::env::var("TELEMETRY_SUBMIT_HOSTS") - .map(|var| var.split(",").map(|var| var.trim().into()).collect()) - .unwrap_or(Vec::new()); + .map(|var| var.split(',').map(|var| var.trim().into()).collect()) + .unwrap_or_default(); return Server::start(server::StartOpts::ConnectToExisting { feed_host, submit_hosts, @@ -117,7 +90,7 @@ pub async fn start_server( // Build the shard command let mut shard_command = std::env::var("TELEMETRY_SHARD_BIN") - .map(|val| Command::new(val)) + .map(Command::new) .unwrap_or_else(|_| { commands::cargo_run_telemetry_shard(server_opts.release_mode) .expect("must be in rust workspace to run shard command") @@ -145,7 +118,7 @@ pub async fn start_server( // Build the core command let mut core_command = std::env::var("TELEMETRY_CORE_BIN") - .map(|val| Command::new(val)) + .map(Command::new) .unwrap_or_else(|_| { commands::cargo_run_telemetry_core(server_opts.release_mode) .expect("must be in rust workspace to run core command")