Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion src/integration-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ opentelemetry_sdk.workspace = true
opentelemetry-otlp.workspace = true
opentelemetry-proto.workspace = true
rand = { workspace = true, features = ["thread_rng"] }
reqwest.workspace = true
serde_json.workspace = true
tokio = { workspace = true, features = ["process", "test-util"] }
tokio-stream = { workspace = true, features = ["net"] }
Expand Down Expand Up @@ -118,12 +119,15 @@ path = "../../src/generated/cloud/workflows/v1"
package = "google-cloud-workflows-executions-v1"
path = "../../src/generated/cloud/workflows/executions/v1"

[dependencies.google-cloud-trace-v1]
package = "google-cloud-trace-v1"
path = "../../src/generated/devtools/cloudtrace/v1"

[dev-dependencies]
anyhow.workspace = true
httptest.workspace = true
mockall.workspace = true
opentelemetry_sdk = { workspace = true, features = ["testing"] }
reqwest.workspace = true
serde.workspace = true
serde_with.workspace = true
static_assertions.workspace = true
Expand Down
16 changes: 14 additions & 2 deletions src/integration-tests/src/observability/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,15 @@ pub struct CloudTelemetryAuthInterceptor {
impl CloudTelemetryAuthInterceptor {
/// Creates a new `CloudTelemetryAuthInterceptor` and starts a background task to keep
/// credentials refreshed.
pub fn new(credentials: Credentials) -> Self {
let (tx, rx) = watch::channel(None);
pub async fn new(credentials: Credentials) -> Self {
let (tx, mut rx) = watch::channel(None);
tokio::spawn(refresh_task(credentials, tx));

// Wait for the first refresh to complete.
// We ignore the result because if the sender is dropped (unlikely),
// the interceptor will just fail requests, which is the correct behavior.
let _ = rx.changed().await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only send headers in the case of a success, so this hangs on bad credentials, right?

Err(e) => {
tracing::warn!("Failed to refresh GCP credentials: {e:?}");
sleep(ERROR_RETRY_DELAY).await;
}

So should we:

match headers {
  // omitted...
  Err(e) if e.is_transient() => {
     tracing::warn!("Failed to refresh GCP credentials: {e:?}"); 
     sleep(ERROR_RETRY_DELAY).await; 
  },
  Err(e) => break,
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, seems like this can't get hit anymore:

// If the first refresh hasn't completed yet, fail the request.
// The OTLP exporter is expected to handle this transient failure
// with its built-in retry mechanism.
Status::unauthenticated("GCP credentials not yet available")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is getting complicated enough that we should probably defer it in a separate PR.


Self { rx }
}
}
Expand Down Expand Up @@ -139,6 +145,12 @@ mod tests {
#[tokio::test]
async fn test_interceptor_injects_headers() {
let (tx, rx) = watch::channel(None);
// Manually construct because new() spawns a task we don't want here,
// or we could just use new() and let it spawn.
// But since we want to control the channel, we construct manually as before.
// Wait, the previous test code manually constructed it:
// let mut interceptor = CloudTelemetryAuthInterceptor { rx };
// So we don't need to change this test if it doesn't use new().
let mut interceptor = CloudTelemetryAuthInterceptor { rx };

// 1. Initial state (no headers)
Expand Down
2 changes: 1 addition & 1 deletion src/integration-tests/src/observability/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl CloudTelemetryTracerProviderBuilder {
.build()
.map_err(|e| TraceError::Other(e.into()))?,
};
let interceptor = CloudTelemetryAuthInterceptor::new(credentials);
let interceptor = CloudTelemetryAuthInterceptor::new(credentials).await;

let resource = opentelemetry_sdk::Resource::builder_empty()
.with_attributes(vec![
Expand Down
152 changes: 152 additions & 0 deletions src/integration-tests/tests/telemetry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#[cfg(all(test, feature = "run-integration-tests", google_cloud_unstable_tracing))]
mod telemetry {
use google_cloud_trace_v1::client::TraceService;
use httptest::{Expectation, Server, matchers::*, responders::status_code};
use integration_tests::observability::otlp::CloudTelemetryTracerProviderBuilder;
use opentelemetry::trace::TraceContextExt;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_telemetry_e2e() -> integration_tests::Result<()> {
// 1. Setup Mock Server (Traffic Destination)
let echo_server = Server::run();
echo_server.expect(
Expectation::matching(all_of![
request::method("POST"),
request::path("/v1beta1/echo:echo"),
])
.respond_with(status_code(200).body(r#"{"content": "test"}"#)),
);

// 2. Setup Telemetry (Real Google Cloud Destination)
// This requires GOOGLE_CLOUD_PROJECT to be set.
let project_id = integration_tests::project_id()?;
let service_name = "e2e-telemetry-test";

// Configure OTLP provider (sends to telemetry.googleapis.com)
// This uses ADC automatically from the environment.
let provider = CloudTelemetryTracerProviderBuilder::new(&project_id, service_name)
.build()
.await?;

// Install subscriber
let _guard = tracing_subscriber::Registry::default()
.with(integration_tests::observability::tracing::layer(
provider.clone(),
))
.set_default();

// 3. Generate Trace
let span_name = "e2e-showcase-test";

// Start a root span
let root_span = tracing::info_span!("e2e_root", "otel.name" = span_name);
let trace_id = {
let _enter = root_span.enter();
let trace_id = root_span
.context()
.span()
.span_context()
.trace_id()
.to_string();

// Initialize showcase client pointing to local mock server
let client = showcase::client::Echo::builder()
.with_endpoint(format!("http://{}", echo_server.addr()))
.with_credentials(auth::credentials::anonymous::Builder::new().build())
.with_tracing()
.build()
.await?;

// Make the API call
// This will generate child spans within the library
let _ = client.echo().set_content("test").send().await?;

trace_id
};
// explicitly drop the span to end it
drop(root_span);

println!(
"View generated trace in Console: https://console.cloud.google.com/traces/explorer;traceId={}?project={}",
trace_id, project_id
);

// 4. Force flush to ensure spans are sent.
let _ = provider.force_flush();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: can we use ?. Are you trying to avoid workaround Timeout errors? https://docs.rs/opentelemetry_sdk/latest/opentelemetry_sdk/error/enum.OTelSdkError.html#variant.Timeout


// 5. Verify (Poll Cloud Trace API)
let client = TraceService::builder().build().await?;

// Because we are limited by quota, start with a backoff.
// Traces can take several minutes to propagate after they have been written.
// Implement a generous retry loop to account for this.
let backoff_delays = [10, 60, 120, 120, 120];
let mut trace = None;

for delay in backoff_delays {
tokio::time::sleep(std::time::Duration::from_secs(delay)).await;

match client
.get_trace()
.set_project_id(&project_id)
.set_trace_id(&trace_id)
.send()
.await
{
Ok(t) => {
trace = Some(t);
break;
}
Err(e) => {
if let Some(status) = e.status() {
if status.code == gax::error::rpc::Code::NotFound
|| status.code == gax::error::rpc::Code::Internal
{
println!(
"Trace not found yet (or internal error), retrying... Error: {:?}",
e
);
continue;
}
}
return Err(e.into());
}
}
}

let trace = trace.ok_or_else(|| anyhow::anyhow!("Timed out waiting for trace"))?;

// 6. Assertions
// Check for root span
let root_found = trace.spans.iter().any(|s| s.name == span_name);
assert!(root_found, "Root span '{}' not found in trace", span_name);

// Check for showcase client span
let client_span_name = "google-cloud-showcase-v1beta1::client::Echo::echo";
let client_found = trace.spans.iter().any(|s| s.name == client_span_name);
assert!(
client_found,
"Client library span '{}' not found in trace",
client_span_name
);

Ok(())
}
}