Skip to content

Commit 41f5c1e

Browse files
add telemetry type to a dataset
1 parent fc29387 commit 41f5c1e

File tree

11 files changed

+134
-48
lines changed

11 files changed

+134
-48
lines changed

src/handlers/http/ingest.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ use crate::event::error::EventError;
2929
use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST};
3030
use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
3131
use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY};
32-
use crate::handlers::{EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
32+
use crate::handlers::{
33+
EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType,
34+
};
3335
use crate::metadata::SchemaVersion;
3436
use crate::option::Mode;
3537
use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST;
@@ -68,6 +70,12 @@ pub async fn ingest(
6870
.and_then(|h| h.to_str().ok())
6971
.map_or(LogSource::default(), LogSource::from);
7072

73+
let telemetry_type = req
74+
.headers()
75+
.get(TELEMETRY_TYPE_KEY)
76+
.and_then(|h| h.to_str().ok())
77+
.map_or(TelemetryType::default(), TelemetryType::from);
78+
7179
let extract_log = req
7280
.headers()
7381
.get(EXTRACT_LOG_KEY)
@@ -102,6 +110,7 @@ pub async fn ingest(
102110
StreamType::UserDefined,
103111
None,
104112
vec![log_source_entry.clone()],
113+
telemetry_type,
105114
)
106115
.await?;
107116

@@ -186,6 +195,7 @@ pub async fn handle_otel_logs_ingestion(
186195
StreamType::UserDefined,
187196
None,
188197
vec![log_source_entry.clone()],
198+
TelemetryType::Logs,
189199
)
190200
.await?;
191201

@@ -252,6 +262,7 @@ pub async fn handle_otel_metrics_ingestion(
252262
StreamType::UserDefined,
253263
None,
254264
vec![log_source_entry.clone()],
265+
TelemetryType::Metrics,
255266
)
256267
.await?;
257268

@@ -318,6 +329,7 @@ pub async fn handle_otel_traces_ingestion(
318329
StreamType::UserDefined,
319330
None,
320331
vec![log_source_entry.clone()],
332+
TelemetryType::Traces,
321333
)
322334
.await?;
323335

src/handlers/http/logstream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,6 @@ pub async fn put_stream(
188188
body: Bytes,
189189
) -> Result<impl Responder, StreamError> {
190190
let stream_name = stream_name.into_inner();
191-
192191
PARSEABLE
193192
.create_update_stream(req.headers(), &body, &stream_name)
194193
.await?;
@@ -377,6 +376,7 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
377376
custom_partition: stream_meta.custom_partition.clone(),
378377
static_schema_flag: stream_meta.static_schema_flag,
379378
log_source: stream_meta.log_source.clone(),
379+
telemetry_type: stream_meta.telemetry_type,
380380
};
381381

382382
Ok((web::Json(stream_info), StatusCode::OK))

src/handlers/http/modal/utils/logstream_utils.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ use crate::{
2222
event::format::LogSource,
2323
handlers::{
2424
CUSTOM_PARTITION_KEY, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY,
25-
TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY,
25+
TELEMETRY_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType,
26+
UPDATE_STREAM_KEY,
2627
},
2728
storage::StreamType,
2829
};
@@ -36,6 +37,7 @@ pub struct PutStreamHeaders {
3637
pub update_stream_flag: bool,
3738
pub stream_type: StreamType,
3839
pub log_source: LogSource,
40+
pub telemetry_type: TelemetryType,
3941
}
4042

4143
impl From<&HeaderMap> for PutStreamHeaders {
@@ -65,6 +67,10 @@ impl From<&HeaderMap> for PutStreamHeaders {
6567
log_source: headers
6668
.get(LOG_SOURCE_KEY)
6769
.map_or(LogSource::default(), |v| v.to_str().unwrap().into()),
70+
telemetry_type: headers
71+
.get(TELEMETRY_TYPE_KEY)
72+
.and_then(|v| v.to_str().ok())
73+
.map_or(TelemetryType::Logs, TelemetryType::from),
6874
}
6975
}
7076
}

src/handlers/mod.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
*
1717
*/
1818

19+
use std::fmt::Display;
20+
21+
use serde::{Deserialize, Serialize};
22+
1923
pub mod airplane;
2024
pub mod http;
2125
pub mod livetail;
@@ -30,6 +34,7 @@ const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag";
3034
const AUTHORIZATION_KEY: &str = "authorization";
3135
const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
3236
pub const STREAM_TYPE_KEY: &str = "x-p-stream-type";
37+
pub const TELEMETRY_TYPE_KEY: &str = "x-p-telemetry-type";
3338
const COOKIE_AGE_DAYS: usize = 7;
3439
const SESSION_COOKIE_NAME: &str = "session";
3540
const USER_COOKIE_NAME: &str = "username";
@@ -39,3 +44,36 @@ const LOG_SOURCE_KINESIS: &str = "kinesis";
3944

4045
// AWS Kinesis constants
4146
const KINESIS_COMMON_ATTRIBUTES_KEY: &str = "x-amz-firehose-common-attributes";
47+
48+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq)]
49+
#[serde(rename_all = "lowercase")]
50+
pub enum TelemetryType {
51+
#[default]
52+
Logs,
53+
Metrics,
54+
Traces,
55+
Events,
56+
}
57+
58+
impl From<&str> for TelemetryType {
59+
fn from(s: &str) -> Self {
60+
match s.to_lowercase().as_str() {
61+
"logs" => TelemetryType::Logs,
62+
"metrics" => TelemetryType::Metrics,
63+
"traces" => TelemetryType::Traces,
64+
"events" => TelemetryType::Events,
65+
_ => TelemetryType::Logs,
66+
}
67+
}
68+
}
69+
70+
impl Display for TelemetryType {
71+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72+
f.write_str(match self {
73+
TelemetryType::Logs => "logs",
74+
TelemetryType::Metrics => "metrics",
75+
TelemetryType::Traces => "traces",
76+
TelemetryType::Events => "events",
77+
})
78+
}
79+
}

