Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions opentelemetry-exporter-geneva/geneva-uploader/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,11 @@ mod benchmarks {
.collect();

b.iter(|| {
let res =
encoder.encode_log_batch(black_box(logs.iter()), black_box(metadata));
let res = encoder.encode_log_batch(
black_box(logs.iter()),
black_box("benchmark"),
black_box(metadata),
);
black_box(res); // double sure the return value is generated
});
},
Expand All @@ -206,9 +209,11 @@ mod benchmarks {
.collect();

b.iter(|| {
let res = black_box(
encoder.encode_log_batch(black_box(logs.iter()), black_box(metadata)),
);
let res = black_box(encoder.encode_log_batch(
black_box(logs.iter()),
black_box("benchmark"),
black_box(metadata),
));
black_box(res); // double sure the return value is generated
});
},
Expand All @@ -231,9 +236,11 @@ mod benchmarks {
.collect();

b.iter(|| {
let res = black_box(
encoder.encode_log_batch(black_box(logs.iter()), black_box(metadata)),
);
let res = black_box(encoder.encode_log_batch(
black_box(logs.iter()),
black_box("benchmark"),
black_box(metadata),
));
black_box(res);
});
});
Expand Down
4 changes: 3 additions & 1 deletion opentelemetry-exporter-geneva/geneva-uploader/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub struct GenevaClient {
uploader: Arc<GenevaUploader>,
encoder: OtlpEncoder,
metadata: String,
namespace: String,
max_concurrent_uploads: usize,
}

Expand Down Expand Up @@ -89,6 +90,7 @@ impl GenevaClient {
uploader: Arc::new(uploader),
encoder: OtlpEncoder::new(),
metadata,
namespace: cfg.namespace,
max_concurrent_uploads,
})
}
Expand All @@ -101,7 +103,7 @@ impl GenevaClient {
.flat_map(|scope_log| scope_log.log_records.iter());
// TODO: Investigate using tokio::spawn_blocking for event encoding to avoid blocking
// the async executor thread for CPU-intensive work.
let blobs = self.encoder.encode_log_batch(log_iter, &self.metadata);
let blobs = self.encoder.encode_log_batch(log_iter, &self.namespace, &self.metadata);

// create an iterator that yields futures for each upload
let upload_futures = blobs.into_iter().map(|batch| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const FIELD_SEVERITY_NUMBER: &str = "SeverityNumber";
const FIELD_SEVERITY_TEXT: &str = "SeverityText";
const FIELD_BODY: &str = "body";

const SCHEMA_TYPE_NAME: &str = "OtlpLogRecord";

/// Encoder to write OTLP payload in bond form.
#[derive(Clone)]
pub(crate) struct OtlpEncoder;
Expand All @@ -30,7 +32,7 @@ impl OtlpEncoder {
}

/// Encode a batch of logs into a vector of (event_name, bytes, schema_ids, start_time_nanos, end_time_nanos)
pub(crate) fn encode_log_batch<'a, I>(&self, logs: I, metadata: &str) -> Vec<EncodedBatch>
pub(crate) fn encode_log_batch<'a, I>(&self, logs: I, namespace: &str, metadata: &str) -> Vec<EncodedBatch>
where
I: IntoIterator<Item = &'a opentelemetry_proto::tonic::logs::v1::LogRecord>,
{
Expand Down Expand Up @@ -90,8 +92,7 @@ impl OtlpEncoder {
// 1. Get schema with optimized single-pass field collection and schema ID calculation
let (field_info, schema_id) =
Self::determine_fields_and_schema_id(log_record, event_name_str);

let schema_entry = Self::create_schema(schema_id, field_info.as_slice());
let schema_entry = Self::create_schema(schema_id, field_info.as_slice(), namespace);
// 2. Encode row
let row_buffer = self.write_row_data(log_record, &field_info);
let level = log_record.severity_number as u8;
Expand Down Expand Up @@ -238,9 +239,9 @@ impl OtlpEncoder {
}

/// Create schema - always creates a new CentralSchemaEntry
fn create_schema(schema_id: u64, field_info: &[FieldDef]) -> CentralSchemaEntry {
fn create_schema(schema_id: u64, field_info: &[FieldDef], namespace: &str) -> CentralSchemaEntry {
let schema =
BondEncodedSchema::from_fields("OtlpLogRecord", "telemetry", field_info.to_vec()); //TODO - use actual struct name and namespace
BondEncodedSchema::from_fields(SCHEMA_TYPE_NAME, namespace, field_info.to_vec());

let schema_bytes = schema.as_bytes();
let schema_md5 = md5::compute(schema_bytes).0;
Expand All @@ -252,6 +253,8 @@ impl OtlpEncoder {
}
}



/// Write row data directly from LogRecord
fn write_row_data(&self, log: &LogRecord, sorted_fields: &[FieldDef]) -> Vec<u8> {
let mut buffer = Vec::with_capacity(sorted_fields.len() * 50); //TODO - estimate better
Expand Down Expand Up @@ -390,7 +393,7 @@ mod tests {
});

let metadata = "namespace=testNamespace/eventVersion=Ver1v0";
let result = encoder.encode_log_batch([log].iter(), metadata);
let result = encoder.encode_log_batch([log].iter(), "testNamespace", metadata);

assert!(!result.is_empty());
}
Expand Down Expand Up @@ -437,7 +440,7 @@ mod tests {
let metadata = "namespace=test";

// Encode multiple log records with different schema structures but same event_name
let result = encoder.encode_log_batch([log1, log2, log3].iter(), metadata);
let result = encoder.encode_log_batch([log1, log2, log3].iter(), "test", metadata);

// Should create one batch (same event_name = "user_action")
assert_eq!(result.len(), 1);
Expand Down Expand Up @@ -494,7 +497,7 @@ mod tests {
..Default::default()
};

let result = encoder.encode_log_batch([log].iter(), "test");
let result = encoder.encode_log_batch([log].iter(), "test", "test");

assert_eq!(result.len(), 1);
assert_eq!(result[0].event_name, "test_event");
Expand Down Expand Up @@ -534,7 +537,7 @@ mod tests {
}),
});

let result = encoder.encode_log_batch([log1, log2, log3].iter(), "test");
let result = encoder.encode_log_batch([log1, log2, log3].iter(), "test", "test");

// All should be in one batch with same event_name
assert_eq!(result.len(), 1);
Expand All @@ -560,7 +563,7 @@ mod tests {
..Default::default()
};

let result = encoder.encode_log_batch([log1, log2].iter(), "test");
let result = encoder.encode_log_batch([log1, log2].iter(), "test", "test");

// Should create 2 separate batches
assert_eq!(result.len(), 2);
Expand All @@ -583,7 +586,7 @@ mod tests {
..Default::default()
};

let result = encoder.encode_log_batch([log].iter(), "test");
let result = encoder.encode_log_batch([log].iter(), "test", "test");

assert_eq!(result.len(), 1);
assert_eq!(result[0].event_name, "Log"); // Should default to "Log"
Expand Down Expand Up @@ -629,7 +632,7 @@ mod tests {
}),
});

