Skip to content

Commit c2faefc

Browse files
author
Devdutt Shenoi
committed
refactor: further streamline, associate w/ Parseable
1 parent 303ba35 commit c2faefc

File tree

5 files changed

+73
-125
lines changed

5 files changed

+73
-125
lines changed

src/connectors/kafka/processor.rs

Lines changed: 13 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,7 @@ use tokio_stream::wrappers::ReceiverStream;
2626
use tracing::{debug, error};
2727

2828
use crate::{
29-
connectors::common::processor::Processor,
30-
event::{
31-
format::{json, EventFormat, LogSource},
32-
Event as ParseableEvent,
33-
},
34-
parseable::PARSEABLE,
29+
connectors::common::processor::Processor, event::format::LogSource, parseable::PARSEABLE,
3530
storage::StreamType,
3631
};
3732

@@ -41,10 +36,7 @@ use super::{config::BufferConfig, ConsumerRecord, StreamConsumer, TopicPartition
4136
pub struct ParseableSinkProcessor;
4237

4338
impl ParseableSinkProcessor {
44-
async fn build_event_from_chunk(
45-
&self,
46-
records: &[ConsumerRecord],
47-
) -> anyhow::Result<ParseableEvent> {
39+
async fn process_event_from_chunk(&self, records: &[ConsumerRecord]) -> anyhow::Result<u64> {
4840
let stream_name = records
4941
.first()
5042
.map(|r| r.topic.as_str())
@@ -54,14 +46,6 @@ impl ParseableSinkProcessor {
5446
.create_stream_if_not_exists(stream_name, StreamType::UserDefined, LogSource::Json)
5547
.await?;
5648

57-
let stream = PARSEABLE.get_stream(stream_name)?;
58-
let schema = stream.get_schema_raw();
59-
let time_partition = stream.get_time_partition();
60-
let time_partition_limit = stream.get_time_partition_limit();
61-
let custom_partition = stream.get_custom_partition();
62-
let static_schema_flag = stream.get_static_schema_flag();
63-
let schema_version = stream.get_schema_version();
64-
6549
let mut json_vec = Vec::with_capacity(records.len());
6650
let mut total_payload_size = 0u64;
6751

@@ -72,20 +56,15 @@ impl ParseableSinkProcessor {
7256
}
7357
}
7458

75-
let p_event = json::Event::new(Value::Array(json_vec)).into_event(
76-
stream_name.to_string(),
77-
total_payload_size,
78-
&schema,
79-
static_schema_flag,
80-
custom_partition.as_ref(),
81-
time_partition.as_ref(),
82-
time_partition_limit,
83-
schema_version,
84-
&LogSource::Custom("Kafka".to_owned()),
85-
StreamType::UserDefined,
86-
)?;
87-
88-
Ok(p_event)
59+
PARSEABLE
60+
.get_or_create_stream(stream_name)
61+
.push_logs(
62+
Value::Array(json_vec),
63+
&LogSource::Custom("Kafka".to_owned()),
64+
)
65+
.await?;
66+
67+
Ok(total_payload_size)
8968
}
9069
}
9170

@@ -95,9 +74,9 @@ impl Processor<Vec<ConsumerRecord>, ()> for ParseableSinkProcessor {
9574
let len = records.len();
9675
debug!("Processing {len} records");
9776

98-
self.build_event_from_chunk(&records).await?.process()?;
77+
let size = self.process_event_from_chunk(&records).await?;
9978

100-
debug!("Processed {len} records");
79+
debug!("Processed {len} records, size = {size} Bytes");
10180
Ok(())
10281
}
10382
}

src/handlers/http/ingest.rs

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,15 @@ use serde_json::Value;
2828

2929
use crate::event;
3030
use crate::event::error::EventError;
31-
use crate::event::format::{self, EventFormat, LogSource};
31+
use crate::event::format::LogSource;
3232
use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
33-
use crate::metadata::SchemaVersion;
3433
use crate::option::Mode;
3534
use crate::parseable::{StreamNotFound, PARSEABLE};
3635
use crate::storage::{ObjectStorageError, StreamType};
3736
use crate::utils::header_parsing::ParseHeaderError;
3837
use crate::utils::json::flatten::JsonFlattenError;
3938

4039
use super::logstream::error::{CreateStreamError, StreamError};
41-
use super::modal::utils::ingest_utils::push_logs;
4240
use super::users::dashboards::DashboardError;
4341
use super::users::filters::FiltersError;
4442

@@ -72,31 +70,21 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
7270
return Err(PostError::OtelNotSupported);
7371
}
7472

