Skip to content

Commit 0d37dab

Browse files
authored
chore: Add latency measuring code in dummy sink (#2318)
1 parent b02b98e commit 0d37dab

File tree

1 file changed

+29
-4
lines changed

1 file changed

+29
-4
lines changed

dozer-cli/src/pipeline/dummy_sink.rs

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ use dozer_core::{
88
};
99
use dozer_recordstore::ProcessorRecordStore;
1010
use dozer_types::{
11+
chrono::Local,
1112
errors::internal::BoxedError,
1213
log::{info, warn},
13-
types::{Operation, Schema},
14+
types::{FieldType, Operation, Schema},
1415
};
1516

1617
#[derive(Debug)]
@@ -27,24 +28,48 @@ impl SinkFactory for DummySinkFactory {
2728

2829
fn build(
2930
&self,
30-
_input_schemas: HashMap<PortHandle, Schema>,
31+
input_schemas: HashMap<PortHandle, Schema>,
3132
) -> Result<Box<dyn Sink>, BoxedError> {
32-
Ok(Box::<DummySink>::default())
33+
let inserted_at_index = input_schemas
34+
.into_values()
35+
.next()
36+
.and_then(|schema| {
37+
schema.fields.into_iter().enumerate().find(|(_, field)| {
38+
field.name == "inserted_at" && field.typ == FieldType::Timestamp
39+
})
40+
})
41+
.map(|(index, _)| index);
42+
Ok(Box::new(DummySink {
43+
inserted_at_index,
44+
..Default::default()
45+
}))
3346
}
3447
}
3548

3649
#[derive(Debug, Default)]
3750
struct DummySink {
3851
snapshotting_started_instant: HashMap<String, Instant>,
52+
inserted_at_index: Option<usize>,
3953
}
4054

4155
impl Sink for DummySink {
4256
fn process(
4357
&mut self,
4458
_from_port: PortHandle,
4559
_record_store: &ProcessorRecordStore,
46-
_op: Operation,
60+
op: Operation,
4761
) -> Result<(), BoxedError> {
62+
if let Some(inserted_at_index) = self.inserted_at_index {
63+
if let Operation::Insert { new } = op {
64+
let value = &new.values[inserted_at_index];
65+
if let Some(inserted_at) = value.to_timestamp() {
66+
let latency = Local::now().naive_utc() - inserted_at.naive_utc();
67+
info!("Latency: {}ms", latency.num_milliseconds());
68+
} else {
69+
warn!("expecting timestamp, got {:?}", value);
70+
}
71+
}
72+
}
4873
Ok(())
4974
}
5075

0 commit comments

Comments
 (0)