Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 93 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,4 @@ tracing-subscriber = { version = "0.3.20", default-features = false }
testcontainers-modules = { version = "0.11.2", default-features = false }
metrics-exporter-prometheus = { version = "0.17.0", default-features = false }
futures = { version = "0.3.31", default-features = false }
moka = { version = "0.12.12", default-features = false }
4 changes: 4 additions & 0 deletions bin/tips-audit/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ struct Args {

#[arg(long, env = "TIPS_AUDIT_CHANNEL_BUFFER_SIZE", default_value = "500")]
channel_buffer_size: usize,

#[arg(long, env = "TIPS_AUDIT_NOOP_ARCHIVE", default_value = "false")]
noop_archive: bool,
}

#[tokio::main]
Expand Down Expand Up @@ -93,6 +96,7 @@ async fn main() -> Result<()> {
writer,
args.worker_pool_size,
args.channel_buffer_size,
args.noop_archive,
);

info!("Audit archiver initialized, starting main loop");
Expand Down
2 changes: 1 addition & 1 deletion crates/audit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ metrics-derive.workspace = true
tips-core = { workspace = true, features = ["test-utils"] }
serde = { workspace = true, features = ["std", "derive"] }
tokio = { workspace = true, features = ["full"] }
uuid = { workspace = true, features = ["v4", "serde"] }
uuid = { workspace = true, features = ["v5", "serde"] }
tracing = { workspace = true, features = ["std"] }
anyhow = { workspace = true, features = ["std"] }
serde_json = { workspace = true, features = ["std"] }
Expand Down
31 changes: 29 additions & 2 deletions crates/audit/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,23 @@ where
W: EventWriter + Clone + Send + 'static,
{
/// Creates a new archiver with the given reader and writer.
pub fn new(reader: R, writer: W, worker_pool_size: usize, channel_buffer_size: usize) -> Self {
pub fn new(
reader: R,
writer: W,
worker_pool_size: usize,
channel_buffer_size: usize,
noop_archive: bool,
) -> Self {
let (event_tx, event_rx) = mpsc::channel(channel_buffer_size);
let metrics = Metrics::default();

Self::spawn_workers(writer, event_rx, metrics.clone(), worker_pool_size);
Self::spawn_workers(
writer,
event_rx,
metrics.clone(),
worker_pool_size,
noop_archive,
);

Self {
reader,
Expand All @@ -57,6 +69,7 @@ where
event_rx: mpsc::Receiver<Event>,
metrics: Metrics,
worker_pool_size: usize,
noop_archive: bool,
) {
let event_rx = Arc::new(Mutex::new(event_rx));

Expand All @@ -75,6 +88,20 @@ where
match event {
Some(event) => {
let archive_start = Instant::now();
// tmp: only use this to clear kafka consumer offset
// TODO: use debug! later
if noop_archive {
info!(
worker_id,
bundle_id = %event.event.bundle_id(),
tx_ids = ?event.event.transaction_ids(),
timestamp = event.timestamp,
"Noop archive - skipping event"
);
metrics.events_processed.increment(1);
metrics.in_flight_archive_tasks.decrement(1.0);
continue;
}
if let Err(e) = writer.archive_event(event).await {
error!(worker_id, error = %e, "Failed to write event");
} else {
Expand Down
24 changes: 15 additions & 9 deletions crates/audit/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ mod tests {
use crate::reader::Event;
use crate::types::{BundleEvent, DropReason, UserOpDropReason, UserOpEvent};
use alloy_primitives::{Address, B256, TxHash, U256};
use tips_core::test_utils::create_bundle_from_txn_data;
use tips_core::{BundleExtensions, test_utils::create_bundle_from_txn_data};
use uuid::Uuid;

fn create_test_event(key: &str, timestamp: i64, bundle_event: BundleEvent) -> Event {
Expand All @@ -616,7 +616,7 @@ mod tests {
fn test_update_bundle_history_transform_adds_new_event() {
let bundle_history = BundleHistory { history: vec![] };
let bundle = create_bundle_from_txn_data();
let bundle_id = Uuid::new_v4();
let bundle_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice());
let bundle_event = BundleEvent::Received {
bundle_id,
bundle: Box::new(bundle.clone()),
Expand Down Expand Up @@ -655,7 +655,7 @@ mod tests {
};

let bundle = create_bundle_from_txn_data();
let bundle_id = Uuid::new_v4();
let bundle_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice());
let bundle_event = BundleEvent::Received {
bundle_id,
bundle: Box::new(bundle),
Expand All @@ -670,9 +670,9 @@ mod tests {
#[test]
fn test_update_bundle_history_transform_handles_all_event_types() {
let bundle_history = BundleHistory { history: vec![] };
let bundle_id = Uuid::new_v4();

let bundle = create_bundle_from_txn_data();
let bundle_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice());

let bundle_event = BundleEvent::Received {
bundle_id,
bundle: Box::new(bundle),
Expand Down Expand Up @@ -717,7 +717,8 @@ mod tests {
#[test]
fn test_update_transaction_metadata_transform_adds_new_bundle() {
let metadata = TransactionMetadata { bundle_ids: vec![] };
let bundle_id = Uuid::new_v4();
let bundle = create_bundle_from_txn_data();
let bundle_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice());

let result = update_transaction_metadata_transform(metadata, bundle_id);

Expand All @@ -729,7 +730,8 @@ mod tests {

#[test]
fn test_update_transaction_metadata_transform_skips_existing_bundle() {
let bundle_id = Uuid::new_v4();
let bundle = create_bundle_from_txn_data();
let bundle_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice());
let metadata = TransactionMetadata {
bundle_ids: vec![bundle_id],
};
Expand All @@ -741,8 +743,12 @@ mod tests {

#[test]
fn test_update_transaction_metadata_transform_adds_to_existing_bundles() {
let existing_bundle_id = Uuid::new_v4();
let new_bundle_id = Uuid::new_v4();
// Some different, dummy bundle IDs since create_bundle_from_txn_data() returns the same bundle ID
// Even if the same txn is contained across multiple bundles, the bundle ID will be different since the
// UUID is based on the bundle hash.
let existing_bundle_id = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap();
let new_bundle_id = Uuid::parse_str("6ba7b810-9dad-11d1-80b4-00c04fd430c8").unwrap();

let metadata = TransactionMetadata {
bundle_ids: vec![existing_bundle_id],
};
Expand Down
12 changes: 10 additions & 2 deletions crates/audit/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,11 @@ impl BundleEvent {
format!("{bundle_id}-{block_hash}")
}
_ => {
format!("{}-{}", self.bundle_id(), Uuid::new_v4())
format!(
"{}-{}",
self.bundle_id(),
Uuid::new_v5(&Uuid::NAMESPACE_OID, self.bundle_id().as_bytes())
)
}
}
}
Expand Down Expand Up @@ -200,7 +204,11 @@ impl UserOpEvent {
format!("{user_op_hash}-{tx_hash}")
}
_ => {
format!("{}-{}", self.user_op_hash(), Uuid::new_v4())
format!(
"{}-{}",
self.user_op_hash(),
Uuid::new_v5(&Uuid::NAMESPACE_OID, self.user_op_hash().as_slice())
)
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion crates/audit/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ impl TestHarness {
.await;

let s3_client = aws_sdk_s3::Client::new(&config);
let bucket_name = format!("test-bucket-{}", Uuid::new_v4());
let bucket_name = format!(
"test-bucket-{}",
Uuid::new_v5(&Uuid::NAMESPACE_OID, bundle.bundle_hash().as_slice())
);

s3_client
.create_bucket()
Expand Down
Loading