diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/bench.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/bench.rs index 78e18c2b7..fae7ebd61 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/bench.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/bench.rs @@ -178,8 +178,11 @@ mod benchmarks { .collect(); b.iter(|| { - let res = - encoder.encode_log_batch(black_box(logs.iter()), black_box(metadata)); + let res = encoder.encode_log_batch( + black_box(logs.iter()), + black_box("benchmark"), + black_box(metadata), + ); black_box(res); // double sure the return value is generated }); }, @@ -206,9 +209,11 @@ mod benchmarks { .collect(); b.iter(|| { - let res = black_box( - encoder.encode_log_batch(black_box(logs.iter()), black_box(metadata)), - ); + let res = black_box(encoder.encode_log_batch( + black_box(logs.iter()), + black_box("benchmark"), + black_box(metadata), + )); black_box(res); // double sure the return value is generated }); }, @@ -231,9 +236,11 @@ mod benchmarks { .collect(); b.iter(|| { - let res = black_box( - encoder.encode_log_batch(black_box(logs.iter()), black_box(metadata)), - ); + let res = black_box(encoder.encode_log_batch( + black_box(logs.iter()), + black_box("benchmark"), + black_box(metadata), + )); black_box(res); }); }); diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs index 72bf5dd42..9b9619684 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs @@ -32,6 +32,7 @@ pub struct GenevaClient { uploader: Arc, encoder: OtlpEncoder, metadata: String, + namespace: String, max_concurrent_uploads: usize, } @@ -89,6 +90,7 @@ impl GenevaClient { uploader: Arc::new(uploader), encoder: OtlpEncoder::new(), metadata, + namespace: cfg.namespace, max_concurrent_uploads, }) } @@ -101,7 +103,7 @@ impl GenevaClient { .flat_map(|scope_log| scope_log.log_records.iter()); // TODO: Investigate using tokio::spawn_blocking for event encoding to avoid blocking // the async executor thread for CPU-intensive work. - let blobs = self.encoder.encode_log_batch(log_iter, &self.metadata); + let blobs = self.encoder.encode_log_batch(log_iter, &self.namespace, &self.metadata); // create an iterator that yields futures for each upload let upload_futures = blobs.into_iter().map(|batch| { diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs index 5fa816bee..e51ca097f 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs @@ -20,6 +20,8 @@ const FIELD_SEVERITY_NUMBER: &str = "SeverityNumber"; const FIELD_SEVERITY_TEXT: &str = "SeverityText"; const FIELD_BODY: &str = "body"; +const SCHEMA_TYPE_NAME: &str = "OtlpLogRecord"; + /// Encoder to write OTLP payload in bond form. #[derive(Clone)] pub(crate) struct OtlpEncoder; @@ -30,7 +32,7 @@ impl OtlpEncoder { } /// Encode a batch of logs into a vector of (event_name, bytes, schema_ids, start_time_nanos, end_time_nanos) - pub(crate) fn encode_log_batch<'a, I>(&self, logs: I, metadata: &str) -> Vec + pub(crate) fn encode_log_batch<'a, I>(&self, logs: I, namespace: &str, metadata: &str) -> Vec where I: IntoIterator, { @@ -90,8 +92,7 @@ impl OtlpEncoder { // 1. Get schema with optimized single-pass field collection and schema ID calculation let (field_info, schema_id) = Self::determine_fields_and_schema_id(log_record, event_name_str); - - let schema_entry = Self::create_schema(schema_id, field_info.as_slice()); + let schema_entry = Self::create_schema(schema_id, field_info.as_slice(), namespace); // 2. Encode row let row_buffer = self.write_row_data(log_record, &field_info); let level = log_record.severity_number as u8; @@ -238,9 +239,9 @@ impl OtlpEncoder { } /// Create schema - always creates a new CentralSchemaEntry - fn create_schema(schema_id: u64, field_info: &[FieldDef]) -> CentralSchemaEntry { + fn create_schema(schema_id: u64, field_info: &[FieldDef], namespace: &str) -> CentralSchemaEntry { let schema = - BondEncodedSchema::from_fields("OtlpLogRecord", "telemetry", field_info.to_vec()); //TODO - use actual struct name and namespace + BondEncodedSchema::from_fields(SCHEMA_TYPE_NAME, namespace, field_info.to_vec()); let schema_bytes = schema.as_bytes(); let schema_md5 = md5::compute(schema_bytes).0; @@ -252,6 +253,8 @@ impl OtlpEncoder { } } + + /// Write row data directly from LogRecord fn write_row_data(&self, log: &LogRecord, sorted_fields: &[FieldDef]) -> Vec { let mut buffer = Vec::with_capacity(sorted_fields.len() * 50); //TODO - estimate better @@ -390,7 +393,7 @@ mod tests { }); let metadata = "namespace=testNamespace/eventVersion=Ver1v0"; - let result = encoder.encode_log_batch([log].iter(), metadata); + let result = encoder.encode_log_batch([log].iter(), "testNamespace", metadata); assert!(!result.is_empty()); } @@ -437,7 +440,7 @@ mod tests { let metadata = "namespace=test"; // Encode multiple log records with different schema structures but same event_name - let result = encoder.encode_log_batch([log1, log2, log3].iter(), metadata); + let result = encoder.encode_log_batch([log1, log2, log3].iter(), "test", metadata); // Should create one batch (same event_name = "user_action") assert_eq!(result.len(), 1); @@ -494,7 +497,7 @@ mod tests { ..Default::default() }; - let result = encoder.encode_log_batch([log].iter(), "test"); + let result = encoder.encode_log_batch([log].iter(), "test", "test"); assert_eq!(result.len(), 1); assert_eq!(result[0].event_name, "test_event"); @@ -534,7 +537,7 @@ mod tests { }), }); - let result = encoder.encode_log_batch([log1, log2, log3].iter(), "test"); + let result = encoder.encode_log_batch([log1, log2, log3].iter(), "test", "test"); // All should be in one batch with same event_name assert_eq!(result.len(), 1); @@ -560,7 +563,7 @@ mod tests { ..Default::default() }; - let result = encoder.encode_log_batch([log1, log2].iter(), "test"); + let result = encoder.encode_log_batch([log1, log2].iter(), "test", "test"); // Should create 2 separate batches assert_eq!(result.len(), 2); @@ -583,7 +586,7 @@ mod tests { ..Default::default() }; - let result = encoder.encode_log_batch([log].iter(), "test"); + let result = encoder.encode_log_batch([log].iter(), "test", "test"); assert_eq!(result.len(), 1); assert_eq!(result[0].event_name, "Log"); // Should default to "Log" @@ -629,7 +632,7 @@ mod tests { }), }); - let result = encoder.encode_log_batch([log1, log2, log3, log4].iter(), "test"); + let result = encoder.encode_log_batch([log1, log2, log3, log4].iter(), "test", "test"); // Should create 3 batches: "user_action", "system_alert", "Log" assert_eq!(result.len(), 3); @@ -656,4 +659,33 @@ mod tests { assert!(!log_batch.data.is_empty()); // Should have encoded data assert_eq!(log_batch.metadata.schema_ids.matches(';').count(), 0); // 1 schema = 0 semicolons } + + + #[test] + fn test_schema_uses_different_namespaces() { + let log = LogRecord { + observed_time_unix_nano: 1_700_000_000_000_000_000, + event_name: "test_event".to_string(), + severity_number: 9, + ..Default::default() + }; + + // Test with different namespaces using separate encoder instances + let encoder1 = OtlpEncoder::new(); + let encoder2 = OtlpEncoder::new(); + + let metadata = "eventVersion=Ver1v0"; + let result1 = encoder1.encode_log_batch([log.clone()].iter(), "customNamespace", metadata); + let result2 = encoder2.encode_log_batch([log].iter(), "anotherNamespace", metadata); + + assert!(!result1.is_empty()); + assert!(!result2.is_empty()); + + // Verify that both namespaces produce results + // Since schema_cache was removed, we just verify the functionality works + assert_eq!(result1.len(), 1); + assert_eq!(result2.len(), 1); + assert_eq!(result1[0].event_name, "test_event"); + assert_eq!(result2[0].event_name, "test_event"); + } }