Skip to content

Commit 2f92ac2

Browse files
feat: add log source to stream info (#1092)
add additional header `X-P-Log-Source` to stream creation for otel logs ingestion using API `POST /v1/logs`, server adds `log_source=OtelLogs` to the stream info which can be verified using API `GET /logstream/{logstream}/info` similarly, for otel metrics, `log_source=OtelMetricsLogs` and for otel traces, `log_source=OtelTraces` is added to the stream info server adds `log_souce=Json` for stream with unknown log source value or for the stream without the log souce header
1 parent ba932ac commit 2f92ac2

File tree

8 files changed

+122
-17
lines changed

8 files changed

+122
-17
lines changed

src/event/format/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use anyhow::{anyhow, Error as AnyError};
2626
use arrow_array::RecordBatch;
2727
use arrow_schema::{DataType, Field, Schema, TimeUnit};
2828
use chrono::DateTime;
29+
use serde::{Deserialize, Serialize};
2930
use serde_json::Value;
3031

3132
use crate::{metadata::SchemaVersion, utils::arrow::get_field};
@@ -38,7 +39,7 @@ static TIME_FIELD_NAME_PARTS: [&str; 2] = ["time", "date"];
3839
type EventSchema = Vec<Arc<Field>>;
3940

4041
/// Source of the logs, used to perform special processing for certain sources
41-
#[derive(Default, Debug, Clone, PartialEq, Eq)]
42+
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
4243
pub enum LogSource {
4344
// AWS Kinesis sends logs in the format of a json array
4445
Kinesis,
@@ -51,6 +52,8 @@ pub enum LogSource {
5152
// OpenTelemetry sends traces according to the specification as explained here
5253
// https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/metrics/v1
5354
OtelTraces,
55+
// Internal Stream format
56+
Pmeta,
5457
#[default]
5558
// Json object or array
5659
Json,
@@ -64,7 +67,8 @@ impl From<&str> for LogSource {
6467
"otel-logs" => LogSource::OtelLogs,
6568
"otel-metrics" => LogSource::OtelMetrics,
6669
"otel-traces" => LogSource::OtelTraces,
67-
custom => LogSource::Custom(custom.to_owned()),
70+
"pmeta" => LogSource::Pmeta,
71+
_ => LogSource::Json,
6872
}
6973
}
7074
}

src/handlers/http/ingest.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,12 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
6868
stream_name
6969
)));
7070
}
71-
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
71+
create_stream_if_not_exists(
72+
&stream_name,
73+
&StreamType::UserDefined.to_string(),
74+
LogSource::default(),
75+
)
76+
.await?;
7277

7378
flatten_and_push_logs(req, body, &stream_name).await?;
7479
Ok(HttpResponse::Ok().finish())
@@ -130,7 +135,12 @@ pub async fn handle_otel_logs_ingestion(
130135
}
131136

132137
let stream_name = stream_name.to_str().unwrap().to_owned();
133-
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
138+
create_stream_if_not_exists(
139+
&stream_name,
140+
&StreamType::UserDefined.to_string(),
141+
LogSource::OtelLogs,
142+
)
143+
.await?;
134144

135145
//custom flattening required for otel logs
136146
let logs: LogsData = serde_json::from_slice(body.as_bytes())?;
@@ -163,7 +173,12 @@ pub async fn handle_otel_metrics_ingestion(
163173
)));
164174
}
165175
let stream_name = stream_name.to_str().unwrap().to_owned();
166-
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
176+
create_stream_if_not_exists(
177+
&stream_name,
178+
&StreamType::UserDefined.to_string(),
179+
LogSource::OtelMetrics,
180+
)
181+
.await?;
167182

168183
//custom flattening required for otel metrics
169184
let metrics: MetricsData = serde_json::from_slice(body.as_bytes())?;
@@ -197,7 +212,12 @@ pub async fn handle_otel_traces_ingestion(
197212
)));
198213
}
199214
let stream_name = stream_name.to_str().unwrap().to_owned();
200-
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
215+
create_stream_if_not_exists(
216+
&stream_name,
217+
&StreamType::UserDefined.to_string(),
218+
LogSource::OtelTraces,
219+
)
220+
.await?;
201221