src/metadata.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use std::sync::Arc;
2525

2626
use crate::catalog::snapshot::ManifestItem;
2727
use crate::event::format::LogSourceEntry;
28+
use crate::handlers::TelemetryType;
2829
use crate::metrics::{
2930
EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE,
3031
EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE,
@@ -88,6 +89,7 @@ pub struct LogStreamMetadata {
8889
pub hot_tier_enabled: bool,
8990
pub stream_type: StreamType,
9091
pub log_source: Vec<LogSourceEntry>,
92+
pub telemetry_type: TelemetryType,
9193
}
9294

9395
impl LogStreamMetadata {
@@ -102,6 +104,7 @@ impl LogStreamMetadata {
102104
stream_type: StreamType,
103105
schema_version: SchemaVersion,
104106
log_source: Vec<LogSourceEntry>,
107+
telemetry_type: TelemetryType,
105108
) -> Self {
106109
LogStreamMetadata {
107110
created_at: if created_at.is_empty() {
@@ -125,6 +128,7 @@ impl LogStreamMetadata {
125128
stream_type,
126129
schema_version,
127130
log_source,
131+
telemetry_type,
128132
..Default::default()
129133
}
130134
}

src/migration/mod.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ async fn migrate_stream_metadata(
276276
stream_metadata_value = stream_metadata_migration::v1_v4(stream_metadata_value);
277277
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
278278
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
279+
stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value);
279280

280281
storage
281282
.put_object(&path, to_bytes(&stream_metadata_value))
@@ -290,6 +291,7 @@ async fn migrate_stream_metadata(
290291
stream_metadata_value = stream_metadata_migration::v2_v4(stream_metadata_value);
291292
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
292293
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
294+
stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value);
293295

294296
storage
295297
.put_object(&path, to_bytes(&stream_metadata_value))
@@ -304,6 +306,7 @@ async fn migrate_stream_metadata(
304306
stream_metadata_value = stream_metadata_migration::v3_v4(stream_metadata_value);
305307
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
306308
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
309+
stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value);
307310

308311
storage
309312
.put_object(&path, to_bytes(&stream_metadata_value))
@@ -312,24 +315,29 @@ async fn migrate_stream_metadata(
312315
Some("v4") => {
313316
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
314317
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
318+
stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value);
315319

316320
storage
317321
.put_object(&path, to_bytes(&stream_metadata_value))
318322
.await?;
319323
}
320324
Some("v5") => {
321325
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
326+
stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value);
322327
storage
323328
.put_object(&path, to_bytes(&stream_metadata_value))
324329
.await?;
325330
}
326-
_ => {
327-
stream_metadata_value =
328-
stream_metadata_migration::rename_log_source_v6(stream_metadata_value);
331+
Some("v6") => {
332+
stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value);
329333
storage
330334
.put_object(&path, to_bytes(&stream_metadata_value))
331335
.await?;
332336
}
337+
_ => {
338+
// If the version is not recognized, we assume it's already in the latest format
339+
return Ok(stream_metadata_value);
340+
}
333341
}
334342

335343
Ok(stream_metadata_value)
@@ -354,6 +362,7 @@ async fn setup_logstream_metadata(
354362
hot_tier_enabled,
355363
stream_type,
356364
log_source,
365+
telemetry_type,
357366
..
358367
} = serde_json::from_value(stream_metadata_value).unwrap_or_default();
359368

@@ -387,6 +396,7 @@ async fn setup_logstream_metadata(
387396
hot_tier_enabled,
388397
stream_type,
389398
log_source,
399+
telemetry_type,
390400
};
391401

392402
Ok(metadata)

src/migration/stream_metadata_migration.rs

Lines changed: 36 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,12 @@
1717
*
1818
*/
1919

20-
use std::collections::HashMap;
21-
22-
use serde_json::{Value, json};
23-
2420
use crate::{
25-
catalog::snapshot::CURRENT_SNAPSHOT_VERSION, handlers::http::cluster::INTERNAL_STREAM_NAME,
21+
catalog::snapshot::CURRENT_SNAPSHOT_VERSION,
22+
handlers::{TelemetryType, http::cluster::INTERNAL_STREAM_NAME},
2623
storage,
2724
};
25+
use serde_json::{Value, json};
2826

2927
pub fn v1_v4(mut stream_metadata: Value) -> Value {
3028
let stream_metadata_map = stream_metadata.as_object_mut().unwrap();
@@ -232,38 +230,45 @@ fn default_log_source_entry() -> Value {
232230
})
233231
}
234232

235-
pub fn rename_log_source_v6(mut stream_metadata: Value) -> Value {
236-
let format_mapping = create_format_mapping();
233+
pub fn v6_v7(mut stream_metadata: Value) -> Value {
234+
let stream_metadata_map = stream_metadata.as_object_mut().unwrap();
235+
stream_metadata_map.insert(
236+
"objectstore-format".to_owned(),
237+
Value::String(storage::CURRENT_OBJECT_STORE_VERSION.into()),
238+
);
239+
stream_metadata_map.insert(
240+
"version".to_owned(),
241+
Value::String(storage::CURRENT_SCHEMA_VERSION.into()),
242+
);
243+
244+
// fetch log_source, if log_source=otel-traces, telemetry_type=traces
245+
// if log_source=otel-metrics, telemetry_type=metrics
246+
// else telemetry_type=logs
247+
let log_source = stream_metadata_map
248+
.get("log_source")
249+
.and_then(|v| v.as_array())
250+
.and_then(|arr| arr.first())
251+
.and_then(|v| v.get("log_source_format"))
252+
.and_then(|v| v.as_str())
253+
.unwrap_or("json");
254+
let telemetry_type = match log_source {
255+
"otel-logs" => TelemetryType::Logs,
256+
"otel-traces" => TelemetryType::Traces,
257+
"otel-metrics" => TelemetryType::Metrics,
258+
_ => TelemetryType::Logs, // Default to Logs if not recognized
259+
};
237260

238-
if let Some(log_sources) = stream_metadata
239-
.get_mut("log_source")
240-
.and_then(|v| v.as_array_mut())
241-
{
242-
for source in log_sources.iter_mut() {
243-
if let Some(format_value) = source.get_mut("log_source_format") {
244-
if let Some(format_str) = format_value.as_str() {
245-
if let Some(new_format) = format_mapping.get(format_str) {
246-
*format_value = json!(new_format);
247-
}
248-
}
249-
}
250-
}
261+
// add telemetry_type if not present
262+
if !stream_metadata_map.contains_key("telemetry_type") {
263+
stream_metadata_map.insert(
264+
"telemetry_type".to_owned(),
265+
Value::String(telemetry_type.to_string()),
266+
);
251267
}
252268

253269
stream_metadata
254270
}
255271

256-
fn create_format_mapping() -> HashMap<&'static str, &'static str> {
257-
HashMap::from([
258-
("Kinesis", "kinesis"),
259-
("OtelLogs", "otel-logs"),
260-
("OtelTraces", "otel-traces"),
261-
("OtelMetrics", "otel-metrics"),
262-
("Pmeta", "pmeta"),
263-
("Json", "json"),
264-
])
265-
}
266-
267272
fn v1_v2_snapshot_migration(mut snapshot: Value) -> Value {
268273
let manifest_list = snapshot.get("manifest_list").unwrap();
269274
let mut new_manifest_list = Vec::new();
@@ -331,12 +336,4 @@ mod tests {
331336
let updated_stream_metadata = super::v5_v6(stream_metadata.clone());
332337
assert_eq!(updated_stream_metadata, expected);
333338
}
334-
335-
#[test]
336-
fn test_rename_log_source_v6() {
337-
let stream_metadata = serde_json::json!({"version":"v6","schema_version":"v1","objectstore-format":"v6","created-at":"2025-03-25T02:37:00.664625075+00:00","first-event-at":"2025-03-24T22:37:00.665-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":94,"ingestion":146530,"storage":29248},"current_stats":{"events":94,"ingestion":146530,"storage":29248},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test11/date=2025-03-25/manifest.json","time_lower_bound":"2025-03-25T00:00:00Z","time_upper_bound":"2025-03-25T23:59:59.999999999Z","events_ingested":94,"ingestion_size":146530,"storage_size":29248}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"OtelLogs","fields":["span_id","trace_id","time_unix_nano","severity_text","severity_number","body"]},{"log_source_format":"OtelTraces","fields":["span_status_code","flags","span_parent_span_id","span_trace_id","span_status_message","event_name","span_span_id","span_name","span_kind_description","event_time_unix_nano","span_end_time_unix_nano","span_status_description","span_start_time_unix_nano","span_kind","name"]},{"log_source_format":"OtelMetrics","fields":["metric_unit","start_time_unix_nano","time_unix_nano","metric_name","metric_description"]}]});
338-
let expected = serde_json::json!({"version":"v6","schema_version":"v1","objectstore-format":"v6","created-at":"2025-03-25T02:37:00.664625075+00:00","first-event-at":"2025-03-24T22:37:00.665-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":94,"ingestion":146530,"storage":29248},"current_stats":{"events":94,"ingestion":146530,"storage":29248},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test11/date=2025-03-25/manifest.json","time_lower_bound":"2025-03-25T00:00:00Z","time_upper_bound":"2025-03-25T23:59:59.999999999Z","events_ingested":94,"ingestion_size":146530,"storage_size":29248}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"otel-logs","fields":["span_id","trace_id","time_unix_nano","severity_text","severity_number","body"]},{"log_source_format":"otel-traces","fields":["span_status_code","flags","span_parent_span_id","span_trace_id","span_status_message","event_name","span_span_id","span_name","span_kind_description","event_time_unix_nano","span_end_time_unix_nano","span_status_description","span_start_time_unix_nano","span_kind","name"]},{"log_source_format":"otel-metrics","fields":["metric_unit","start_time_unix_nano","time_unix_nano","metric_name","metric_description"]}]});
339-
let updated_stream_metadata = super::rename_log_source_v6(stream_metadata.clone());
340-
assert_eq!(updated_stream_metadata, expected);
341-
}
342339
}

0 commit comments

Comments
 (0)