|
14 | 14 |
|
15 | 15 | #[cfg(all(test, feature = "run-integration-tests", google_cloud_unstable_tracing))] |
16 | 16 | mod telemetry { |
17 | | - use gax::exponential_backoff::ExponentialBackoffBuilder; |
18 | 17 | use google_cloud_trace_v1::client::TraceService; |
19 | 18 | use httptest::{Expectation, Server, matchers::*, responders::status_code}; |
20 | 19 | use integration_tests::observability::otlp::CloudTelemetryTracerProviderBuilder; |
21 | 20 | use opentelemetry::trace::TraceContextExt; |
22 | | - use std::time::Duration; |
23 | 21 | use tracing_opentelemetry::OpenTelemetrySpanExt; |
24 | 22 | use tracing_subscriber::layer::SubscriberExt; |
25 | 23 | use tracing_subscriber::util::SubscriberInitExt; |
26 | 24 |
|
27 | | - #[derive(Debug, Clone)] |
28 | | - struct RetryNotFound; |
29 | | - |
30 | | - impl gax::retry_policy::RetryPolicy for RetryNotFound { |
31 | | - fn on_error( |
32 | | - &self, |
33 | | - _state: &gax::retry_state::RetryState, |
34 | | - error: gax::error::Error, |
35 | | - ) -> gax::retry_result::RetryResult { |
36 | | - if let Some(status) = error.status() { |
37 | | - if status.code == gax::error::rpc::Code::NotFound { |
38 | | - return gax::retry_result::RetryResult::Continue(error); |
39 | | - } |
40 | | - } |
41 | | - gax::retry_result::RetryResult::Permanent(error) |
42 | | - } |
43 | | - } |
44 | | - |
45 | 25 | #[tokio::test(flavor = "multi_thread", worker_threads = 2)] |
46 | 26 | async fn test_telemetry_e2e() -> integration_tests::Result<()> { |
47 | 27 | // 1. Setup Mock Server (Traffic Destination) |
@@ -125,31 +105,40 @@ mod telemetry { |
125 | 105 | println!("Spans flushed."); |
126 | 106 |
|
127 | 107 | // 5. Verify (Poll Cloud Trace API) |
128 | | - // Configure retry policy for NOT_FOUND errors |
129 | | - let retry_policy = RetryNotFound; |
130 | | - // Configure backoff policy (initial 10s, max 120s, scaling 6.0) |
131 | | - // This roughly matches the previous manual loop: 10, 60, 120... |
132 | | - let backoff = ExponentialBackoffBuilder::new() |
133 | | - .with_initial_delay(Duration::from_secs(10)) |
134 | | - .with_maximum_delay(Duration::from_secs(120)) |
135 | | - .with_scaling(6.0) |
136 | | - .build() |
137 | | - .map_err(|e| anyhow::anyhow!("Failed to build backoff: {}", e))?; |
138 | | - |
139 | | - // Use the Builder to create the client with retry and backoff policies |
140 | | - let client: TraceService = TraceService::builder() |
141 | | - .with_retry_policy(retry_policy) |
142 | | - .with_backoff_policy(backoff) |
143 | | - .build() |
144 | | - .await?; |
| 108 | + let client = TraceService::builder().build().await?; |
145 | 109 |
|
146 | 110 | println!("Polling for trace..."); |
147 | | - let trace = client |
148 | | - .get_trace() |
149 | | - .set_project_id(&project_id) |
150 | | - .set_trace_id(&trace_id) |
151 | | - .send() |
152 | | - .await?; |
| 111 | + let backoff_delays = [10, 60, 120, 120, 120]; |
| 112 | + let mut trace = None; |
| 113 | + |
| 114 | + for delay in backoff_delays { |
| 115 | + println!("Waiting {}s before polling...", delay); |
| 116 | + tokio::time::sleep(std::time::Duration::from_secs(delay)).await; |
| 117 | + |
| 118 | + match client |
| 119 | + .get_trace() |
| 120 | + .set_project_id(&project_id) |
| 121 | + .set_trace_id(&trace_id) |
| 122 | + .send() |
| 123 | + .await |
| 124 | + { |
| 125 | + Ok(t) => { |
| 126 | + trace = Some(t); |
| 127 | + break; |
| 128 | + } |
| 129 | + Err(e) => { |
| 130 | + if let Some(status) = e.status() { |
| 131 | + if status.code == gax::error::rpc::Code::NotFound { |
| 132 | + println!("Trace not found yet, retrying..."); |
| 133 | + continue; |
| 134 | + } |
| 135 | + } |
| 136 | + return Err(e.into()); |
| 137 | + } |
| 138 | + } |
| 139 | + } |
| 140 | + |
| 141 | + let trace = trace.ok_or_else(|| anyhow::anyhow!("Timed out waiting for trace"))?; |
153 | 142 |
|
154 | 143 | println!("Trace found!"); |
155 | 144 | println!("Response: {:?}", trace); |
|
0 commit comments