Skip to content

Commit 03e67b4

Browse files
add protobuf support for otel logs
1 parent 32bfc04 commit 03e67b4

File tree

6 files changed

+175
-41
lines changed

6 files changed

+175
-41
lines changed

Cargo.lock

Lines changed: 89 additions & 27 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ actix-web-prometheus = { version = "0.1" }
3333
actix-web-static-files = "4.0"
3434
http = "0.2.7"
3535
http-auth-basic = "0.3.3"
36-
tonic = { version = "0.12.3", features = ["tls", "transport", "gzip", "zstd"] }
36+
tonic = { version = "0.12.3", features = ["tls", "transport", "gzip", "zstd", "prost"] }
3737
tonic-web = "0.12.3"
3838
tower-http = { version = "0.6.1", features = ["cors"] }
3939
url = "2.4.0"
@@ -76,7 +76,13 @@ tokio-stream = { version = "0.1", features = ["fs"] }
7676
tokio-util = { version = "0.7" }
7777

7878
# Logging and Metrics
79-
opentelemetry-proto = { git = "https://github.com/parseablehq/opentelemetry-rust", branch = "fix-metrics-u64-serialization" }
79+
opentelemetry-proto = { version = "0.30.0", features = [
80+
"gen-tonic",
81+
"with-serde",
82+
"logs",
83+
"metrics",
84+
"trace",
85+
] }
8086
prometheus = { version = "0.13", features = ["process"] }
8187
prometheus-parse = "0.2.5"
8288
tracing = "0.1"
@@ -133,6 +139,7 @@ xxhash-rust = { version = "0.8", features = ["xxh3"] }
133139
futures-core = "0.3.31"
134140
tempfile = "3.20.0"
135141
lazy_static = "1.4.0"
142+
prost = "0.13.1"
136143

137144
[build-dependencies]
138145
cargo_toml = "0.21"

src/handlers/http/ingest.rs

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
use std::collections::{HashMap, HashSet};
2020

21-
use actix_web::web::{Json, Path};
21+
use actix_web::web::{self, Json, Path};
2222
use actix_web::{HttpRequest, HttpResponse, http::header::ContentType};
2323
use arrow_array::RecordBatch;
2424
use bytes::Bytes;
@@ -29,18 +29,21 @@ use crate::event::error::EventError;
2929
use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST};
3030
use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
3131
use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY};
32+
use crate::handlers::http::modal::utils::ingest_utils::push_logs;
3233
use crate::handlers::{
3334
EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType,
3435
};
3536
use crate::metadata::SchemaVersion;
3637
use crate::option::Mode;
37-
use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST;
38+
use crate::otel::logs::{OTEL_LOG_KNOWN_FIELD_LIST, flatten_otel_protobuf};
3839
use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST;
3940
use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST;
4041
use crate::parseable::{PARSEABLE, StreamNotFound};
4142
use crate::storage::{ObjectStorageError, StreamType};
4243
use crate::utils::header_parsing::ParseHeaderError;
4344
use crate::utils::json::{flatten::JsonFlattenError, strict::StrictValue};
45+
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
46+
use prost::Message;
4447

4548
use super::logstream::error::{CreateStreamError, StreamError};
4649
use super::modal::utils::ingest_utils::{flatten_and_push_logs, get_custom_fields_from_header};
@@ -166,7 +169,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
166169
// creates if stream does not exist
167170
pub async fn handle_otel_logs_ingestion(
168171
req: HttpRequest,
169-
Json(json): Json<StrictValue>,
172+
body: web::Bytes,
170173
) -> Result<HttpResponse, PostError> {
171174
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
172175
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
@@ -217,14 +220,35 @@ pub async fn handle_otel_logs_ingestion(
217220
.await?;
218221

219222
let p_custom_fields = get_custom_fields_from_header(&req);
223+
match req.headers().get("Content-Type") {
224+
Some(content_type) => {
225+
if content_type == "application/json" {
226+
flatten_and_push_logs(
227+
serde_json::from_slice(&body)?,
228+
&stream_name,
229+
&log_source,
230+
&p_custom_fields,
231+
)
232+
.await?;
233+
}
220234

221-
flatten_and_push_logs(
222-
json.into_inner(),
223-
&stream_name,
224-
&log_source,
225-
&p_custom_fields,
226-
)
227-
.await?;
235+
if content_type == "application/x-protobuf" {
236+
match ExportLogsServiceRequest::decode(body) {
237+
Ok(json) => {
238+
for record in flatten_otel_protobuf(&json) {
239+
push_logs(&stream_name, record, &log_source, &p_custom_fields).await?;
240+
}
241+
}
242+
Err(e) => {
243+
return Err(PostError::Invalid(e.into()));
244+
}
245+
}
246+
}
247+
}
248+
None => {
249+
return Err(PostError::Header(ParseHeaderError::InvalidValue));
250+
}
251+
}
228252

229253
Ok(HttpResponse::Ok().finish())
230254
}

0 commit comments

Comments
 (0)