202222
//custom flattening required for otel traces
203223
let traces: TracesData = serde_json::from_slice(body.as_bytes())?;
@@ -264,6 +284,7 @@ pub async fn push_logs_unchecked(
264284
pub async fn create_stream_if_not_exists(
265285
stream_name: &str,
266286
stream_type: &str,
287+
log_source: LogSource,
267288
) -> Result<bool, PostError> {
268289
let mut stream_exists = false;
269290
if STREAM_INFO.stream_exists(stream_name) {
@@ -288,6 +309,7 @@ pub async fn create_stream_if_not_exists(
288309
false,
289310
Arc::new(Schema::empty()),
290311
stream_type,
312+
log_source,
291313
)
292314
.await?;
293315

src/handlers/http/logstream.rs

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use super::modal::utils::logstream_utils::{
2626
use super::query::update_schema_when_distributed;
2727
use crate::alerts::Alerts;
2828
use crate::catalog::get_first_event;
29-
use crate::event::format::override_data_type;
29+
use crate::event::format::{override_data_type, LogSource};
3030
use crate::handlers::STREAM_TYPE_KEY;
3131
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
3232
use crate::metadata::{SchemaVersion, STREAM_INFO};
@@ -35,8 +35,8 @@ use crate::option::{Mode, CONFIG};
3535
use crate::rbac::role::Action;
3636
use crate::rbac::Users;
3737
use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
38-
use crate::storage::StreamType;
39-
use crate::storage::{retention::Retention, StorageDir, StreamInfo};
38+
use crate::storage::{retention::Retention, StorageDir};
39+
use crate::storage::{StreamInfo, StreamType};
4040
use crate::utils::actix::extract_session_key_from_req;
4141
use crate::{event, stats};
4242

@@ -484,6 +484,7 @@ fn remove_id_from_alerts(value: &mut Value) {
484484
}
485485
}
486486

487+
#[allow(clippy::too_many_arguments)]
487488
pub async fn create_stream(
488489
stream_name: String,
489490
time_partition: &str,
@@ -492,6 +493,7 @@ pub async fn create_stream(
492493
static_schema_flag: bool,
493494
schema: Arc<Schema>,
494495
stream_type: &str,
496+
log_source: LogSource,
495497
) -> Result<(), CreateStreamError> {
496498
// fail to proceed if invalid stream name
497499
if stream_type != StreamType::Internal.to_string() {
@@ -509,6 +511,7 @@ pub async fn create_stream(
509511
static_schema_flag,
510512
schema.clone(),
511513
stream_type,
514+
log_source.clone(),
512515
)
513516
.await
514517
{
@@ -533,6 +536,7 @@ pub async fn create_stream(
533536
static_schema,
534537
stream_type,
535538
SchemaVersion::V1, // New stream
539+
log_source,
536540
);
537541
}
538542
Err(err) => {
@@ -583,6 +587,7 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
583587
.map(|limit| limit.to_string()),
584588
custom_partition: stream_meta.custom_partition.clone(),
585589
static_schema_flag: stream_meta.static_schema_flag,
590+
log_source: stream_meta.log_source.clone(),
586591
};
587592

588593
// get the other info from
@@ -725,8 +730,12 @@ pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result<impl Responder,
725730
}
726731

727732
pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> {
728-
if let Ok(stream_exists) =
729-
create_stream_if_not_exists(INTERNAL_STREAM_NAME, &StreamType::Internal.to_string()).await
733+
if let Ok(stream_exists) = create_stream_if_not_exists(
734+
INTERNAL_STREAM_NAME,
735+
&StreamType::Internal.to_string(),
736+
LogSource::Pmeta,
737+
)
738+
.await
730739
{
731740
if stream_exists {
732741
return Ok(());
@@ -894,9 +903,9 @@ pub mod error {
894903
mod tests {
895904
use crate::handlers::http::logstream::error::StreamError;
896905
use crate::handlers::http::logstream::get_stats;
906+
use crate::handlers::http::modal::utils::logstream_utils::fetch_headers_from_put_stream_request;
897907
use actix_web::test::TestRequest;
898908
use anyhow::bail;
899-
900909
#[actix_web::test]
901910
#[should_panic]
902911
async fn get_stats_panics_without_logstream() {
@@ -915,4 +924,41 @@ mod tests {
915924
_ => bail!("expected StreamNotFound error"),
916925
}
917926
}
927+
928+
#[actix_web::test]
929+
async fn header_without_log_source() {
930+
let req = TestRequest::default().to_http_request();
931+
let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req);
932+
assert_eq!(log_source, crate::event::format::LogSource::Json);
933+
}
934+
935+
#[actix_web::test]
936+
async fn header_with_known_log_source() {
937+
let mut req = TestRequest::default()
938+
.insert_header(("X-P-Log-Source", "pmeta"))
939+
.to_http_request();
940+
let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req);
941+
assert_eq!(log_source, crate::event::format::LogSource::Pmeta);
942+
943+
req = TestRequest::default()
944+
.insert_header(("X-P-Log-Source", "otel-logs"))
945+
.to_http_request();
946+
let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req);
947+
assert_eq!(log_source, crate::event::format::LogSource::OtelLogs);
948+
949+
req = TestRequest::default()
950+
.insert_header(("X-P-Log-Source", "kinesis"))
951+
.to_http_request();
952+
let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req);
953+
assert_eq!(log_source, crate::event::format::LogSource::Kinesis);
954+
}
955+
956+
#[actix_web::test]
957+
async fn header_with_unknown_log_source() {
958+
let req = TestRequest::default()
959+
.insert_header(("X-P-Log-Source", "teststream"))
960+
.to_http_request();
961+
let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req);
962+
assert_eq!(log_source, crate::event::format::LogSource::Json);
963+
}
918964
}

src/handlers/http/modal/utils/logstream_utils.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ use bytes::Bytes;
2424
use http::StatusCode;
2525

2626
use crate::{
27+
event::format::LogSource,
2728
handlers::{
2829
http::logstream::error::{CreateStreamError, StreamError},
29-
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TIME_PARTITION_KEY,
30-
TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY,
30+
CUSTOM_PARTITION_KEY, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY,
31+
TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY,
3132
},
3233
metadata::{self, SchemaVersion, STREAM_INFO},
3334
option::{Mode, CONFIG},
@@ -48,6 +49,7 @@ pub async fn create_update_stream(
4849
static_schema_flag,
4950
update_stream_flag,
5051
stream_type,
52+
log_source,
5153
) = fetch_headers_from_put_stream_request(req);
5254

5355
if metadata::STREAM_INFO.stream_exists(stream_name) && !update_stream_flag {
@@ -113,6 +115,7 @@ pub async fn create_update_stream(
113115
static_schema_flag,
114116
schema,
115117
&stream_type,
118+
log_source,
116119
)
117120
.await?;
118121

@@ -167,13 +170,14 @@ async fn validate_and_update_custom_partition(
167170

168171
pub fn fetch_headers_from_put_stream_request(
169172
req: &HttpRequest,
170-
) -> (String, String, String, bool, bool, String) {
173+
) -> (String, String, String, bool, bool, String, LogSource) {
171174
let mut time_partition = String::default();
172175
let mut time_partition_limit = String::default();
173176
let mut custom_partition = String::default();
174177
let mut static_schema_flag = false;
175178
let mut update_stream_flag = false;
176179
let mut stream_type = StreamType::UserDefined.to_string();
180+
let mut log_source = LogSource::default();
177181
req.headers().iter().for_each(|(key, value)| {
178182
if key == TIME_PARTITION_KEY {
179183
time_partition = value.to_str().unwrap().to_string();
@@ -193,6 +197,9 @@ pub fn fetch_headers_from_put_stream_request(
193197
if key == STREAM_TYPE_KEY {
194198
stream_type = value.to_str().unwrap().to_string();
195199
}
200+
if key == LOG_SOURCE_KEY {
201+
log_source = LogSource::from(value.to_str().unwrap());
202+
}
196203
});
197204

198205
(
@@ -202,6 +209,7 @@ pub fn fetch_headers_from_put_stream_request(
202209
static_schema_flag,
203210
update_stream_flag,
204211
stream_type,
212+
log_source,
205213
)
206214
}
207215

@@ -378,6 +386,7 @@ pub async fn update_custom_partition_in_stream(
378386
Ok(())
379387
}
380388

389+
#[allow(clippy::too_many_arguments)]
381390
pub async fn create_stream(
382391
stream_name: String,
383392
time_partition: &str,
@@ -386,6 +395,7 @@ pub async fn create_stream(
386395
static_schema_flag: bool,
387396
schema: Arc<Schema>,
388397
stream_type: &str,
398+
log_source: LogSource,
389399
) -> Result<(), CreateStreamError> {
390400
// fail to proceed if invalid stream name
391401
if stream_type != StreamType::Internal.to_string() {
@@ -403,6 +413,7 @@ pub async fn create_stream(
403413
static_schema_flag,
404414
schema.clone(),
405415
stream_type,
416+
log_source.clone(),
406417
)
407418
.await
408419
{
@@ -427,6 +438,7 @@ pub async fn create_stream(
427438
static_schema,
428439
stream_type,
429440
SchemaVersion::V1, // New stream
441+
log_source,
430442
);
431443
}
432444
Err(err) => {
@@ -476,7 +488,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
476488
let static_schema_flag = stream_metadata.static_schema_flag;
477489
let stream_type = stream_metadata.stream_type.as_deref().unwrap_or("");
478490
let schema_version = stream_metadata.schema_version;
479-
491+
let log_source = stream_metadata.log_source;
480492
metadata::STREAM_INFO.add_stream(
481493
stream_name.to_string(),
482494
stream_metadata.created_at,
@@ -487,6 +499,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
487499
static_schema,
488500
stream_type,
489501
schema_version,
502+
log_source,
490503
);
491504
} else {
492505
return Ok(false);

src/kafka.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use std::{collections::HashMap, fmt::Debug};
3434
use tracing::{debug, error, info, warn};
3535

3636
use crate::audit::AuditLogBuilder;
37+
use crate::event::format::LogSource;
3738
use crate::option::CONFIG;
3839
use crate::{
3940
event::{
@@ -180,7 +181,12 @@ async fn ingest_message(msg: BorrowedMessage<'_>) -> Result<(), KafkaError> {
180181
let stream_name = msg.topic();
181182

182183
// stream should get created only if there is an incoming event, not before that
183-
create_stream_if_not_exists(stream_name, &StreamType::UserDefined.to_string()).await?;
184+
create_stream_if_not_exists(
185+
stream_name,
186+
&StreamType::UserDefined.to_string(),
187+
LogSource::default(),
188+
)
189+
.await?;
184190

185191
let schema = resolve_schema(stream_name)?;
186192
let event = format::json::Event {

0 commit comments

Comments
 (0)