Skip to content

Commit 79bc61a

Browse files
committed
refactor: make auth interceptor await initial credential refresh and remove manual delays in tests.
1 parent 21a2dae commit 79bc61a

File tree

3 files changed

+28
-31
lines changed

3 files changed

+28
-31
lines changed

src/integration-tests/src/observability/auth.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,15 @@ pub struct CloudTelemetryAuthInterceptor {
5050
impl CloudTelemetryAuthInterceptor {
5151
/// Creates a new `CloudTelemetryAuthInterceptor` and starts a background task to keep
5252
/// credentials refreshed.
53-
pub fn new(credentials: Credentials) -> Self {
54-
let (tx, rx) = watch::channel(None);
53+
pub async fn new(credentials: Credentials) -> Self {
54+
let (tx, mut rx) = watch::channel(None);
5555
tokio::spawn(refresh_task(credentials, tx));
56+
57+
// Wait for the first refresh to complete.
58+
// We ignore the result because if the sender is dropped (unlikely),
59+
// the interceptor will just fail requests, which is the correct behavior.
60+
let _ = rx.changed().await;
61+
5662
Self { rx }
5763
}
5864
}
@@ -139,6 +145,12 @@ mod tests {
139145
#[tokio::test]
140146
async fn test_interceptor_injects_headers() {
141147
let (tx, rx) = watch::channel(None);
148+
// Manually construct because new() spawns a task we don't want here,
149+
// or we could just use new() and let it spawn.
150+
// But since we want to control the channel, we construct manually as before.
151+
// Wait, the previous test code manually constructed it:
152+
// let mut interceptor = CloudTelemetryAuthInterceptor { rx };
153+
// So we don't need to change this test if it doesn't use new().
142154
let mut interceptor = CloudTelemetryAuthInterceptor { rx };
143155

144156
// 1. Initial state (no headers)

src/integration-tests/src/observability/otlp.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ impl CloudTelemetryTracerProviderBuilder {
101101
.build()
102102
.map_err(|e| TraceError::Other(e.into()))?,
103103
};
104-
let interceptor = CloudTelemetryAuthInterceptor::new(credentials);
104+
let interceptor = CloudTelemetryAuthInterceptor::new(credentials).await;
105105

106106
let resource = opentelemetry_sdk::Resource::builder_empty()
107107
.with_attributes(vec![

src/integration-tests/tests/telemetry.rs

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ mod telemetry {
3838
// This requires GOOGLE_CLOUD_PROJECT to be set.
3939
let project_id = integration_tests::project_id()?;
4040
let service_name = "e2e-telemetry-test";
41-
println!("Project ID: {}", project_id);
4241

4342
// Configure OTLP provider (sends to telemetry.googleapis.com)
4443
// This uses ADC automatically from the environment.
@@ -53,28 +52,23 @@ mod telemetry {
5352
))
5453
.set_default();
5554

56-
// Wait for credentials to be refreshed in the background task
57-
println!("Waiting for credentials to initialize...");
58-
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
59-
6055
// 3. Generate Trace
6156
let span_name = "e2e-showcase-test";
6257

6358
// Start a root span
6459
let root_span = tracing::info_span!("e2e_root", "otel.name" = span_name);
6560
let trace_id = {
6661
let _enter = root_span.enter();
67-
let otel_ctx = root_span.context();
68-
let otel_span_ref = otel_ctx.span();
69-
let span_context = otel_span_ref.span_context();
70-
let trace_id = span_context.trace_id().to_string();
62+
let trace_id = root_span
63+
.context()
64+
.span()
65+
.span_context()
66+
.trace_id()
67+
.to_string();
7168

7269
// Initialize showcase client pointing to local mock server
73-
// We use anonymous credentials for the *client* because it's talking to httptest
74-
let endpoint = format!("http://{}", echo_server.addr());
75-
7670
let client = showcase::client::Echo::builder()
77-
.with_endpoint(endpoint)
71+
.with_endpoint(format!("http://{}", echo_server.addr()))
7872
.with_credentials(auth::credentials::anonymous::Builder::new().build())
7973
.with_tracing()
8074
.build()
@@ -89,30 +83,24 @@ mod telemetry {
8983
// explicitly drop the span to end it
9084
drop(root_span);
9185

92-
println!("Generated Trace ID: {}", trace_id);
93-
println!("Span Name: {}", span_name);
9486
println!(
95-
"View in Console: https://console.cloud.google.com/traces/explorer;traceId={}?project={}",
87+
"View generated trace in Console: https://console.cloud.google.com/traces/explorer;traceId={}?project={}",
9688
trace_id, project_id
9789
);
9890

99-
// 4. Flush
100-
// Force flush
101-
for _ in 0..5 {
102-
let _ = provider.force_flush();
103-
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
104-
}
105-
println!("Spans flushed.");
91+
// 4. Force flush to ensure spans are sent.
92+
let _ = provider.force_flush();
10693

10794
// 5. Verify (Poll Cloud Trace API)
10895
let client = TraceService::builder().build().await?;
10996

110-
println!("Polling for trace...");
97+
// Because we are limited by quota, start with a backoff.
98+
// Traces can take several minutes to propagate after they have been written.
99+
// Implement a generous retry loop to account for this.
111100
let backoff_delays = [10, 60, 120, 120, 120];
112101
let mut trace = None;
113102

114103
for delay in backoff_delays {
115-
println!("Waiting {}s before polling...", delay);
116104
tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
117105

118106
match client
@@ -145,9 +133,6 @@ mod telemetry {
145133

146134
let trace = trace.ok_or_else(|| anyhow::anyhow!("Timed out waiting for trace"))?;
147135

148-
println!("Trace found!");
149-
println!("Response: {:?}", trace);
150-
151136
// 6. Assertions
152137
// Check for root span
153138
let root_found = trace.spans.iter().any(|s| s.name == span_name);

0 commit comments

Comments
 (0)