Skip to content

Commit a835ced

Browse files
committed
Add metric for tracking S3 endpoint calls by RPC method
1 parent e1350ca commit a835ced

File tree

2 files changed

+50
-1
lines changed

2 files changed

+50
-1
lines changed

src/proxy.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ impl<T: S3 + Send + Sync> S3 for S3CachingProxy<T> {
115115
&self,
116116
req: S3Request<GetObjectInput>,
117117
) -> S3Result<S3Response<GetObjectOutput>> {
118+
telemetry::record_endpoint_call("GetObject");
118119
let start = Instant::now();
119120
let method = req.method.to_string();
120121
let scheme = req.uri.scheme_str().map(str::to_owned);
@@ -372,6 +373,7 @@ impl<T: S3 + Send + Sync> S3 for S3CachingProxy<T> {
372373
&self,
373374
req: S3Request<PutObjectInput>,
374375
) -> S3Result<S3Response<PutObjectOutput>> {
376+
telemetry::record_endpoint_call("PutObject");
375377
let bucket = req.input.bucket.clone();
376378
let key = req.input.key.clone();
377379

@@ -400,6 +402,7 @@ impl<T: S3 + Send + Sync> S3 for S3CachingProxy<T> {
400402
&self,
401403
req: S3Request<DeleteObjectInput>,
402404
) -> S3Result<S3Response<DeleteObjectOutput>> {
405+
telemetry::record_endpoint_call("DeleteObject");
403406
let bucket = req.input.bucket.clone();
404407
let key = req.input.key.clone();
405408

@@ -428,6 +431,7 @@ impl<T: S3 + Send + Sync> S3 for S3CachingProxy<T> {
428431
&self,
429432
req: S3Request<DeleteObjectsInput>,
430433
) -> S3Result<S3Response<DeleteObjectsOutput>> {
434+
telemetry::record_endpoint_call("DeleteObjects");
431435
let bucket = req.input.bucket.clone();
432436
let keys: Vec<String> = req
433437
.input
@@ -464,6 +468,7 @@ impl<T: S3 + Send + Sync> S3 for S3CachingProxy<T> {
464468
&self,
465469
req: S3Request<CopyObjectInput>,
466470
) -> S3Result<S3Response<CopyObjectOutput>> {
471+
telemetry::record_endpoint_call("CopyObject");
467472
let dest_bucket = req.input.bucket.clone();
468473
let dest_key = req.input.key.clone();
469474

@@ -492,13 +497,15 @@ impl<T: S3 + Send + Sync> S3 for S3CachingProxy<T> {
492497
&self,
493498
req: S3Request<AbortMultipartUploadInput>,
494499
) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
500+
telemetry::record_endpoint_call("AbortMultipartUpload");
495501
self.inner.abort_multipart_upload(req).await
496502
}
497503

498504
async fn complete_multipart_upload(
499505
&self,
500506
req: S3Request<CompleteMultipartUploadInput>,
501507
) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
508+
telemetry::record_endpoint_call("CompleteMultipartUpload");
502509
let bucket = req.input.bucket.clone();
503510
let key = req.input.key.clone();
504511

@@ -527,90 +534,103 @@ impl<T: S3 + Send + Sync> S3 for S3CachingProxy<T> {
527534
&self,
528535
req: S3Request<CreateBucketInput>,
529536
) -> S3Result<S3Response<CreateBucketOutput>> {
537+
telemetry::record_endpoint_call("CreateBucket");
530538
self.inner.create_bucket(req).await
531539
}
532540

533541
async fn create_multipart_upload(
534542
&self,
535543
req: S3Request<CreateMultipartUploadInput>,
536544
) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
545+
telemetry::record_endpoint_call("CreateMultipartUpload");
537546
self.inner.create_multipart_upload(req).await
538547
}
539548

540549
async fn delete_bucket(
541550
&self,
542551
req: S3Request<DeleteBucketInput>,
543552
) -> S3Result<S3Response<DeleteBucketOutput>> {
553+
telemetry::record_endpoint_call("DeleteBucket");
544554
self.inner.delete_bucket(req).await
545555
}
546556

547557
async fn get_bucket_location(
548558
&self,
549559
req: S3Request<GetBucketLocationInput>,
550560
) -> S3Result<S3Response<GetBucketLocationOutput>> {
561+
telemetry::record_endpoint_call("GetBucketLocation");
551562
self.inner.get_bucket_location(req).await
552563
}
553564

554565
async fn head_bucket(
555566
&self,
556567
req: S3Request<HeadBucketInput>,
557568
) -> S3Result<S3Response<HeadBucketOutput>> {
569+
telemetry::record_endpoint_call("HeadBucket");
558570
self.inner.head_bucket(req).await
559571
}
560572

561573
async fn head_object(
562574
&self,
563575
req: S3Request<HeadObjectInput>,
564576
) -> S3Result<S3Response<HeadObjectOutput>> {
577+
telemetry::record_endpoint_call("HeadObject");
565578
self.inner.head_object(req).await
566579
}
567580

568581
async fn list_buckets(
569582
&self,
570583
req: S3Request<ListBucketsInput>,
571584
) -> S3Result<S3Response<ListBucketsOutput>> {
585+
telemetry::record_endpoint_call("ListBuckets");
572586
self.inner.list_buckets(req).await
573587
}
574588

575589
async fn list_multipart_uploads(
576590
&self,
577591
req: S3Request<ListMultipartUploadsInput>,
578592
) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
593+
telemetry::record_endpoint_call("ListMultipartUploads");
579594
self.inner.list_multipart_uploads(req).await
580595
}
581596

582597
async fn list_objects(
583598
&self,
584599
req: S3Request<ListObjectsInput>,
585600
) -> S3Result<S3Response<ListObjectsOutput>> {
601+
telemetry::record_endpoint_call("ListObjects");
586602
self.inner.list_objects(req).await
587603
}
588604

589605
async fn list_objects_v2(
590606
&self,
591607
req: S3Request<ListObjectsV2Input>,
592608
) -> S3Result<S3Response<ListObjectsV2Output>> {
609+
telemetry::record_endpoint_call("ListObjectsV2");
593610
self.inner.list_objects_v2(req).await
594611
}
595612

596613
async fn list_parts(
597614
&self,
598615
req: S3Request<ListPartsInput>,
599616
) -> S3Result<S3Response<ListPartsOutput>> {
617+
telemetry::record_endpoint_call("ListParts");
600618
self.inner.list_parts(req).await
601619
}
602620

603621
async fn upload_part(
604622
&self,
605623
req: S3Request<UploadPartInput>,
606624
) -> S3Result<S3Response<UploadPartOutput>> {
625+
telemetry::record_endpoint_call("UploadPart");
607626
self.inner.upload_part(req).await
608627
}
609628

610629
async fn upload_part_copy(
611630
&self,
612631
req: S3Request<UploadPartCopyInput>,
613632
) -> S3Result<S3Response<UploadPartCopyOutput>> {
633+
telemetry::record_endpoint_call("UploadPartCopy");
614634
self.inner.upload_part_copy(req).await
615635
}
616636
}

src/telemetry.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use opentelemetry::KeyValue;
55
use opentelemetry::metrics::{Counter, Gauge, Histogram};
66
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
77
use opentelemetry_otlp::{Compression, WithExportConfig, WithTonicConfig};
8-
use prometheus::{HistogramOpts, IntCounter, IntGauge, Registry};
8+
use prometheus::{HistogramOpts, IntCounter, IntCounterVec, IntGauge, Opts, Registry};
99
use tracing::{error, info};
1010
use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt};
1111

