Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/access.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use s3s::S3Result;
use s3s::access::{S3Access, S3AccessContext};

use crate::telemetry;

pub(crate) struct TelemetryAccess;

#[async_trait::async_trait]
impl S3Access for TelemetryAccess {
async fn check(&self, cx: &mut S3AccessContext<'_>) -> S3Result<()> {
// to use this check to record telemetry data is an abuse of the S3Access trait, but we use it here to have a single location where we can record the requested operations
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Die alternative wäre ein decorator für den S3 Trait, oder? bedeutet man müsste alle methoden wrappen.

telemetry::record_endpoint_call(cx.s3_op().name());

if cx.credentials().is_none() {
return Err(s3s::s3_error!(AccessDenied, "Signature is required"));
}

Ok(())
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub use self::proxy::{S3CachingProxy, range_to_string};
pub use self::s3_cache::{CacheKey, CachedObject, S3Cache};
pub use self::statistics::UniqueRequestedObjectsStatisticsTracker;

mod access;
mod auth;
mod config;
mod error;
Expand Down Expand Up @@ -148,6 +149,7 @@ where
let service = {
let mut b = S3ServiceBuilder::new(caching_proxy);
b.set_auth(auth::create_auth(&config));
b.set_access(access::TelemetryAccess);
let upstream_health_endpoint = Url::parse(&config.upstream_endpoint)
.unwrap()
.join("/minio/health/ready")
Expand Down
31 changes: 30 additions & 1 deletion src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use opentelemetry::KeyValue;
use opentelemetry::metrics::{Counter, Gauge, Histogram};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::{Compression, WithExportConfig, WithTonicConfig};
use prometheus::{HistogramOpts, IntCounter, IntGauge, Registry};
use prometheus::{HistogramOpts, IntCounter, IntCounterVec, IntGauge, Opts, Registry};
use tracing::{error, info};
use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt};

Expand Down Expand Up @@ -681,6 +681,35 @@ pub(crate) fn record_request_duration(data: RequestDuration) {
REQUEST_DURATION_MS.record(milliseconds, &attributes);
}

// MARK: - Endpoint Calls

pub(crate) fn record_endpoint_call(method: &str) {
static ENDPOINT_TOTAL: LazyLock<Counter<u64>> = LazyLock::new(|| {
opentelemetry::global::meter(CARGO_CRATE_NAME)
.u64_counter("s3_cache.endpoint_call_total")
.with_description("Number of S3 endpoint method calls")
.build()
});

static PROM_ENDPOINT_TOTAL: LazyLock<IntCounterVec> = LazyLock::new(|| {
let counter = IntCounterVec::new(
Opts::new(
"s3_cache_endpoint_call_total",
"Number of S3 endpoint method calls",
),
&["rpc_method"],
)
.unwrap();
PROMETHEUS_REGISTRY
.register(Box::new(counter.clone()))
.unwrap();
counter
});

PROM_ENDPOINT_TOTAL.with_label_values(&[method]).inc();
ENDPOINT_TOTAL.add(1, &[KeyValue::new("rpc.method", method.to_owned())]);
}

// MARK: Response Body Sizes

/// Attributes based on: https://opentelemetry.io/docs/specs/semconv/http/http-metrics/#http-server
Expand Down