Skip to content

Commit 61257af

Browse files
authored
feat(geneva uploader): Add rowCount parameter to ingestion URL (#484)
1 parent c869d96 commit 61257af

File tree

4 files changed

+74
-7
lines changed

4 files changed

+74
-7
lines changed

opentelemetry-exporter-geneva/geneva-uploader/src/client.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub struct EncodedBatch {
1616
pub event_name: String,
1717
pub data: Vec<u8>,
1818
pub metadata: crate::payload_encoder::central_blob::BatchMetadata,
19+
pub row_count: usize,
1920
}
2021

2122
/// Configuration for GenevaClient (user-facing)
@@ -212,7 +213,12 @@ impl GenevaClient {
212213
);
213214

214215
self.uploader
215-
.upload(batch.data.clone(), &batch.event_name, &batch.metadata)
216+
.upload(
217+
batch.data.clone(),
218+
&batch.event_name,
219+
&batch.metadata,
220+
batch.row_count,
221+
)
216222
.await
217223
.map(|_| {
218224
debug!(

opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ mod tests {
128128

129129
let response = ctx
130130
.uploader
131-
.upload(ctx.data, &ctx.event_name, &metadata)
131+
.upload(ctx.data, &ctx.event_name, &metadata, 1)
132132
.await
133133
.expect("Upload failed");
134134

@@ -195,7 +195,7 @@ mod tests {
195195

196196
let _ = ctx
197197
.uploader
198-
.upload(ctx.data.clone(), &ctx.event_name, &warmup_metadata)
198+
.upload(ctx.data.clone(), &ctx.event_name, &warmup_metadata, 1)
199199
.await
200200
.expect("Warm-up upload failed");
201201
let warmup_elapsed = start_warmup.elapsed();
@@ -221,7 +221,7 @@ mod tests {
221221
};
222222

223223
let resp = uploader
224-
.upload(data, &event_name, &metadata)
224+
.upload(data, &event_name, &metadata, 1)
225225
.await
226226
.unwrap_or_else(|_| panic!("Upload {i} failed"));
227227
let elapsed = start.elapsed();

opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/uploader.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ impl GenevaUploader {
163163
data_size: usize,
164164
event_name: &str,
165165
metadata: &BatchMetadata,
166+
row_count: usize,
166167
) -> Result<String> {
167168
// Get already formatted schema IDs and format timestamps using BatchMetadata methods
168169
let schema_ids = &metadata.schema_ids;
@@ -181,7 +182,7 @@ impl GenevaUploader {
181182

182183
// Create the query string
183184
let mut query = String::with_capacity(512); // Preallocate enough space for the query string (decided based on expected size)
184-
write!(&mut query, "api/v1/ingestion/ingest?endpoint={}&moniker={}&namespace={}&event={}&version={}&sourceUniqueId={}&sourceIdentity={}&startTime={}&endTime={}&format=centralbond/lz4hc&dataSize={}&minLevel={}&schemaIds={}",
185+
write!(&mut query, "api/v1/ingestion/ingest?endpoint={}&moniker={}&namespace={}&event={}&version={}&sourceUniqueId={}&sourceIdentity={}&startTime={}&endTime={}&format=centralbond/lz4hc&dataSize={}&minLevel={}&schemaIds={}&rowCount={}",
185186
encoded_monitoring_endpoint,
186187
moniker,
187188
self.config.namespace,
@@ -193,7 +194,8 @@ impl GenevaUploader {
193194
end_time_str,
194195
data_size,
195196
2,
196-
schema_ids
197+
schema_ids,
198+
row_count
197199
).map_err(|e| GenevaUploaderError::InternalError(format!("Failed to write query string: {e}")))?;
198200
Ok(query)
199201
}
@@ -205,6 +207,7 @@ impl GenevaUploader {
205207
/// * `event_name` - Name of the event
206208
/// * `event_version` - Version of the event
207209
/// * `metadata` - Batch metadata containing timestamps and schema information
210+
/// * `row_count` - Number of rows/events in the batch
208211
///
209212
/// # Returns
210213
/// * `Result<IngestionResponse>` - The response containing the ticket ID or an error
@@ -214,6 +217,7 @@ impl GenevaUploader {
214217
data: Vec<u8>,
215218
event_name: &str,
216219
metadata: &BatchMetadata,
220+
row_count: usize,
217221
) -> Result<IngestionResponse> {
218222
debug!(
219223
name: "uploader.upload",
@@ -233,6 +237,7 @@ impl GenevaUploader {
233237
data_size,
234238
event_name,
235239
metadata,
240+
row_count,
236241
)?;
237242
let full_url = format!(
238243
"{}/{}",

opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ impl OtlpEncoder {
190190
event_name: batch_event_name.to_string(),
191191
data: compressed,
192192
metadata: batch_data.metadata,
193+
row_count: events_count,
193194
});
194195
}
195196
Ok(blobs)
@@ -292,7 +293,6 @@ impl OtlpEncoder {
292293

293294
let schemas_count = schemas.len();
294295
let events_count = events.len();
295-
296296
let blob = CentralBlob {
297297
version: 1,
298298
format: 2,
@@ -327,6 +327,7 @@ impl OtlpEncoder {
327327
event_name: EVENT_NAME.to_string(),
328328
data: compressed,
329329
metadata: batch_metadata,
330+
row_count: events_count,
330331
}])
331332
}
332333

@@ -1282,4 +1283,59 @@ mod tests {
12821283
assert!(single_result.starts_with('['));
12831284
assert!(single_result.ends_with(']'));
12841285
}
1286+
1287+
#[test]
1288+
fn test_row_count_in_encoded_batch() {
1289+
let encoder = OtlpEncoder::new();
1290+
1291+
// Test with logs
1292+
let logs = [
1293+
LogRecord {
1294+
observed_time_unix_nano: 1_700_000_000_000_000_000,
1295+
event_name: "test_event".to_string(),
1296+
severity_number: 9,
1297+
..Default::default()
1298+
},
1299+
LogRecord {
1300+
observed_time_unix_nano: 1_700_000_001_000_000_000,
1301+
event_name: "test_event".to_string(),
1302+
severity_number: 10,
1303+
..Default::default()
1304+
},
1305+
LogRecord {
1306+
observed_time_unix_nano: 1_700_000_002_000_000_000,
1307+
event_name: "test_event".to_string(),
1308+
severity_number: 11,
1309+
..Default::default()
1310+
},
1311+
];
1312+
1313+
let result = encoder
1314+
.encode_log_batch(logs.iter(), "namespace=test")
1315+
.unwrap();
1316+
1317+
assert_eq!(result.len(), 1);
1318+
assert_eq!(result[0].row_count, 3);
1319+
1320+
// Test with spans
1321+
let spans = [
1322+
Span {
1323+
start_time_unix_nano: 1_700_000_000_000_000_000,
1324+
end_time_unix_nano: 1_700_000_001_000_000_000,
1325+
..Default::default()
1326+
},
1327+
Span {
1328+
start_time_unix_nano: 1_700_000_002_000_000_000,
1329+
end_time_unix_nano: 1_700_000_003_000_000_000,
1330+
..Default::default()
1331+
},
1332+
];
1333+
1334+
let span_result = encoder
1335+
.encode_span_batch(spans.iter(), "namespace=test")
1336+
.unwrap();
1337+
1338+
assert_eq!(span_result.len(), 1);
1339+
assert_eq!(span_result[0].row_count, 2);
1340+
}
12851341
}

0 commit comments

Comments
 (0)