diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 0d0a68971..0d4bbd695 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -16,24 +16,24 @@ * */ -use async_trait::async_trait; -use futures_util::StreamExt; -use rdkafka::consumer::{CommitMode, Consumer}; -use serde_json::Value; -use std::collections::HashMap; -use std::sync::Arc; -use tokio_stream::wrappers::ReceiverStream; -use tracing::{debug, error}; - use crate::{ connectors::common::processor::Processor, event::{ Event as ParseableEvent, USER_AGENT_KEY, format::{EventFormat, LogSourceEntry, json}, }, + handlers::TelemetryType, parseable::PARSEABLE, storage::StreamType, }; +use async_trait::async_trait; +use futures_util::StreamExt; +use rdkafka::consumer::{CommitMode, Consumer}; +use serde_json::Value; +use std::collections::HashMap; +use std::sync::Arc; +use tokio_stream::wrappers::ReceiverStream; +use tracing::{debug, error}; use super::{ConsumerRecord, StreamConsumer, TopicPartition, config::BufferConfig}; @@ -50,13 +50,13 @@ impl ParseableSinkProcessor { .map(|r| r.topic.as_str()) .unwrap_or_default(); let log_source_entry = LogSourceEntry::default(); - PARSEABLE .create_stream_if_not_exists( stream_name, StreamType::UserDefined, None, vec![log_source_entry], + TelemetryType::default(), ) .await?; diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 5c326c308..09ad09fbd 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -29,7 +29,9 @@ use crate::event::error::EventError; use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST}; use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry}; use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY}; -use crate::handlers::{EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; +use crate::handlers::{ + EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType, +}; use crate::metadata::SchemaVersion; use crate::option::Mode; use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST; @@ -68,6 +70,12 @@ pub async fn ingest( .and_then(|h| h.to_str().ok()) .map_or(LogSource::default(), LogSource::from); + let telemetry_type = req + .headers() + .get(TELEMETRY_TYPE_KEY) + .and_then(|h| h.to_str().ok()) + .map_or(TelemetryType::default(), TelemetryType::from); + let extract_log = req .headers() .get(EXTRACT_LOG_KEY) @@ -102,6 +110,7 @@ pub async fn ingest( StreamType::UserDefined, None, vec![log_source_entry.clone()], + telemetry_type, ) .await?; @@ -186,6 +195,7 @@ pub async fn handle_otel_logs_ingestion( StreamType::UserDefined, None, vec![log_source_entry.clone()], + TelemetryType::Logs, ) .await?; @@ -252,6 +262,7 @@ pub async fn handle_otel_metrics_ingestion( StreamType::UserDefined, None, vec![log_source_entry.clone()], + TelemetryType::Metrics, ) .await?; @@ -318,6 +329,7 @@ pub async fn handle_otel_traces_ingestion( StreamType::UserDefined, None, vec![log_source_entry.clone()], + TelemetryType::Traces, ) .await?; diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index eb1f6911f..ab68fce41 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -188,7 +188,6 @@ pub async fn put_stream( body: Bytes, ) -> Result { let stream_name = stream_name.into_inner(); - PARSEABLE .create_update_stream(req.headers(), &body, &stream_name) .await?; @@ -377,6 +376,7 @@ pub async fn get_stream_info(stream_name: Path) -> Result for PutStreamHeaders { @@ -65,6 +67,10 @@ impl From<&HeaderMap> for PutStreamHeaders { log_source: headers .get(LOG_SOURCE_KEY) .map_or(LogSource::default(), |v| v.to_str().unwrap().into()), + telemetry_type: headers + .get(TELEMETRY_TYPE_KEY) + .and_then(|v| v.to_str().ok()) + .map_or(TelemetryType::Logs, TelemetryType::from), } } } diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs index 48b6c9a72..c1e9cbd40 100644 --- a/src/handlers/mod.rs +++ b/src/handlers/mod.rs @@ -16,6 +16,10 @@ * */ +use std::fmt::Display; + +use serde::{Deserialize, Serialize}; + pub mod airplane; pub mod http; pub mod livetail; @@ -30,6 +34,7 @@ const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag"; const AUTHORIZATION_KEY: &str = "authorization"; const UPDATE_STREAM_KEY: &str = "x-p-update-stream"; pub const STREAM_TYPE_KEY: &str = "x-p-stream-type"; +pub const TELEMETRY_TYPE_KEY: &str = "x-p-telemetry-type"; const COOKIE_AGE_DAYS: usize = 7; const SESSION_COOKIE_NAME: &str = "session"; const USER_COOKIE_NAME: &str = "username"; @@ -39,3 +44,36 @@ const LOG_SOURCE_KINESIS: &str = "kinesis"; // AWS Kinesis constants const KINESIS_COMMON_ATTRIBUTES_KEY: &str = "x-amz-firehose-common-attributes"; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum TelemetryType { + #[default] + Logs, + Metrics, + Traces, + Events, +} + +impl From<&str> for TelemetryType { + fn from(s: &str) -> Self { + match s.to_lowercase().as_str() { + "logs" => TelemetryType::Logs, + "metrics" => TelemetryType::Metrics, + "traces" => TelemetryType::Traces, + "events" => TelemetryType::Events, + _ => TelemetryType::Logs, + } + } +} + +impl Display for TelemetryType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + TelemetryType::Logs => "logs", + TelemetryType::Metrics => "metrics", + TelemetryType::Traces => "traces", + TelemetryType::Events => "events", + }) + } +} diff --git a/src/metadata.rs b/src/metadata.rs index 1b150f5c9..9b706993a 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use crate::catalog::snapshot::ManifestItem; use crate::event::format::LogSourceEntry; +use crate::handlers::TelemetryType; use crate::metrics::{ EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, @@ -88,6 +89,7 @@ pub struct LogStreamMetadata { pub hot_tier_enabled: bool, pub stream_type: StreamType, pub log_source: Vec, + pub telemetry_type: TelemetryType, } impl LogStreamMetadata { @@ -102,6 +104,7 @@ impl LogStreamMetadata { stream_type: StreamType, schema_version: SchemaVersion, log_source: Vec, + telemetry_type: TelemetryType, ) -> Self { LogStreamMetadata { created_at: if created_at.is_empty() { @@ -125,6 +128,7 @@ impl LogStreamMetadata { stream_type, schema_version, log_source, + telemetry_type, ..Default::default() } } diff --git a/src/migration/mod.rs b/src/migration/mod.rs index a77cf5802..9f3b7b4dc 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -276,6 +276,7 @@ async fn migrate_stream_metadata( stream_metadata_value = stream_metadata_migration::v1_v4(stream_metadata_value); stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream); stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value); + stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value); storage .put_object(&path, to_bytes(&stream_metadata_value)) @@ -290,6 +291,7 @@ async fn migrate_stream_metadata( stream_metadata_value = stream_metadata_migration::v2_v4(stream_metadata_value); stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream); stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value); + stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value); storage .put_object(&path, to_bytes(&stream_metadata_value)) @@ -304,6 +306,7 @@ async fn migrate_stream_metadata( stream_metadata_value = stream_metadata_migration::v3_v4(stream_metadata_value); stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream); stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value); + stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value); storage .put_object(&path, to_bytes(&stream_metadata_value)) @@ -312,6 +315,7 @@ async fn migrate_stream_metadata( Some("v4") => { stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream); stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value); + stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value); storage .put_object(&path, to_bytes(&stream_metadata_value)) @@ -319,17 +323,21 @@ async fn migrate_stream_metadata( } Some("v5") => { stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value); + stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value); storage .put_object(&path, to_bytes(&stream_metadata_value)) .await?; } - _ => { - stream_metadata_value = - stream_metadata_migration::rename_log_source_v6(stream_metadata_value); + Some("v6") => { + stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value); storage .put_object(&path, to_bytes(&stream_metadata_value)) .await?; } + _ => { + // If the version is not recognized, we assume it's already in the latest format + return Ok(stream_metadata_value); + } } Ok(stream_metadata_value) @@ -354,6 +362,7 @@ async fn setup_logstream_metadata( hot_tier_enabled, stream_type, log_source, + telemetry_type, .. } = serde_json::from_value(stream_metadata_value).unwrap_or_default(); @@ -387,6 +396,7 @@ async fn setup_logstream_metadata( hot_tier_enabled, stream_type, log_source, + telemetry_type, }; Ok(metadata) diff --git a/src/migration/stream_metadata_migration.rs b/src/migration/stream_metadata_migration.rs index e3ddaac62..d75a81114 100644 --- a/src/migration/stream_metadata_migration.rs +++ b/src/migration/stream_metadata_migration.rs @@ -17,14 +17,12 @@ * */ -use std::collections::HashMap; - -use serde_json::{Value, json}; - use crate::{ - catalog::snapshot::CURRENT_SNAPSHOT_VERSION, handlers::http::cluster::INTERNAL_STREAM_NAME, + catalog::snapshot::CURRENT_SNAPSHOT_VERSION, + handlers::{TelemetryType, http::cluster::INTERNAL_STREAM_NAME}, storage, }; +use serde_json::{Value, json}; pub fn v1_v4(mut stream_metadata: Value) -> Value { let stream_metadata_map = stream_metadata.as_object_mut().unwrap(); @@ -232,38 +230,45 @@ fn default_log_source_entry() -> Value { }) } -pub fn rename_log_source_v6(mut stream_metadata: Value) -> Value { - let format_mapping = create_format_mapping(); - - if let Some(log_sources) = stream_metadata - .get_mut("log_source") - .and_then(|v| v.as_array_mut()) - { - for source in log_sources.iter_mut() { - if let Some(format_value) = source.get_mut("log_source_format") { - if let Some(format_str) = format_value.as_str() { - if let Some(new_format) = format_mapping.get(format_str) { - *format_value = json!(new_format); - } - } - } - } +pub fn v6_v7(mut stream_metadata: Value) -> Value { + let stream_metadata_map = stream_metadata.as_object_mut().unwrap(); + stream_metadata_map.insert( + "objectstore-format".to_owned(), + Value::String(storage::CURRENT_OBJECT_STORE_VERSION.into()), + ); + stream_metadata_map.insert( + "version".to_owned(), + Value::String(storage::CURRENT_SCHEMA_VERSION.into()), + ); + + // fetch log_source, if log_source=otel-traces, telemetry_type=traces + // if log_source=otel-metrics, telemetry_type=metrics + // else telemetry_type=logs + let log_source = stream_metadata_map + .get("log_source") + .and_then(|v| v.as_array()) + .and_then(|arr| arr.first()) + .and_then(|v| v.get("log_source_format")) + .and_then(|v| v.as_str()) + .unwrap_or("json"); + let telemetry_type = match log_source { + "otel-logs" => TelemetryType::Logs, + "otel-traces" => TelemetryType::Traces, + "otel-metrics" => TelemetryType::Metrics, + _ => TelemetryType::Logs, // Default to Logs if not recognized + }; + + // add telemetry_type if not present + if !stream_metadata_map.contains_key("telemetry_type") { + stream_metadata_map.insert( + "telemetry_type".to_owned(), + Value::String(telemetry_type.to_string()), + ); } stream_metadata } -fn create_format_mapping() -> HashMap<&'static str, &'static str> { - HashMap::from([ - ("Kinesis", "kinesis"), - ("OtelLogs", "otel-logs"), - ("OtelTraces", "otel-traces"), - ("OtelMetrics", "otel-metrics"), - ("Pmeta", "pmeta"), - ("Json", "json"), - ]) -} - fn v1_v2_snapshot_migration(mut snapshot: Value) -> Value { let manifest_list = snapshot.get("manifest_list").unwrap(); let mut new_manifest_list = Vec::new(); @@ -295,7 +300,7 @@ mod tests { #[test] fn test_v5_v6_with_log_source() { let stream_metadata = serde_json::json!({"version":"v5","schema_version":"v0","objectstore-format":"v5","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":"OtelLogs"}); - let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"otel-logs","fields":[]}]}); + let expected = serde_json::json!({"version":"v7","schema_version":"v0","objectstore-format":"v7","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"otel-logs","fields":[]}]}); let updated_stream_metadata = super::v5_v6(stream_metadata.clone()); assert_eq!(updated_stream_metadata, expected); } @@ -303,7 +308,7 @@ mod tests { #[test] fn test_v5_v6_with_default_log_source() { let stream_metadata = serde_json::json!({"version":"v5","schema_version":"v0","objectstore-format":"v5","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":"Json"}); - let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"json","fields":[]}]}); + let expected = serde_json::json!({"version":"v7","schema_version":"v0","objectstore-format":"v7","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"json","fields":[]}]}); let updated_stream_metadata = super::v5_v6(stream_metadata.clone()); assert_eq!(updated_stream_metadata, expected); } @@ -311,7 +316,7 @@ mod tests { #[test] fn test_v5_v6_without_log_source() { let stream_metadata = serde_json::json!({"version":"v4","schema_version":"v0","objectstore-format":"v4","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined"}); - let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"json","fields":[]}]}); + let expected = serde_json::json!({"version":"v7","schema_version":"v0","objectstore-format":"v7","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"json","fields":[]}]}); let updated_stream_metadata = super::v5_v6(stream_metadata.clone()); assert_eq!(updated_stream_metadata, expected); } @@ -319,7 +324,7 @@ mod tests { #[test] fn test_v5_v6_unknown_log_source() { let stream_metadata = serde_json::json!({"version":"v5","schema_version":"v0","objectstore-format":"v5","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":"Invalid"}); - let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"json","fields":[]}]}); + let expected = serde_json::json!({"version":"v7","schema_version":"v0","objectstore-format":"v7","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"json","fields":[]}]}); let updated_stream_metadata = super::v5_v6(stream_metadata.clone()); assert_eq!(updated_stream_metadata, expected); } @@ -327,16 +332,72 @@ mod tests { #[test] fn test_v5_v6_invalid_log_source() { let stream_metadata = serde_json::json!({"version":"v5","schema_version":"v0","objectstore-format":"v5","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":{"log_source": "Invalid"}}); - let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"json","fields":[]}]}); + let expected = serde_json::json!({"version":"v7","schema_version":"v0","objectstore-format":"v7","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"json","fields":[]}]}); let updated_stream_metadata = super::v5_v6(stream_metadata.clone()); assert_eq!(updated_stream_metadata, expected); } #[test] - fn test_rename_log_source_v6() { - let stream_metadata = serde_json::json!({"version":"v6","schema_version":"v1","objectstore-format":"v6","created-at":"2025-03-25T02:37:00.664625075+00:00","first-event-at":"2025-03-24T22:37:00.665-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":94,"ingestion":146530,"storage":29248},"current_stats":{"events":94,"ingestion":146530,"storage":29248},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test11/date=2025-03-25/manifest.json","time_lower_bound":"2025-03-25T00:00:00Z","time_upper_bound":"2025-03-25T23:59:59.999999999Z","events_ingested":94,"ingestion_size":146530,"storage_size":29248}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"OtelLogs","fields":["span_id","trace_id","time_unix_nano","severity_text","severity_number","body"]},{"log_source_format":"OtelTraces","fields":["span_status_code","flags","span_parent_span_id","span_trace_id","span_status_message","event_name","span_span_id","span_name","span_kind_description","event_time_unix_nano","span_end_time_unix_nano","span_status_description","span_start_time_unix_nano","span_kind","name"]},{"log_source_format":"OtelMetrics","fields":["metric_unit","start_time_unix_nano","time_unix_nano","metric_name","metric_description"]}]}); - let expected = serde_json::json!({"version":"v6","schema_version":"v1","objectstore-format":"v6","created-at":"2025-03-25T02:37:00.664625075+00:00","first-event-at":"2025-03-24T22:37:00.665-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":94,"ingestion":146530,"storage":29248},"current_stats":{"events":94,"ingestion":146530,"storage":29248},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test11/date=2025-03-25/manifest.json","time_lower_bound":"2025-03-25T00:00:00Z","time_upper_bound":"2025-03-25T23:59:59.999999999Z","events_ingested":94,"ingestion_size":146530,"storage_size":29248}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"otel-logs","fields":["span_id","trace_id","time_unix_nano","severity_text","severity_number","body"]},{"log_source_format":"otel-traces","fields":["span_status_code","flags","span_parent_span_id","span_trace_id","span_status_message","event_name","span_span_id","span_name","span_kind_description","event_time_unix_nano","span_end_time_unix_nano","span_status_description","span_start_time_unix_nano","span_kind","name"]},{"log_source_format":"otel-metrics","fields":["metric_unit","start_time_unix_nano","time_unix_nano","metric_name","metric_description"]}]}); - let updated_stream_metadata = super::rename_log_source_v6(stream_metadata.clone()); + fn test_v6_v7_otel_logs() { + let stream_metadata = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"otel-logs","fields":[]}]}); + let expected = serde_json::json!({"version":"v7","schema_version":"v0","objectstore-format":"v7","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"otel-logs","fields":[]}],"telemetry_type":"logs"}); + let updated_stream_metadata = super::v6_v7(stream_metadata.clone()); + assert_eq!(updated_stream_metadata, expected); + } + + #[test] + fn test_v6_v7_otel_traces() { + let stream_metadata = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"otel-traces","fields":[]}]}); + let expected = serde_json::json!({"version":"v7","schema_version":"v0","objectstore-format":"v7","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"otel-traces","fields":[]}],"telemetry_type":"traces"}); + let updated_stream_metadata = super::v6_v7(stream_metadata.clone()); + assert_eq!(updated_stream_metadata, expected); + } + + #[test] + fn test_v6_v7_otel_metrics() { + let stream_metadata = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"otel-metrics","fields":[]}]}); + let expected = serde_json::json!({"version":"v7","schema_version":"v0","objectstore-format":"v7","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"otel-metrics","fields":[]}],"telemetry_type":"metrics"}); + let updated_stream_metadata = super::v6_v7(stream_metadata.clone()); + assert_eq!(updated_stream_metadata, expected); + } + + #[test] + fn test_v6_v7_json_defaults_to_logs() { + let stream_metadata = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"json","fields":[]}]}); + let expected = serde_json::json!({"version":"v7","schema_version":"v0","objectstore-format":"v7","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"json","fields":[]}],"telemetry_type":"logs"}); + let updated_stream_metadata = super::v6_v7(stream_metadata.clone()); + assert_eq!(updated_stream_metadata, expected); + } + + #[test] + fn test_v6_v7_kinesis_defaults_to_logs() { + let stream_metadata = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"kinesis","fields":[]}]}); + let expected = serde_json::json!({"version":"v7","schema_version":"v0","objectstore-format":"v7","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"kinesis","fields":[]}],"telemetry_type":"logs"}); + let updated_stream_metadata = super::v6_v7(stream_metadata.clone()); + assert_eq!(updated_stream_metadata, expected); + } + + #[test] + fn test_v6_v7_existing_telemetry_type_not_overwritten() { + let stream_metadata = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"otel-traces","fields":[]}],"telemetry_type":"CustomType"}); + let expected = serde_json::json!({"version":"v7","schema_version":"v0","objectstore-format":"v7","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"otel-traces","fields":[]}],"telemetry_type":"CustomType"}); + let updated_stream_metadata = super::v6_v7(stream_metadata.clone()); + assert_eq!(updated_stream_metadata, expected); + } + + #[test] + fn test_v6_v7_missing_log_source_defaults_to_logs() { + let stream_metadata = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined"}); + let expected = serde_json::json!({"version":"v7","schema_version":"v0","objectstore-format":"v7","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","telemetry_type":"logs"}); + let updated_stream_metadata = super::v6_v7(stream_metadata.clone()); + assert_eq!(updated_stream_metadata, expected); + } + + #[test] + fn test_v6_v7_empty_log_source_array_defaults_to_logs() { + let stream_metadata = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[]}); + let expected = serde_json::json!({"version":"v7","schema_version":"v0","objectstore-format":"v7","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[],"telemetry_type":"logs"}); + let updated_stream_metadata = super::v6_v7(stream_metadata.clone()); assert_eq!(updated_stream_metadata, expected); } } diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 92c00582a..6ec3ae542 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -47,7 +47,7 @@ use crate::{ format::{LogSource, LogSourceEntry}, }, handlers::{ - STREAM_TYPE_KEY, + STREAM_TYPE_KEY, TelemetryType, http::{ cluster::{INTERNAL_STREAM_NAME, sync_streams_with_ingestors}, ingest::PostError, @@ -322,6 +322,7 @@ impl Parseable { let stream_type = stream_metadata.stream_type; let schema_version = stream_metadata.schema_version; let log_source = stream_metadata.log_source; + let telemetry_type = stream_metadata.telemetry_type; let metadata = LogStreamMetadata::new( created_at, time_partition, @@ -332,6 +333,7 @@ impl Parseable { stream_type, schema_version, log_source, + telemetry_type, ); let ingestor_id = INGESTOR_META .get() @@ -359,6 +361,7 @@ impl Parseable { StreamType::Internal, None, vec![log_source_entry], + TelemetryType::Logs, ) .await { @@ -384,6 +387,7 @@ impl Parseable { stream_type: StreamType, custom_partition: Option<&String>, log_source: Vec, + telemetry_type: TelemetryType, ) -> Result { if self.streams.contains(stream_name) { return Ok(true); @@ -414,6 +418,7 @@ impl Parseable { Arc::new(Schema::empty()), stream_type, log_source, + telemetry_type, ) .await?; @@ -485,6 +490,7 @@ impl Parseable { update_stream_flag, stream_type, log_source, + telemetry_type, } = headers.into(); let stream_in_memory_dont_update = @@ -547,6 +553,7 @@ impl Parseable { schema, stream_type, vec![log_source_entry], + telemetry_type, ) .await?; @@ -603,6 +610,7 @@ impl Parseable { schema: Arc, stream_type: StreamType, log_source: Vec, + telemetry_type: TelemetryType, ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name if stream_type != StreamType::Internal { @@ -625,6 +633,7 @@ impl Parseable { group: PARSEABLE.options.username.clone(), }, log_source: log_source.clone(), + telemetry_type, ..Default::default() }; @@ -653,6 +662,7 @@ impl Parseable { stream_type, SchemaVersion::V1, // New stream log_source, + telemetry_type, ); let ingestor_id = INGESTOR_META .get() diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs index e7c6eaaac..81a2342f2 100644 --- a/src/prism/home/mod.rs +++ b/src/prism/home/mod.rs @@ -29,8 +29,10 @@ use tracing::error; use crate::{ alerts::{ALERTS, AlertError, AlertsSummary, get_alerts_summary}, correlation::{CORRELATIONS, CorrelationError}, - event::format::LogSource, - handlers::http::{cluster::fetch_daily_stats, logstream::error::StreamError}, + handlers::{ + TelemetryType, + http::{cluster::fetch_daily_stats, logstream::error::StreamError}, + }, parseable::PARSEABLE, rbac::{Users, map::SessionKey, role::Action}, stats::Stats, @@ -38,7 +40,8 @@ use crate::{ users::{dashboards::DASHBOARDS, filters::FILTERS}, }; -type StreamMetadataResponse = Result<(String, Vec, DataSetType), PrismHomeError>; +type StreamMetadataResponse = + Result<(String, Vec, TelemetryType), PrismHomeError>; #[derive(Debug, Serialize, Default)] pub struct DatedStats { @@ -48,17 +51,10 @@ pub struct DatedStats { storage: u64, } -#[derive(Debug, Serialize)] -enum DataSetType { - Logs, - Metrics, - Traces, -} - #[derive(Debug, Serialize)] pub struct DataSet { title: String, - dataset_type: DataSetType, + dataset_type: TelemetryType, } #[derive(Debug, Serialize)] @@ -208,7 +204,7 @@ fn get_top_5_streams_by_ingestion( async fn get_stream_metadata( stream: String, -) -> Result<(String, Vec, DataSetType), PrismHomeError> { +) -> Result<(String, Vec, TelemetryType), PrismHomeError> { let path = RelativePathBuf::from_iter([&stream, STREAM_ROOT_DIRECTORY]); let obs = PARSEABLE .storage @@ -237,18 +233,7 @@ async fn get_stream_metadata( ))); } - // let log_source = &stream_jsons[0].clone().log_source; - let log_source_format = stream_jsons - .iter() - .find(|sj| !sj.log_source.is_empty()) - .map(|sj| sj.log_source[0].log_source_format.clone()) - .unwrap_or_default(); - - let dataset_type = match log_source_format { - LogSource::OtelMetrics => DataSetType::Metrics, - LogSource::OtelTraces => DataSetType::Traces, - _ => DataSetType::Logs, - }; + let dataset_type = stream_jsons[0].telemetry_type; Ok((stream, stream_jsons, dataset_type)) } diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index 3150b1fdb..56b9aa314 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -187,6 +187,7 @@ async fn get_stream_info_helper(stream_name: &str) -> Result, + #[serde(default)] + pub telemetry_type: TelemetryType, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -149,6 +152,8 @@ pub struct StreamInfo { #[serde(default)] pub stream_type: StreamType, pub log_source: Vec, + #[serde(default)] + pub telemetry_type: TelemetryType, } #[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)] @@ -226,6 +231,7 @@ impl Default for ObjectStoreFormat { static_schema_flag: false, hot_tier_enabled: false, log_source: vec![LogSourceEntry::default()], + telemetry_type: TelemetryType::Logs, } } }