Skip to content

Commit fa70652

Browse files
restrict /api/v1/ingest api for otel
1 parent 1681a75 commit fa70652

File tree

3 files changed

+46
-36
lines changed

3 files changed

+46
-36
lines changed

src/handlers/http/ingest.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ use crate::event::{
2626
format::{self, EventFormat},
2727
};
2828
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
29-
use crate::handlers::STREAM_NAME_HEADER_KEY;
29+
use crate::handlers::{
30+
LOG_SOURCE_KEY, LOG_SOURCE_OTEL_LOGS, LOG_SOURCE_OTEL_METRICS, LOG_SOURCE_OTEL_TRACES,
31+
STREAM_NAME_HEADER_KEY,
32+
};
3033
use crate::metadata::error::stream_info::MetadataError;
3134
use crate::metadata::{SchemaVersion, STREAM_INFO};
3235
use crate::option::{Mode, CONFIG};
@@ -120,6 +123,16 @@ pub async fn handle_otel_logs_ingestion(
120123
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
121124
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
122125
};
126+
127+
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
128+
return Err(PostError::Header(ParseHeaderError::MissingLogSource));
129+
};
130+
if log_source.to_str().unwrap() != LOG_SOURCE_OTEL_LOGS {
131+
return Err(PostError::Invalid(anyhow::anyhow!(
132+
"Please use x-p-log-source: otel-logs for ingesting otel logs"
133+
)));
134+
}
135+
123136
let stream_name = stream_name.to_str().unwrap().to_owned();
124137
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
125138

@@ -144,6 +157,14 @@ pub async fn handle_otel_metrics_ingestion(
144157
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
145158
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
146159
};
160+
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
161+
return Err(PostError::Header(ParseHeaderError::MissingLogSource));
162+
};
163+
if log_source.to_str().unwrap() != LOG_SOURCE_OTEL_METRICS {
164+
return Err(PostError::Invalid(anyhow::anyhow!(
165+
"Please use x-p-log-source: otel-metrics for ingesting otel metrics"
166+
)));
167+
}
147168
let stream_name = stream_name.to_str().unwrap().to_owned();
148169
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
149170

@@ -168,6 +189,15 @@ pub async fn handle_otel_traces_ingestion(
168189
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
169190
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
170191
};
192+
193+
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
194+
return Err(PostError::Header(ParseHeaderError::MissingLogSource));
195+
};
196+
if log_source.to_str().unwrap() != LOG_SOURCE_OTEL_TRACES {
197+
return Err(PostError::Invalid(anyhow::anyhow!(
198+
"Please use x-p-log-source: otel-traces for ingesting otel traces"
199+
)));
200+
}
171201
let stream_name = stream_name.to_str().unwrap().to_owned();
172202
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
173203

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 13 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,8 @@ use anyhow::anyhow;
2121
use arrow_schema::Field;
2222
use bytes::Bytes;
2323
use chrono::{DateTime, NaiveDateTime, Utc};
24-
use nom::AsBytes;
25-
use opentelemetry_proto::tonic::{
26-
logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData,
27-
};
2824
use serde_json::Value;
29-
use std::{
30-
collections::{BTreeMap, HashMap},
31-
sync::Arc,
32-
};
25+
use std::{collections::HashMap, sync::Arc};
3326

3427
use crate::{
3528
event::{
@@ -38,11 +31,9 @@ use crate::{
3831
},
3932
handlers::{
4033
http::{ingest::PostError, kinesis},
41-
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL_LOGS, LOG_SOURCE_OTEL_METRICS,
42-
LOG_SOURCE_OTEL_TRACES, PREFIX_META, PREFIX_TAGS, SEPARATOR,
34+
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, PREFIX_META, PREFIX_TAGS, SEPARATOR,
4335
},
4436
metadata::{SchemaVersion, STREAM_INFO},
45-
otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces},
4637
storage::StreamType,
4738
utils::{header_parsing::collect_labelled_headers, json::convert_array_to_object},
4839
};
@@ -56,32 +47,19 @@ pub async fn flatten_and_push_logs(
5647
push_logs(stream_name, &req, &body).await?;
5748
return Ok(());
5849
};
59-
let mut json: Vec<BTreeMap<String, Value>> = Vec::new();
60-
match log_source.to_str().unwrap() {
61-
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),
62-
//custom flattening required for otel logs
63-
LOG_SOURCE_OTEL_LOGS => {
64-
let logs: LogsData = serde_json::from_slice(body.as_bytes())?;
65-
json = flatten_otel_logs(&logs);
66-
}
67-
//custom flattening required for otel metrics
68-
LOG_SOURCE_OTEL_METRICS => {
69-
let metrics: MetricsData = serde_json::from_slice(body.as_bytes())?;
70-
json = flatten_otel_metrics(metrics);
71-
}
72-
//custom flattening required for otel traces
73-
LOG_SOURCE_OTEL_TRACES => {
74-
let traces: TracesData = serde_json::from_slice(body.as_bytes())?;
75-
json = flatten_otel_traces(&traces);
76-
}
77-
log_source => {
78-
tracing::warn!("Unknown log source: {}", log_source);
50+
let log_source = log_source.to_str().unwrap();
51+
if log_source == LOG_SOURCE_KINESIS {
52+
let json = kinesis::flatten_kinesis_logs(&body);
53+
for record in json.iter() {
54+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
7955
push_logs(stream_name, &req, &body).await?;
8056
}
81-
}
82-
83-
for record in json.iter_mut() {
84-
let body: Bytes = serde_json::to_vec(record).unwrap().into();
57+
} else if log_source.contains("otel") {
58+
return Err(PostError::Invalid(anyhow!(
59+
"Please use endpoints `/v1/logs` for otel logs, `/v1/metrics` for otel metrics and `/v1/traces` for otel traces"
60+
)));
61+
} else {
62+
tracing::warn!("Unknown log source: {}", log_source);
8563
push_logs(stream_name, &req, &body).await?;
8664
}
8765

src/utils/header_parsing.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ pub enum ParseHeaderError {
6868
SeperatorInValue(char),
6969
#[error("Stream name not found in header [x-p-stream]")]
7070
MissingStreamName,
71+
#[error("Log source not found in header [x-p-log-source]")]
72+
MissingLogSource,
7173
}
7274

7375
impl ResponseError for ParseHeaderError {

0 commit comments

Comments
 (0)