Skip to content

Commit 9b0ec57

Browse files
committed
perf: deduplicate bundles with the same contents (#137)
* chore: use uuid v5 for determinism * feat: add ttl * diffs * fix: in-flight archive task * Revert "fix: in-flight archive task" This reverts commit a759f2b. * tmp: clear backlog by nooping * chore: log out meterbundleres
1 parent 5f51708 commit 9b0ec57

File tree

16 files changed

+296
-64
lines changed

16 files changed

+296
-64
lines changed

Cargo.lock

Lines changed: 93 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,4 @@ tracing-subscriber = { version = "0.3.20", default-features = false }
9898
testcontainers-modules = { version = "0.11.2", default-features = false }
9999
metrics-exporter-prometheus = { version = "0.17.0", default-features = false }
100100
futures = { version = "0.3.31", default-features = false }
101+
moka = { version = "0.12.12", default-features = false }

bin/tips-audit/src/main.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ struct Args {
5959

6060
#[arg(long, env = "TIPS_AUDIT_CHANNEL_BUFFER_SIZE", default_value = "500")]
6161
channel_buffer_size: usize,
62+
63+
#[arg(long, env = "TIPS_AUDIT_NOOP_ARCHIVE", default_value = "false")]
64+
noop_archive: bool,
6265
}
6366

6467
#[tokio::main]
@@ -93,6 +96,7 @@ async fn main() -> Result<()> {
9396
writer,
9497
args.worker_pool_size,
9598
args.channel_buffer_size,
99+
args.noop_archive,
96100
);
97101

98102
info!("Audit archiver initialized, starting main loop");

crates/audit/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ metrics-derive.workspace = true
1818
tips-core = { workspace = true, features = ["test-utils"] }
1919
serde = { workspace = true, features = ["std", "derive"] }
2020
tokio = { workspace = true, features = ["full"] }
21-
uuid = { workspace = true, features = ["v4", "serde"] }
21+
uuid = { workspace = true, features = ["v5", "serde"] }
2222
tracing = { workspace = true, features = ["std"] }
2323
anyhow = { workspace = true, features = ["std"] }
2424
serde_json = { workspace = true, features = ["std"] }

crates/audit/src/archiver.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,23 @@ where
3838
W: EventWriter + Clone + Send + 'static,
3939
{
4040
/// Creates a new archiver with the given reader and writer.
41-
pub fn new(reader: R, writer: W, worker_pool_size: usize, channel_buffer_size: usize) -> Self {
41+
pub fn new(
42+
reader: R,
43+
writer: W,
44+
worker_pool_size: usize,
45+
channel_buffer_size: usize,
46+
noop_archive: bool,
47+
) -> Self {
4248
let (event_tx, event_rx) = mpsc::channel(channel_buffer_size);
4349
let metrics = Metrics::default();
4450

45-
Self::spawn_workers(writer, event_rx, metrics.clone(), worker_pool_size);
51+
Self::spawn_workers(
52+
writer,
53+
event_rx,
54+
metrics.clone(),
55+
worker_pool_size,
56+
noop_archive,
57+
);
4658

4759
Self {
4860
reader,
@@ -57,6 +69,7 @@ where
5769
event_rx: mpsc::Receiver<Event>,
5870
metrics: Metrics,
5971
worker_pool_size: usize,
72+
noop_archive: bool,
6073
) {
6174
let event_rx = Arc::new(Mutex::new(event_rx));
6275

@@ -75,6 +88,20 @@ where
7588
match event {
7689
Some(event) => {
7790
let archive_start = Instant::now();
91+
// tmp: only use this to clear kafka consumer offset
92+
// TODO: use debug! later
93+
if noop_archive {
94+
info!(
95+
worker_id,
96+
bundle_id = %event.event.bundle_id(),
97+
tx_ids = ?event.event.transaction_ids(),
98+
timestamp = event.timestamp,
99+
"Noop archive - skipping event"
100+
);
101+
metrics.events_processed.increment(1);
102+
metrics.in_flight_archive_tasks.decrement(1.0);
103+
continue;
104+
}
78105
if let Err(e) = writer.archive_event(event).await {
79106
error!(worker_id, error = %e, "Failed to write event");
80107
} else {

crates/audit/src/storage.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,7 @@ mod tests {
601601
use crate::reader::Event;
602602
use crate::types::{BundleEvent, DropReason, UserOpDropReason, UserOpEvent};
603603
use alloy_primitives::{Address, B256, TxHash, U256};
604-
use tips_core::test_utils::create_bundle_from_txn_data;
604+
use tips_core::{BundleExtensions, test_utils::create_bundle_from_txn_data};
605605
use uuid::Uuid;
606606

607607
fn create_test_event(key: &str, timestamp: i64, bundle_event: BundleEvent) -> Event {
@@ -616,7 +616,7 @@ mod tests {
616616
fn test_update_bundle_history_transform_adds_new_event() {
617617
let bundle_history = BundleHistory { history: vec![] };
618618
let bundle = create_bundle_from_txn_data();
619-
let bundle_id = Uuid::new_v4();
619+
let bundle_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice());
620620
let bundle_event = BundleEvent::Received {
621621
bundle_id,
622622
bundle: Box::new(bundle.clone()),
@@ -655,7 +655,7 @@ mod tests {
655655
};
656656

657657
let bundle = create_bundle_from_txn_data();
658-
let bundle_id = Uuid::new_v4();
658+
let bundle_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice());
659659
let bundle_event = BundleEvent::Received {
660660
bundle_id,
661661
bundle: Box::new(bundle),
@@ -670,9 +670,9 @@ mod tests {
670670
#[test]
671671
fn test_update_bundle_history_transform_handles_all_event_types() {
672672
let bundle_history = BundleHistory { history: vec![] };
673-
let bundle_id = Uuid::new_v4();
674-
675673
let bundle = create_bundle_from_txn_data();
674+
let bundle_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice());
675+
676676
let bundle_event = BundleEvent::Received {
677677
bundle_id,
678678
bundle: Box::new(bundle),
@@ -717,7 +717,8 @@ mod tests {
717717
#[test]
718718
fn test_update_transaction_metadata_transform_adds_new_bundle() {
719719
let metadata = TransactionMetadata { bundle_ids: vec![] };
720-
let bundle_id = Uuid::new_v4();
720+
let bundle = create_bundle_from_txn_data();
721+
let bundle_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice());
721722

722723
let result = update_transaction_metadata_transform(metadata, bundle_id);
723724

@@ -729,7 +730,8 @@ mod tests {
729730

730731
#[test]
731732
fn test_update_transaction_metadata_transform_skips_existing_bundle() {
732-
let bundle_id = Uuid::new_v4();
733+
let bundle = create_bundle_from_txn_data();
734+
let bundle_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice());
733735
let metadata = TransactionMetadata {
734736
bundle_ids: vec![bundle_id],
735737
};
@@ -741,8 +743,12 @@ mod tests {
741743

742744
#[test]
743745
fn test_update_transaction_metadata_transform_adds_to_existing_bundles() {
744-
let existing_bundle_id = Uuid::new_v4();
745-
let new_bundle_id = Uuid::new_v4();
746+
// Some different, dummy bundle IDs since create_bundle_from_txn_data() returns the same bundle ID
747+
// Even if the same txn is contained across multiple bundles, the bundle ID will be different since the
748+
// UUID is based on the bundle hash.
749+
let existing_bundle_id = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap();
750+
let new_bundle_id = Uuid::parse_str("6ba7b810-9dad-11d1-80b4-00c04fd430c8").unwrap();
751+
746752
let metadata = TransactionMetadata {
747753
bundle_ids: vec![existing_bundle_id],
748754
};

crates/audit/src/types.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,11 @@ impl BundleEvent {
140140
format!("{bundle_id}-{block_hash}")
141141
}
142142
_ => {
143-
format!("{}-{}", self.bundle_id(), Uuid::new_v4())
143+
format!(
144+
"{}-{}",
145+
self.bundle_id(),
146+
Uuid::new_v5(&Uuid::NAMESPACE_OID, self.bundle_id().as_bytes())
147+
)
144148
}
145149
}
146150
}
@@ -200,7 +204,11 @@ impl UserOpEvent {
200204
format!("{user_op_hash}-{tx_hash}")
201205
}
202206
_ => {
203-
format!("{}-{}", self.user_op_hash(), Uuid::new_v4())
207+
format!(
208+
"{}-{}",
209+
self.user_op_hash(),
210+
Uuid::new_v5(&Uuid::NAMESPACE_OID, self.user_op_hash().as_slice())
211+
)
204212
}
205213
}
206214
}

crates/audit/tests/common/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ impl TestHarness {
3535
.await;
3636

3737
let s3_client = aws_sdk_s3::Client::new(&config);
38-
let bucket_name = format!("test-bucket-{}", Uuid::new_v4());
38+
let bucket_name = format!(
39+
"test-bucket-{}",
40+
Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice())
41+
);
3942

4043
s3_client
4144
.create_bucket()

0 commit comments

Comments
 (0)