Skip to content

Commit a453b7a

Browse files
authored
feat: [Geneva Exporter] Split Client API into batching and uploading (#398)
1 parent fcf1ca3 commit a453b7a

File tree

10 files changed

+162
-92
lines changed

10 files changed

+162
-92
lines changed

opentelemetry-exporter-geneva/geneva-uploader/src/bench.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,9 @@ mod benchmarks {
178178
.collect();
179179

180180
b.iter(|| {
181-
let res =
182-
encoder.encode_log_batch(black_box(logs.iter()), black_box(metadata));
181+
let res = encoder
182+
.encode_log_batch(black_box(logs.iter()), black_box(metadata))
183+
.unwrap();
183184
black_box(res); // double sure the return value is generated
184185
});
185186
},
@@ -207,7 +208,9 @@ mod benchmarks {
207208

208209
b.iter(|| {
209210
let res = black_box(
210-
encoder.encode_log_batch(black_box(logs.iter()), black_box(metadata)),
211+
encoder
212+
.encode_log_batch(black_box(logs.iter()), black_box(metadata))
213+
.unwrap(),
211214
);
212215
black_box(res); // double sure the return value is generated
213216
});
@@ -232,7 +235,9 @@ mod benchmarks {
232235

233236
b.iter(|| {
234237
let res = black_box(
235-
encoder.encode_log_batch(black_box(logs.iter()), black_box(metadata)),
238+
encoder
239+
.encode_log_batch(black_box(logs.iter()), black_box(metadata))
240+
.unwrap(),
236241
);
237242
black_box(res);
238243
});

opentelemetry-exporter-geneva/geneva-uploader/src/client.rs

Lines changed: 26 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,19 @@
22
33
use crate::config_service::client::{AuthMethod, GenevaConfigClient, GenevaConfigClientConfig};
44
use crate::ingestion_service::uploader::{GenevaUploader, GenevaUploaderConfig};
5-
use crate::payload_encoder::lz4_chunked_compression::lz4_chunked_compression;
65
use crate::payload_encoder::otlp_encoder::OtlpEncoder;
7-
use futures::stream::{self, StreamExt};
86
use opentelemetry_proto::tonic::logs::v1::ResourceLogs;
97
use std::sync::Arc;
108

9+
/// Public batch type (already LZ4 chunked compressed).
10+
/// Produced by `OtlpEncoder::encode_log_batch` and returned to callers.
11+
#[derive(Debug, Clone)]
12+
pub struct EncodedBatch {
13+
pub event_name: String,
14+
pub data: Vec<u8>,
15+
pub metadata: crate::payload_encoder::central_blob::BatchMetadata,
16+
}
17+
1118
/// Configuration for GenevaClient (user-facing)
1219
#[derive(Clone, Debug)]
1320
pub struct GenevaClientConfig {
@@ -21,8 +28,6 @@ pub struct GenevaClientConfig {
2128
pub tenant: String,
2229
pub role_name: String,
2330
pub role_instance: String,
24-
/// Maximum number of concurrent uploads. If None, defaults to number of CPU cores.
25-
pub max_concurrent_uploads: Option<usize>,
2631
// Add event name/version here if constant, or per-upload if you want them per call.
2732
}
2833

@@ -32,7 +37,6 @@ pub struct GenevaClient {
3237
uploader: Arc<GenevaUploader>,
3338
encoder: OtlpEncoder,
3439
metadata: String,
35-
max_concurrent_uploads: usize,
3640
}
3741

3842
impl GenevaClient {
@@ -78,57 +82,35 @@ impl GenevaClient {
7882
let uploader = GenevaUploader::from_config_client(config_client, uploader_config)
7983
.await
8084
.map_err(|e| format!("GenevaUploader init failed: {e}"))?;
81-
let max_concurrent_uploads = cfg.max_concurrent_uploads.unwrap_or_else(|| {
82-
// TODO - Use a more sophisticated method to determine concurrency if needed
83-
// currently using number of CPU cores
84-
std::thread::available_parallelism()
85-
.map(|p| p.get())
86-
.unwrap_or(4)
87-
});
8885
Ok(Self {
8986
uploader: Arc::new(uploader),
9087
encoder: OtlpEncoder::new(),
9188
metadata,
92-
max_concurrent_uploads,
9389
})
9490
}
9591

96-
/// Upload OTLP logs (as ResourceLogs).
97-
pub async fn upload_logs(&self, logs: &[ResourceLogs]) -> Result<(), String> {
92+
/// Encode OTLP logs into LZ4 chunked compressed batches.
93+
pub fn encode_and_compress_logs(
94+
&self,
95+
logs: &[ResourceLogs],
96+
) -> Result<Vec<EncodedBatch>, String> {
9897
let log_iter = logs
9998
.iter()
10099
.flat_map(|resource_log| resource_log.scope_logs.iter())
101100
.flat_map(|scope_log| scope_log.log_records.iter());
102-
// TODO: Investigate using tokio::spawn_blocking for event encoding to avoid blocking
103-
// the async executor thread for CPU-intensive work.
104-
let blobs = self.encoder.encode_log_batch(log_iter, &self.metadata);
105101

106-
// create an iterator that yields futures for each upload
107-
let upload_futures = blobs.into_iter().map(|batch| {
108-
async move {
109-
// TODO: Investigate using tokio::spawn_blocking for LZ4 compression to avoid blocking
110-
// the async executor thread for CPU-intensive work.
111-
let compressed_blob = lz4_chunked_compression(&batch.data).map_err(|e| {
112-
format!("LZ4 compression failed: {e} Event: {}", batch.event_name)
113-
})?;
114-
self.uploader
115-
.upload(compressed_blob, &batch.event_name, &batch.metadata)
116-
.await
117-
.map(|_| ())
118-
.map_err(|e| format!("Geneva upload failed: {e} Event: {}", batch.event_name))
119-
}
120-
});
121-
// Execute uploads concurrently with configurable concurrency
122-
let errors: Vec<String> = stream::iter(upload_futures)
123-
.buffer_unordered(self.max_concurrent_uploads)
124-
.filter_map(|result| async move { result.err() })
125-
.collect()
126-
.await;
102+
self.encoder
103+
.encode_log_batch(log_iter, &self.metadata)
104+
.map_err(|e| format!("Compression failed: {e}"))
105+
}
127106

128-
// Return error if any uploads failed
129-
if !errors.is_empty() {
130-
return Err(format!("Upload failures: {}", errors.join("; ")));
131-
}
132-
Ok(())
107+
/// Upload a single compressed batch.
108+
/// This allows for granular control over uploads, including custom retry logic for individual batches.
109+
pub async fn upload_batch(&self, batch: &EncodedBatch) -> Result<(), String> {
110+
self.uploader
111+
.upload(batch.data.clone(), &batch.event_name, &batch.metadata)
112+
.await
113+
.map(|_| ())
114+
.map_err(|e| format!("Geneva upload failed: {e} Event: {}", batch.event_name))
133115
}
134116
}

opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/uploader.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,13 @@ impl From<reqwest::Error> for GenevaUploaderError {
8888

8989
pub(crate) type Result<T> = std::result::Result<T, GenevaUploaderError>;
9090

91-
#[allow(dead_code)]
9291
/// Response from the ingestion API when submitting data
9392
#[derive(Debug, Clone, Deserialize)]
9493
pub(crate) struct IngestionResponse {
94+
#[allow(dead_code)]
9595
pub(crate) ticket: String,
9696
#[serde(flatten)]
97+
#[allow(dead_code)]
9798
pub(crate) extra: HashMap<String, Value>,
9899
}
99100

opentelemetry-exporter-geneva/geneva-uploader/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
mod config_service;
22
mod ingestion_service;
3-
pub mod payload_encoder;
3+
mod payload_encoder;
44

55
pub mod client;
66

@@ -14,8 +14,9 @@ pub(crate) use config_service::client::{
1414

1515
#[allow(unused_imports)]
1616
pub(crate) use ingestion_service::uploader::{
17-
GenevaUploader, GenevaUploaderConfig, GenevaUploaderError, IngestionResponse, Result,
17+
GenevaUploader, GenevaUploaderConfig, GenevaUploaderError, Result,
1818
};
1919

20+
pub use client::EncodedBatch;
2021
pub use client::{GenevaClient, GenevaClientConfig};
2122
pub use config_service::client::AuthMethod;

opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/central_blob.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,17 +52,6 @@ impl BatchMetadata {
5252
}
5353
}
5454

55-
/// Represents an encoded batch with all necessary metadata
56-
#[derive(Debug, Clone)]
57-
pub(crate) struct EncodedBatch {
58-
/// The event name for this batch
59-
pub(crate) event_name: String,
60-
/// The encoded binary data
61-
pub(crate) data: Vec<u8>,
62-
/// Batch metadata containing timestamps and schema information
63-
pub(crate) metadata: BatchMetadata,
64-
}
65-
6655
/// Helper to encode UTF-8 Rust str to UTF-16LE bytes
6756
/// TODO - consider avoiding temporary allocation, by passing a mutable buffer
6857
#[allow(dead_code)]

opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
use crate::client::EncodedBatch;
12
use crate::payload_encoder::bond_encoder::{BondDataType, BondEncodedSchema, BondWriter, FieldDef};
23
use crate::payload_encoder::central_blob::{
3-
BatchMetadata, CentralBlob, CentralEventEntry, CentralSchemaEntry, EncodedBatch,
4+
BatchMetadata, CentralBlob, CentralEventEntry, CentralSchemaEntry,
45
};
6+
use crate::payload_encoder::lz4_chunked_compression::lz4_chunked_compression;
57
use chrono::{TimeZone, Utc};
68
use opentelemetry_proto::tonic::common::v1::any_value::Value;
79
use opentelemetry_proto::tonic::logs::v1::LogRecord;
@@ -29,8 +31,14 @@ impl OtlpEncoder {
2931
OtlpEncoder {}
3032
}
3133

32-
/// Encode a batch of logs into a vector of (event_name, bytes, schema_ids, start_time_nanos, end_time_nanos)
33-
pub(crate) fn encode_log_batch<'a, I>(&self, logs: I, metadata: &str) -> Vec<EncodedBatch>
34+
/// Encode a batch of logs into a vector of (event_name, compressed_bytes, schema_ids, start_time_nanos, end_time_nanos)
35+
/// The returned `data` field contains LZ4 chunked compressed bytes.
36+
/// On compression failure, the error is returned (no logging, no fallback).
37+
pub(crate) fn encode_log_batch<'a, I>(
38+
&self,
39+
logs: I,
40+
metadata: &str,
41+
) -> Result<Vec<EncodedBatch>, String>
3442
where
3543
I: IntoIterator<Item = &'a opentelemetry_proto::tonic::logs::v1::LogRecord>,
3644
{
@@ -143,14 +151,16 @@ impl OtlpEncoder {
143151
schemas: batch_data.schemas,
144152
events: batch_data.events,
145153
};
146-
let bytes = blob.to_bytes();
154+
let uncompressed = blob.to_bytes();
155+
let compressed = lz4_chunked_compression(&uncompressed)
156+
.map_err(|e| format!("compression failed: {e}"))?;
147157
blobs.push(EncodedBatch {
148158
event_name: batch_event_name,
149-
data: bytes,
159+
data: compressed,
150160
metadata: batch_data.metadata,
151161
});
152162
}
153-
blobs
163+
Ok(blobs)
154164
}
155165

156166
/// Determine fields and calculate schema ID in a single pass for optimal performance
@@ -391,7 +401,7 @@ mod tests {
391401
});
392402

393403
let metadata = "namespace=testNamespace/eventVersion=Ver1v0";
394-
let result = encoder.encode_log_batch([log].iter(), metadata);
404+
let result = encoder.encode_log_batch([log].iter(), metadata).unwrap();
395405

396406
assert!(!result.is_empty());
397407
}
@@ -438,7 +448,9 @@ mod tests {
438448
let metadata = "namespace=test";
439449

440450
// Encode multiple log records with different schema structures but same event_name
441-
let result = encoder.encode_log_batch([log1, log2, log3].iter(), metadata);
451+
let result = encoder
452+
.encode_log_batch([log1, log2, log3].iter(), metadata)
453+
.unwrap();
442454

443455
// Should create one batch (same event_name = "user_action")
444456
assert_eq!(result.len(), 1);
@@ -495,7 +507,7 @@ mod tests {
495507
..Default::default()
496508
};
497509

498-
let result = encoder.encode_log_batch([log].iter(), "test");
510+
let result = encoder.encode_log_batch([log].iter(), "test").unwrap();
499511

500512
assert_eq!(result.len(), 1);
501513
assert_eq!(result[0].event_name, "test_event");
@@ -535,7 +547,9 @@ mod tests {
535547
}),
536548
});
537549