let result = encoder.encode_log_batch([log1, log2, log3, log4].iter(), "test");
let result = encoder.encode_log_batch([log1, log2, log3, log4].iter(), "test", "test");

// Should create 3 batches: "user_action", "system_alert", "Log"
assert_eq!(result.len(), 3);
Expand All @@ -656,4 +659,33 @@ mod tests {
assert!(!log_batch.data.is_empty()); // Should have encoded data
assert_eq!(log_batch.metadata.schema_ids.matches(';').count(), 0); // 1 schema = 0 semicolons
}


#[test]
fn test_schema_uses_different_namespaces() {
let log = LogRecord {
observed_time_unix_nano: 1_700_000_000_000_000_000,
event_name: "test_event".to_string(),
severity_number: 9,
..Default::default()
};

// Test with different namespaces using separate encoder instances
let encoder1 = OtlpEncoder::new();
let encoder2 = OtlpEncoder::new();

let metadata = "eventVersion=Ver1v0";
let result1 = encoder1.encode_log_batch([log.clone()].iter(), "customNamespace", metadata);
let result2 = encoder2.encode_log_batch([log].iter(), "anotherNamespace", metadata);

assert!(!result1.is_empty());
assert!(!result2.is_empty());

// Verify that both namespaces produce results
// Since schema_cache was removed, we just verify the functionality works
assert_eq!(result1.len(), 1);
assert_eq!(result2.len(), 1);
assert_eq!(result1[0].event_name, "test_event");
assert_eq!(result2[0].event_name, "test_event");
}
}
Loading