75-
push_logs(&stream_name, json, &log_source).await?;
73+
PARSEABLE
74+
.get_or_create_stream(&stream_name)
75+
.push_logs(json, &log_source)
76+
.await?;
7677

7778
Ok(HttpResponse::Ok().finish())
7879
}
7980

8081
pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
81-
let size: usize = body.len();
8282
let json: Value = serde_json::from_slice(&body)?;
83-
let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw();
84-
85-
// For internal streams, use old schema
86-
format::json::Event::new(json)
87-
.into_event(
88-
stream_name,
89-
size as u64,
90-
&schema,
91-
false,
92-
None,
93-
None,
94-
None,
95-
SchemaVersion::V0,
96-
&LogSource::Pmeta,
97-
StreamType::Internal,
98-
)?
99-
.process()?;
83+
84+
PARSEABLE
85+
.get_stream(&stream_name)?
86+
.push_logs(json, &LogSource::Pmeta)
87+
.await?;
10088

10189
Ok(())
10290
}
@@ -125,7 +113,10 @@ pub async fn handle_otel_logs_ingestion(
125113
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelLogs)
126114
.await?;
127115

128-
push_logs(&stream_name, json, &log_source).await?;
116+
PARSEABLE
117+
.get_or_create_stream(&stream_name)
118+
.push_logs(json, &log_source)
119+
.await?;
129120

130121
Ok(HttpResponse::Ok().finish())
131122
}
@@ -156,7 +147,10 @@ pub async fn handle_otel_metrics_ingestion(
156147
)
157148
.await?;
158149

159-
push_logs(&stream_name, json, &log_source).await?;
150+
PARSEABLE
151+
.get_or_create_stream(&stream_name)
152+
.push_logs(json, &log_source)
153+
.await?;
160154

161155
Ok(HttpResponse::Ok().finish())
162156
}
@@ -184,7 +178,10 @@ pub async fn handle_otel_traces_ingestion(
184178
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelTraces)
185179
.await?;
186180

187-
push_logs(&stream_name, json, &log_source).await?;
181+
PARSEABLE
182+
.get_or_create_stream(&stream_name)
183+
.push_logs(json, &log_source)
184+
.await?;
188185

189186
Ok(HttpResponse::Ok().finish())
190187
}
@@ -233,7 +230,10 @@ pub async fn post_event(
233230
return Err(PostError::OtelNotSupported);
234231
}
235232

236-
push_logs(&stream_name, json, &log_source).await?;
233+
PARSEABLE
234+
.get_or_create_stream(&stream_name)
235+
.push_logs(json, &log_source)
236+
.await?;
237237

238238
Ok(HttpResponse::Ok().finish())
239239
}

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 0 additions & 63 deletions
This file was deleted.

src/handlers/http/modal/utils/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,5 @@
1616
*
1717
*/
1818

19-
pub mod ingest_utils;
2019
pub mod logstream_utils;
2120
pub mod rbac_utils;

src/parseable/streams.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,16 @@ use parquet::{
4242
};
4343
use rand::distributions::DistString;
4444
use relative_path::RelativePathBuf;
45+
use serde_json::Value;
4546
use tokio::task::JoinSet;
4647
use tracing::{error, info, trace, warn};
4748

4849
use crate::{
4950
cli::Options,
50-
event::DEFAULT_TIMESTAMP_KEY,
51+
event::{
52+
format::{json, EventFormat, LogSource},
53+
DEFAULT_TIMESTAMP_KEY,
54+
},
5155
metadata::{LogStreamMetadata, SchemaVersion},
5256
metrics,
5357
option::Mode,
@@ -109,6 +113,35 @@ impl Stream {
109113
})
110114
}
111115

116+
pub async fn push_logs(&self, json: Value, log_source: &LogSource) -> anyhow::Result<()> {
117+
let time_partition = self.get_time_partition();
118+
let time_partition_limit = self.get_time_partition_limit();
119+
let static_schema_flag = self.get_static_schema_flag();
120+
let custom_partition = self.get_custom_partition();
121+
let schema_version = self.get_schema_version();
122+
let schema = self.get_schema_raw();
123+
let stream_type = self.get_stream_type();
124+
125+
let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length
126+
127+
json::Event::new(json)
128+
.into_event(
129+
self.stream_name.to_owned(),
130+
origin_size,
131+
&schema,
132+
static_schema_flag,
133+
custom_partition.as_ref(),
134+
time_partition.as_ref(),
135+
time_partition_limit,
136+
schema_version,
137+
log_source,
138+
stream_type,
139+
)?
140+
.process()?;
141+
142+
Ok(())
143+
}
144+
112145
// Concatenates record batches and puts them in memory store for each event.
113146
pub fn push(
114147
&self,

0 commit comments

Comments
 (0)