Skip to content

Commit 38f238a

Browse files
committed
Apply nightly rustfmt formatting
1 parent 084dff3 commit 38f238a

File tree

8 files changed

+91
-183
lines changed

8 files changed

+91
-183
lines changed

crates/metering/src/annotator.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
use crate::{MeteredTransaction, MeteringCache};
1+
use std::sync::Arc;
2+
23
use alloy_primitives::TxHash;
34
use parking_lot::RwLock;
4-
use std::sync::Arc;
55
use tokio::sync::mpsc::UnboundedReceiver;
66
use tracing::{debug, info, warn};
77

8+
use crate::{MeteredTransaction, MeteringCache};
9+
810
/// Message received from the flashblocks websocket feed indicating which
911
/// transactions were included in a specific flashblock.
1012
#[derive(Debug)]

crates/metering/src/cache.rs

Lines changed: 22 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
use alloy_primitives::B256;
2-
use indexmap::IndexMap;
31
use std::collections::{BTreeMap, HashMap, VecDeque};
42

5-
use alloy_primitives::U256;
3+
use alloy_primitives::{B256, U256};
4+
use indexmap::IndexMap;
65

76
#[derive(Debug, Clone)]
87
pub struct MeteredTransaction {
@@ -39,23 +38,17 @@ impl ResourceTotals {
3938
fn accumulate(&mut self, tx: &MeteredTransaction) {
4039
self.gas_used = self.gas_used.saturating_add(tx.gas_used);
4140
self.execution_time_us = self.execution_time_us.saturating_add(tx.execution_time_us);
42-
self.state_root_time_us = self
43-
.state_root_time_us
44-
.saturating_add(tx.state_root_time_us);
45-
self.data_availability_bytes = self
46-
.data_availability_bytes
47-
.saturating_add(tx.data_availability_bytes);
41+
self.state_root_time_us = self.state_root_time_us.saturating_add(tx.state_root_time_us);
42+
self.data_availability_bytes =
43+
self.data_availability_bytes.saturating_add(tx.data_availability_bytes);
4844
}
4945

5046
fn subtract(&mut self, tx: &MeteredTransaction) {
5147
self.gas_used = self.gas_used.saturating_sub(tx.gas_used);
5248
self.execution_time_us = self.execution_time_us.saturating_sub(tx.execution_time_us);
53-
self.state_root_time_us = self
54-
.state_root_time_us
55-
.saturating_sub(tx.state_root_time_us);
56-
self.data_availability_bytes = self
57-
.data_availability_bytes
58-
.saturating_sub(tx.data_availability_bytes);
49+
self.state_root_time_us = self.state_root_time_us.saturating_sub(tx.state_root_time_us);
50+
self.data_availability_bytes =
51+
self.data_availability_bytes.saturating_sub(tx.data_availability_bytes);
5952
}
6053
}
6154

@@ -129,11 +122,7 @@ pub struct BlockMetrics {
129122

130123
impl BlockMetrics {
131124
pub fn new(block_number: u64) -> Self {
132-
Self {
133-
block_number,
134-
flashblocks: BTreeMap::new(),
135-
totals: ResourceTotals::default(),
136-
}
125+
Self { block_number, flashblocks: BTreeMap::new(), totals: ResourceTotals::default() }
137126
}
138127

139128
pub fn flashblock_count(&self) -> usize {
@@ -166,18 +155,12 @@ impl BlockMetrics {
166155
for flashblock in self.flashblocks.values() {
167156
let totals = flashblock.totals();
168157
self.totals.gas_used = self.totals.gas_used.saturating_add(totals.gas_used);
169-
self.totals.execution_time_us = self
170-
.totals
171-
.execution_time_us
172-
.saturating_add(totals.execution_time_us);
173-
self.totals.state_root_time_us = self
174-
.totals
175-
.state_root_time_us
176-
.saturating_add(totals.state_root_time_us);
177-
self.totals.data_availability_bytes = self
178-
.totals
179-
.data_availability_bytes
180-
.saturating_add(totals.data_availability_bytes);
158+
self.totals.execution_time_us =
159+
self.totals.execution_time_us.saturating_add(totals.execution_time_us);
160+
self.totals.state_root_time_us =
161+
self.totals.state_root_time_us.saturating_add(totals.state_root_time_us);
162+
self.totals.data_availability_bytes =
163+
self.totals.data_availability_bytes.saturating_add(totals.data_availability_bytes);
181164
}
182165
}
183166
}
@@ -193,21 +176,15 @@ pub struct MeteringCache {
193176
impl MeteringCache {
194177
/// Creates a new cache retaining at most `max_blocks` recent blocks.
195178
pub fn new(max_blocks: usize) -> Self {
196-
Self {
197-
max_blocks,
198-
blocks: VecDeque::new(),
199-
block_index: HashMap::new(),
200-
}
179+
Self { max_blocks, blocks: VecDeque::new(), block_index: HashMap::new() }
201180
}
202181

203182
pub fn max_blocks(&self) -> usize {
204183
self.max_blocks
205184
}
206185

207186
pub fn block(&self, block_number: u64) -> Option<&BlockMetrics> {
208-
self.block_index
209-
.get(&block_number)
210-
.and_then(|&idx| self.blocks.get(idx))
187+
self.block_index.get(&block_number).and_then(|&idx| self.blocks.get(idx))
211188
}
212189

213190
pub fn block_mut(&mut self, block_number: u64) -> &mut BlockMetrics {
@@ -221,18 +198,15 @@ impl MeteringCache {
221198
self.block_index.insert(block_number, idx);
222199

223200
self.evict_if_needed();
224-
self.blocks
225-
.get_mut(*self.block_index.get(&block_number).unwrap())
226-
.unwrap()
201+
self.blocks.get_mut(*self.block_index.get(&block_number).unwrap()).unwrap()
227202
}
228203

229204
pub fn flashblock(
230205
&self,
231206
block_number: u64,
232207
flashblock_index: u64,
233208
) -> Option<&FlashblockMetrics> {
234-
self.block(block_number)
235-
.and_then(|block| block.flashblock(flashblock_index))
209+
self.block(block_number).and_then(|block| block.flashblock(flashblock_index))
236210
}
237211

238212
pub fn upsert_transaction(
@@ -320,10 +294,7 @@ mod tests {
320294
let block = cache.block(100).unwrap();
321295
let flashblock = block.flashblocks().next().unwrap();
322296
assert_eq!(flashblock.len(), 1);
323-
assert_eq!(
324-
flashblock.transactions().next().unwrap().tx_hash,
325-
tx1.tx_hash
326-
);
297+
assert_eq!(flashblock.transactions().next().unwrap().tx_hash, tx1.tx_hash);
327298
}
328299

329300
#[test]
@@ -337,10 +308,7 @@ mod tests {
337308
let block = cache.block(100).unwrap();
338309
let flashblock = block.flashblocks().next().unwrap();
339310
assert_eq!(flashblock.len(), 1);
340-
assert_eq!(
341-
flashblock.transactions().next().unwrap().gas_used,
342-
tx1.gas_used
343-
);
311+
assert_eq!(flashblock.transactions().next().unwrap().gas_used, tx1.gas_used);
344312
}
345313

346314
#[test]
@@ -368,9 +336,6 @@ mod tests {
368336
.iter()
369337
.map(|tx| tx.priority_fee_per_gas)
370338
.collect();
371-
assert_eq!(
372-
sorted,
373-
vec![U256::from(10u64), U256::from(20u64), U256::from(30u64)]
374-
);
339+
assert_eq!(sorted, vec![U256::from(10u64), U256::from(20u64), U256::from(30u64)]);
375340
}
376341
}

crates/metering/src/estimator.rs

Lines changed: 21 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,22 @@
1-
use crate::{MeteredTransaction, MeteringCache};
1+
use std::sync::Arc;
2+
23
use alloy_primitives::U256;
34
use parking_lot::RwLock;
45
use reth_optimism_payload_builder::config::OpDAConfig;
5-
use std::sync::Arc;
6+
7+
use crate::{MeteredTransaction, MeteringCache};
68

79
/// Errors that can occur during priority fee estimation.
810
#[derive(Debug, Clone, PartialEq, Eq)]
911
pub enum EstimateError {
1012
/// The bundle's resource demand exceeds the configured capacity limit.
11-
DemandExceedsCapacity {
12-
resource: ResourceKind,
13-
demand: u128,
14-
limit: u128,
15-
},
13+
DemandExceedsCapacity { resource: ResourceKind, demand: u128, limit: u128 },
1614
}
1715

1816
impl std::fmt::Display for EstimateError {
1917
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2018
match self {
21-
EstimateError::DemandExceedsCapacity {
22-
resource,
23-
demand,
24-
limit,
25-
} => {
19+
EstimateError::DemandExceedsCapacity { resource, demand, limit } => {
2620
write!(
2721
f,
2822
"bundle {} demand ({}) exceeds capacity limit ({})",
@@ -256,13 +250,7 @@ impl PriorityFeeEstimator {
256250
default_priority_fee: U256,
257251
da_config: Option<OpDAConfig>,
258252
) -> Self {
259-
Self {
260-
cache,
261-
percentile,
262-
limits,
263-
default_priority_fee,
264-
da_config,
265-
}
253+
Self { cache, percentile, limits, default_priority_fee, da_config }
266254
}
267255

268256
/// Returns the current DA block size limit, preferring the dynamic `OpDAConfig` value
@@ -310,11 +298,8 @@ impl PriorityFeeEstimator {
310298
let mut flashblock_transactions = Vec::new();
311299
let mut total_tx_count = 0usize;
312300
for flashblock in block_metrics.flashblocks() {
313-
let sorted: Vec<MeteredTransaction> = flashblock
314-
.transactions_sorted_by_priority_fee()
315-
.into_iter()
316-
.cloned()
317-
.collect();
301+
let sorted: Vec<MeteredTransaction> =
302+
flashblock.transactions_sorted_by_priority_fee().into_iter().cloned().collect();
318303
if sorted.is_empty() {
319304
continue;
320305
}
@@ -350,11 +335,8 @@ impl PriorityFeeEstimator {
350335
continue;
351336
};
352337

353-
let transactions: &[&MeteredTransaction] = if resource.use_it_or_lose_it() {
354-
&aggregate_refs
355-
} else {
356-
&txs_refs
357-
};
338+
let transactions: &[&MeteredTransaction] =
339+
if resource.use_it_or_lose_it() { &aggregate_refs } else { &txs_refs };
358340
let estimate = compute_estimate(
359341
resource,
360342
transactions,
@@ -505,11 +487,7 @@ fn compute_estimate(
505487
) -> Result<ResourceEstimate, EstimateError> {
506488
// Bundle demand exceeds the resource limit entirely.
507489
if demand > limit {
508-
return Err(EstimateError::DemandExceedsCapacity {
509-
resource,
510-
demand,
511-
limit,
512-
});
490+
return Err(EstimateError::DemandExceedsCapacity { resource, demand, limit });
513491
}
514492

515493
// No transactions or zero demand means no competition for this resource.
@@ -646,10 +624,12 @@ fn compute_min_max_estimates(
646624

647625
#[cfg(test)]
648626
mod tests {
649-
use super::*;
627+
use std::sync::Arc;
628+
650629
use alloy_primitives::{B256, U256};
651630
use parking_lot::RwLock;
652-
use std::sync::Arc;
631+
632+
use super::*;
653633

654634
const DEFAULT_FEE: U256 = U256::from_limbs([1, 0, 0, 0]); // 1 wei
655635
const DEFAULT_LIMITS: ResourceLimits = ResourceLimits {
@@ -847,16 +827,11 @@ mod tests {
847827
let mut demand = ResourceDemand::default();
848828
demand.gas_used = Some(15);
849829

850-
let estimates = estimator
851-
.estimate_for_block(Some(1), demand)
852-
.expect("no error")
853-
.expect("cached block");
830+
let estimates =
831+
estimator.estimate_for_block(Some(1), demand).expect("no error").expect("cached block");
854832

855833
assert_eq!(estimates.block_number, 1);
856-
let gas_estimate = estimates
857-
.max_across_flashblocks
858-
.gas_used
859-
.expect("gas estimate present");
834+
let gas_estimate = estimates.max_across_flashblocks.gas_used.expect("gas estimate present");
860835
assert_eq!(gas_estimate.threshold_priority_fee, U256::from(10));
861836
}
862837

@@ -902,10 +877,8 @@ mod tests {
902877
let mut demand = ResourceDemand::default();
903878
demand.gas_used = Some(15);
904879

905-
let rolling = estimator
906-
.estimate_rolling(demand)
907-
.expect("no error")
908-
.expect("estimates available");
880+
let rolling =
881+
estimator.estimate_rolling(demand).expect("no error").expect("estimates available");
909882

910883
assert_eq!(rolling.blocks_sampled, 2);
911884
let gas_estimate = rolling.estimates.gas_used.expect("gas estimate present");

crates/metering/src/kafka.rs

Lines changed: 15 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
1-
use crate::MeteredTransaction;
2-
use alloy_consensus::Transaction;
3-
use alloy_consensus::transaction::Recovered;
1+
use std::time::Duration;
2+
3+
use alloy_consensus::{Transaction, transaction::Recovered};
44
use alloy_eips::Encodable2718;
55
use alloy_primitives::U256;
66
use chrono::Utc;
77
use eyre::Result;
88
use op_alloy_consensus::OpTxEnvelope;
99
use op_alloy_flz::tx_estimated_size_fjord_bytes;
10-
use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
11-
use rdkafka::{ClientConfig, Message};
12-
use std::time::Duration;
10+
use rdkafka::{
11+
ClientConfig, Message,
12+
consumer::{CommitMode, Consumer, StreamConsumer},
13+
};
1314
use tips_core::types::AcceptedBundle;
14-
use tokio::sync::mpsc::UnboundedSender;
15-
use tokio::time::sleep;
15+
use tokio::{sync::mpsc::UnboundedSender, time::sleep};
1616
use tracing::{debug, error, info, trace, warn};
1717

18+
use crate::MeteredTransaction;
19+
1820
/// Configuration required to connect to the Kafka topic publishing accepted bundles.
1921
pub struct KafkaBundleConsumerConfig {
2022
pub client_config: ClientConfig,
@@ -36,19 +38,12 @@ impl KafkaBundleConsumer {
3638
config: KafkaBundleConsumerConfig,
3739
tx_sender: UnboundedSender<MeteredTransaction>,
3840
) -> Result<Self> {
39-
let KafkaBundleConsumerConfig {
40-
client_config,
41-
topic,
42-
} = config;
41+
let KafkaBundleConsumerConfig { client_config, topic } = config;
4342

4443
let consumer: StreamConsumer = client_config.create()?;
4544
consumer.subscribe(&[topic.as_str()])?;
4645

47-
Ok(Self {
48-
consumer,
49-
tx_sender,
50-
topic,
51-
})
46+
Ok(Self { consumer, tx_sender, topic })
5247
}
5348

5449
/// Starts listening for Kafka messages until the task is cancelled.
@@ -88,9 +83,8 @@ impl KafkaBundleConsumer {
8883
}
8984

9085
async fn handle_message(&self, message: rdkafka::message::BorrowedMessage<'_>) -> Result<()> {
91-
let payload = message
92-
.payload()
93-
.ok_or_else(|| eyre::eyre!("Kafka message missing payload"))?;
86+
let payload =
87+
message.payload().ok_or_else(|| eyre::eyre!("Kafka message missing payload"))?;
9488

9589
let bundle: AcceptedBundle = serde_json::from_slice(payload)?;
9690
metrics::counter!("metering.kafka.messages_total").increment(1);
@@ -137,11 +131,7 @@ impl KafkaBundleConsumer {
137131
return Ok(());
138132
}
139133

140-
for (tx, result) in bundle
141-
.txs
142-
.iter()
143-
.zip(bundle.meter_bundle_response.results.iter())
144-
{
134+
for (tx, result) in bundle.txs.iter().zip(bundle.meter_bundle_response.results.iter()) {
145135
let priority_fee_per_gas = calculate_priority_fee(tx);
146136
let data_availability_bytes = tx_estimated_size_fjord_bytes(&tx.encoded_2718());
147137

0 commit comments

Comments
 (0)