@@ -681,6 +681,35 @@ pub(crate) fn record_request_duration(data: RequestDuration) {
681681
REQUEST_DURATION_MS.record(milliseconds, &attributes);
682682
}
683683

684+
// MARK: - Endpoint Calls
685+
686+
pub(crate) fn record_endpoint_call(method: &'static str) {
687+
static ENDPOINT_TOTAL: LazyLock<Counter<u64>> = LazyLock::new(|| {
688+
opentelemetry::global::meter(CARGO_CRATE_NAME)
689+
.u64_counter("s3_cache.endpoint_call_total")
690+
.with_description("Number of S3 endpoint method calls")
691+
.build()
692+
});
693+
694+
static PROM_ENDPOINT_TOTAL: LazyLock<IntCounterVec> = LazyLock::new(|| {
695+
let counter = IntCounterVec::new(
696+
Opts::new(
697+
"s3_cache_endpoint_call_total",
698+
"Number of S3 endpoint method calls",
699+
),
700+
&["rpc_method"],
701+
)
702+
.unwrap();
703+
PROMETHEUS_REGISTRY
704+
.register(Box::new(counter.clone()))
705+
.unwrap();
706+
counter
707+
});
708+
709+
PROM_ENDPOINT_TOTAL.with_label_values(&[method]).inc();
710+
ENDPOINT_TOTAL.add(1, &[KeyValue::new("rpc.method", method)]);
711+
}
712+
684713
// MARK: Response Body Sizes
685714

686715
/// Attributes based on: https://opentelemetry.io/docs/specs/semconv/http/http-metrics/#http-server

0 commit comments

Comments
 (0)