diff --git a/Cargo.toml b/Cargo.toml index 0813f8f9..144abdff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,10 +41,11 @@ opentelemetry-semantic-conventions = { version = "0.30", features = [ criterion = "0.7" [workspace.lints.rust] -rust_2024_compatibility = { level = "warn", priority = -1 } -# No need to enable those, because it is either unnecessary or results in ugly syntax -if_let_rescope = "allow" -tail_expr_drop_order = "allow" +# Note: rust_2024_compatibility, if_let_rescope, and tail_expr_drop_order lints +# are only available in Rust 1.76+ and are incompatible with MSRV 1.75.0 +# rust_2024_compatibility = { level = "warn", priority = -1 } +# if_let_rescope = "allow" +# tail_expr_drop_order = "allow" [workspace.lints.clippy] all = { level = "warn", priority = 1 } diff --git a/opentelemetry-exporter-geneva/geneva-uploader-ffi/Cargo.toml b/opentelemetry-exporter-geneva/geneva-uploader-ffi/Cargo.toml index b12b2ca3..b1dda4c4 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader-ffi/Cargo.toml +++ b/opentelemetry-exporter-geneva/geneva-uploader-ffi/Cargo.toml @@ -26,6 +26,6 @@ workspace = true [dev-dependencies] otlp_builder = { path = "examples/otlp_builder" } -wiremock = "0.6" +wiremock = "0.5" base64 = "0.22" chrono = "0.4" diff --git a/opentelemetry-exporter-geneva/geneva-uploader-ffi/src/lib.rs b/opentelemetry-exporter-geneva/geneva-uploader-ffi/src/lib.rs index 04d382c1..85ea95db 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader-ffi/src/lib.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader-ffi/src/lib.rs @@ -495,11 +495,15 @@ pub unsafe extern "C" fn geneva_client_free(handle: *mut GenevaClientHandle) { #[cfg(test)] mod tests { use super::*; + use std::ffi::CString; + + #[cfg(feature = "mock_auth")] use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; + #[cfg(feature = "mock_auth")] use prost::Message; - use std::ffi::CString; // Build a minimal unsigned JWT with the Endpoint claim and an exp. Matches what extract_endpoint_from_token expects. + #[cfg(feature = "mock_auth")] fn generate_mock_jwt_and_expiry(endpoint: &str, ttl_secs: i64) -> (String, String) { use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _}; use chrono::{Duration, Utc}; @@ -716,6 +720,7 @@ mod tests { // Integration-style test: encode via FFI then upload via FFI using MockAuth + Wiremock server. // Uses otlp_builder to construct an ExportLogsServiceRequest payload. #[test] + #[cfg(feature = "mock_auth")] fn test_encode_and_upload_with_mock_server() { use otlp_builder::builder::build_otlp_logs_minimal; use wiremock::matchers::method; @@ -827,6 +832,7 @@ mod tests { // multiple different event_names in one request produce multiple batches, // and each batch upload hits ingestion with the corresponding event query param. #[test] + #[cfg(feature = "mock_auth")] fn test_encode_batching_by_event_name_and_upload() { use wiremock::matchers::method; use wiremock::{Mock, MockServer, ResponseTemplate}; @@ -945,7 +951,7 @@ mod tests { let reqs = mock_server.received_requests().await.unwrap(); let posts: Vec = reqs .iter() - .filter(|r| r.method.as_str() == "POST") + .filter(|r| r.method.as_ref() == "POST") .map(|r| r.url.to_string()) .collect(); diff --git a/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml b/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml index 81b86463..386cf78b 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml +++ b/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml @@ -35,7 +35,7 @@ tokio = { version = "1", features = ["full"] } rcgen = "0.14" openssl = { version = "0.10", features = ["vendored"] } tempfile = "3.5" -wiremock = "0.6" +wiremock = "0.5" futures = "0.3" num_cpus = "1.16" lz4_flex = { version = "0.11" } diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs index bcd6485e..21f4e02e 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs @@ -11,7 +11,7 @@ use std::sync::Arc; #[derive(Debug, Clone)] pub struct EncodedBatch { pub event_name: String, - pub data: Vec, + pub data: Arc>, pub metadata: crate::payload_encoder::central_blob::BatchMetadata, } @@ -103,7 +103,7 @@ impl GenevaClient { /// This allows for granular control over uploads, including custom retry logic for individual batches. pub async fn upload_batch(&self, batch: &EncodedBatch) -> Result<(), String> { self.uploader - .upload(batch.data.clone(), &batch.event_name, &batch.metadata) + .upload(Arc::clone(&batch.data), &batch.event_name, &batch.metadata) .await .map(|_| ()) .map_err(|e| format!("Geneva upload failed: {e} Event: {}", batch.event_name)) diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/mod.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/mod.rs index 172d5686..f3c6f9fe 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/mod.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/mod.rs @@ -2,6 +2,7 @@ pub(crate) mod uploader; #[cfg(test)] mod tests { + use std::sync::Arc; use std::time::Instant; mod test_helpers { @@ -127,7 +128,7 @@ mod tests { let response = ctx .uploader - .upload(ctx.data, &ctx.event_name, &metadata) + .upload(Arc::new(ctx.data), &ctx.event_name, &metadata) .await .expect("Upload failed"); @@ -194,7 +195,11 @@ mod tests { let _ = ctx .uploader - .upload(ctx.data.clone(), &ctx.event_name, &warmup_metadata) + .upload( + Arc::new(ctx.data.clone()), + &ctx.event_name, + &warmup_metadata, + ) .await .expect("Warm-up upload failed"); let warmup_elapsed = start_warmup.elapsed(); @@ -220,7 +225,7 @@ mod tests { }; let resp = uploader - .upload(data, &event_name, &metadata) + .upload(Arc::new(data), &event_name, &metadata) .await .unwrap_or_else(|_| panic!("Upload {i} failed")); let elapsed = start.elapsed(); diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/uploader.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/uploader.rs index 20f26a51..4c1da8fb 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/uploader.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/uploader.rs @@ -204,7 +204,7 @@ impl GenevaUploader { #[allow(dead_code)] pub(crate) async fn upload( &self, - data: Vec, + data: Arc>, event_name: &str, metadata: &BatchMetadata, ) -> Result { @@ -225,6 +225,12 @@ impl GenevaUploader { upload_uri ); // Send the upload request + // Use Arc::try_unwrap for efficiency if this is the only reference, + // otherwise clone the data + let body_data = match Arc::try_unwrap(data) { + Ok(vec) => vec, + Err(arc) => (*arc).clone(), + }; let response = self .http_client .post(&full_url) @@ -232,7 +238,7 @@ impl GenevaUploader { header::AUTHORIZATION, format!("Bearer {}", auth_info.auth_token), ) - .body(data) + .body(body_data) .send() .await?; let status = response.status(); diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs index ff314829..006fc811 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs @@ -156,7 +156,7 @@ impl OtlpEncoder { .map_err(|e| format!("compression failed: {e}"))?; blobs.push(EncodedBatch { event_name: batch_event_name, - data: compressed, + data: Arc::new(compressed), metadata: batch_data.metadata, }); } @@ -674,4 +674,44 @@ mod tests { assert!(!log_batch.data.is_empty()); // Should have encoded data assert_eq!(log_batch.metadata.schema_ids.matches(';').count(), 0); // 1 schema = 0 semicolons } + + #[test] + fn test_arc_data_sharing() { + let encoder = OtlpEncoder::new(); + + let log = LogRecord { + event_name: "test_event".to_string(), + severity_number: 9, + severity_text: "INFO".to_string(), + observed_time_unix_nano: 1_700_000_000_000_000_000, + ..Default::default() + }; + + let metadata = "namespace=testNamespace/eventVersion=Ver1v0"; + let result = encoder.encode_log_batch([log].iter(), metadata).unwrap(); + + assert_eq!(result.len(), 1); + let batch = &result[0]; + + // Clone the batch to simulate retry scenario + let batch_clone = batch.clone(); + + // Verify that both batches point to the same underlying data + // Arc::ptr_eq checks if two Arcs point to the same allocation + assert!( + Arc::ptr_eq(&batch.data, &batch_clone.data), + "Arc should share the same memory allocation after clone" + ); + + // Verify the strong count is 2 (original + cloned) + assert_eq!( + Arc::strong_count(&batch.data), + 2, + "Arc should have reference count of 2 after cloning" + ); + + // Verify that data is accessible from both references + assert_eq!(batch.data.len(), batch_clone.data.len()); + assert_eq!(&**batch.data, &**batch_clone.data); + } } diff --git a/stress/Cargo.toml b/stress/Cargo.toml index f84159e1..817fa368 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -26,7 +26,7 @@ num_cpus = "1.15.0" num-format = "0.4.4" sysinfo = { version = "0.36", optional = true } tokio = { version = "1", features = ["full", "test-util"] } -wiremock = "0.6" +wiremock = "0.5" futures = "0.3" opentelemetry-appender-tracing = { version = "0.30", features= ["spec_unstable_logs_enabled"] }