diff --git a/Cargo.lock b/Cargo.lock index 6f78204e44..c80a3cd21c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5707,6 +5707,7 @@ dependencies = [ "google-cloud-storage", "google-cloud-telcoautomation-v1", "google-cloud-test-utils", + "google-cloud-trace-v1", "google-cloud-wkt", "google-cloud-workflows-executions-v1", "google-cloud-workflows-v1", diff --git a/src/integration-tests/Cargo.toml b/src/integration-tests/Cargo.toml index 26b1e82b6d..6e483d2a00 100644 --- a/src/integration-tests/Cargo.toml +++ b/src/integration-tests/Cargo.toml @@ -46,6 +46,7 @@ opentelemetry_sdk.workspace = true opentelemetry-otlp.workspace = true opentelemetry-proto.workspace = true rand = { workspace = true, features = ["thread_rng"] } +reqwest.workspace = true serde_json.workspace = true tokio = { workspace = true, features = ["process", "test-util"] } tokio-stream = { workspace = true, features = ["net"] } @@ -118,12 +119,15 @@ path = "../../src/generated/cloud/workflows/v1" package = "google-cloud-workflows-executions-v1" path = "../../src/generated/cloud/workflows/executions/v1" +[dependencies.google-cloud-trace-v1] +package = "google-cloud-trace-v1" +path = "../../src/generated/devtools/cloudtrace/v1" + [dev-dependencies] anyhow.workspace = true httptest.workspace = true mockall.workspace = true opentelemetry_sdk = { workspace = true, features = ["testing"] } -reqwest.workspace = true serde.workspace = true serde_with.workspace = true static_assertions.workspace = true diff --git a/src/integration-tests/src/observability/auth.rs b/src/integration-tests/src/observability/auth.rs index f1aedb47e4..9d6b84efe6 100644 --- a/src/integration-tests/src/observability/auth.rs +++ b/src/integration-tests/src/observability/auth.rs @@ -50,9 +50,15 @@ pub struct CloudTelemetryAuthInterceptor { impl CloudTelemetryAuthInterceptor { /// Creates a new `CloudTelemetryAuthInterceptor` and starts a background task to keep /// credentials refreshed. - pub fn new(credentials: Credentials) -> Self { - let (tx, rx) = watch::channel(None); + pub async fn new(credentials: Credentials) -> Self { + let (tx, mut rx) = watch::channel(None); tokio::spawn(refresh_task(credentials, tx)); + + // Wait for the first refresh to complete. + // We ignore the result because if the sender is dropped (unlikely), + // the interceptor will just fail requests, which is the correct behavior. + let _ = rx.changed().await; + Self { rx } } } @@ -139,6 +145,12 @@ mod tests { #[tokio::test] async fn test_interceptor_injects_headers() { let (tx, rx) = watch::channel(None); + // Manually construct because new() spawns a task we don't want here, + // or we could just use new() and let it spawn. + // But since we want to control the channel, we construct manually as before. + // Wait, the previous test code manually constructed it: + // let mut interceptor = CloudTelemetryAuthInterceptor { rx }; + // So we don't need to change this test if it doesn't use new(). let mut interceptor = CloudTelemetryAuthInterceptor { rx }; // 1. Initial state (no headers) diff --git a/src/integration-tests/src/observability/otlp.rs b/src/integration-tests/src/observability/otlp.rs index bc5612ac0b..552ab41457 100644 --- a/src/integration-tests/src/observability/otlp.rs +++ b/src/integration-tests/src/observability/otlp.rs @@ -101,7 +101,7 @@ impl CloudTelemetryTracerProviderBuilder { .build() .map_err(|e| TraceError::Other(e.into()))?, }; - let interceptor = CloudTelemetryAuthInterceptor::new(credentials); + let interceptor = CloudTelemetryAuthInterceptor::new(credentials).await; let resource = opentelemetry_sdk::Resource::builder_empty() .with_attributes(vec![ diff --git a/src/integration-tests/tests/telemetry.rs b/src/integration-tests/tests/telemetry.rs new file mode 100644 index 0000000000..0c840743d3 --- /dev/null +++ b/src/integration-tests/tests/telemetry.rs @@ -0,0 +1,152 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(all(test, feature = "run-integration-tests", google_cloud_unstable_tracing))] +mod telemetry { + use google_cloud_trace_v1::client::TraceService; + use httptest::{Expectation, Server, matchers::*, responders::status_code}; + use integration_tests::observability::otlp::CloudTelemetryTracerProviderBuilder; + use opentelemetry::trace::TraceContextExt; + use tracing_opentelemetry::OpenTelemetrySpanExt; + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::util::SubscriberInitExt; + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_telemetry_e2e() -> integration_tests::Result<()> { + // 1. Setup Mock Server (Traffic Destination) + let echo_server = Server::run(); + echo_server.expect( + Expectation::matching(all_of![ + request::method("POST"), + request::path("/v1beta1/echo:echo"), + ]) + .respond_with(status_code(200).body(r#"{"content": "test"}"#)), + ); + + // 2. Setup Telemetry (Real Google Cloud Destination) + // This requires GOOGLE_CLOUD_PROJECT to be set. + let project_id = integration_tests::project_id()?; + let service_name = "e2e-telemetry-test"; + + // Configure OTLP provider (sends to telemetry.googleapis.com) + // This uses ADC automatically from the environment. + let provider = CloudTelemetryTracerProviderBuilder::new(&project_id, service_name) + .build() + .await?; + + // Install subscriber + let _guard = tracing_subscriber::Registry::default() + .with(integration_tests::observability::tracing::layer( + provider.clone(), + )) + .set_default(); + + // 3. Generate Trace + let span_name = "e2e-showcase-test"; + + // Start a root span + let root_span = tracing::info_span!("e2e_root", "otel.name" = span_name); + let trace_id = { + let _enter = root_span.enter(); + let trace_id = root_span + .context() + .span() + .span_context() + .trace_id() + .to_string(); + + // Initialize showcase client pointing to local mock server + let client = showcase::client::Echo::builder() + .with_endpoint(format!("http://{}", echo_server.addr())) + .with_credentials(auth::credentials::anonymous::Builder::new().build()) + .with_tracing() + .build() + .await?; + + // Make the API call + // This will generate child spans within the library + let _ = client.echo().set_content("test").send().await?; + + trace_id + }; + // explicitly drop the span to end it + drop(root_span); + + println!( + "View generated trace in Console: https://console.cloud.google.com/traces/explorer;traceId={}?project={}", + trace_id, project_id + ); + + // 4. Force flush to ensure spans are sent. + let _ = provider.force_flush(); + + // 5. Verify (Poll Cloud Trace API) + let client = TraceService::builder().build().await?; + + // Because we are limited by quota, start with a backoff. + // Traces can take several minutes to propagate after they have been written. + // Implement a generous retry loop to account for this. + let backoff_delays = [10, 60, 120, 120, 120]; + let mut trace = None; + + for delay in backoff_delays { + tokio::time::sleep(std::time::Duration::from_secs(delay)).await; + + match client + .get_trace() + .set_project_id(&project_id) + .set_trace_id(&trace_id) + .send() + .await + { + Ok(t) => { + trace = Some(t); + break; + } + Err(e) => { + if let Some(status) = e.status() { + if status.code == gax::error::rpc::Code::NotFound + || status.code == gax::error::rpc::Code::Internal + { + println!( + "Trace not found yet (or internal error), retrying... Error: {:?}", + e + ); + continue; + } + } + return Err(e.into()); + } + } + } + + let trace = trace.ok_or_else(|| anyhow::anyhow!("Timed out waiting for trace"))?; + + // 6. Assertions + // Check for root span + let root_found = trace.spans.iter().any(|s| s.name == span_name); + assert!(root_found, "Root span '{}' not found in trace", span_name); + + // Check for showcase client span + let client_span_name = "google-cloud-showcase-v1beta1::client::Echo::echo"; + let client_found = trace.spans.iter().any(|s| s.name == client_span_name); + assert!( + client_found, + "Client library span '{}' not found in trace", + client_span_name + ); + + Ok(()) + } +}