diff --git a/Cargo.lock b/Cargo.lock index 32fffdbaf..053897936 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -81,7 +81,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" dependencies = [ "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -218,7 +218,7 @@ dependencies = [ "actix-router", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -248,7 +248,7 @@ dependencies = [ "pin-project", "prometheus", "quanta", - "thiserror", + "thiserror 1.0.64", ] [[package]] @@ -712,7 +712,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -723,7 +723,7 @@ checksum = "a27b8a3a6e1a44fa4c8baf1f653e4172e81486d4941f2237e20dc2d0cf4ddff1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -990,7 +990,7 @@ dependencies = [ "semver", "serde", "serde_json", - "thiserror", + "thiserror 1.0.64", ] [[package]] @@ -1106,7 +1106,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -1336,7 +1336,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -1347,7 +1347,7 @@ checksum = "a668eda54683121533a393014d8692171709ff57a7d61f187b6e782719f8933f" dependencies = [ "darling_core", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -1776,7 +1776,7 @@ checksum = "67e77553c4162a157adbf834ebae5b415acbecbeafc7a74b0e886657506a7611" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -1789,7 +1789,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -1811,7 +1811,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -1978,7 +1978,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -2938,7 +2938,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -3009,7 +3009,7 @@ dependencies = [ "reqwest 0.12.8", "serde", "serde_json", - "thiserror", + "thiserror 1.0.64", "url", "validator", ] @@ -3020,6 +3020,49 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "opentelemetry" +version = "0.27.1" +source = "git+https://github.com/parseablehq/opentelemetry-rust?branch=fix-metrics-u64-serialization#7e84c98d75ae16993a37bd5ff75a9768d652fe8f" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.9", + "tracing", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.27.0" +source = "git+https://github.com/parseablehq/opentelemetry-rust?branch=fix-metrics-u64-serialization#7e84c98d75ae16993a37bd5ff75a9768d652fe8f" +dependencies = [ + "hex", + "opentelemetry", + "opentelemetry_sdk", + "prost", + "serde", + "tonic", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.27.1" +source = "git+https://github.com/parseablehq/opentelemetry-rust?branch=fix-metrics-u64-serialization#7e84c98d75ae16993a37bd5ff75a9768d652fe8f" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "opentelemetry", + "percent-encoding", + "rand", + "serde_json", + "thiserror 2.0.9", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -3161,6 +3204,7 @@ dependencies = [ "object_store", "once_cell", "openid", + "opentelemetry-proto", "parquet", "path-clean", "prometheus", @@ -3183,7 +3227,7 @@ dependencies = [ "sha2", "static-files", "sysinfo", - "thiserror", + "thiserror 2.0.9", "thread-priority", "tokio", "tokio-stream", @@ -3311,7 +3355,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -3351,7 +3395,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ac2cf0f2e4f42b49f5ffd07dae8d746508ef7526c13940e5f524012ae6c6550" dependencies = [ "proc-macro2", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -3423,7 +3467,7 @@ dependencies = [ "parking_lot", "procfs", "protobuf", - "thiserror", + "thiserror 1.0.64", ] [[package]] @@ -3465,7 +3509,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.79", + "syn 2.0.87", "tempfile", ] @@ -3479,7 +3523,7 @@ dependencies = [ "itertools 0.13.0", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -3536,7 +3580,7 @@ dependencies = [ "rustc-hash", "rustls 0.23.13", "socket2", - "thiserror", + "thiserror 1.0.64", "tokio", "tracing", ] @@ -3553,7 +3597,7 @@ dependencies = [ "rustc-hash", "rustls 0.23.13", "slab", - "thiserror", + "thiserror 1.0.64", "tinyvec", "tracing", ] @@ -3864,7 +3908,7 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.79", + "syn 2.0.87", "unicode-ident", ] @@ -4117,7 +4161,7 @@ checksum = "e88edab869b01783ba905e7d0153f9fc1a6505a96e4ad3018011eedb838566d9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -4140,7 +4184,7 @@ checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -4282,7 +4326,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -4325,7 +4369,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -4370,7 +4414,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -4391,9 +4435,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.79" +version = "2.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89132cd0bf050864e1d38dc3bbc07a0eb8e7530af26344d3d2bbbef83499f590" +checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d" dependencies = [ "proc-macro2", "quote", @@ -4468,7 +4512,16 @@ version = "1.0.64" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d50af8abc119fb8bb6dbabcfa89656f46f84aa0ac7688088608076ad2b459a84" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.64", +] + +[[package]] +name = "thiserror" +version = "2.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f072643fd0190df67a8bab670c20ef5d8737177d6ac6b2e9a236cb096206b2cc" +dependencies = [ + "thiserror-impl 2.0.9", ] [[package]] @@ -4479,7 +4532,18 @@ checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b50fa271071aae2e6ee85f842e2e28ba8cd2c5fb67f11fcb1fd70b276f9e7d4" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", ] [[package]] @@ -4601,7 +4665,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -4831,7 +4895,7 @@ checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -4962,7 +5026,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f4e71ddbefed856d5881821d6ada4e606bbb91fd332296963ed596e2ad2100f3" dependencies = [ "libc", - "thiserror", + "thiserror 1.0.64", "windows 0.52.0", ] @@ -5037,7 +5101,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -5124,7 +5188,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", "wasm-bindgen-shared", ] @@ -5158,7 +5222,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -5297,7 +5361,7 @@ checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -5308,7 +5372,7 @@ checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -5606,7 +5670,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -5628,7 +5692,7 @@ dependencies = [ "flate2", "indexmap 2.5.0", "memchr", - "thiserror", + "thiserror 1.0.64", "zopfli", ] diff --git a/Cargo.toml b/Cargo.toml index 21938aadd..d57b900ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,9 +64,10 @@ humantime-serde = "1.1" itertools = "0.13.0" num_cpus = "1.15" once_cell = "1.17.1" +opentelemetry-proto = {git = "https://github.com/parseablehq/opentelemetry-rust", branch="fix-metrics-u64-serialization"} prometheus = { version = "0.13", features = ["process"] } rand = "0.8.5" -rdkafka = {version = "0.36.2", default-features = false, features = ["tokio"]} +rdkafka = { version = "0.36.2", default-features = false, features = ["tokio"] } regex = "1.7.3" relative-path = { version = "1.7", features = ["serde"] } reqwest = { version = "0.11.27", default-features = false, features = [ @@ -80,7 +81,7 @@ serde = { version = "1.0", features = ["rc", "derive"] } serde_json = "1.0" static-files = "0.2" sysinfo = "0.31.4" -thiserror = "1.0.64" +thiserror = "2.0.0" thread-priority = "1.0.0" tokio = { version = "1.28", default-features = false, features = [ "sync", diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 6d1cf3419..ab9116eb7 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -29,7 +29,7 @@ use serde_json::Value; use std::{collections::HashMap, sync::Arc}; use tracing::error; -use super::{EventFormat, Metadata, Tags}; +use super::{EventFormat, LogSource, Metadata, Tags}; use crate::{ metadata::SchemaVersion, utils::{arrow::get_field, json::flatten_json_body}, @@ -52,8 +52,17 @@ impl EventFormat for Event { static_schema_flag: Option<&String>, time_partition: Option<&String>, schema_version: SchemaVersion, + log_source: &LogSource, ) -> Result<(Self::Data, Vec>, bool, Tags, Metadata), anyhow::Error> { - let data = flatten_json_body(self.data, None, None, None, schema_version, false)?; + let data = flatten_json_body( + self.data, + None, + None, + None, + schema_version, + false, + log_source, + )?; let stream_schema = schema; // incoming event may be a single json or a json array diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index b3cb8e4dd..593e82f1e 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -43,6 +43,38 @@ type Tags = String; type Metadata = String; type EventSchema = Vec>; +/// Source of the logs, used to perform special processing for certain sources +#[derive(Default, Debug, Clone, PartialEq, Eq)] +pub enum LogSource { + // AWS Kinesis sends logs in the format of a json array + Kinesis, + // OpenTelemetry sends logs according to the specification as explained here + // https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/logs/v1 + OtelLogs, + // OpenTelemetry sends traces according to the specification as explained here + // https://github.com/open-telemetry/opentelemetry-proto/blob/v1.0.0/opentelemetry/proto/trace/v1/trace.proto + OtelMetrics, + // OpenTelemetry sends traces according to the specification as explained here + // https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/metrics/v1 + OtelTraces, + #[default] + // Json object or array + Json, + Custom(String), +} + +impl From<&str> for LogSource { + fn from(s: &str) -> Self { + match s { + "kinesis" => LogSource::Kinesis, + "otel-logs" => LogSource::OtelLogs, + "otel-metrics" => LogSource::OtelMetrics, + "otel-traces" => LogSource::OtelTraces, + custom => LogSource::Custom(custom.to_owned()), + } + } +} + // Global Trait for event format // This trait is implemented by all the event formats pub trait EventFormat: Sized { @@ -54,6 +86,7 @@ pub trait EventFormat: Sized { static_schema_flag: Option<&String>, time_partition: Option<&String>, schema_version: SchemaVersion, + log_source: &LogSource, ) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>; fn decode(data: Self::Data, schema: Arc) -> Result; @@ -64,12 +97,14 @@ pub trait EventFormat: Sized { static_schema_flag: Option<&String>, time_partition: Option<&String>, schema_version: SchemaVersion, + log_source: &LogSource, ) -> Result<(RecordBatch, bool), AnyError> { let (data, mut schema, is_first, tags, metadata) = self.to_data( storage_schema, static_schema_flag, time_partition, schema_version, + log_source, )?; // DEFAULT_TAGS_KEY, DEFAULT_METADATA_KEY and DEFAULT_TIMESTAMP_KEY are reserved field names diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index ad6fd0089..3b5c327c7 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -20,16 +20,20 @@ use super::logstream::error::{CreateStreamError, StreamError}; use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs}; use super::users::dashboards::DashboardError; use super::users::filters::FiltersError; +use crate::event::format::LogSource; use crate::event::{ self, error::EventError, format::{self, EventFormat}, }; use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage; -use crate::handlers::STREAM_NAME_HEADER_KEY; +use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; use crate::metadata::error::stream_info::MetadataError; use crate::metadata::{SchemaVersion, STREAM_INFO}; use crate::option::{Mode, CONFIG}; +use crate::otel::logs::flatten_otel_logs; +use crate::otel::metrics::flatten_otel_metrics; +use crate::otel::traces::flatten_otel_traces; use crate::storage::{ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; @@ -38,6 +42,10 @@ use arrow_schema::Schema; use bytes::Bytes; use chrono::Utc; use http::StatusCode; +use nom::AsBytes; +use opentelemetry_proto::tonic::logs::v1::LogsData; +use opentelemetry_proto::tonic::metrics::v1::MetricsData; +use opentelemetry_proto::tonic::trace::v1::TracesData; use serde_json::Value; use std::collections::HashMap; use std::sync::Arc; @@ -85,7 +93,13 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< metadata: String::default(), }; // For internal streams, use old schema - event.into_recordbatch(&schema, None, None, SchemaVersion::V0)? + event.into_recordbatch( + &schema, + None, + None, + SchemaVersion::V0, + &LogSource::default(), + )? }; event::Event { rb, @@ -106,21 +120,102 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< // Handler for POST /v1/logs to ingest OTEL logs // ingests events by extracting stream name from header // creates if stream does not exist -pub async fn handle_otel_ingestion( +pub async fn handle_otel_logs_ingestion( req: HttpRequest, body: Bytes, ) -> Result { - if let Some((_, stream_name)) = req - .headers() - .iter() - .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) - { - let stream_name = stream_name.to_str().unwrap(); - create_stream_if_not_exists(stream_name, &StreamType::UserDefined.to_string()).await?; - push_logs(stream_name, &req, &body).await?; - } else { + let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); + }; + + let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else { + return Err(PostError::Header(ParseHeaderError::MissingLogSource)); + }; + let log_source = LogSource::from(log_source.to_str().unwrap()); + if log_source != LogSource::OtelLogs { + return Err(PostError::Invalid(anyhow::anyhow!( + "Please use x-p-log-source: otel-logs for ingesting otel logs" + ))); + } + + let stream_name = stream_name.to_str().unwrap().to_owned(); + create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?; + + //custom flattening required for otel logs + let logs: LogsData = serde_json::from_slice(body.as_bytes())?; + let mut json = flatten_otel_logs(&logs); + for record in json.iter_mut() { + let body: Bytes = serde_json::to_vec(record).unwrap().into(); + push_logs(&stream_name, &req, &body, &log_source).await?; + } + + Ok(HttpResponse::Ok().finish()) +} + +// Handler for POST /v1/metrics to ingest OTEL metrics +// ingests events by extracting stream name from header +// creates if stream does not exist +pub async fn handle_otel_metrics_ingestion( + req: HttpRequest, + body: Bytes, +) -> Result { + let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { + return Err(PostError::Header(ParseHeaderError::MissingStreamName)); + }; + let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else { + return Err(PostError::Header(ParseHeaderError::MissingLogSource)); + }; + let log_source = LogSource::from(log_source.to_str().unwrap()); + if log_source != LogSource::OtelMetrics { + return Err(PostError::Invalid(anyhow::anyhow!( + "Please use x-p-log-source: otel-metrics for ingesting otel metrics" + ))); + } + let stream_name = stream_name.to_str().unwrap().to_owned(); + create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?; + + //custom flattening required for otel metrics + let metrics: MetricsData = serde_json::from_slice(body.as_bytes())?; + let mut json = flatten_otel_metrics(metrics); + for record in json.iter_mut() { + let body: Bytes = serde_json::to_vec(record).unwrap().into(); + push_logs(&stream_name, &req, &body, &log_source).await?; + } + + Ok(HttpResponse::Ok().finish()) +} + +// Handler for POST /v1/traces to ingest OTEL traces +// ingests events by extracting stream name from header +// creates if stream does not exist +pub async fn handle_otel_traces_ingestion( + req: HttpRequest, + body: Bytes, +) -> Result { + let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { + return Err(PostError::Header(ParseHeaderError::MissingStreamName)); + }; + + let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else { + return Err(PostError::Header(ParseHeaderError::MissingLogSource)); + }; + let log_source = LogSource::from(log_source.to_str().unwrap()); + if log_source != LogSource::OtelTraces { + return Err(PostError::Invalid(anyhow::anyhow!( + "Please use x-p-log-source: otel-traces for ingesting otel traces" + ))); + } + let stream_name = stream_name.to_str().unwrap().to_owned(); + create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?; + + //custom flattening required for otel traces + let traces: TracesData = serde_json::from_slice(body.as_bytes())?; + let mut json = flatten_otel_traces(&traces); + for record in json.iter_mut() { + let body: Bytes = serde_json::to_vec(record).unwrap().into(); + push_logs(&stream_name, &req, &body, &log_source).await?; } + Ok(HttpResponse::Ok().finish()) } @@ -280,7 +375,7 @@ mod tests { use serde_json::json; use crate::{ - event, + event::{self, format::LogSource}, handlers::{http::modal::utils::ingest_utils::into_event_batch, PREFIX_META, PREFIX_TAGS}, metadata::SchemaVersion, }; @@ -329,6 +424,7 @@ mod tests { None, None, SchemaVersion::V0, + &LogSource::default(), ) .unwrap(); @@ -379,6 +475,7 @@ mod tests { None, None, SchemaVersion::V0, + &LogSource::default(), ) .unwrap(); @@ -412,7 +509,16 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).unwrap(); + let (rb, _) = into_event_batch( + &req, + &json, + schema, + None, + None, + SchemaVersion::V0, + &LogSource::default(), + ) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); @@ -444,7 +550,16 @@ mod tests { let req = TestRequest::default().to_http_request(); - assert!(into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).is_err()); + assert!(into_event_batch( + &req, + &json, + schema, + None, + None, + SchemaVersion::V0, + &LogSource::default() + ) + .is_err()); } #[test] @@ -462,7 +577,16 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).unwrap(); + let (rb, _) = into_event_batch( + &req, + &json, + schema, + None, + None, + SchemaVersion::V0, + &LogSource::default(), + ) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -480,7 +604,8 @@ mod tests { HashMap::default(), None, None, - SchemaVersion::V0 + SchemaVersion::V0, + &LogSource::default() ) .is_err()) } @@ -512,6 +637,7 @@ mod tests { None, None, SchemaVersion::V0, + &LogSource::default(), ) .unwrap(); @@ -568,6 +694,7 @@ mod tests { None, None, SchemaVersion::V0, + &LogSource::default(), ) .unwrap(); @@ -617,7 +744,16 @@ mod tests { ); let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).unwrap(); + let (rb, _) = into_event_batch( + &req, + &json, + schema, + None, + None, + SchemaVersion::V0, + &LogSource::default(), + ) + .unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); @@ -666,7 +802,16 @@ mod tests { .into_iter(), ); - assert!(into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).is_err()); + assert!(into_event_batch( + &req, + &json, + schema, + None, + None, + SchemaVersion::V0, + &LogSource::default() + ) + .is_err()); } #[test] @@ -701,6 +846,7 @@ mod tests { None, None, SchemaVersion::V0, + &LogSource::default(), ) .unwrap(); @@ -781,6 +927,7 @@ mod tests { None, None, SchemaVersion::V1, + &LogSource::default(), ) .unwrap(); diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index d50edb96e..34a3d00af 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -438,7 +438,7 @@ impl Server { web::resource("/logs") .route( web::post() - .to(ingest::handle_otel_ingestion) + .to(ingest::handle_otel_logs_ingestion) .authorize_for_stream(Action::Ingest), ) .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), @@ -447,7 +447,7 @@ impl Server { web::resource("/metrics") .route( web::post() - .to(ingest::handle_otel_ingestion) + .to(ingest::handle_otel_metrics_ingestion) .authorize_for_stream(Action::Ingest), ) .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), @@ -456,7 +456,7 @@ impl Server { web::resource("/traces") .route( web::post() - .to(ingest::handle_otel_ingestion) + .to(ingest::handle_otel_traces_ingestion) .authorize_for_stream(Action::Ingest), ) .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 4ea637c00..31cff6d9f 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -16,23 +16,22 @@ * */ -use std::{collections::HashMap, sync::Arc}; - use actix_web::HttpRequest; use anyhow::anyhow; use arrow_schema::Field; use bytes::Bytes; use chrono::{DateTime, NaiveDateTime, Utc}; use serde_json::Value; +use std::{collections::HashMap, sync::Arc}; use crate::{ event::{ self, - format::{self, EventFormat}, + format::{self, EventFormat, LogSource}, }, handlers::{ http::{ingest::PostError, kinesis}, - LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, PREFIX_META, PREFIX_TAGS, SEPARATOR, + LOG_SOURCE_KEY, PREFIX_META, PREFIX_TAGS, SEPARATOR, }, metadata::{SchemaVersion, STREAM_INFO}, storage::StreamType, @@ -47,16 +46,26 @@ pub async fn flatten_and_push_logs( let log_source = req .headers() .get(LOG_SOURCE_KEY) - .map(|header| header.to_str().unwrap_or_default()) + .map(|h| h.to_str().unwrap_or("")) + .map(LogSource::from) .unwrap_or_default(); - if log_source == LOG_SOURCE_KINESIS { - let json = kinesis::flatten_kinesis_logs(&body); - for record in json.iter() { - let body: Bytes = serde_json::to_vec(record).unwrap().into(); - push_logs(stream_name, &req, &body).await?; + + match log_source { + LogSource::Kinesis => { + let json = kinesis::flatten_kinesis_logs(&body); + for record in json.iter() { + let body: Bytes = serde_json::to_vec(record).unwrap().into(); + push_logs(stream_name, &req, &body, &LogSource::default()).await?; + } + } + LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => { + return Err(PostError::Invalid(anyhow!( + "Please use endpoints `/v1/logs` for otel logs, `/v1/metrics` for otel metrics and `/v1/traces` for otel traces" + ))); + } + _ => { + push_logs(stream_name, &req, &body, &log_source).await?; } - } else { - push_logs(stream_name, &req, &body).await?; } Ok(()) } @@ -65,6 +74,7 @@ pub async fn push_logs( stream_name: &str, req: &HttpRequest, body: &Bytes, + log_source: &LogSource, ) -> Result<(), PostError> { let time_partition = STREAM_INFO.get_time_partition(stream_name)?; let time_partition_limit = STREAM_INFO.get_time_partition_limit(stream_name)?; @@ -88,6 +98,7 @@ pub async fn push_logs( &HashMap::new(), size, schema_version, + log_source, ) .await?; } else { @@ -97,6 +108,7 @@ pub async fn push_logs( None, custom_partition.as_ref(), schema_version, + log_source, )?; let custom_partition = custom_partition.unwrap(); let custom_partition_list = custom_partition.split(',').collect::>(); @@ -116,6 +128,7 @@ pub async fn push_logs( &custom_partition_values, size, schema_version, + log_source, ) .await?; } @@ -127,6 +140,7 @@ pub async fn push_logs( time_partition_limit, None, schema_version, + log_source, )?; for value in data { parsed_timestamp = get_parsed_timestamp(&value, time_partition.as_ref().unwrap())?; @@ -141,6 +155,7 @@ pub async fn push_logs( &HashMap::new(), size, schema_version, + log_source, ) .await?; } @@ -151,6 +166,7 @@ pub async fn push_logs( time_partition_limit, custom_partition.as_ref(), schema_version, + log_source, )?; let custom_partition = custom_partition.unwrap(); let custom_partition_list = custom_partition.split(',').collect::>(); @@ -171,6 +187,7 @@ pub async fn push_logs( &custom_partition_values, size, schema_version, + log_source, ) .await?; } @@ -190,6 +207,7 @@ pub async fn create_process_record_batch( custom_partition_values: &HashMap, origin_size: u64, schema_version: SchemaVersion, + log_source: &LogSource, ) -> Result<(), PostError> { let (rb, is_first_event) = get_stream_schema( stream_name, @@ -198,6 +216,7 @@ pub async fn create_process_record_batch( static_schema_flag, time_partition, schema_version, + log_source, )?; event::Event { rb, @@ -223,6 +242,7 @@ pub fn get_stream_schema( static_schema_flag: Option<&String>, time_partition: Option<&String>, schema_version: SchemaVersion, + log_source: &LogSource, ) -> Result<(arrow_array::RecordBatch, bool), PostError> { let hash_map = STREAM_INFO.read().unwrap(); let schema = hash_map @@ -237,6 +257,7 @@ pub fn get_stream_schema( static_schema_flag, time_partition, schema_version, + log_source, ) } @@ -247,6 +268,7 @@ pub fn into_event_batch( static_schema_flag: Option<&String>, time_partition: Option<&String>, schema_version: SchemaVersion, + log_source: &LogSource, ) -> Result<(arrow_array::RecordBatch, bool), PostError> { let tags = collect_labelled_headers(req, PREFIX_TAGS, SEPARATOR)?; let metadata = collect_labelled_headers(req, PREFIX_META, SEPARATOR)?; @@ -255,8 +277,13 @@ pub fn into_event_batch( tags, metadata, }; - let (rb, is_first) = - event.into_recordbatch(&schema, static_schema_flag, time_partition, schema_version)?; + let (rb, is_first) = event.into_recordbatch( + &schema, + static_schema_flag, + time_partition, + schema_version, + log_source, + )?; Ok((rb, is_first)) } diff --git a/src/handlers/http/otel/opentelemetry.proto.common.v1.rs b/src/handlers/http/otel/opentelemetry.proto.common.v1.rs deleted file mode 100644 index bc40d0720..000000000 --- a/src/handlers/http/otel/opentelemetry.proto.common.v1.rs +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - - // This file was generated by protoc-gen-rust-protobuf. The file was edited after the generation. - // All the repeated fields were changed to Option> and the `oneof` fields were changed to Option. - - use serde::{Deserialize, Serialize}; - #[derive(Serialize, Deserialize, Debug, Clone)] - /// AnyValue is used to represent any type of attribute value. AnyValue may contain a - /// primitive value such as a string or integer or it may contain an arbitrary nested - /// object containing arrays, key-value lists and primitives. - pub struct AnyValue { - /// The value is one of the listed fields. It is valid for all values to be unspecified - /// in which case this AnyValue is considered to be "empty". - pub value: Value, - } - - #[derive(Serialize, Deserialize, Debug, Clone)] - pub struct Value { - #[serde(rename = "stringValue")] - pub str_val: Option, - #[serde(rename = "boolValue")] - pub bool_val: Option, - #[serde(rename = "intValue")] - pub int_val: Option, - #[serde(rename = "doubleValue")] - pub double_val: Option, - #[serde(rename = "arrayValue")] - pub array_val: Option, - #[serde(rename = "keyVauleList")] - pub kv_list_val: Option, - #[serde(rename = "bytesValue")] - pub bytes_val: Option, - } - - #[derive(Serialize, Deserialize, Debug, Clone)] - /// ArrayValue is a list of AnyValue messages. We need ArrayValue as a message - /// since oneof in AnyValue does not allow repeated fields. - pub struct ArrayValue { - /// Array of values. The array may be empty (contain 0 elements). - pub values: Vec, - } - - #[derive(Serialize, Deserialize, Debug, Clone)] - /// KeyValueList is a list of KeyValue messages. We need KeyValueList as a message - /// since `oneof` in AnyValue does not allow repeated fields. Everywhere else where we need - /// a list of KeyValue messages (e.g. in Span) we use `repeated KeyValue` directly to - /// avoid unnecessary extra wrapping (which slows down the protocol). The 2 approaches - /// are semantically equivalent. - pub struct KeyValueList { - /// A collection of key/value pairs of key-value pairs. The list may be empty (may - /// contain 0 elements). - /// The keys MUST be unique (it is not allowed to have more than one - /// value with the same key). - pub values: Vec, - } - - #[derive(Serialize, Deserialize, Debug, Clone)] - /// KeyValue is a key-value pair that is used to store Span attributes, Link - /// attributes, etc. - pub struct KeyValue { - pub key: String, - pub value: Option, - } - - #[derive(Serialize, Deserialize, Debug)] - /// InstrumentationScope is a message representing the instrumentation scope information - /// such as the fully qualified name and version. - pub struct InstrumentationScope { - /// An empty instrumentation scope name means the name is unknown. - pub name: Option, - pub version: Option, - /// Additional attributes that describe the scope. \[Optional\]. - /// Attribute keys MUST be unique (it is not allowed to have more than one - /// attribute with the same key). - pub attributes: Option>, - #[serde(rename = "droppedAttributesCount")] - pub dropped_attributes_count: Option, - } - \ No newline at end of file diff --git a/src/handlers/http/otel/opentelemetry.proto.logs.v1.rs b/src/handlers/http/otel/opentelemetry.proto.logs.v1.rs deleted file mode 100644 index dc63286e3..000000000 --- a/src/handlers/http/otel/opentelemetry.proto.logs.v1.rs +++ /dev/null @@ -1,291 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -// This file was generated by protoc-gen-rust-protobuf. The file was edited after the generation. - // All the repeated fields were changed to Option>. - - use crate::handlers::http::otel::proto::common::v1::InstrumentationScope; - use crate::handlers::http::otel::proto::common::v1::KeyValue; - use crate::handlers::http::otel::proto::common::v1::Value; - use crate::handlers::http::otel::proto::resource::v1::Resource; - use serde::{Deserialize, Serialize}; - - #[derive(Serialize, Deserialize, Debug)] - /// LogsData represents the logs data that can be stored in a persistent storage, - /// OR can be embedded by other protocols that transfer OTLP logs data but do not - /// implement the OTLP protocol. - /// - /// The main difference between this message and collector protocol is that - /// in this message there will not be any "control" or "metadata" specific to - /// OTLP protocol. - /// - /// When new fields are added into this message, the OTLP request MUST be updated - /// as well. - pub struct LogsData { - /// An array of ResourceLogs. - /// For data coming from a single resource this array will typically contain - /// one element. Intermediary nodes that receive data from multiple origins - /// typically batch the data before forwarding further and in that case this - /// array will contain multiple elements. - #[serde(rename = "resourceLogs")] - pub resource_logs: Option>, - } - - #[derive(Serialize, Deserialize, Debug)] - /// A collection of ScopeLogs from a Resource. - pub struct ResourceLogs { - /// The resource for the logs in this message. - /// If this field is not set then resource info is unknown. - pub resource: Option, - /// A list of ScopeLogs that originate from a resource. - #[serde(rename = "scopeLogs")] - pub scope_logs: Option>, - /// This schema_url applies to the data in the "resource" field. It does not apply - /// to the data in the "scope_logs" field which have their own schema_url field. - #[serde(rename = "schemaUrl")] - pub schema_url: Option, - } - - #[derive(Serialize, Deserialize, Debug)] - /// A collection of Logs produced by a Scope. - pub struct ScopeLogs { - /// The instrumentation scope information for the logs in this message. - /// Semantically when InstrumentationScope isn't set, it is equivalent with - /// an empty instrumentation scope name (unknown). - pub scope: Option, - /// A list of log records. - #[serde(rename = "logRecords")] - pub log_records: Vec, - /// This schema_url applies to all logs in the "logs" field. - #[serde(rename = "schemaUrl")] - pub schema_url: Option, - } - - #[derive(Serialize, Deserialize, Debug)] - /// A log record according to OpenTelemetry Log Data Model: - /// - pub struct LogRecord { - /// time_unix_nano is the time when the event occurred. - /// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. - /// Value of 0 indicates unknown or missing timestamp. - #[serde(rename = "timeUnixNano")] - pub time_unix_nano: Option, - /// Time when the event was observed by the collection system. - /// For events that originate in OpenTelemetry (e.g. using OpenTelemetry Logging SDK) - /// this timestamp is typically set at the generation time and is equal to Timestamp. - /// For events originating externally and collected by OpenTelemetry (e.g. using - /// Collector) this is the time when OpenTelemetry's code observed the event measured - /// by the clock of the OpenTelemetry code. This field MUST be set once the event is - /// observed by OpenTelemetry. - /// - /// For converting OpenTelemetry log data to formats that support only one timestamp or - /// when receiving OpenTelemetry log data by recipients that support only one timestamp - /// internally the following logic is recommended: - /// - Use time_unix_nano if it is present, otherwise use observed_time_unix_nano. - /// - /// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. - /// Value of 0 indicates unknown or missing timestamp. - #[serde(rename = "observedTimeUnixNano")] - pub observed_time_unix_nano: Option, - /// Numerical value of the severity, normalized to values described in Log Data Model. - /// \[Optional\]. - #[serde(rename = "severityNumber")] - pub severity_number: Option, - /// The severity text (also known as log level). The original string representation as - /// it is known at the source. \[Optional\]. - #[serde(rename = "severityText")] - pub severity_text: Option, - /// A value containing the body of the log record. Can be for example a human-readable - /// string message (including multi-line) describing the event in a free form or it can - /// be a structured data composed of arrays and maps of other values. \[Optional\]. - pub body: Option, - /// Additional attributes that describe the specific event occurrence. \[Optional\]. - /// Attribute keys MUST be unique (it is not allowed to have more than one - /// attribute with the same key). - pub attributes: Option>, - #[serde(rename = "droppedAttributesCount")] - pub dropped_attributes_count: Option, - /// Flags, a bit field. 8 least significant bits are the trace flags as - /// defined in W3C Trace Context specification. 24 most significant bits are reserved - /// and must be set to 0. Readers must not assume that 24 most significant bits - /// will be zero and must correctly mask the bits when reading 8-bit trace flag (use - /// flags & LOG_RECORD_FLAGS_TRACE_FLAGS_MASK). \[Optional\]. - pub flags: Option, - /// A unique identifier for a trace. All logs from the same trace share - /// the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes OR - /// of length other than 16 bytes is considered invalid (empty string in OTLP/JSON - /// is zero-length and thus is also invalid). - /// - /// This field is optional. - /// - /// The receivers SHOULD assume that the log record is not associated with a - /// trace if any of the following is true: - /// - the field is not present, - /// - the field contains an invalid value. - #[serde(rename = "traceId")] - pub trace_id: Option, - /// A unique identifier for a span within a trace, assigned when the span - /// is created. The ID is an 8-byte array. An ID with all zeroes OR of length - /// other than 8 bytes is considered invalid (empty string in OTLP/JSON - /// is zero-length and thus is also invalid). - /// - /// This field is optional. If the sender specifies a valid span_id then it SHOULD also - /// specify a valid trace_id. - /// - /// The receivers SHOULD assume that the log record is not associated with a - /// span if any of the following is true: - /// - the field is not present, - /// - the field contains an invalid value. - #[serde(rename = "spanId")] - pub span_id: Option, - } - /// Possible values for LogRecord.SeverityNumber. - #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] - #[repr(i32)] - pub enum SeverityNumber { - /// UNSPECIFIED is the default SeverityNumber, it MUST NOT be used. - Unspecified = 0, - Trace = 1, - Trace2 = 2, - Trace3 = 3, - Trace4 = 4, - Debug = 5, - Debug2 = 6, - Debug3 = 7, - Debug4 = 8, - Info = 9, - Info2 = 10, - Info3 = 11, - Info4 = 12, - Warn = 13, - Warn2 = 14, - Warn3 = 15, - Warn4 = 16, - Error = 17, - Error2 = 18, - Error3 = 19, - Error4 = 20, - Fatal = 21, - Fatal2 = 22, - Fatal3 = 23, - Fatal4 = 24, - } - impl SeverityNumber { - /// String value of the enum field names used in the ProtoBuf definition. - /// - /// The values are not transformed in any way and thus are considered stable - /// (if the ProtoBuf definition does not change) and safe for programmatic use. - pub fn as_str_name(severity_number: i32) -> &'static str { - match severity_number { - 0 => "SEVERITY_NUMBER_UNSPECIFIED", - 1 => "SEVERITY_NUMBER_TRACE", - 2 => "SEVERITY_NUMBER_TRACE2", - 3 => "SEVERITY_NUMBER_TRACE3", - 4 => "SEVERITY_NUMBER_TRACE4", - 5 => "SEVERITY_NUMBER_DEBUG", - 6 => "SEVERITY_NUMBER_DEBUG2", - 7 => "SEVERITY_NUMBER_DEBUG3", - 8 => "SEVERITY_NUMBER_DEBUG4", - 9 => "SEVERITY_NUMBER_INFO", - 10 => "SEVERITY_NUMBER_INFO2", - 11 => "SEVERITY_NUMBER_INFO3", - 12 => "SEVERITY_NUMBER_INFO4", - 13 => "SEVERITY_NUMBER_WARN", - 14 => "SEVERITY_NUMBER_WARN2", - 15 => "SEVERITY_NUMBER_WARN3", - 16 => "SEVERITY_NUMBER_WARN4", - 17 => "SEVERITY_NUMBER_ERROR", - 18 => "SEVERITY_NUMBER_ERROR2", - 19 => "SEVERITY_NUMBER_ERROR3", - 20 => "SEVERITY_NUMBER_ERROR4", - 21 => "SEVERITY_NUMBER_FATAL", - 22 => "SEVERITY_NUMBER_FATAL2", - 23 => "SEVERITY_NUMBER_FATAL3", - 24 => "SEVERITY_NUMBER_FATAL4", - _ => "Invalid severity number", - } - } - /// Creates an enum from field names used in the ProtoBuf definition. - pub fn from_str_name(value: &str) -> ::core::option::Option { - match value { - "SEVERITY_NUMBER_UNSPECIFIED" => Some(Self::Unspecified), - "SEVERITY_NUMBER_TRACE" => Some(Self::Trace), - "SEVERITY_NUMBER_TRACE2" => Some(Self::Trace2), - "SEVERITY_NUMBER_TRACE3" => Some(Self::Trace3), - "SEVERITY_NUMBER_TRACE4" => Some(Self::Trace4), - "SEVERITY_NUMBER_DEBUG" => Some(Self::Debug), - "SEVERITY_NUMBER_DEBUG2" => Some(Self::Debug2), - "SEVERITY_NUMBER_DEBUG3" => Some(Self::Debug3), - "SEVERITY_NUMBER_DEBUG4" => Some(Self::Debug4), - "SEVERITY_NUMBER_INFO" => Some(Self::Info), - "SEVERITY_NUMBER_INFO2" => Some(Self::Info2), - "SEVERITY_NUMBER_INFO3" => Some(Self::Info3), - "SEVERITY_NUMBER_INFO4" => Some(Self::Info4), - "SEVERITY_NUMBER_WARN" => Some(Self::Warn), - "SEVERITY_NUMBER_WARN2" => Some(Self::Warn2), - "SEVERITY_NUMBER_WARN3" => Some(Self::Warn3), - "SEVERITY_NUMBER_WARN4" => Some(Self::Warn4), - "SEVERITY_NUMBER_ERROR" => Some(Self::Error), - "SEVERITY_NUMBER_ERROR2" => Some(Self::Error2), - "SEVERITY_NUMBER_ERROR3" => Some(Self::Error3), - "SEVERITY_NUMBER_ERROR4" => Some(Self::Error4), - "SEVERITY_NUMBER_FATAL" => Some(Self::Fatal), - "SEVERITY_NUMBER_FATAL2" => Some(Self::Fatal2), - "SEVERITY_NUMBER_FATAL3" => Some(Self::Fatal3), - "SEVERITY_NUMBER_FATAL4" => Some(Self::Fatal4), - _ => None, - } - } - } - /// LogRecordFlags is defined as a protobuf 'uint32' type and is to be used as - /// bit-fields. Each non-zero value defined in this enum is a bit-mask. - /// To extract the bit-field, for example, use an expression like: - /// - /// (logRecord.flags & LOG_RECORD_FLAGS_TRACE_FLAGS_MASK) - /// - #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] - #[repr(i32)] - pub enum LogRecordFlags { - /// The zero value for the enum. Should not be used for comparisons. - /// Instead use bitwise "and" with the appropriate mask as shown above. - DoNotUse = 0, - /// Bits 0-7 are used for trace flags. - TraceFlagsMask = 255, - } - impl LogRecordFlags { - /// String value of the enum field names used in the ProtoBuf definition. - /// - /// The values are not transformed in any way and thus are considered stable - /// (if the ProtoBuf definition does not change) and safe for programmatic use. - pub fn as_str_name(flag: u32) -> &'static str { - match flag { - 0 => "LOG_RECORD_FLAGS_DO_NOT_USE", - 255 => "LOG_RECORD_FLAGS_TRACE_FLAGS_MASK", - _ => "Invalid flag", - } - } - /// Creates an enum from field names used in the ProtoBuf definition. - pub fn from_str_name(value: &str) -> ::core::option::Option { - match value { - "LOG_RECORD_FLAGS_DO_NOT_USE" => Some(Self::DoNotUse), - "LOG_RECORD_FLAGS_TRACE_FLAGS_MASK" => Some(Self::TraceFlagsMask), - _ => None, - } - } - } - \ No newline at end of file diff --git a/src/handlers/http/otel/opentelemetry.proto.resource.v1.rs b/src/handlers/http/otel/opentelemetry.proto.resource.v1.rs deleted file mode 100644 index 2f102628a..000000000 --- a/src/handlers/http/otel/opentelemetry.proto.resource.v1.rs +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ -// This file was generated by protoc-gen-rust-protobuf. The file was edited after the generation. -// All the repeated fields were changed to Option> - -use crate::handlers::http::otel::proto::common::v1::KeyValue; -use serde::{Deserialize, Serialize}; - -#[derive(Serialize, Deserialize, Debug)] -/// Resource information. -pub struct Resource { - /// Set of attributes that describe the resource. - /// Attribute keys MUST be unique (it is not allowed to have more than one - /// attribute with the same key). - #[serde(rename = "attributes")] - pub attributes: Option>, - /// dropped_attributes_count is the number of dropped attributes. If the value is 0, then - /// no attributes were dropped. - #[serde(rename = "droppedAttributesCount")] - pub dropped_attributes_count: Option, -} diff --git a/src/handlers/http/otel/opentelemetry/proto/README.md b/src/handlers/http/otel/opentelemetry/proto/README.md deleted file mode 100644 index d0281330e..000000000 --- a/src/handlers/http/otel/opentelemetry/proto/README.md +++ /dev/null @@ -1,2 +0,0 @@ -The following protobuf definitions are vendored from: -https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto diff --git a/src/handlers/http/otel/opentelemetry/proto/common/v1/common.proto b/src/handlers/http/otel/opentelemetry/proto/common/v1/common.proto deleted file mode 100644 index f7ee8f265..000000000 --- a/src/handlers/http/otel/opentelemetry/proto/common/v1/common.proto +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; - -package opentelemetry.proto.common.v1; - -option csharp_namespace = "OpenTelemetry.Proto.Common.V1"; -option java_multiple_files = true; -option java_package = "io.opentelemetry.proto.common.v1"; -option java_outer_classname = "CommonProto"; -option go_package = "go.opentelemetry.io/proto/otlp/common/v1"; - -// AnyValue is used to represent any type of attribute value. AnyValue may contain a -// primitive value such as a string or integer or it may contain an arbitrary nested -// object containing arrays, key-value lists and primitives. -message AnyValue { - // The value is one of the listed fields. It is valid for all values to be unspecified - // in which case this AnyValue is considered to be "empty". - oneof value { - string string_value = 1; - bool bool_value = 2; - int64 int_value = 3; - double double_value = 4; - ArrayValue array_value = 5; - KeyValueList kvlist_value = 6; - bytes bytes_value = 7; - } -} - -// ArrayValue is a list of AnyValue messages. We need ArrayValue as a message -// since oneof in AnyValue does not allow repeated fields. -message ArrayValue { - // Array of values. The array may be empty (contain 0 elements). - repeated AnyValue values = 1; -} - -// KeyValueList is a list of KeyValue messages. We need KeyValueList as a message -// since `oneof` in AnyValue does not allow repeated fields. Everywhere else where we need -// a list of KeyValue messages (e.g. in Span) we use `repeated KeyValue` directly to -// avoid unnecessary extra wrapping (which slows down the protocol). The 2 approaches -// are semantically equivalent. -message KeyValueList { - // A collection of key/value pairs of key-value pairs. The list may be empty (may - // contain 0 elements). - // The keys MUST be unique (it is not allowed to have more than one - // value with the same key). - repeated KeyValue values = 1; -} - -// KeyValue is a key-value pair that is used to store Span attributes, Link -// attributes, etc. -message KeyValue { - string key = 1; - AnyValue value = 2; -} - -// InstrumentationScope is a message representing the instrumentation scope information -// such as the fully qualified name and version. -message InstrumentationScope { - // An empty instrumentation scope name means the name is unknown. - string name = 1; - string version = 2; - - // Additional attributes that describe the scope. [Optional]. - // Attribute keys MUST be unique (it is not allowed to have more than one - // attribute with the same key). - repeated KeyValue attributes = 3; - uint32 dropped_attributes_count = 4; -} diff --git a/src/handlers/http/otel/opentelemetry/proto/logs/v1/logs.proto b/src/handlers/http/otel/opentelemetry/proto/logs/v1/logs.proto deleted file mode 100644 index 0b4b64972..000000000 --- a/src/handlers/http/otel/opentelemetry/proto/logs/v1/logs.proto +++ /dev/null @@ -1,203 +0,0 @@ -// Copyright 2020, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; - -package opentelemetry.proto.logs.v1; - -import "opentelemetry/proto/common/v1/common.proto"; -import "opentelemetry/proto/resource/v1/resource.proto"; - -option csharp_namespace = "OpenTelemetry.Proto.Logs.V1"; -option java_multiple_files = true; -option java_package = "io.opentelemetry.proto.logs.v1"; -option java_outer_classname = "LogsProto"; -option go_package = "go.opentelemetry.io/proto/otlp/logs/v1"; - -// LogsData represents the logs data that can be stored in a persistent storage, -// OR can be embedded by other protocols that transfer OTLP logs data but do not -// implement the OTLP protocol. -// -// The main difference between this message and collector protocol is that -// in this message there will not be any "control" or "metadata" specific to -// OTLP protocol. -// -// When new fields are added into this message, the OTLP request MUST be updated -// as well. -message LogsData { - // An array of ResourceLogs. - // For data coming from a single resource this array will typically contain - // one element. Intermediary nodes that receive data from multiple origins - // typically batch the data before forwarding further and in that case this - // array will contain multiple elements. - repeated ResourceLogs resource_logs = 1; -} - -// A collection of ScopeLogs from a Resource. -message ResourceLogs { - reserved 1000; - - // The resource for the logs in this message. - // If this field is not set then resource info is unknown. - opentelemetry.proto.resource.v1.Resource resource = 1; - - // A list of ScopeLogs that originate from a resource. - repeated ScopeLogs scope_logs = 2; - - // This schema_url applies to the data in the "resource" field. It does not apply - // to the data in the "scope_logs" field which have their own schema_url field. - string schema_url = 3; -} - -// A collection of Logs produced by a Scope. -message ScopeLogs { - // The instrumentation scope information for the logs in this message. - // Semantically when InstrumentationScope isn't set, it is equivalent with - // an empty instrumentation scope name (unknown). - opentelemetry.proto.common.v1.InstrumentationScope scope = 1; - - // A list of log records. - repeated LogRecord log_records = 2; - - // This schema_url applies to all logs in the "logs" field. - string schema_url = 3; -} - -// Possible values for LogRecord.SeverityNumber. -enum SeverityNumber { - // UNSPECIFIED is the default SeverityNumber, it MUST NOT be used. - SEVERITY_NUMBER_UNSPECIFIED = 0; - SEVERITY_NUMBER_TRACE = 1; - SEVERITY_NUMBER_TRACE2 = 2; - SEVERITY_NUMBER_TRACE3 = 3; - SEVERITY_NUMBER_TRACE4 = 4; - SEVERITY_NUMBER_DEBUG = 5; - SEVERITY_NUMBER_DEBUG2 = 6; - SEVERITY_NUMBER_DEBUG3 = 7; - SEVERITY_NUMBER_DEBUG4 = 8; - SEVERITY_NUMBER_INFO = 9; - SEVERITY_NUMBER_INFO2 = 10; - SEVERITY_NUMBER_INFO3 = 11; - SEVERITY_NUMBER_INFO4 = 12; - SEVERITY_NUMBER_WARN = 13; - SEVERITY_NUMBER_WARN2 = 14; - SEVERITY_NUMBER_WARN3 = 15; - SEVERITY_NUMBER_WARN4 = 16; - SEVERITY_NUMBER_ERROR = 17; - SEVERITY_NUMBER_ERROR2 = 18; - SEVERITY_NUMBER_ERROR3 = 19; - SEVERITY_NUMBER_ERROR4 = 20; - SEVERITY_NUMBER_FATAL = 21; - SEVERITY_NUMBER_FATAL2 = 22; - SEVERITY_NUMBER_FATAL3 = 23; - SEVERITY_NUMBER_FATAL4 = 24; -} - -// LogRecordFlags is defined as a protobuf 'uint32' type and is to be used as -// bit-fields. Each non-zero value defined in this enum is a bit-mask. -// To extract the bit-field, for example, use an expression like: -// -// (logRecord.flags & LOG_RECORD_FLAGS_TRACE_FLAGS_MASK) -// -enum LogRecordFlags { - // The zero value for the enum. Should not be used for comparisons. - // Instead use bitwise "and" with the appropriate mask as shown above. - LOG_RECORD_FLAGS_DO_NOT_USE = 0; - - // Bits 0-7 are used for trace flags. - LOG_RECORD_FLAGS_TRACE_FLAGS_MASK = 0x000000FF; - - // Bits 8-31 are reserved for future use. -} - -// A log record according to OpenTelemetry Log Data Model: -// https://github.com/open-telemetry/oteps/blob/main/text/logs/0097-log-data-model.md -message LogRecord { - reserved 4; - - // time_unix_nano is the time when the event occurred. - // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. - // Value of 0 indicates unknown or missing timestamp. - fixed64 time_unix_nano = 1; - - // Time when the event was observed by the collection system. - // For events that originate in OpenTelemetry (e.g. using OpenTelemetry Logging SDK) - // this timestamp is typically set at the generation time and is equal to Timestamp. - // For events originating externally and collected by OpenTelemetry (e.g. using - // Collector) this is the time when OpenTelemetry's code observed the event measured - // by the clock of the OpenTelemetry code. This field MUST be set once the event is - // observed by OpenTelemetry. - // - // For converting OpenTelemetry log data to formats that support only one timestamp or - // when receiving OpenTelemetry log data by recipients that support only one timestamp - // internally the following logic is recommended: - // - Use time_unix_nano if it is present, otherwise use observed_time_unix_nano. - // - // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. - // Value of 0 indicates unknown or missing timestamp. - fixed64 observed_time_unix_nano = 11; - - // Numerical value of the severity, normalized to values described in Log Data Model. - // [Optional]. - SeverityNumber severity_number = 2; - - // The severity text (also known as log level). The original string representation as - // it is known at the source. [Optional]. - string severity_text = 3; - - // A value containing the body of the log record. Can be for example a human-readable - // string message (including multi-line) describing the event in a free form or it can - // be a structured data composed of arrays and maps of other values. [Optional]. - opentelemetry.proto.common.v1.AnyValue body = 5; - - // Additional attributes that describe the specific event occurrence. [Optional]. - // Attribute keys MUST be unique (it is not allowed to have more than one - // attribute with the same key). - repeated opentelemetry.proto.common.v1.KeyValue attributes = 6; - uint32 dropped_attributes_count = 7; - - // Flags, a bit field. 8 least significant bits are the trace flags as - // defined in W3C Trace Context specification. 24 most significant bits are reserved - // and must be set to 0. Readers must not assume that 24 most significant bits - // will be zero and must correctly mask the bits when reading 8-bit trace flag (use - // flags & LOG_RECORD_FLAGS_TRACE_FLAGS_MASK). [Optional]. - fixed32 flags = 8; - - // A unique identifier for a trace. All logs from the same trace share - // the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes OR - // of length other than 16 bytes is considered invalid (empty string in OTLP/JSON - // is zero-length and thus is also invalid). - // - // This field is optional. - // - // The receivers SHOULD assume that the log record is not associated with a - // trace if any of the following is true: - // - the field is not present, - // - the field contains an invalid value. - bytes trace_id = 9; - - // A unique identifier for a span within a trace, assigned when the span - // is created. The ID is an 8-byte array. An ID with all zeroes OR of length - // other than 8 bytes is considered invalid (empty string in OTLP/JSON - // is zero-length and thus is also invalid). - // - // This field is optional. If the sender specifies a valid span_id then it SHOULD also - // specify a valid trace_id. - // - // The receivers SHOULD assume that the log record is not associated with a - // span if any of the following is true: - // - the field is not present, - // - the field contains an invalid value. - bytes span_id = 10; -} diff --git a/src/handlers/http/otel/opentelemetry/proto/resource/v1/resource.proto b/src/handlers/http/otel/opentelemetry/proto/resource/v1/resource.proto deleted file mode 100644 index 6637560bc..000000000 --- a/src/handlers/http/otel/opentelemetry/proto/resource/v1/resource.proto +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; - -package opentelemetry.proto.resource.v1; - -import "opentelemetry/proto/common/v1/common.proto"; - -option csharp_namespace = "OpenTelemetry.Proto.Resource.V1"; -option java_multiple_files = true; -option java_package = "io.opentelemetry.proto.resource.v1"; -option java_outer_classname = "ResourceProto"; -option go_package = "go.opentelemetry.io/proto/otlp/resource/v1"; - -// Resource information. -message Resource { - // Set of attributes that describe the resource. - // Attribute keys MUST be unique (it is not allowed to have more than one - // attribute with the same key). - repeated opentelemetry.proto.common.v1.KeyValue attributes = 1; - - // dropped_attributes_count is the number of dropped attributes. If the value is 0, then - // no attributes were dropped. - uint32 dropped_attributes_count = 2; -} diff --git a/src/kafka.rs b/src/kafka.rs index f65b954c6..b917eca83 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -33,6 +33,7 @@ use std::sync::Arc; use std::{collections::HashMap, fmt::Debug}; use tracing::{debug, error, info, warn}; +use crate::event::format::LogSource; use crate::option::CONFIG; use crate::{ event::{ @@ -90,38 +91,6 @@ pub enum KafkaError { DoNotPrintError, } -// // Commented out functions -// // Might come in handy later -// fn parse_auto_env(key: &'static str) -> Result, ::Err> -// where -// T: FromStr, -// { -// Ok(if let Ok(val) = env::var(key) { -// Some(val.parse::()?) -// } else { -// None -// }) -// } - -// fn handle_duration_env_prefix(key: &'static str) -> Result, ParseIntError> { -// if let Ok(raw_secs) = env::var(format!("{key}_S")) { -// Ok(Some(Duration::from_secs(u64::from_str(&raw_secs)?))) -// } else if let Ok(raw_secs) = env::var(format!("{key}_M")) { -// Ok(Some(Duration::from_secs(u64::from_str(&raw_secs)? * 60))) -// } else { -// Ok(None) -// } -// } - -// fn parse_i32_env(key: &'static str) -> Result, KafkaError> { -// parse_auto_env::(key).map_err(|raw| KafkaError::ParseIntError(key, raw)) -// } - -// fn parse_duration_env_prefixed(key_prefix: &'static str) -> Result, KafkaError> { -// handle_duration_env_prefix(key_prefix) -// .map_err(|raw| KafkaError::ParseDurationError(key_prefix, raw)) -// } - fn setup_consumer() -> Result<(StreamConsumer, Vec), KafkaError> { if let Some(topics) = &CONFIG.parseable.kafka_topics { // topics can be a comma separated list of topics to subscribe to @@ -147,10 +116,6 @@ fn setup_consumer() -> Result<(StreamConsumer, Vec), KafkaError> { conf.set("client.id", val); } - // if let Some(val) = get_flag_env_val("a")? { - // conf.set("api.version.request", val.to_string()); - // } - if let Some(ssl_protocol) = CONFIG.parseable.kafka_security_protocol.as_ref() { conf.set("security.protocol", serde_json::to_string(&ssl_protocol)?); } @@ -234,6 +199,7 @@ async fn ingest_message(msg: BorrowedMessage<'_>) -> Result<(), KafkaError> { static_schema_flag.as_ref(), time_partition.as_ref(), schema_version, + &LogSource::default(), ) .map_err(|err| KafkaError::PostError(PostError::CustomError(err.to_string())))?; diff --git a/src/lib.rs b/src/lib.rs index 5c8e09274..951bb432a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,6 +33,7 @@ pub mod metrics; pub mod migration; mod oidc; pub mod option; +pub mod otel; mod query; pub mod rbac; mod response; @@ -43,7 +44,6 @@ pub mod sync; pub mod users; mod utils; mod validator; - pub use handlers::http::modal::{ ingest_server::IngestServer, query_server::QueryServer, server::Server, ParseableServer, }; diff --git a/src/handlers/http/otel/proto.rs b/src/otel.rs similarity index 64% rename from src/handlers/http/otel/proto.rs rename to src/otel.rs index 9322bfcc5..11f98d89e 100644 --- a/src/handlers/http/otel/proto.rs +++ b/src/otel.rs @@ -16,23 +16,7 @@ * */ -/// Common types used across all event types. -pub mod common { - pub mod v1 { - include!("opentelemetry.proto.common.v1.rs"); - } -} - -/// Generated types used for logs. -pub mod logs { - pub mod v1 { - include!("opentelemetry.proto.logs.v1.rs"); - } -} - -/// Generated types used in resources. -pub mod resource { - pub mod v1 { - include!("opentelemetry.proto.resource.v1.rs"); - } -} +pub mod logs; +pub mod metrics; +pub mod otel_utils; +pub mod traces; diff --git a/src/otel/logs.rs b/src/otel/logs.rs new file mode 100644 index 000000000..fcdffe1af --- /dev/null +++ b/src/otel/logs.rs @@ -0,0 +1,162 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use opentelemetry_proto::tonic::logs::v1::LogRecord; +use opentelemetry_proto::tonic::logs::v1::LogsData; +use opentelemetry_proto::tonic::logs::v1::ScopeLogs; +use opentelemetry_proto::tonic::logs::v1::SeverityNumber; +use serde_json::Value; +use std::collections::BTreeMap; + +use super::otel_utils::collect_json_from_values; +use super::otel_utils::convert_epoch_nano_to_timestamp; +use super::otel_utils::insert_attributes; + +/// otel log event has severity number +/// there is a mapping of severity number to severity text provided in proto +/// this function fetches the severity text from the severity number +/// and adds it to the flattened json +fn flatten_severity(severity_number: i32) -> BTreeMap { + let mut severity_json: BTreeMap = BTreeMap::new(); + severity_json.insert( + "severity_number".to_string(), + Value::Number(severity_number.into()), + ); + let severity = SeverityNumber::try_from(severity_number).unwrap(); + severity_json.insert( + "severity_text".to_string(), + Value::String(severity.as_str_name().to_string()), + ); + severity_json +} + +/// this function flattens the `LogRecord` object +/// and returns a `BTreeMap` of the flattened json +/// this function is called recursively for each log record object in the otel logs +pub fn flatten_log_record(log_record: &LogRecord) -> BTreeMap { + let mut log_record_json: BTreeMap = BTreeMap::new(); + log_record_json.insert( + "time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + log_record.time_unix_nano as i64, + )), + ); + log_record_json.insert( + "observed_time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + log_record.observed_time_unix_nano as i64, + )), + ); + + log_record_json.extend(flatten_severity(log_record.severity_number)); + + if log_record.body.is_some() { + let body = &log_record.body; + let body_json = collect_json_from_values(body, &"body".to_string()); + for key in body_json.keys() { + log_record_json.insert(key.to_owned(), body_json[key].to_owned()); + } + } + insert_attributes(&mut log_record_json, &log_record.attributes); + log_record_json.insert( + "log_record_dropped_attributes_count".to_string(), + Value::Number(log_record.dropped_attributes_count.into()), + ); + + log_record_json.insert( + "flags".to_string(), + Value::Number((log_record.flags).into()), + ); + log_record_json.insert( + "span_id".to_string(), + Value::String(hex::encode(&log_record.span_id)), + ); + log_record_json.insert( + "trace_id".to_string(), + Value::String(hex::encode(&log_record.trace_id)), + ); + + log_record_json +} + +/// this function flattens the `ScopeLogs` object +/// and returns a `Vec` of `BTreeMap` of the flattened json +fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { + let mut vec_scope_log_json = Vec::new(); + let mut scope_log_json = BTreeMap::new(); + + if let Some(scope) = &scope_log.scope { + scope_log_json.insert("scope_name".to_string(), Value::String(scope.name.clone())); + scope_log_json.insert( + "scope_version".to_string(), + Value::String(scope.version.clone()), + ); + insert_attributes(&mut scope_log_json, &scope.attributes); + scope_log_json.insert( + "scope_dropped_attributes_count".to_string(), + Value::Number(scope.dropped_attributes_count.into()), + ); + } + scope_log_json.insert( + "scope_log_schema_url".to_string(), + Value::String(scope_log.schema_url.clone()), + ); + + for log_record in &scope_log.log_records { + let log_record_json = flatten_log_record(log_record); + let mut combined_json = scope_log_json.clone(); + combined_json.extend(log_record_json); + vec_scope_log_json.push(combined_json); + } + + vec_scope_log_json +} + +/// this function performs the custom flattening of the otel logs +/// and returns a `Vec` of `BTreeMap` of the flattened json +pub fn flatten_otel_logs(message: &LogsData) -> Vec> { + let mut vec_otel_json = Vec::new(); + for record in &message.resource_logs { + let mut resource_log_json = BTreeMap::new(); + + if let Some(resource) = &record.resource { + insert_attributes(&mut resource_log_json, &resource.attributes); + resource_log_json.insert( + "resource_dropped_attributes_count".to_string(), + Value::Number(resource.dropped_attributes_count.into()), + ); + } + + let mut vec_resource_logs_json = Vec::new(); + for scope_log in &record.scope_logs { + vec_resource_logs_json.extend(flatten_scope_log(scope_log)); + } + resource_log_json.insert( + "schema_url".to_string(), + Value::String(record.schema_url.clone()), + ); + + for resource_logs_json in &mut vec_resource_logs_json { + resource_logs_json.extend(resource_log_json.clone()); + } + + vec_otel_json.extend(vec_resource_logs_json); + } + + vec_otel_json +} diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs new file mode 100644 index 000000000..f5aa1c072 --- /dev/null +++ b/src/otel/metrics.rs @@ -0,0 +1,524 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::collections::BTreeMap; + +use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as NumberDataPointValue; +use opentelemetry_proto::tonic::metrics::v1::{ + exemplar::Value as ExemplarValue, exponential_histogram_data_point::Buckets, metric, Exemplar, + ExponentialHistogram, Gauge, Histogram, Metric, MetricsData, NumberDataPoint, Sum, Summary, +}; +use serde_json::Value; + +use super::otel_utils::{ + convert_epoch_nano_to_timestamp, insert_attributes, insert_number_if_some, +}; + +/// otel metrics event has json array for exemplar +/// this function flatten the exemplar json array +/// and returns a `BTreeMap` of the exemplar json +/// this function is reused in all json objects that have exemplar +fn flatten_exemplar(exemplars: &[Exemplar]) -> BTreeMap { + let mut exemplar_json = BTreeMap::new(); + for exemplar in exemplars { + insert_attributes(&mut exemplar_json, &exemplar.filtered_attributes); + exemplar_json.insert( + "exemplar_time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + exemplar.time_unix_nano as i64, + )), + ); + exemplar_json.insert( + "exemplar_span_id".to_string(), + Value::String(hex::encode(&exemplar.span_id)), + ); + exemplar_json.insert( + "exemplar_trace_id".to_string(), + Value::String(hex::encode(&exemplar.trace_id)), + ); + if let Some(value) = &exemplar.value { + match value { + ExemplarValue::AsDouble(double_val) => { + exemplar_json.insert( + "exemplar_value".to_string(), + Value::Number(serde_json::Number::from_f64(*double_val).unwrap()), + ); + } + ExemplarValue::AsInt(int_val) => { + exemplar_json.insert( + "exemplar_value".to_string(), + Value::Number(serde_json::Number::from(*int_val)), + ); + } + } + } + } + exemplar_json +} + +/// otel metrics event has json array for number data points +/// this function flatten the number data points json array +/// and returns a `Vec` of `BTreeMap` of the flattened json +/// this function is reused in all json objects that have number data points +fn flatten_number_data_points(data_points: &[NumberDataPoint]) -> Vec> { + data_points + .iter() + .map(|data_point| { + let mut data_point_json = BTreeMap::new(); + insert_attributes(&mut data_point_json, &data_point.attributes); + data_point_json.insert( + "start_time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + data_point.start_time_unix_nano as i64, + )), + ); + data_point_json.insert( + "time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + data_point.time_unix_nano as i64, + )), + ); + let exemplar_json = flatten_exemplar(&data_point.exemplars); + for (key, value) in exemplar_json { + data_point_json.insert(key, value); + } + data_point_json.extend(flatten_data_point_flags(data_point.flags)); + if let Some(value) = &data_point.value { + match value { + NumberDataPointValue::AsDouble(double_val) => { + data_point_json.insert( + "data_point_value".to_string(), + Value::Number(serde_json::Number::from_f64(*double_val).unwrap()), + ); + } + NumberDataPointValue::AsInt(int_val) => { + data_point_json.insert( + "data_point_value".to_string(), + Value::Number(serde_json::Number::from(*int_val)), + ); + } + } + } + data_point_json + }) + .collect() +} + +/// otel metrics event has json object for gauge +/// each gauge object has json array for data points +/// this function flatten the gauge json object +/// and returns a `Vec` of `BTreeMap` for each data point +fn flatten_gauge(gauge: &Gauge) -> Vec> { + let mut vec_gauge_json = Vec::new(); + let data_points_json = flatten_number_data_points(&gauge.data_points); + for data_point_json in data_points_json { + let mut gauge_json = BTreeMap::new(); + for (key, value) in &data_point_json { + gauge_json.insert(key.clone(), value.clone()); + } + vec_gauge_json.push(gauge_json); + } + vec_gauge_json +} + +/// otel metrics event has json object for sum +/// each sum object has json array for data points +/// this function flatten the sum json object +/// and returns a `Vec` of `BTreeMap` for each data point +fn flatten_sum(sum: &Sum) -> Vec> { + let mut vec_sum_json = Vec::new(); + let data_points_json = flatten_number_data_points(&sum.data_points); + for data_point_json in data_points_json { + let mut sum_json = BTreeMap::new(); + for (key, value) in &data_point_json { + sum_json.insert(key.clone(), value.clone()); + } + vec_sum_json.push(sum_json); + } + let mut sum_json = BTreeMap::new(); + sum_json.extend(flatten_aggregation_temporality(sum.aggregation_temporality)); + sum_json.insert("is_monotonic".to_string(), Value::Bool(sum.is_monotonic)); + for data_point_json in &mut vec_sum_json { + for (key, value) in &sum_json { + data_point_json.insert(key.clone(), value.clone()); + } + } + vec_sum_json +} + +/// otel metrics event has json object for histogram +/// each histogram object has json array for data points +/// this function flatten the histogram json object +/// and returns a `Vec` of `BTreeMap` for each data point +fn flatten_histogram(histogram: &Histogram) -> Vec> { + let mut data_points_json = Vec::new(); + for data_point in &histogram.data_points { + let mut data_point_json = BTreeMap::new(); + insert_attributes(&mut data_point_json, &data_point.attributes); + data_point_json.insert( + "start_time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + data_point.start_time_unix_nano as i64, + )), + ); + data_point_json.insert( + "time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + data_point.time_unix_nano as i64, + )), + ); + data_point_json.insert( + "data_point_count".to_string(), + Value::Number(data_point.count.into()), + ); + insert_number_if_some(&mut data_point_json, "data_point_sum", &data_point.sum); + data_point_json.insert( + "data_point_bucket_counts".to_string(), + Value::Array( + data_point + .bucket_counts + .iter() + .map(|&count| Value::Number(count.into())) + .collect(), + ), + ); + data_point_json.insert( + "data_point_explicit_bounds".to_string(), + Value::Array( + data_point + .explicit_bounds + .iter() + .map(|bound| Value::String(bound.to_string())) + .collect(), + ), + ); + let exemplar_json = flatten_exemplar(&data_point.exemplars); + for (key, value) in exemplar_json { + data_point_json.insert(key.to_string(), value); + } + data_point_json.extend(flatten_data_point_flags(data_point.flags)); + insert_number_if_some(&mut data_point_json, "min", &data_point.min); + insert_number_if_some(&mut data_point_json, "max", &data_point.max); + data_points_json.push(data_point_json); + } + let mut histogram_json = BTreeMap::new(); + histogram_json.extend(flatten_aggregation_temporality( + histogram.aggregation_temporality, + )); + for data_point_json in &mut data_points_json { + for (key, value) in &histogram_json { + data_point_json.insert(key.clone(), value.clone()); + } + } + data_points_json +} + +/// otel metrics event has json object for buckets +/// this function flatten the buckets json object +/// and returns a `BTreeMap` of the flattened json +fn flatten_buckets(bucket: &Buckets) -> BTreeMap { + let mut bucket_json = BTreeMap::new(); + bucket_json.insert("offset".to_string(), Value::Number(bucket.offset.into())); + bucket_json.insert( + "bucket_count".to_string(), + Value::Array( + bucket + .bucket_counts + .iter() + .map(|&count| Value::Number(count.into())) + .collect(), + ), + ); + bucket_json +} + +/// otel metrics event has json object for exponential histogram +/// each exponential histogram object has json array for data points +/// this function flatten the exponential histogram json object +/// and returns a `Vec` of `BTreeMap` for each data point +fn flatten_exp_histogram(exp_histogram: &ExponentialHistogram) -> Vec> { + let mut data_points_json = Vec::new(); + for data_point in &exp_histogram.data_points { + let mut data_point_json = BTreeMap::new(); + insert_attributes(&mut data_point_json, &data_point.attributes); + data_point_json.insert( + "start_time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + data_point.start_time_unix_nano as i64, + )), + ); + data_point_json.insert( + "time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + data_point.time_unix_nano as i64, + )), + ); + data_point_json.insert( + "data_point_count".to_string(), + Value::Number(data_point.count.into()), + ); + insert_number_if_some(&mut data_point_json, "data_point_sum", &data_point.sum); + data_point_json.insert( + "data_point_scale".to_string(), + Value::Number(data_point.scale.into()), + ); + data_point_json.insert( + "data_point_zero_count".to_string(), + Value::Number(data_point.zero_count.into()), + ); + if let Some(positive) = &data_point.positive { + let positive_json = flatten_buckets(positive); + for (key, value) in positive_json { + data_point_json.insert(format!("positive_{}", key), value); + } + } + if let Some(negative) = &data_point.negative { + let negative_json = flatten_buckets(negative); + for (key, value) in negative_json { + data_point_json.insert(format!("negative_{}", key), value); + } + } + let exemplar_json = flatten_exemplar(&data_point.exemplars); + for (key, value) in exemplar_json { + data_point_json.insert(key, value); + } + data_points_json.push(data_point_json); + } + let mut exp_histogram_json = BTreeMap::new(); + exp_histogram_json.extend(flatten_aggregation_temporality( + exp_histogram.aggregation_temporality, + )); + for data_point_json in &mut data_points_json { + for (key, value) in &exp_histogram_json { + data_point_json.insert(key.clone(), value.clone()); + } + } + data_points_json +} + +/// otel metrics event has json object for summary +/// each summary object has json array for data points +/// this function flatten the summary json object +/// and returns a `Vec` of `BTreeMap` for each data point +fn flatten_summary(summary: &Summary) -> Vec> { + let mut data_points_json = Vec::new(); + for data_point in &summary.data_points { + let mut data_point_json = BTreeMap::new(); + insert_attributes(&mut data_point_json, &data_point.attributes); + data_point_json.insert( + "start_time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + data_point.start_time_unix_nano as i64, + )), + ); + data_point_json.insert( + "time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + data_point.time_unix_nano as i64, + )), + ); + data_point_json.insert( + "data_point_count".to_string(), + Value::Number(data_point.count.into()), + ); + data_point_json.insert( + "data_point_sum".to_string(), + Value::Number(serde_json::Number::from_f64(data_point.sum).unwrap()), + ); + data_point_json.insert( + "data_point_quantile_values".to_string(), + Value::Array( + data_point + .quantile_values + .iter() + .map(|quantile_value| { + Value::Object( + vec![ + ( + "quantile", + Value::Number( + serde_json::Number::from_f64(quantile_value.quantile) + .unwrap(), + ), + ), + ( + "value", + Value::Number( + serde_json::Number::from_f64(quantile_value.value).unwrap(), + ), + ), + ] + .into_iter() + .map(|(k, v)| (k.to_string(), v)) + .collect(), + ) + }) + .collect(), + ), + ); + data_points_json.push(data_point_json); + } + data_points_json +} + +/// this function flattens the `Metric` object +/// each metric object has json object for gauge, sum, histogram, exponential histogram, summary +/// this function flatten the metric json object +/// and returns a `Vec` of `BTreeMap` of the flattened json +/// this function is called recursively for each metric record object in the otel metrics event +pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec> { + let mut data_points_json = Vec::new(); + let mut metric_json = BTreeMap::new(); + + match &metrics_record.data { + Some(metric::Data::Gauge(gauge)) => { + data_points_json.extend(flatten_gauge(gauge)); + } + Some(metric::Data::Sum(sum)) => { + data_points_json.extend(flatten_sum(sum)); + } + Some(metric::Data::Histogram(histogram)) => { + data_points_json.extend(flatten_histogram(histogram)); + } + Some(metric::Data::ExponentialHistogram(exp_histogram)) => { + data_points_json.extend(flatten_exp_histogram(exp_histogram)); + } + Some(metric::Data::Summary(summary)) => { + data_points_json.extend(flatten_summary(summary)); + } + None => {} + } + metric_json.insert( + "metric_name".to_string(), + Value::String(metrics_record.name.clone()), + ); + metric_json.insert( + "metric_description".to_string(), + Value::String(metrics_record.description.clone()), + ); + metric_json.insert( + "metric_unit".to_string(), + Value::String(metrics_record.unit.clone()), + ); + insert_attributes(&mut metric_json, &metrics_record.metadata); + for data_point_json in &mut data_points_json { + for (key, value) in &metric_json { + data_point_json.insert(key.clone(), value.clone()); + } + } + if data_points_json.is_empty() { + data_points_json.push(metric_json); + } + data_points_json +} + +/// this function performs the custom flattening of the otel metrics +/// and returns a `Vec` of `BTreeMap` of the flattened json +pub fn flatten_otel_metrics(message: MetricsData) -> Vec> { + let mut vec_otel_json = Vec::new(); + for record in &message.resource_metrics { + let mut resource_metrics_json = BTreeMap::new(); + if let Some(resource) = &record.resource { + insert_attributes(&mut resource_metrics_json, &resource.attributes); + resource_metrics_json.insert( + "resource_dropped_attributes_count".to_string(), + Value::Number(resource.dropped_attributes_count.into()), + ); + } + let mut vec_scope_metrics_json = Vec::new(); + for scope_metric in &record.scope_metrics { + let mut scope_metrics_json = BTreeMap::new(); + for metrics_record in &scope_metric.metrics { + vec_scope_metrics_json.extend(flatten_metrics_record(metrics_record)); + } + if let Some(scope) = &scope_metric.scope { + scope_metrics_json + .insert("scope_name".to_string(), Value::String(scope.name.clone())); + scope_metrics_json.insert( + "scope_version".to_string(), + Value::String(scope.version.clone()), + ); + insert_attributes(&mut scope_metrics_json, &scope.attributes); + scope_metrics_json.insert( + "scope_dropped_attributes_count".to_string(), + Value::Number(scope.dropped_attributes_count.into()), + ); + } + scope_metrics_json.insert( + "scope_metrics_schema_url".to_string(), + Value::String(scope_metric.schema_url.clone()), + ); + + for scope_metric_json in &mut vec_scope_metrics_json { + for (key, value) in &scope_metrics_json { + scope_metric_json.insert(key.clone(), value.clone()); + } + } + } + resource_metrics_json.insert( + "resource_metrics_schema_url".to_string(), + Value::String(record.schema_url.clone()), + ); + for resource_metric_json in &mut vec_scope_metrics_json { + for (key, value) in &resource_metrics_json { + resource_metric_json.insert(key.clone(), value.clone()); + } + } + vec_otel_json.extend(vec_scope_metrics_json); + } + vec_otel_json +} + +/// otel metrics event has json object for aggregation temporality +/// there is a mapping of aggregation temporality to its description provided in proto +/// this function fetches the description from the aggregation temporality +/// and adds it to the flattened json +fn flatten_aggregation_temporality(aggregation_temporality: i32) -> BTreeMap { + let mut aggregation_temporality_json = BTreeMap::new(); + aggregation_temporality_json.insert( + "aggregation_temporality".to_string(), + Value::Number(aggregation_temporality.into()), + ); + let description = match aggregation_temporality { + 0 => "AGGREGATION_TEMPORALITY_UNSPECIFIED", + 1 => "AGGREGATION_TEMPORALITY_DELTA", + 2 => "AGGREGATION_TEMPORALITY_CUMULATIVE", + _ => "", + }; + aggregation_temporality_json.insert( + "aggregation_temporality_description".to_string(), + Value::String(description.to_string()), + ); + + aggregation_temporality_json +} + +fn flatten_data_point_flags(flags: u32) -> BTreeMap { + let mut data_point_flags_json = BTreeMap::new(); + data_point_flags_json.insert("data_point_flags".to_string(), Value::Number(flags.into())); + let description = match flags { + 0 => "DATA_POINT_FLAGS_DO_NOT_USE", + 1 => "DATA_POINT_FLAGS_NO_RECORDED_VALUE_MASK", + _ => "", + }; + data_point_flags_json.insert( + "data_point_flags_description".to_string(), + Value::String(description.to_string()), + ); + data_point_flags_json +} diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs new file mode 100644 index 000000000..3ac051771 --- /dev/null +++ b/src/otel/otel_utils.rs @@ -0,0 +1,162 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use chrono::DateTime; +use opentelemetry_proto::tonic::common::v1::{any_value::Value as OtelValue, AnyValue, KeyValue}; +use serde_json::Value; +use std::collections::BTreeMap; +// Value can be one of types - String, Bool, Int, Double, ArrayValue, AnyValue, KeyValueList, Byte +pub fn collect_json_from_value(key: &String, value: OtelValue) -> BTreeMap { + let mut value_json: BTreeMap = BTreeMap::new(); + match value { + OtelValue::StringValue(str_val) => { + value_json.insert(key.to_string(), Value::String(str_val)); + } + OtelValue::BoolValue(bool_val) => { + value_json.insert(key.to_string(), Value::Bool(bool_val)); + } + OtelValue::IntValue(int_val) => { + value_json.insert(key.to_string(), Value::Number(int_val.into())); + } + OtelValue::DoubleValue(double_val) => { + if let Some(number) = serde_json::Number::from_f64(double_val) { + value_json.insert(key.to_string(), Value::Number(number)); + } + } + OtelValue::ArrayValue(array_val) => { + let values = &array_val.values; + for value in values { + let array_value_json = collect_json_from_anyvalue(key, value.clone()); + for key in array_value_json.keys() { + value_json.insert( + format!( + "{}_{}", + key.to_owned(), + value_to_string(array_value_json[key].to_owned()) + ), + array_value_json[key].to_owned(), + ); + } + } + } + OtelValue::KvlistValue(kv_list_val) => { + for key_value in kv_list_val.values { + let value = key_value.value; + if value.is_some() { + let value = value.unwrap(); + let key_value_json = collect_json_from_anyvalue(key, value.clone()); + + for key in key_value_json.keys() { + value_json.insert( + format!( + "{}_{}_{}", + key.to_owned(), + key_value.key, + value_to_string(key_value_json[key].to_owned()) + ), + key_value_json[key].to_owned(), + ); + } + } + } + } + OtelValue::BytesValue(bytes_val) => { + value_json.insert( + key.to_string(), + Value::String(String::from_utf8_lossy(&bytes_val).to_string()), + ); + } + } + + value_json +} + +pub fn collect_json_from_anyvalue(key: &String, value: AnyValue) -> BTreeMap { + collect_json_from_value(key, value.value.unwrap()) +} + +//traverse through Value by calling function ollect_json_from_any_value +pub fn collect_json_from_values( + values: &Option, + key: &String, +) -> BTreeMap { + let mut value_json: BTreeMap = BTreeMap::new(); + + for value in values.iter() { + value_json = collect_json_from_anyvalue(key, value.clone()); + } + + value_json +} + +pub fn value_to_string(value: serde_json::Value) -> String { + match value.clone() { + e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), + Value::String(s) => s, + _ => "".to_string(), + } +} + +pub fn flatten_attributes(attributes: &Vec) -> BTreeMap { + let mut attributes_json: BTreeMap = BTreeMap::new(); + for attribute in attributes { + let key = &attribute.key; + let value = &attribute.value; + let value_json = collect_json_from_values(value, &key.to_string()); + for key in value_json.keys() { + attributes_json.insert(key.to_owned(), value_json[key].to_owned()); + } + } + attributes_json +} + +pub fn insert_if_some( + map: &mut BTreeMap, + key: &str, + option: &Option, +) { + if let Some(value) = option { + map.insert(key.to_string(), Value::String(value.to_string())); + } +} + +pub fn insert_number_if_some(map: &mut BTreeMap, key: &str, option: &Option) { + if let Some(value) = option { + if let Some(number) = serde_json::Number::from_f64(*value) { + map.insert(key.to_string(), Value::Number(number)); + } + } +} + +pub fn insert_bool_if_some(map: &mut BTreeMap, key: &str, option: &Option) { + if let Some(value) = option { + map.insert(key.to_string(), Value::Bool(*value)); + } +} + +pub fn insert_attributes(map: &mut BTreeMap, attributes: &Vec) { + let attributes_json = flatten_attributes(attributes); + for (key, value) in attributes_json { + map.insert(key, value); + } +} + +pub fn convert_epoch_nano_to_timestamp(epoch_ns: i64) -> String { + let dt = DateTime::from_timestamp_nanos(epoch_ns).naive_utc(); + dt.format("%Y-%m-%dT%H:%M:%S%.6fZ").to_string() +} diff --git a/src/otel/traces.rs b/src/otel/traces.rs new file mode 100644 index 000000000..8ba137b33 --- /dev/null +++ b/src/otel/traces.rs @@ -0,0 +1,304 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use opentelemetry_proto::tonic::trace::v1::span::Event; +use opentelemetry_proto::tonic::trace::v1::span::Link; +use opentelemetry_proto::tonic::trace::v1::ScopeSpans; +use opentelemetry_proto::tonic::trace::v1::Span; +use opentelemetry_proto::tonic::trace::v1::Status; +use opentelemetry_proto::tonic::trace::v1::TracesData; +use serde_json::Value; +use std::collections::BTreeMap; + +use super::otel_utils::convert_epoch_nano_to_timestamp; +use super::otel_utils::insert_attributes; + +/// this function flattens the `ScopeSpans` object +/// and returns a `Vec` of `BTreeMap` of the flattened json +fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { + let mut vec_scope_span_json = Vec::new(); + let mut scope_span_json = BTreeMap::new(); + + for span in &scope_span.spans { + let span_record_json = flatten_span_record(span); + vec_scope_span_json.extend(span_record_json); + } + + if let Some(scope) = &scope_span.scope { + scope_span_json.insert("scope_name".to_string(), Value::String(scope.name.clone())); + scope_span_json.insert( + "scope_version".to_string(), + Value::String(scope.version.clone()), + ); + insert_attributes(&mut scope_span_json, &scope.attributes); + scope_span_json.insert( + "scope_dropped_attributes_count".to_string(), + Value::Number(scope.dropped_attributes_count.into()), + ); + + for span_json in &mut vec_scope_span_json { + for (key, value) in &scope_span_json { + span_json.insert(key.clone(), value.clone()); + } + } + } + + for span_json in &mut vec_scope_span_json { + span_json.insert( + "schema_url".to_string(), + Value::String(scope_span.schema_url.clone()), + ); + } + + vec_scope_span_json +} + +/// this function performs the custom flattening of the otel traces event +/// and returns a `Vec` of `BTreeMap` of the flattened json +pub fn flatten_otel_traces(message: &TracesData) -> Vec> { + let mut vec_otel_json = Vec::new(); + + for record in &message.resource_spans { + let mut resource_span_json = BTreeMap::new(); + + if let Some(resource) = &record.resource { + insert_attributes(&mut resource_span_json, &resource.attributes); + resource_span_json.insert( + "resource_dropped_attributes_count".to_string(), + Value::Number(resource.dropped_attributes_count.into()), + ); + } + + let mut vec_resource_spans_json = Vec::new(); + for scope_span in &record.scope_spans { + let scope_span_json = flatten_scope_span(scope_span); + vec_resource_spans_json.extend(scope_span_json); + } + + resource_span_json.insert( + "schema_url".to_string(), + Value::String(record.schema_url.clone()), + ); + + for resource_spans_json in &mut vec_resource_spans_json { + for (key, value) in &resource_span_json { + resource_spans_json.insert(key.clone(), value.clone()); + } + } + + vec_otel_json.extend(vec_resource_spans_json); + } + + vec_otel_json +} + +/// otel traces has json array of events +/// this function flattens the `Event` object +/// and returns a `Vec` of `BTreeMap` of the flattened json +fn flatten_events(events: &[Event]) -> Vec> { + events + .iter() + .map(|event| { + let mut event_json = BTreeMap::new(); + event_json.insert( + "event_time_unix_nano".to_string(), + Value::String( + convert_epoch_nano_to_timestamp(event.time_unix_nano as i64).to_string(), + ), + ); + event_json.insert("event_name".to_string(), Value::String(event.name.clone())); + insert_attributes(&mut event_json, &event.attributes); + event_json.insert( + "event_dropped_attributes_count".to_string(), + Value::Number(event.dropped_attributes_count.into()), + ); + event_json + }) + .collect() +} + +/// otel traces has json array of links +/// this function flattens the `Link` object +/// and returns a `Vec` of `BTreeMap` of the flattened json +fn flatten_links(links: &[Link]) -> Vec> { + links + .iter() + .map(|link| { + let mut link_json = BTreeMap::new(); + link_json.insert( + "link_span_id".to_string(), + Value::String(hex::encode(&link.span_id)), + ); + link_json.insert( + "link_trace_id".to_string(), + Value::String(hex::encode(&link.trace_id)), + ); + + insert_attributes(&mut link_json, &link.attributes); + link_json.insert( + "link_dropped_attributes_count".to_string(), + Value::Number(link.dropped_attributes_count.into()), + ); + link_json + }) + .collect() +} + +/// otel trace event has status +/// there is a mapping of status code to status description provided in proto +/// this function fetches the status description from the status code +/// and adds it to the flattened json +fn flatten_status(status: &Status) -> BTreeMap { + let mut status_json = BTreeMap::new(); + status_json.insert( + "span_status_message".to_string(), + Value::String(status.message.clone()), + ); + status_json.insert( + "span_status_code".to_string(), + Value::Number(status.code.into()), + ); + let description = match status.code { + 0 => "STATUS_CODE_UNSET", + 1 => "STATUS_CODE_OK", + 2 => "STATUS_CODE_ERROR", + _ => "", + }; + status_json.insert( + "span_status_description".to_string(), + Value::String(description.to_string()), + ); + + status_json +} + +/// otel log event has flags +/// there is a mapping of flags to flags description provided in proto +/// this function fetches the flags description from the flags +/// and adds it to the flattened json +fn flatten_flags(flags: u32) -> BTreeMap { + let mut flags_json = BTreeMap::new(); + flags_json.insert("span_flags".to_string(), Value::Number(flags.into())); + let description = match flags { + 0 => "SPAN_FLAGS_DO_NOT_USE", + 255 => "SPAN_FLAGS_TRACE_FLAGS_MASK", + 256 => "SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK", + 512 => "SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK", + _ => "", + }; + flags_json.insert( + "span_flags_description".to_string(), + Value::String(description.to_string()), + ); + + flags_json +} + +/// otel span event has kind +/// there is a mapping of kind to kind description provided in proto +/// this function fetches the kind description from the kind +/// and adds it to the flattened json +fn flatten_kind(kind: i32) -> BTreeMap { + let mut kind_json = BTreeMap::new(); + kind_json.insert("span_kind".to_string(), Value::Number(kind.into())); + let description = match kind { + 0 => "SPAN_KIND_UNSPECIFIED", + 1 => "SPAN_KIND_INTERNAL", + 2 => "SPAN_KIND_SERVER", + 3 => "SPAN_KIND_CLIENT", + 4 => "SPAN_KIND_PRODUCER", + 5 => "SPAN_KIND_CONSUMER", + _ => "", + }; + kind_json.insert( + "span_kind_description".to_string(), + Value::String(description.to_string()), + ); + + kind_json +} + +/// this function flattens the `Span` object +/// and returns a `Vec` of `BTreeMap` of the flattened json +/// this function is called recursively for each span record object in the otel traces event +fn flatten_span_record(span_record: &Span) -> Vec> { + let mut span_records_json = Vec::new(); + + let mut span_record_json = BTreeMap::new(); + span_record_json.insert( + "span_trace_id".to_string(), + Value::String(hex::encode(&span_record.trace_id)), + ); + span_record_json.insert( + "span_span_id".to_string(), + Value::String(hex::encode(&span_record.span_id)), + ); + span_record_json.insert( + "span_trace_state".to_string(), + Value::String(span_record.trace_state.clone()), + ); + span_record_json.insert( + "span_parent_span_id".to_string(), + Value::String(hex::encode(&span_record.parent_span_id)), + ); + span_record_json.extend(flatten_flags(span_record.flags)); + span_record_json.insert( + "span_name".to_string(), + Value::String(span_record.name.clone()), + ); + span_record_json.extend(flatten_kind(span_record.kind)); + span_record_json.insert( + "span_start_time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + span_record.start_time_unix_nano as i64, + )), + ); + span_record_json.insert( + "span_end_time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + span_record.end_time_unix_nano as i64, + )), + ); + insert_attributes(&mut span_record_json, &span_record.attributes); + span_record_json.insert( + "span_dropped_attributes_count".to_string(), + Value::Number(span_record.dropped_attributes_count.into()), + ); + span_records_json.extend(flatten_events(&span_record.events)); + span_record_json.insert( + "span_dropped_events_count".to_string(), + Value::Number(span_record.dropped_events_count.into()), + ); + span_records_json.extend(flatten_links(&span_record.links)); + span_record_json.insert( + "span_dropped_links_count".to_string(), + Value::Number(span_record.dropped_links_count.into()), + ); + + if let Some(status) = &span_record.status { + span_record_json.extend(flatten_status(status)); + } + + for span_json in &mut span_records_json { + for (key, value) in &span_record_json { + span_json.insert(key.clone(), value.clone()); + } + } + + span_records_json +} diff --git a/src/utils/header_parsing.rs b/src/utils/header_parsing.rs index 8d4feab1e..89caf4d9e 100644 --- a/src/utils/header_parsing.rs +++ b/src/utils/header_parsing.rs @@ -68,6 +68,8 @@ pub enum ParseHeaderError { SeperatorInValue(char), #[error("Stream name not found in header [x-p-stream]")] MissingStreamName, + #[error("Log source not found in header [x-p-log-source]")] + MissingLogSource, } impl ResponseError for ParseHeaderError { diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index 0d3ac1e79..4c794afba 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -22,6 +22,7 @@ use flatten::{convert_to_array, generic_flattening, has_more_than_four_levels}; use serde_json; use serde_json::Value; +use crate::event::format::LogSource; use crate::metadata::SchemaVersion; pub mod flatten; @@ -36,15 +37,20 @@ pub fn flatten_json_body( custom_partition: Option<&String>, schema_version: SchemaVersion, validation_required: bool, + log_source: &LogSource, ) -> Result { // Flatten the json body only if new schema and has less than 4 levels of nesting - let mut nested_value = - if schema_version == SchemaVersion::V0 || has_more_than_four_levels(&body, 1) { - body - } else { - let flattened_json = generic_flattening(&body)?; - convert_to_array(flattened_json)? - }; + let mut nested_value = if schema_version == SchemaVersion::V1 + && !has_more_than_four_levels(&body, 1) + && matches!( + log_source, + LogSource::Json | LogSource::Custom(_) | LogSource::Kinesis + ) { + let flattened_json = generic_flattening(&body)?; + convert_to_array(flattened_json)? + } else { + body + }; flatten::flatten( &mut nested_value, "_", @@ -62,6 +68,7 @@ pub fn convert_array_to_object( time_partition_limit: Option, custom_partition: Option<&String>, schema_version: SchemaVersion, + log_source: &LogSource, ) -> Result, anyhow::Error> { let data = flatten_json_body( body, @@ -70,6 +77,7 @@ pub fn convert_array_to_object( custom_partition, schema_version, true, + log_source, )?; let value_arr = match data { Value::Array(arr) => arr, @@ -101,6 +109,8 @@ pub fn convert_to_string(value: &Value) -> Value { #[cfg(test)] mod tests { + use crate::event::format::LogSource; + use super::flatten_json_body; use serde_json::json; @@ -115,7 +125,8 @@ mod tests { None, None, crate::metadata::SchemaVersion::V1, - false + false, + &LogSource::default() ) .unwrap(), expected @@ -133,7 +144,8 @@ mod tests { None, None, crate::metadata::SchemaVersion::V1, - false + false, + &LogSource::default() ) .unwrap(), expected