538-
let result = encoder.encode_log_batch([log1, log2, log3].iter(), "test");
550+
let result = encoder
551+
.encode_log_batch([log1, log2, log3].iter(), "test")
552+
.unwrap();
539553

540554
// All should be in one batch with same event_name
541555
assert_eq!(result.len(), 1);
@@ -561,7 +575,9 @@ mod tests {
561575
..Default::default()
562576
};
563577

564-
let result = encoder.encode_log_batch([log1, log2].iter(), "test");
578+
let result = encoder
579+
.encode_log_batch([log1, log2].iter(), "test")
580+
.unwrap();
565581

566582
// Should create 2 separate batches
567583
assert_eq!(result.len(), 2);
@@ -584,7 +600,7 @@ mod tests {
584600
..Default::default()
585601
};
586602

587-
let result = encoder.encode_log_batch([log].iter(), "test");
603+
let result = encoder.encode_log_batch([log].iter(), "test").unwrap();
588604

589605
assert_eq!(result.len(), 1);
590606
assert_eq!(result[0].event_name, "Log"); // Should default to "Log"
@@ -630,7 +646,9 @@ mod tests {
630646
}),
631647
});
632648

633-
let result = encoder.encode_log_batch([log1, log2, log3, log4].iter(), "test");
649+
let result = encoder
650+
.encode_log_batch([log1, log2, log3, log4].iter(), "test")
651+
.unwrap();
634652

635653
// Should create 3 batches: "user_action", "system_alert", "Log"
636654
assert_eq!(result.len(), 3);

opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ rust-version = "1.75.0"
99
opentelemetry_sdk = {workspace = true, default-features = false, features = ["logs"]}
1010
opentelemetry-proto = {workspace = true, default-features = false, features = ["logs"]}
1111
geneva-uploader = {path = "../geneva-uploader/", version = "0.1.0"}
12+
futures = "0.3"
1213

1314
[dev-dependencies]
1415
opentelemetry-appender-tracing = {workspace = true}
@@ -19,4 +20,4 @@ tracing-subscriber = { version = "0.3.0", default-features = false, features = [
1920
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
2021

2122
[lints]
22-
workspace = true
23+
workspace = true

opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/examples/basic.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ async fn main() {
6262
tenant,
6363
role_name,
6464
role_instance,
65-
max_concurrent_uploads: None, // Use default
6665
};
6766

6867
let geneva_client = GenevaClient::new(config)

0 commit comments

Comments
 (0)