Skip to content

Commit 167d82f

Browse files
authored
feat: Geneva uploader - Add ingestion service (#235)
1 parent 1c888a0 commit 167d82f

File tree

7 files changed

+518
-30
lines changed

7 files changed

+518
-30
lines changed

opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ reqwest = { version = "0.12", features = ["native-tls", "native-tls-alpn"]}
1414
native-tls = "0.2"
1515
thiserror = "2.0"
1616
chrono = "0.4"
17+
url = "2.2"
18+
1719

1820
[features]
1921
self_signed_certs = [] # Empty by default for security
@@ -25,6 +27,8 @@ rcgen = "0.13"
2527
openssl = { version = "0.10", features = ["vendored"] }
2628
tempfile = "3.5"
2729
wiremock = "0.6"
30+
futures = "0.3"
31+
num_cpus = "1.16"
2832

2933
[lints]
3034
workspace = true

opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ use uuid::Uuid;
1212

1313
use chrono::{DateTime, Utc};
1414
use native_tls::{Identity, Protocol};
15+
use std::fmt;
16+
use std::fmt::Write;
1517
use std::fs;
1618
use std::path::PathBuf;
1719
use std::sync::RwLock;
@@ -43,7 +45,7 @@ use std::sync::RwLock;
4345
/// openssl pkcs12 -export -in cert.pem -inkey key.pem -out client.p12 -name "alias"
4446
/// ```
4547
#[allow(dead_code)]
46-
#[derive(Clone)]
48+
#[derive(Clone, Debug)]
4749
pub(crate) enum AuthMethod {
4850
/// Certificate-based authentication
4951
///
@@ -116,7 +118,7 @@ pub(crate) type Result<T> = std::result::Result<T, GenevaConfigClientError>;
116118
/// };
117119
/// ```
118120
#[allow(dead_code)]
119-
#[derive(Clone)]
121+
#[derive(Clone, Debug)]
120122
pub(crate) struct GenevaConfigClientConfig {
121123
pub(crate) endpoint: String,
122124
pub(crate) environment: String,
@@ -191,6 +193,18 @@ pub(crate) struct GenevaConfigClient {
191193
static_headers: HeaderMap,
192194
}
193195

196+
impl fmt::Debug for GenevaConfigClient {
197+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
198+
f.debug_struct("GenevaConfigClient")
199+
.field("config", &self.config)
200+
.field("precomputed_url_prefix", &self.precomputed_url_prefix)
201+
.field("agent_identity", &self.agent_identity)
202+
.field("agent_version", &self.agent_version)
203+
.field("static_headers", &self.static_headers)
204+
.finish()
205+
}
206+
}
207+
194208
/// Client for interacting with the Geneva Configuration Service.
195209
///
196210
/// This client handles authentication and communication with the Geneva Config
@@ -250,21 +264,18 @@ impl GenevaConfigClient {
250264
let version_str = format!("Ver{}v0", config.config_major_version);
251265

252266
let mut pre_url = String::with_capacity(config.endpoint.len() + 200);
253-
pre_url.push_str(config.endpoint.trim_end_matches('/'));
254-
pre_url.push_str("/api/agent/v3/");
255-
pre_url.push_str(&config.environment);
256-
pre_url.push('/');
257-
pre_url.push_str(&config.account);
258-
pre_url.push_str("/MonitoringStorageKeys/?Namespace=");
259-
pre_url.push_str(&config.namespace);
260-
pre_url.push_str("&Region=");
261-
pre_url.push_str(&config.region);
262-
pre_url.push_str("&Identity=");
263-
pre_url.push_str(&encoded_identity);
264-
pre_url.push_str("&OSType=");
265-
pre_url.push_str(get_os_type());
266-
pre_url.push_str("&ConfigMajorVersion=");
267-
pre_url.push_str(&version_str);
267+
write!(
268+
&mut pre_url,
269+
"{}/api/agent/v3/{}/{}/MonitoringStorageKeys/?Namespace={}&Region={}&Identity={}&OSType={}&ConfigMajorVersion={}",
270+
config.endpoint.trim_end_matches('/'),
271+
config.environment,
272+
config.account,
273+
config.namespace,
274+
config.region,
275+
encoded_identity,
276+
get_os_type(),
277+
version_str
278+
).map_err(|e| GenevaConfigClientError::InternalError(format!("Failed to write URL: {e}")))?;
268279

269280
let http_client = client_builder.build()?;
270281

@@ -405,9 +416,9 @@ impl GenevaConfigClient {
405416
async fn fetch_ingestion_info(&self) -> Result<(IngestionGatewayInfo, MonikerInfo)> {
406417
let tag_id = Uuid::new_v4().to_string(); //TODO - uuid is costly, check if counter is enough?
407418
let mut url = String::with_capacity(self.precomputed_url_prefix.len() + 50); // Pre-allocate with reasonable capacity
408-
url.push_str(&self.precomputed_url_prefix);
409-
url.push_str("&TagId=");
410-
url.push_str(&tag_id);
419+
write!(&mut url, "{}&TagId={}", self.precomputed_url_prefix, tag_id).map_err(|e| {
420+
GenevaConfigClientError::InternalError(format!("Failed to write URL: {e}"))
421+
})?;
411422

412423
let req_id = Uuid::new_v4().to_string();
413424

opentelemetry-exporter-geneva/geneva-uploader/src/config_service/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
mod client;
1+
pub(crate) mod client;
22

33
#[cfg(test)]
44
mod tests {
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
pub(crate) mod uploader;
2+
3+
#[cfg(test)]
4+
mod tests {
5+
use std::time::Instant;
6+
7+
mod test_helpers {
8+
use crate::{
9+
AuthMethod, GenevaConfigClient, GenevaConfigClientConfig, GenevaUploader,
10+
GenevaUploaderConfig,
11+
};
12+
use std::env;
13+
use std::fs;
14+
use std::sync::Arc;
15+
16+
pub struct TestUploadContext {
17+
pub data: Vec<u8>,
18+
pub uploader: GenevaUploader,
19+
pub event_name: String,
20+
pub event_version: String,
21+
}
22+
23+
pub async fn build_test_upload_context() -> TestUploadContext {
24+
// Load binary blob
25+
let blob_path =
26+
env::var("GENEVA_BLOB_PATH").expect("GENEVA_BLOB_PATH env var is required");
27+
let data = fs::read(&blob_path).expect("Failed to read binary blob");
28+
29+
// Read config from environment
30+
let endpoint = env::var("GENEVA_ENDPOINT").expect("GENEVA_ENDPOINT is required");
31+
let environment =
32+
env::var("GENEVA_ENVIRONMENT").expect("GENEVA_ENVIRONMENT is required");
33+
let account = env::var("GENEVA_ACCOUNT").expect("GENEVA_ACCOUNT is required");
34+
let namespace = env::var("GENEVA_NAMESPACE").expect("GENEVA_NAMESPACE is required");
35+
let region = env::var("GENEVA_REGION").expect("GENEVA_REGION is required");
36+
let cert_path = std::path::PathBuf::from(
37+
std::env::var("GENEVA_CERT_PATH").expect("GENEVA_CERT_PATH is required"),
38+
);
39+
let cert_password = env::var("GENEVA_CERT_PASSWORD").unwrap_or_default();
40+
let config_major_version = env::var("GENEVA_CONFIG_MAJOR_VERSION")
41+
.expect("GENEVA_CONFIG_MAJOR_VERSION is required")
42+
.parse::<u32>()
43+
.expect("GENEVA_CONFIG_MAJOR_VERSION must be a u32");
44+
let source_identity = env::var("GENEVA_SOURCE_IDENTITY").unwrap_or_else(|_| {
45+
"Tenant=Default/Role=Uploader/RoleInstance=devhost".to_string()
46+
});
47+
let schema_ids =
48+
"c1ce0ecea020359624c493bbe97f9e80;0da22cabbee419e000541a5eda732eb3".to_string();
49+
50+
// Define uploader config
51+
let uploader_config = GenevaUploaderConfig {
52+
namespace: namespace.clone(),
53+
source_identity,
54+
environment: environment.clone(),
55+
schema_ids,
56+
};
57+
58+
let config = GenevaConfigClientConfig {
59+
endpoint,
60+
environment,
61+
account,
62+
namespace,
63+
region,
64+
config_major_version,
65+
auth_method: AuthMethod::Certificate {
66+
path: cert_path,
67+
password: cert_password,
68+
},
69+
};
70+
71+
// Build client and uploader
72+
let config_client =
73+
GenevaConfigClient::new(config).expect("Failed to create config client");
74+
let uploader =
75+
GenevaUploader::from_config_client(Arc::new(config_client), uploader_config)
76+
.await
77+
.expect("Failed to create uploader");
78+
79+
// Event name/version
80+
let event_name = "Log".to_string();
81+
let event_version = "Ver2v0".to_string();
82+
83+
TestUploadContext {
84+
data,
85+
uploader,
86+
event_name,
87+
event_version,
88+
}
89+
}
90+
}
91+
92+
#[tokio::test]
93+
/// To run this test against a real Geneva Config Service and GIG, set the following environment variables:
94+
///
95+
/// ```bash
96+
/// export GENEVA_ENDPOINT="xxxhttps://<gcs-endpoint>"
97+
/// export GENEVA_ENVIRONMENT="Test"
98+
/// export GENEVA_ACCOUNT="YourAccountName"
99+
/// export GENEVA_NAMESPACE="YourNamespace"
100+
/// export GENEVA_REGION="YourRegion"
101+
/// export GENEVA_CONFIG_MAJOR_VERSION="2"
102+
/// export GENEVA_CERT_PATH="/path/to/client.p12"
103+
/// export GENEVA_CERT_PASSWORD="your-cert-password"
104+
/// export GENEVA_SOURCE_IDENTITY="Tenant=YourTenant/Role=YourRole/RoleInstance=YourInstance"
105+
/// export GENEVA_BLOB_PATH="/path/to/blob.bin"
106+
///
107+
/// cargo test test_upload_to_gig_real_server -- --ignored --nocapture
108+
/// ```
109+
#[ignore]
110+
async fn test_upload_to_gig_real_server() {
111+
let ctx = test_helpers::build_test_upload_context().await;
112+
println!("✅ Loaded blob ({} bytes)", ctx.data.len());
113+
// below call is only for logging purposes, to get endpoint and auth info.
114+
let (auth_info, _, _) = ctx
115+
.uploader
116+
.config_client
117+
.get_ingestion_info()
118+
.await
119+
.unwrap();
120+
println!("🚀 Uploading to: {}", auth_info.endpoint);
121+
122+
let start = Instant::now();
123+
let response = ctx
124+
.uploader
125+
.upload(ctx.data, &ctx.event_name, &ctx.event_version)
126+
.await
127+
.expect("Upload failed");
128+
129+
println!(
130+
"✅ Upload complete in {:.2?}. Ticket: {}",
131+
start.elapsed(),
132+
response.ticket
133+
);
134+
}
135+
136+
/// To run this test with parallel uploads:
137+
///
138+
/// ```bash
139+
/// export GENEVA_ENDPOINT="https://<gcs-endpoint>"
140+
/// export GENEVA_ENVIRONMENT="Test"
141+
/// export GENEVA_ACCOUNT="YourAccount"
142+
/// export GENEVA_NAMESPACE="YourNamespace"
143+
/// export GENEVA_REGION="YourRegion"
144+
/// export GENEVA_CONFIG_MAJOR_VERSION="2"
145+
/// export GENEVA_CERT_PATH="/path/to/client.p12"
146+
/// export GENEVA_CERT_PASSWORD="your-password"
147+
/// export GENEVA_SOURCE_IDENTITY="Tenant=YourTenant/Role=Role/RoleInstance=Instance"
148+
/// export GENEVA_BLOB_PATH="/path/to/blob.bin"
149+
/// export GENEVA_PARALLEL_UPLOADS="10"
150+
///
151+
/// cargo test test_parallel_uploads -- --ignored --nocapture
152+
/// Output:
153+
// 🔥 Performing warm-up upload...
154+
// 🔥 Warm-up upload complete in 222.42ms
155+
// 🚀 Launching 5 parallel uploads...
156+
// ✅ Upload 2 complete in 120.43ms. Ticket: ...
157+
// ✅ Upload 4 complete in 120.35ms. Ticket: ...
158+
// ✅ Upload 3 complete in 120.50ms. Ticket: ...
159+
// ✅ Upload 1 complete in 154.62ms. Ticket: ...
160+
// ✅ Upload 0 complete in 154.65ms. Ticket: ...
161+
// 📊 Average upload duration: 133.60 ms
162+
// ⏱️ Total elapsed for 5 parallel uploads: 154.93ms
163+
164+
#[tokio::test(flavor = "multi_thread")]
165+
#[ignore]
166+
async fn test_parallel_uploads() {
167+
use std::env;
168+
use std::time::Instant;
169+
170+
// Read parallelism level from env
171+
// Use env variable if provided, else saturate all tokio threads by default (num_cpus::get())
172+
let parallel_uploads: usize = env::var("GENEVA_PARALLEL_UPLOADS")
173+
.ok()
174+
.and_then(|v| v.parse().ok())
175+
.unwrap_or_else(num_cpus::get);
176+
let ctx = test_helpers::build_test_upload_context().await;
177+
178+
// --- Warm-up: do the first upload to populate the token cache ---
179+
println!("🔥 Performing warm-up upload...");
180+
let start_warmup = Instant::now();
181+
let _ = ctx
182+
.uploader
183+
.upload(ctx.data.clone(), &ctx.event_name, &ctx.event_version)
184+
.await
185+
.expect("Warm-up upload failed");
186+
println!(
187+
"🔥 Warm-up upload complete in {:.2?}",
188+
start_warmup.elapsed()
189+
);
190+
191+
println!("🚀 Launching {parallel_uploads} parallel uploads...");
192+
193+
let start_all = Instant::now();
194+
195+
let mut handles = vec![];
196+
for i in 0..parallel_uploads {
197+
let uploader = ctx.uploader.clone();
198+
let data = ctx.data.clone();
199+
let event_name = ctx.event_name.to_string();
200+
let event_version = ctx.event_version.to_string();
201+
202+
let handle = tokio::spawn(async move {
203+
let start = Instant::now();
204+
let resp = uploader
205+
.upload(data, &event_name, &event_version)
206+
.await
207+
.unwrap_or_else(|_| panic!("Upload {} failed", i));
208+
let elapsed = start.elapsed();
209+
println!(
210+
"✅ Upload {} complete in {:.2?}. Ticket: {}",
211+
i, elapsed, resp.ticket
212+
);
213+
elapsed
214+
});
215+
216+
handles.push(handle);
217+
}
218+
219+
let durations: Vec<_> = futures::future::join_all(handles)
220+
.await
221+
.into_iter()
222+
.map(|res| res.expect("Join error in upload task"))
223+
.collect();
224+
225+
let total_time = start_all.elapsed();
226+
227+
let avg_ms =
228+
durations.iter().map(|d| d.as_millis()).sum::<u128>() as f64 / durations.len() as f64;
229+
println!("📊 Average upload duration: {:.2} ms", avg_ms);
230+
231+
println!(
232+
"⏱️ Total elapsed for {} parallel uploads: {:.2?}",
233+
parallel_uploads, total_time
234+
);
235+
}
236+
}

0 commit comments

Comments
 (0)