Skip to content

Commit d860819

Browse files
author
Devdutt Shenoi
authored
Merge branch 'main' into filewriter
2 parents 4b3f3df + e1e2f2a commit d860819

File tree

13 files changed

+302
-497
lines changed

13 files changed

+302
-497
lines changed

src/connectors/kafka/processor.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,7 @@ impl Processor<Vec<ConsumerRecord>, ()> for ParseableSinkProcessor {
109109
let len = records.len();
110110
debug!("Processing {} records", len);
111111

112-
self.build_event_from_chunk(&records)
113-
.await?
114-
.process()
115-
.await?;
112+
self.build_event_from_chunk(&records).await?.process()?;
116113

117114
debug!("Processed {} records", len);
118115
Ok(())

src/event/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ pub struct Event {
4646

4747
// Events holds the schema related to a each event for a single log stream
4848
impl Event {
49-
pub async fn process(self) -> Result<(), EventError> {
49+
pub fn process(self) -> Result<(), EventError> {
5050
let mut key = get_schema_key(&self.rb.schema().fields);
5151
if self.time_partition.is_some() {
5252
let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string();

src/handlers/http/ingest.rs

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,27 +24,21 @@ use arrow_array::RecordBatch;
2424
use bytes::Bytes;
2525
use chrono::Utc;
2626
use http::StatusCode;
27-
use opentelemetry_proto::tonic::logs::v1::LogsData;
28-
use opentelemetry_proto::tonic::metrics::v1::MetricsData;
29-
use opentelemetry_proto::tonic::trace::v1::TracesData;
3027
use serde_json::Value;
3128

3229
use crate::event::error::EventError;
3330
use crate::event::format::{self, EventFormat, LogSource};
3431
use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
3532
use crate::metadata::SchemaVersion;
3633
use crate::option::Mode;
37-
use crate::otel::logs::flatten_otel_logs;
38-
use crate::otel::metrics::flatten_otel_metrics;
39-
use crate::otel::traces::flatten_otel_traces;
4034
use crate::parseable::{StreamNotFound, PARSEABLE};
4135
use crate::storage::{ObjectStorageError, StreamType};
4236
use crate::utils::header_parsing::ParseHeaderError;
4337
use crate::utils::json::flatten::JsonFlattenError;
4438
use crate::{event, LOCK_EXPECT};
4539

4640
use super::logstream::error::{CreateStreamError, StreamError};
47-
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
41+
use super::modal::utils::ingest_utils::flatten_and_push_logs;
4842
use super::users::dashboards::DashboardError;
4943
use super::users::filters::FiltersError;
5044

@@ -70,6 +64,14 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
7064
.get(LOG_SOURCE_KEY)
7165
.and_then(|h| h.to_str().ok())
7266
.map_or(LogSource::default(), LogSource::from);
67+
68+
if matches!(
69+
log_source,
70+
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces
71+
) {
72+
return Err(PostError::OtelNotSupported);
73+
}
74+
7375
flatten_and_push_logs(json, &stream_name, &log_source).await?;
7476

7577
Ok(HttpResponse::Ok().finish())
@@ -104,8 +106,8 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
104106
custom_partition_values: HashMap::new(),
105107
stream_type: StreamType::Internal,
106108
}
107-
.process()
108-
.await?;
109+
.process()?;
110+
109111
Ok(())
110112
}
111113

@@ -133,11 +135,7 @@ pub async fn handle_otel_logs_ingestion(
133135
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelLogs)
134136
.await?;
135137

136-
//custom flattening required for otel logs
137-
let logs: LogsData = serde_json::from_value(json)?;
138-
for record in flatten_otel_logs(&logs) {
139-
push_logs(&stream_name, record, &log_source).await?;
140-
}
138+
flatten_and_push_logs(json, &stream_name, &log_source).await?;
141139

142140
Ok(HttpResponse::Ok().finish())
143141
}
@@ -168,11 +166,7 @@ pub async fn handle_otel_metrics_ingestion(
168166
)
169167
.await?;
170168

171-
//custom flattening required for otel metrics
172-
let metrics: MetricsData = serde_json::from_value(json)?;
173-
for record in flatten_otel_metrics(metrics) {
174-
push_logs(&stream_name, record, &log_source).await?;
175-
}
169+
flatten_and_push_logs(json, &stream_name, &log_source).await?;
176170

177171
Ok(HttpResponse::Ok().finish())
178172
}
@@ -200,11 +194,7 @@ pub async fn handle_otel_traces_ingestion(
200194
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelTraces)
201195
.await?;
202196

203-
//custom flattening required for otel traces
204-
let traces: TracesData = serde_json::from_value(json)?;
205-
for record in flatten_otel_traces(&traces) {
206-
push_logs(&stream_name, record, &log_source).await?;
207-
}
197+
flatten_and_push_logs(json, &stream_name, &log_source).await?;
208198

209199
Ok(HttpResponse::Ok().finish())
210200
}
@@ -245,6 +235,14 @@ pub async fn post_event(
245235
.get(LOG_SOURCE_KEY)
246236
.and_then(|h| h.to_str().ok())
247237
.map_or(LogSource::default(), LogSource::from);
238+
239+
if matches!(
240+
log_source,
241+
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces
242+
) {
243+
return Err(PostError::OtelNotSupported);
244+
}
245+
248246
flatten_and_push_logs(json, &stream_name, &log_source).await?;
249247

250248
Ok(HttpResponse::Ok().finish())

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
use arrow_schema::Field;
2020
use chrono::{DateTime, NaiveDateTime, Utc};
2121
use itertools::Itertools;
22+
use opentelemetry_proto::tonic::{
23+
logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData,
24+
};
2225
use serde_json::Value;
2326
use std::{collections::HashMap, sync::Arc};
2427

@@ -32,6 +35,7 @@ use crate::{
3235
kinesis::{flatten_kinesis_logs, Message},
3336
},
3437
metadata::SchemaVersion,
38+
otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces},
3539
parseable::{StreamNotFound, PARSEABLE},
3640
storage::StreamType,
3741
utils::json::{convert_array_to_object, flatten::convert_to_array},
@@ -45,14 +49,32 @@ pub async fn flatten_and_push_logs(
4549
) -> Result<(), PostError> {
4650
match log_source {
4751
LogSource::Kinesis => {
52+
//custom flattening required for Amazon Kinesis
4853
let message: Message = serde_json::from_value(json)?;
49-
let json = flatten_kinesis_logs(message);
50-
for record in json {
54+
for record in flatten_kinesis_logs(message) {
5155
push_logs(stream_name, record, &LogSource::default()).await?;
5256
}
5357
}
54-
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => {
55-
return Err(PostError::OtelNotSupported);
58+
LogSource::OtelLogs => {
59+
//custom flattening required for otel logs
60+
let logs: LogsData = serde_json::from_value(json)?;
61+
for record in flatten_otel_logs(&logs) {
62+
push_logs(stream_name, record, log_source).await?;
63+
}
64+
}
65+
LogSource::OtelTraces => {
66+
//custom flattening required for otel traces
67+
let traces: TracesData = serde_json::from_value(json)?;
68+
for record in flatten_otel_traces(&traces) {
69+
push_logs(stream_name, record, log_source).await?;
70+
}
71+
}
72+
LogSource::OtelMetrics => {
73+
//custom flattening required for otel metrics
74+
let metrics: MetricsData = serde_json::from_value(json)?;
75+
for record in flatten_otel_metrics(metrics) {
76+
push_logs(stream_name, record, log_source).await?;
77+
}
5678
}
5779
_ => {
5880
push_logs(stream_name, json, log_source).await?;
@@ -61,7 +83,7 @@ pub async fn flatten_and_push_logs(
6183
Ok(())
6284
}
6385

64-
pub async fn push_logs(
86+
async fn push_logs(
6587
stream_name: &str,
6688
json: Value,
6789
log_source: &LogSource,
@@ -138,8 +160,7 @@ pub async fn push_logs(
138160
custom_partition_values,
139161
stream_type: StreamType::UserDefined,
140162
}
141-
.process()
142-
.await?;
163+
.process()?;
143164
}
144165
Ok(())
145166
}

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
pub mod about;
20-
mod alerts;
20+
pub mod alerts;
2121
pub mod analytics;
2222
pub mod audit;
2323
pub mod banner;

src/otel/traces.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,9 +293,15 @@ fn flatten_span_record(span_record: &Span) -> Vec<Map<String, Value>> {
293293
span_record_json.extend(flatten_status(status));
294294
}
295295

296-
for span_json in &mut span_records_json {
297-
for (key, value) in &span_record_json {
298-
span_json.insert(key.clone(), value.clone());
296+
// if span_record.events is null, code should still flatten other elements in the span record - this is handled in the if block
297+
// else block handles the flattening the span record that includes events and links records in each span record
298+
if span_records_json.is_empty() {
299+
span_records_json = vec![span_record_json];
300+
} else {
301+
for span_json in &mut span_records_json {
302+
for (key, value) in &span_record_json {
303+
span_json.insert(key.clone(), value.clone());
304+
}
299305
}
300306
}
301307

src/parseable/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -761,16 +761,14 @@ impl Parseable {
761761
.await
762762
{
763763
error!(
764-
"Failed to update first_event_at in storage for stream {:?}: {err:?}",
765-
stream_name
764+
"Failed to update first_event_at in storage for stream {stream_name:?}: {err:?}"
766765
);
767766
}
768767

769768
match self.get_stream(stream_name) {
770769
Ok(stream) => stream.set_first_event_at(first_event_at),
771770
Err(err) => error!(
772-
"Failed to update first_event_at in stream info for stream {:?}: {err:?}",
773-
stream_name
771+
"Failed to update first_event_at in stream info for stream {stream_name:?}: {err:?}"
774772
),
775773
}
776774

0 commit comments

Comments
 (0)