Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ opentelemetry-semantic-conventions = { version = "0.30", features = [
criterion = "0.7"

[workspace.lints.rust]
rust_2024_compatibility = { level = "warn", priority = -1 }
# No need to enable those, because it is either unnecessary or results in ugly syntax
if_let_rescope = "allow"
tail_expr_drop_order = "allow"
# Note: rust_2024_compatibility, if_let_rescope, and tail_expr_drop_order lints
# are only available in Rust 1.76+ and are incompatible with MSRV 1.75.0
# rust_2024_compatibility = { level = "warn", priority = -1 }
# if_let_rescope = "allow"
# tail_expr_drop_order = "allow"

[workspace.lints.clippy]
all = { level = "warn", priority = 1 }
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ workspace = true

[dev-dependencies]
otlp_builder = { path = "examples/otlp_builder" }
wiremock = "0.6"
wiremock = "0.5"
base64 = "0.22"
chrono = "0.4"
10 changes: 8 additions & 2 deletions opentelemetry-exporter-geneva/geneva-uploader-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,11 +495,15 @@ pub unsafe extern "C" fn geneva_client_free(handle: *mut GenevaClientHandle) {
#[cfg(test)]
mod tests {
use super::*;
use std::ffi::CString;

#[cfg(feature = "mock_auth")]
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
#[cfg(feature = "mock_auth")]
use prost::Message;
use std::ffi::CString;

// Build a minimal unsigned JWT with the Endpoint claim and an exp. Matches what extract_endpoint_from_token expects.
#[cfg(feature = "mock_auth")]
fn generate_mock_jwt_and_expiry(endpoint: &str, ttl_secs: i64) -> (String, String) {
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _};
use chrono::{Duration, Utc};
Expand Down Expand Up @@ -716,6 +720,7 @@ mod tests {
// Integration-style test: encode via FFI then upload via FFI using MockAuth + Wiremock server.
// Uses otlp_builder to construct an ExportLogsServiceRequest payload.
#[test]
#[cfg(feature = "mock_auth")]
fn test_encode_and_upload_with_mock_server() {
use otlp_builder::builder::build_otlp_logs_minimal;
use wiremock::matchers::method;
Expand Down Expand Up @@ -827,6 +832,7 @@ mod tests {
// multiple different event_names in one request produce multiple batches,
// and each batch upload hits ingestion with the corresponding event query param.
#[test]
#[cfg(feature = "mock_auth")]
fn test_encode_batching_by_event_name_and_upload() {
use wiremock::matchers::method;
use wiremock::{Mock, MockServer, ResponseTemplate};
Expand Down Expand Up @@ -945,7 +951,7 @@ mod tests {
let reqs = mock_server.received_requests().await.unwrap();
let posts: Vec<String> = reqs
.iter()
.filter(|r| r.method.as_str() == "POST")
.filter(|r| r.method.as_ref() == "POST")
.map(|r| r.url.to_string())
.collect();

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ tokio = { version = "1", features = ["full"] }
rcgen = "0.14"
openssl = { version = "0.10", features = ["vendored"] }
tempfile = "3.5"
wiremock = "0.6"
wiremock = "0.5"
futures = "0.3"
num_cpus = "1.16"
lz4_flex = { version = "0.11" }
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-exporter-geneva/geneva-uploader/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct EncodedBatch {
pub event_name: String,
pub data: Vec<u8>,
pub data: Arc<Vec<u8>>,
pub metadata: crate::payload_encoder::central_blob::BatchMetadata,
}

Expand Down Expand Up @@ -103,7 +103,7 @@ impl GenevaClient {
/// This allows for granular control over uploads, including custom retry logic for individual batches.
pub async fn upload_batch(&self, batch: &EncodedBatch) -> Result<(), String> {
self.uploader
.upload(batch.data.clone(), &batch.event_name, &batch.metadata)
.upload(Arc::clone(&batch.data), &batch.event_name, &batch.metadata)
.await
.map(|_| ())
.map_err(|e| format!("Geneva upload failed: {e} Event: {}", batch.event_name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub(crate) mod uploader;

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Instant;

mod test_helpers {
Expand Down Expand Up @@ -127,7 +128,7 @@ mod tests {

let response = ctx
.uploader
.upload(ctx.data, &ctx.event_name, &metadata)
.upload(Arc::new(ctx.data), &ctx.event_name, &metadata)
.await
.expect("Upload failed");

Expand Down Expand Up @@ -194,7 +195,11 @@ mod tests {

let _ = ctx
.uploader
.upload(ctx.data.clone(), &ctx.event_name, &warmup_metadata)
.upload(
Arc::new(ctx.data.clone()),
&ctx.event_name,
&warmup_metadata,
)
.await
.expect("Warm-up upload failed");
let warmup_elapsed = start_warmup.elapsed();
Expand All @@ -220,7 +225,7 @@ mod tests {
};

let resp = uploader
.upload(data, &event_name, &metadata)
.upload(Arc::new(data), &event_name, &metadata)
.await
.unwrap_or_else(|_| panic!("Upload {i} failed"));
let elapsed = start.elapsed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl GenevaUploader {
#[allow(dead_code)]
pub(crate) async fn upload(
&self,
data: Vec<u8>,
data: Arc<Vec<u8>>,
event_name: &str,
metadata: &BatchMetadata,
) -> Result<IngestionResponse> {
Expand All @@ -225,14 +225,20 @@ impl GenevaUploader {
upload_uri
);
// Send the upload request
// Use Arc::try_unwrap for efficiency if this is the only reference,
// otherwise clone the data
let body_data = match Arc::try_unwrap(data) {
Ok(vec) => vec,
Err(arc) => (*arc).clone(),
};
let response = self
.http_client
.post(&full_url)
.header(
header::AUTHORIZATION,
format!("Bearer {}", auth_info.auth_token),
)
.body(data)
.body(body_data)
.send()
.await?;
let status = response.status();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl OtlpEncoder {
.map_err(|e| format!("compression failed: {e}"))?;
blobs.push(EncodedBatch {
event_name: batch_event_name,
data: compressed,
data: Arc::new(compressed),
metadata: batch_data.metadata,
});
}
Expand Down Expand Up @@ -674,4 +674,44 @@ mod tests {
assert!(!log_batch.data.is_empty()); // Should have encoded data
assert_eq!(log_batch.metadata.schema_ids.matches(';').count(), 0); // 1 schema = 0 semicolons
}

#[test]
fn test_arc_data_sharing() {
let encoder = OtlpEncoder::new();

let log = LogRecord {
event_name: "test_event".to_string(),
severity_number: 9,
severity_text: "INFO".to_string(),
observed_time_unix_nano: 1_700_000_000_000_000_000,
..Default::default()
};

let metadata = "namespace=testNamespace/eventVersion=Ver1v0";
let result = encoder.encode_log_batch([log].iter(), metadata).unwrap();

assert_eq!(result.len(), 1);
let batch = &result[0];

// Clone the batch to simulate retry scenario
let batch_clone = batch.clone();

// Verify that both batches point to the same underlying data
// Arc::ptr_eq checks if two Arcs point to the same allocation
assert!(
Arc::ptr_eq(&batch.data, &batch_clone.data),
"Arc should share the same memory allocation after clone"
);

// Verify the strong count is 2 (original + cloned)
assert_eq!(
Arc::strong_count(&batch.data),
2,
"Arc should have reference count of 2 after cloning"
);

// Verify that data is accessible from both references
assert_eq!(batch.data.len(), batch_clone.data.len());
assert_eq!(&**batch.data, &**batch_clone.data);
}
}
2 changes: 1 addition & 1 deletion stress/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ num_cpus = "1.15.0"
num-format = "0.4.4"
sysinfo = { version = "0.36", optional = true }
tokio = { version = "1", features = ["full", "test-util"] }
wiremock = "0.6"
wiremock = "0.5"
futures = "0.3"

opentelemetry-appender-tracing = { version = "0.30", features= ["spec_unstable_logs_enabled"] }
Expand Down
Loading