diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/central_blob_decoder.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/central_blob_decoder.rs new file mode 100644 index 000000000..4d5429f07 --- /dev/null +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/central_blob_decoder.rs @@ -0,0 +1,254 @@ +#[cfg(test)] +mod tests { + use std::io::{Cursor, Read}; + + const TERMINATOR: u64 = 0xdeadc0dedeadc0de; + + /// A decoded schema from the CentralBlob + #[derive(Debug, Clone, PartialEq)] + pub struct DecodedSchema { + pub id: u64, + pub md5: [u8; 16], + pub schema_bytes: Vec, + } + + /// A decoded event from the CentralBlob + #[derive(Debug, Clone, PartialEq)] + pub struct DecodedEvent { + pub schema_id: u64, + pub level: u8, + pub event_name: String, + pub row_data: Vec, + } + + impl DecodedEvent { + /// Check if a string value is present in the row data + /// This is the only method actually used in the tests + pub fn contains_string_value(&self, value: &str) -> bool { + let value_bytes = value.as_bytes(); + + // Try different string length encodings that Bond might use + // Bond can use variable-length encoding for strings + + // First try with u32 length prefix (most common) + let length_bytes = (value_bytes.len() as u32).to_le_bytes(); + if let Some(pos) = self + .row_data + .windows(length_bytes.len()) + .position(|window| window == length_bytes) + { + let string_start = pos + length_bytes.len(); + if string_start + value_bytes.len() <= self.row_data.len() { + if &self.row_data[string_start..string_start + value_bytes.len()] == value_bytes + { + return true; + } + } + } + + // Try with u16 length prefix + if value_bytes.len() <= u16::MAX as usize { + let length_bytes = (value_bytes.len() as u16).to_le_bytes(); + if let Some(pos) = self + .row_data + .windows(length_bytes.len()) + .position(|window| window == length_bytes) + { + let string_start = pos + length_bytes.len(); + if string_start + value_bytes.len() <= self.row_data.len() { + if &self.row_data[string_start..string_start + value_bytes.len()] + == value_bytes + { + return true; + } + } + } + } + + // Try with u8 length prefix for short strings + if value_bytes.len() <= u8::MAX as usize { + let length_byte = value_bytes.len() as u8; + if let Some(pos) = self.row_data.iter().position(|&b| b == length_byte) { + let string_start = pos + 1; + if string_start + value_bytes.len() <= self.row_data.len() { + if &self.row_data[string_start..string_start + value_bytes.len()] + == value_bytes + { + return true; + } + } + } + } + + // As a fallback, just check if the string bytes appear anywhere in the data + // This is less precise but more likely to catch the value + self.row_data + .windows(value_bytes.len()) + .any(|window| window == value_bytes) + } + } + + /// The decoded CentralBlob payload + #[derive(Debug, Clone, PartialEq)] + pub struct DecodedCentralBlob { + pub version: u32, + pub format: u32, + pub metadata: String, + pub schemas: Vec, + pub events: Vec, + } + + /// Simple CentralBlob decoder for testing purposes + pub struct CentralBlobDecoder; + + impl CentralBlobDecoder { + /// Decode a CentralBlob from bytes + pub fn decode(data: &[u8]) -> Result { + let mut cursor = Cursor::new(data); + + // Read header + let version = Self::read_u32(&mut cursor)?; + let format = Self::read_u32(&mut cursor)?; + + // Read metadata + let metadata_len = Self::read_u32(&mut cursor)?; + let metadata = Self::read_utf16le_string(&mut cursor, metadata_len as usize)?; + + // Read schemas and events + let mut schemas = Vec::new(); + let mut events = Vec::new(); + + while cursor.position() < data.len() as u64 { + let entity_type = Self::read_u16(&mut cursor)?; + + match entity_type { + 0 => { + // Schema entry + let schema = Self::decode_schema(&mut cursor)?; + schemas.push(schema); + } + 2 => { + // Event entry + let event = Self::decode_event(&mut cursor)?; + events.push(event); + } + _ => return Err(format!("Invalid entity type: {}", entity_type)), + } + } + + Ok(DecodedCentralBlob { + version, + format, + metadata, + schemas, + events, + }) + } + + fn decode_schema(cursor: &mut Cursor<&[u8]>) -> Result { + let id = Self::read_u64(cursor)?; + let mut md5 = [0u8; 16]; + cursor + .read_exact(&mut md5) + .map_err(|_| "Unexpected end of data".to_string())?; + + let schema_len = Self::read_u32(cursor)?; + let mut schema_bytes = vec![0u8; schema_len as usize]; + cursor + .read_exact(&mut schema_bytes) + .map_err(|_| "Unexpected end of data".to_string())?; + + let terminator = Self::read_u64(cursor)?; + if terminator != TERMINATOR { + return Err("Invalid terminator".to_string()); + } + + Ok(DecodedSchema { + id, + md5, + schema_bytes, + }) + } + + fn decode_event(cursor: &mut Cursor<&[u8]>) -> Result { + let schema_id = Self::read_u64(cursor)?; + let level = Self::read_u8(cursor)?; + + let event_name_len = Self::read_u16(cursor)?; + let event_name = Self::read_utf16le_string(cursor, event_name_len as usize)?; + + let row_len = Self::read_u32(cursor)?; + let mut row_data = vec![0u8; row_len as usize]; + cursor + .read_exact(&mut row_data) + .map_err(|_| "Unexpected end of data".to_string())?; + + let terminator = Self::read_u64(cursor)?; + if terminator != TERMINATOR { + return Err("Invalid terminator".to_string()); + } + + Ok(DecodedEvent { + schema_id, + level, + event_name, + row_data, + }) + } + + fn read_u8(cursor: &mut Cursor<&[u8]>) -> Result { + let mut buf = [0u8; 1]; + cursor + .read_exact(&mut buf) + .map_err(|_| "Unexpected end of data".to_string())?; + Ok(buf[0]) + } + + fn read_u16(cursor: &mut Cursor<&[u8]>) -> Result { + let mut buf = [0u8; 2]; + cursor + .read_exact(&mut buf) + .map_err(|_| "Unexpected end of data".to_string())?; + Ok(u16::from_le_bytes(buf)) + } + + fn read_u32(cursor: &mut Cursor<&[u8]>) -> Result { + let mut buf = [0u8; 4]; + cursor + .read_exact(&mut buf) + .map_err(|_| "Unexpected end of data".to_string())?; + Ok(u32::from_le_bytes(buf)) + } + + fn read_u64(cursor: &mut Cursor<&[u8]>) -> Result { + let mut buf = [0u8; 8]; + cursor + .read_exact(&mut buf) + .map_err(|_| "Unexpected end of data".to_string())?; + Ok(u64::from_le_bytes(buf)) + } + + fn read_utf16le_string( + cursor: &mut Cursor<&[u8]>, + byte_len: usize, + ) -> Result { + let mut buf = vec![0u8; byte_len]; + cursor + .read_exact(&mut buf) + .map_err(|_| "Unexpected end of data".to_string())?; + + // Convert UTF-16LE bytes to UTF-16 code units + let mut utf16_chars = Vec::new(); + for chunk in buf.chunks_exact(2) { + let code_unit = u16::from_le_bytes([chunk[0], chunk[1]]); + utf16_chars.push(code_unit); + } + + String::from_utf16(&utf16_chars).map_err(|_| "Invalid UTF-16 data".to_string()) + } + } +} + +// Re-export the test types for use in other test modules +#[cfg(test)] +pub use tests::{CentralBlobDecoder, DecodedCentralBlob}; diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/mod.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/mod.rs index 33c155f8b..daa67b4d6 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/mod.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/mod.rs @@ -3,6 +3,9 @@ pub(crate) mod central_blob; pub(crate) mod lz4_chunked_compression; pub mod otlp_encoder; +#[cfg(test)] +pub(crate) mod central_blob_decoder; + #[cfg(test)] mod tests { use crate::payload_encoder::bond_encoder::{BondDataType, BondEncodedSchema, BondWriter}; diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs index cb1f806c7..8f1245e1a 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs @@ -341,240 +341,415 @@ impl OtlpEncoder { #[cfg(test)] mod tests { use super::*; + use crate::payload_encoder::central_blob_decoder::{CentralBlobDecoder, DecodedCentralBlob}; use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; - #[test] - fn test_encoding() { - let encoder = OtlpEncoder::new(); + const TEST_METADATA: &str = "namespace=testNamespace/eventVersion=Ver1v0"; - let mut log = LogRecord { + /// Helper to create a basic log record with optional customizations + fn create_log_record(event_name: &str, severity: i32) -> LogRecord { + LogRecord { observed_time_unix_nano: 1_700_000_000_000_000_000, - event_name: "test_event".to_string(), - severity_number: 9, + event_name: event_name.to_string(), + severity_number: severity, severity_text: "INFO".to_string(), ..Default::default() - }; + } + } - // Add some attributes + /// Helper to add attributes to a log record + fn add_attribute(log: &mut LogRecord, key: &str, value: Value) { log.attributes.push(KeyValue { - key: "user_id".to_string(), - value: Some(AnyValue { - value: Some(Value::StringValue("user123".to_string())), - }), + key: key.to_string(), + value: Some(AnyValue { value: Some(value) }), }); + } - log.attributes.push(KeyValue { - key: "request_count".to_string(), - value: Some(AnyValue { - value: Some(Value::IntValue(42)), - }), - }); + /// Helper to add trace context to a log record + fn add_trace_context(log: &mut LogRecord, trace_id: Vec, span_id: Vec, flags: u32) { + log.trace_id = trace_id; + log.span_id = span_id; + log.flags = flags; + } - let metadata = "namespace=testNamespace/eventVersion=Ver1v0"; - let result = encoder.encode_log_batch([log].iter(), metadata); + /// Helper to decode and validate basic structure + fn decode_and_validate_structure( + result: &[(String, Vec, usize)], + expected_batches: usize, + ) -> Vec<(String, DecodedCentralBlob)> { + assert_eq!(result.len(), expected_batches); - assert!(!result.is_empty()); + result + .iter() + .map(|(event_name, blob, event_count)| { + let decoded = + CentralBlobDecoder::decode(blob).expect("Blob should decode successfully"); + + // Basic structure validation + assert_eq!(decoded.version, 1); + assert_eq!(decoded.format, 2); + assert_eq!(decoded.metadata, TEST_METADATA); + assert_eq!(decoded.events.len(), *event_count); + assert!(!decoded.schemas.is_empty()); + + (event_name.clone(), decoded) + }) + .collect() } #[test] - fn test_schema_caching() { + fn test_schema_caching_behavior() { let encoder = OtlpEncoder::new(); - let log1 = LogRecord { - observed_time_unix_nano: 1_700_000_000_000_000_000, - severity_number: 9, - ..Default::default() - }; + // Test 1: Same schema should reuse cache + let log1 = create_log_record("test_event", 9); + let log2 = create_log_record("test_event", 10); // Same structure, different values - let mut log2 = LogRecord { - observed_time_unix_nano: 1_700_000_001_000_000_000, - severity_number: 10, - ..Default::default() - }; + encoder.encode_log_batch([log1].iter(), TEST_METADATA); + assert_eq!(encoder.schema_cache_size(), 1); - let metadata = "namespace=test"; + encoder.encode_log_batch([log2].iter(), TEST_METADATA); + assert_eq!(encoder.schema_cache_size(), 1); // No new schema - // First encoding creates schema - let _result1 = encoder.encode_log_batch([log1].iter(), metadata); - assert_eq!(encoder.schema_cache.read().unwrap().len(), 1); + // Test 2: Different schema should create new cache entry + let mut log3 = create_log_record("test_event", 11); + log3.trace_id = vec![1; 16]; // Different structure - // Second encoding with same schema structure reuses schema - let _result2 = encoder.encode_log_batch([log2.clone()].iter(), metadata); - assert_eq!(encoder.schema_cache.read().unwrap().len(), 1); - - // Add trace_id to create different schema - log2.trace_id = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]; - let _result3 = encoder.encode_log_batch([log2].iter(), metadata); - assert_eq!(encoder.schema_cache.read().unwrap().len(), 2); + encoder.encode_log_batch([log3].iter(), TEST_METADATA); + assert_eq!(encoder.schema_cache_size(), 2); // New schema added } #[test] - fn test_single_event_single_schema() { + fn test_event_name_and_batching() { let encoder = OtlpEncoder::new(); - let log = LogRecord { - observed_time_unix_nano: 1_700_000_000_000_000_000, - event_name: "test_event".to_string(), - severity_number: 9, - ..Default::default() - }; + // Test 1: Empty event name defaults to "Log" + let empty_name_log = create_log_record("", 9); + let result = encoder.encode_log_batch([empty_name_log].iter(), TEST_METADATA); + let decoded = decode_and_validate_structure(&result, 1); + assert_eq!(decoded[0].0, "Log"); + + // Test 2: Different event names create separate batches + let log1 = create_log_record("login", 9); + let log2 = create_log_record("logout", 10); + let result = encoder.encode_log_batch([log1, log2].iter(), TEST_METADATA); + let decoded = decode_and_validate_structure(&result, 2); + + let event_names: Vec<&String> = decoded.iter().map(|(name, _)| name).collect(); + assert!(event_names.contains(&&"login".to_string())); + assert!(event_names.contains(&&"logout".to_string())); + + // Test 3: Same event name with different schemas batched together + let log3 = create_log_record("user_action", 9); + let mut log4 = create_log_record("user_action", 10); + log4.trace_id = vec![1; 16]; // Different schema - let result = encoder.encode_log_batch([log].iter(), "test"); + let result = encoder.encode_log_batch([log3, log4].iter(), TEST_METADATA); + let decoded = decode_and_validate_structure(&result, 1); - assert_eq!(result.len(), 1); - assert_eq!(result[0].0, "test_event"); - assert_eq!(result[0].2, 1); // events_count + assert_eq!(decoded[0].0, "user_action"); + assert_eq!(decoded[0].1.events.len(), 2); + assert_eq!(decoded[0].1.schemas.len(), 2); // Different schemas in same batch } #[test] - fn test_same_event_name_multiple_schemas() { + fn test_comprehensive_field_encoding() { let encoder = OtlpEncoder::new(); - // Schema 1: Basic log - let log1 = LogRecord { - event_name: "user_action".to_string(), + // Create log with all possible field types + let mut comprehensive_log = LogRecord { + observed_time_unix_nano: 1_700_000_123_456_789_000, + event_name: "comprehensive_test".to_string(), severity_number: 9, + severity_text: "INFO".to_string(), + body: Some(AnyValue { + value: Some(Value::StringValue("Log body content".to_string())), + }), ..Default::default() }; - // Schema 2: With trace_id - let mut log2 = LogRecord { - event_name: "user_action".to_string(), - severity_number: 10, - ..Default::default() - }; - log2.trace_id = vec![1; 16]; - - // Schema 3: With attributes - let mut log3 = LogRecord { - event_name: "user_action".to_string(), - severity_number: 11, - ..Default::default() - }; - log3.attributes.push(KeyValue { - key: "user_id".to_string(), - value: Some(AnyValue { - value: Some(Value::StringValue("user123".to_string())), - }), - }); - - let result = encoder.encode_log_batch([log1, log2, log3].iter(), "test"); + // Add trace context + add_trace_context( + &mut comprehensive_log, + vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16], + vec![1, 2, 3, 4, 5, 6, 7, 8], + 1, + ); + + // Add all attribute types + add_attribute( + &mut comprehensive_log, + "string_attr", + Value::StringValue("test_value".to_string()), + ); + add_attribute(&mut comprehensive_log, "int_attr", Value::IntValue(42)); + add_attribute( + &mut comprehensive_log, + "double_attr", + Value::DoubleValue(3.14159), + ); + add_attribute(&mut comprehensive_log, "bool_attr", Value::BoolValue(true)); + + let result = encoder.encode_log_batch([comprehensive_log].iter(), TEST_METADATA); + let decoded = decode_and_validate_structure(&result, 1); + let event = &decoded[0].1.events[0]; + + // Validate string values are present in the encoded data + assert!( + event.contains_string_value("comprehensive_test"), + "Should contain event name" + ); + assert!( + event.contains_string_value("INFO"), + "Should contain severity text" + ); + assert!( + event.contains_string_value("test_value"), + "Should contain string attribute" + ); + assert!( + event.contains_string_value("TestEnv"), + "Should contain env_name" + ); + assert!(event.contains_string_value("4.0"), "Should contain env_ver"); + assert!( + event.contains_string_value("Log body content"), + "Should contain body content" + ); + assert!( + event.contains_string_value("0102030405060708090a0b0c0d0e0f10"), + "Should contain trace ID" + ); + assert!( + event.contains_string_value("0102030405060708"), + "Should contain span ID" + ); + + // Validate that the log has the expected event name + assert_eq!(event.event_name, "comprehensive_test"); + assert_eq!(event.level, 9); + assert!(!event.row_data.is_empty()); + } - // All should be in one batch with same event_name - assert_eq!(result.len(), 1); - assert_eq!(result[0].0, "user_action"); - assert_eq!(result[0].2, 3); // events_count + #[test] + fn test_field_ordering_consistency() { + let encoder = OtlpEncoder::new(); - // Should have 3 different schemas cached - assert_eq!(encoder.schema_cache.read().unwrap().len(), 3); + // Test that attribute order doesn't affect schema ID (fields are sorted) + let mut log1 = create_log_record("ordering_test", 9); + add_attribute( + &mut log1, + "attr_z", + Value::StringValue("value_z".to_string()), + ); + add_attribute( + &mut log1, + "attr_a", + Value::StringValue("value_a".to_string()), + ); + + let mut log2 = create_log_record("ordering_test", 10); + add_attribute( + &mut log2, + "attr_a", + Value::StringValue("value_a".to_string()), + ); + add_attribute( + &mut log2, + "attr_z", + Value::StringValue("value_z".to_string()), + ); + + let result1 = encoder.encode_log_batch([log1].iter(), TEST_METADATA); + let result2 = encoder.encode_log_batch([log2].iter(), TEST_METADATA); + + let decoded1 = decode_and_validate_structure(&result1, 1); + let decoded2 = decode_and_validate_structure(&result2, 1); + + // Should have same schema ID despite different attribute order + assert_eq!(decoded1[0].1.schemas[0].id, decoded2[0].1.schemas[0].id); } #[test] - fn test_different_event_names() { + fn test_multiple_schemas_per_batch() { let encoder = OtlpEncoder::new(); - let log1 = LogRecord { - event_name: "login".to_string(), - severity_number: 9, - ..Default::default() - }; - - let log2 = LogRecord { - event_name: "logout".to_string(), - severity_number: 10, - ..Default::default() - }; + // Create logs with same event name but different schemas + let base_log = create_log_record("mixed_batch", 5); - let result = encoder.encode_log_batch([log1, log2].iter(), "test"); + let mut trace_log = create_log_record("mixed_batch", 6); + add_trace_context(&mut trace_log, vec![1; 16], vec![1; 8], 1); - // Should create 2 separate batches - assert_eq!(result.len(), 2); + let mut attr_log = create_log_record("mixed_batch", 7); + add_attribute( + &mut attr_log, + "custom_attr", + Value::StringValue("value".to_string()), + ); - let event_names: Vec<&String> = result.iter().map(|(name, _, _)| name).collect(); - assert!(event_names.contains(&&"login".to_string())); - assert!(event_names.contains(&&"logout".to_string())); + let mut full_log = create_log_record("mixed_batch", 8); + add_trace_context(&mut full_log, vec![2; 16], vec![2; 8], 2); + add_attribute(&mut full_log, "another_attr", Value::IntValue(100)); - // Each batch should have 1 event - assert!(result.iter().all(|(_, _, count)| *count == 1)); - } + let result = encoder.encode_log_batch( + [base_log, trace_log, attr_log, full_log].iter(), + TEST_METADATA, + ); - #[test] - fn test_empty_event_name_defaults_to_log() { - let encoder = OtlpEncoder::new(); + let decoded = decode_and_validate_structure(&result, 1); + let batch = &decoded[0].1; - let log = LogRecord { - event_name: "".to_string(), - severity_number: 9, - ..Default::default() - }; + // Verify batch structure + assert_eq!(batch.events.len(), 4); + assert_eq!(batch.schemas.len(), 4); // Each log has different schema - let result = encoder.encode_log_batch([log].iter(), "test"); + // Verify each event references a valid schema + for event in &batch.events { + assert!(batch.schemas.iter().any(|s| s.id == event.schema_id)); + assert_eq!(event.event_name, "mixed_batch"); + } - assert_eq!(result.len(), 1); - assert_eq!(result[0].0, "Log"); // Should default to "Log" - assert_eq!(result[0].2, 1); + // Verify schema uniqueness + let mut schema_ids: Vec = batch.schemas.iter().map(|s| s.id).collect(); + schema_ids.sort(); + schema_ids.dedup(); + assert_eq!(schema_ids.len(), batch.schemas.len()); } #[test] - fn test_mixed_scenario() { + fn test_minimal_vs_maximal_logs() { let encoder = OtlpEncoder::new(); - // event_name1 with schema1 - let log1 = LogRecord { - event_name: "user_action".to_string(), - severity_number: 9, - ..Default::default() - }; - - // event_name1 with schema2 (different schema, same event) - let mut log2 = LogRecord { - event_name: "user_action".to_string(), - severity_number: 10, - ..Default::default() - }; - log2.trace_id = vec![1; 16]; - - // event_name2 with schema3 - let log3 = LogRecord { - event_name: "system_alert".to_string(), - severity_number: 11, - ..Default::default() - }; + // Minimal log (only required fields) + let minimal = create_log_record("minimal", 5); - // empty event_name (defaults to "Log") with schema4 - let mut log4 = LogRecord { - event_name: "".to_string(), + // Maximal log (all possible fields) + let mut maximal = LogRecord { + observed_time_unix_nano: 1_700_000_000_000_000_000, + event_name: "maximal".to_string(), severity_number: 12, + severity_text: "ERROR".to_string(), + trace_id: vec![1; 16], + span_id: vec![1; 8], + flags: 3, + body: Some(AnyValue { + value: Some(Value::StringValue("Error message".to_string())), + }), ..Default::default() }; - log4.attributes.push(KeyValue { - key: "error_code".to_string(), - value: Some(AnyValue { - value: Some(Value::IntValue(404)), - }), - }); - let result = encoder.encode_log_batch([log1, log2, log3, log4].iter(), "test"); + // Add multiple attributes of different types + for (key, value) in [ + ("str", Value::StringValue("string".to_string())), + ("num", Value::IntValue(999)), + ("float", Value::DoubleValue(99.9)), + ("flag", Value::BoolValue(false)), + ] { + add_attribute(&mut maximal, key, value); + } - // Should create 3 batches: "user_action", "system_alert", "Log" - assert_eq!(result.len(), 3); + let result = encoder.encode_log_batch([minimal, maximal].iter(), TEST_METADATA); + let decoded = decode_and_validate_structure(&result, 2); + + // Find each batch + let minimal_batch = decoded.iter().find(|(name, _)| name == "minimal").unwrap(); + let maximal_batch = decoded.iter().find(|(name, _)| name == "maximal").unwrap(); + + // Verify minimal log has basic required fields + let minimal_event = &minimal_batch.1.events[0]; + assert!( + minimal_event.contains_string_value("TestEnv"), + "Should contain env_name" + ); + assert!( + minimal_event.contains_string_value("4.0"), + "Should contain env_ver" + ); + assert!( + minimal_event.contains_string_value("minimal"), + "Should contain event name" + ); + assert!( + minimal_event.contains_string_value("INFO"), + "Should contain severity text" + ); + + // Verify maximal log has all fields + let maximal_event = &maximal_batch.1.events[0]; + assert!( + maximal_event.contains_string_value("TestEnv"), + "Should contain env_name" + ); + assert!( + maximal_event.contains_string_value("4.0"), + "Should contain env_ver" + ); + assert!( + maximal_event.contains_string_value("maximal"), + "Should contain event name" + ); + assert!( + maximal_event.contains_string_value("ERROR"), + "Should contain severity text" + ); + assert!( + maximal_event.contains_string_value("Error message"), + "Should contain body" + ); + assert!( + maximal_event.contains_string_value("string"), + "Should contain string attribute" + ); + // Contains trace context - check for hex patterns that should be present + // The trace ID should be present in some form in the encoded data + assert!( + maximal_event.contains_string_value("0101010101010101"), + "Should contain part of trace/span ID" + ); + + // Schema should be different + assert_ne!(minimal_batch.1.schemas[0].id, maximal_batch.1.schemas[0].id); + } - // Find each batch and verify counts - let user_action = result - .iter() - .find(|(name, _, _)| name == "user_action") - .unwrap(); - let system_alert = result - .iter() - .find(|(name, _, _)| name == "system_alert") - .unwrap(); - let log_batch = result.iter().find(|(name, _, _)| name == "Log").unwrap(); + #[test] + fn test_timestamp_and_id_encoding() { + let encoder = OtlpEncoder::new(); - assert_eq!(user_action.2, 2); // 2 events with different schemas - assert_eq!(system_alert.2, 1); // 1 event - assert_eq!(log_batch.2, 1); // 1 event + let mut log = LogRecord { + observed_time_unix_nano: 1_234_567_890_123_456_789, // Specific timestamp + event_name: "timestamp_test".to_string(), + severity_number: 6, + trace_id: vec![ + 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf0, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, + 0x77, 0x88, + ], + span_id: vec![0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x00, 0x11], + ..Default::default() + }; - // Should have 4 different schemas cached - assert_eq!(encoder.schema_cache.read().unwrap().len(), 4); + let result = encoder.encode_log_batch([log].iter(), TEST_METADATA); + let decoded = decode_and_validate_structure(&result, 1); + let event = &decoded[0].1.events[0]; + + // Validate hex encoding of IDs are present in the encoded data + assert!( + event.contains_string_value("123456789abcdef01122334455667788"), + "Should contain trace ID" + ); + assert!( + event.contains_string_value("aabbccddeeff0011"), + "Should contain span ID" + ); + + // Validate timestamp is properly formatted (contains the expected date) + assert!( + event.contains_string_value("2009-02-13"), + "Should contain formatted date from timestamp" + ); + + // Validate basic structure + assert_eq!(event.event_name, "timestamp_test"); + assert_eq!(event.level, 6); + assert!(!event.row_data.is_empty()); } }