Skip to content

Commit 51d166e

Browse files
author
Devdutt Shenoi
committed
refactor: event construction and processing
1 parent a86fc47 commit 51d166e

File tree

6 files changed

+137
-243
lines changed

6 files changed

+137
-243
lines changed

src/connectors/kafka/processor.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ use tokio_stream::wrappers::ReceiverStream;
2626
use tracing::{debug, error};
2727

2828
use crate::{
29-
connectors::common::processor::Processor, event::format::LogSource, parseable::PARSEABLE,
29+
connectors::common::processor::Processor,
30+
event::format::{json, EventFormat, LogSource},
31+
parseable::PARSEABLE,
3032
storage::StreamType,
3133
};
3234

@@ -56,14 +58,15 @@ impl ParseableSinkProcessor {
5658
}
5759
}
5860

59-
PARSEABLE
60-
.get_or_create_stream(stream_name)
61-
.push_logs(
62-
Value::Array(json_vec),
63-
total_payload_size,
64-
&LogSource::Custom("Kafka".to_owned()),
65-
)
66-
.await?;
61+
let stream = PARSEABLE.get_or_create_stream(stream_name);
62+
63+
json::Event::new(
64+
Value::Array(json_vec),
65+
total_payload_size,
66+
LogSource::Custom("Kafka".to_owned()),
67+
)
68+
.into_event(&stream)?
69+
.process(&stream)?;
6770

6871
Ok(total_payload_size)
6972
}

0 commit comments

Comments
 (0)