Skip to content

Commit 9e235e7

Browse files
use access layer for observing enpoint call telemetry
1 parent fa72cab commit 9e235e7

File tree

4 files changed

+22
-22
lines changed

4 files changed

+22
-22
lines changed

src/access.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
use s3s::S3Result;
2+
use s3s::access::{S3Access, S3AccessContext};
3+
4+
use crate::telemetry;
5+
6+
pub(crate) struct TelemetryAccess;
7+
8+
#[async_trait::async_trait]
9+
impl S3Access for TelemetryAccess {
10+
async fn check(&self, cx: &mut S3AccessContext<'_>) -> S3Result<()> {
11+
telemetry::record_endpoint_call(cx.s3_op().name());
12+
13+
match cx.credentials() {
14+
Some(_) => Ok(()),
15+
None => Err(s3s::s3_error!(AccessDenied, "Signature is required")),
16+
}
17+
}
18+
}

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub use self::proxy::{S3CachingProxy, range_to_string};
3636
pub use self::s3_cache::{CacheKey, CachedObject, S3Cache};
3737
pub use self::statistics::UniqueRequestedObjectsStatisticsTracker;
3838

39+
mod access;
3940
mod auth;
4041
mod config;
4142
mod error;
@@ -148,6 +149,7 @@ where
148149
let service = {
149150
let mut b = S3ServiceBuilder::new(caching_proxy);
150151
b.set_auth(auth::create_auth(&config));
152+
b.set_access(access::TelemetryAccess);
151153
let upstream_health_endpoint = Url::parse(&config.upstream_endpoint)
152154
.unwrap()
153155
.join("/minio/health/ready")

src/proxy.rs

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ impl<T: S3 + Send + Sync> S3 for S3CachingProxy<T> {
115115
&self,
116116
req: S3Request<GetObjectInput>,
117117
) -> S3Result<S3Response<GetObjectOutput>> {
118-
telemetry::record_endpoint_call("GetObject");
119118
let start = Instant::now();
120119
let method = req.method.to_string();
121120
let scheme = req.uri.scheme_str().map(str::to_owned);
@@ -373,7 +372,6 @@ impl<T: S3 + Send + Sync> S3 for S3CachingProxy<T> {
373372
&self,
374373
req: S3Request<PutObjectInput>,
375374
) -> S3Result<S3Response<PutObjectOutput>> {
376-
telemetry::record_endpoint_call("PutObject");
377375
let bucket = req.input.bucket.clone();
378376
let key = req.input.key.clone();
379377

@@ -402,7 +400,6 @@ impl<T: S3 + Send + Sync> S3 for S3CachingProxy<T> {
402400
&self,
403401
req: S3Request<DeleteObjectInput>,
404402
) -> S3Result<S3Response<DeleteObjectOutput>> {
405-
telemetry::record_endpoint_call("DeleteObject");
406403
let bucket = req.input.bucket.clone();
407404
let key = req.input.key.clone();
408405

@@ -431,7 +428,6 @@ impl<T: S3 + Send + Sync> S3 for S3CachingProxy<T> {
431428
&self,
432429
req: S3Request<DeleteObjectsInput>,
433430
) -> S3Result<S3Response<DeleteObjectsOutput>> {
434-
telemetry::record_endpoint_call("DeleteObjects");
435431
let bucket = req.input.bucket.clone();
436432
let keys: Vec<String> = req
437433
.input
@@ -468,7 +464,6 @@ impl<T: S3 + Send + Sync> S3 for S3CachingProxy<T> {
468464
&self,
469465
req: S3Request<CopyObjectInput>,
470466
) -> S3Result<S3Response<CopyObjectOutput>> {
471-
telemetry::record_endpoint_call("CopyObject");
472467
let dest_bucket = req.input.bucket.clone();
473468
let dest_key = req.input.key.clone();
474469

@@ -497,15 +492,13 @@ impl<T: S3 + Send + Sync> S3 for S3CachingProxy<T> {
497492
&self,
498493
req: S3Request<AbortMultipartUploadInput>,
499494
) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
500-
telemetry::record_endpoint_call("AbortMultipartUpload");
501495
self.inner.abort_multipart_upload(req).await
502496
}
503497

504498
async fn complete_multipart_upload(
505499
&self,
506500
req: S3Request<CompleteMultipartUploadInput>,
507501
) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
508-
telemetry::record_endpoint_call("CompleteMultipartUpload");
509502
let bucket = req.input.bucket.clone();
510503
let key = req.input.key.clone();
511504

@@ -534,103 +527,90 @@ impl<T: S3 + Send + Sync> S3 for S3CachingProxy<T> {
534527
&self,
535528
req: S3Request<CreateBucketInput>,
536529
) -> S3Result<S3Response<CreateBucketOutput>> {
537-
telemetry::record_endpoint_call("CreateBucket");
538530
self.inner.create_bucket(req).await
539531
}
540532

541533
async fn create_multipart_upload(
542534
&self,
543535
req: S3Request<CreateMultipartUploadInput>,
544536
) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
545-
telemetry::record_endpoint_call("CreateMultipartUpload");
546537
self.inner.create_multipart_upload(req).await
547538
}
548539

549540
async fn delete_bucket(
550541
&self,
551542
req: S3Request<DeleteBucketInput>,
552543
) -> S3Result<S3Response<DeleteBucketOutput>> {
553-
telemetry::record_endpoint_call("DeleteBucket");
554544
self.inner.delete_bucket(req).await
555545
}
556546

557547
async fn get_bucket_location(
558548
&self,
559549
req: S3Request<GetBucketLocationInput>,
560550
) -> S3Result<S3Response<GetBucketLocationOutput>> {
561-
telemetry::record_endpoint_call("GetBucketLocation");
562551
self.inner.get_bucket_location(req).await
563552
}
564553

565554
async fn head_bucket(
566555
&self,
567556
req: S3Request<HeadBucketInput>,
568557
) -> S3Result<S3Response<HeadBucketOutput>> {
569-
telemetry::record_endpoint_call("HeadBucket");
570558
self.inner.head_bucket(req).await
571559
}
572560

573561
async fn head_object(
574562
&self,
575563
req: S3Request<HeadObjectInput>,
576564
) -> S3Result<S3Response<HeadObjectOutput>> {
577-
telemetry::record_endpoint_call("HeadObject");
578565
self.inner.head_object(req).await
579566
}
580567

581568
async fn list_buckets(
582569
&self,
583570
req: S3Request<ListBucketsInput>,
584571
) -> S3Result<S3Response<ListBucketsOutput>> {
585-
telemetry::record_endpoint_call("ListBuckets");
586572
self.inner.list_buckets(req).await
587573
}
588574

589575
async fn list_multipart_uploads(
590576
&self,
591577
req: S3Request<ListMultipartUploadsInput>,
592578
) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
593-
telemetry::record_endpoint_call("ListMultipartUploads");
594579
self.inner.list_multipart_uploads(req).await
595580
}
596581

597582
async fn list_objects(
598583
&self,
599584
req: S3Request<ListObjectsInput>,
600585
) -> S3Result<S3Response<ListObjectsOutput>> {
601-
telemetry::record_endpoint_call("ListObjects");
602586
self.inner.list_objects(req).await
603587
}
604588

605589
async fn list_objects_v2(
606590
&self,
607591
req: S3Request<ListObjectsV2Input>,
608592
) -> S3Result<S3Response<ListObjectsV2Output>> {
609-
telemetry::record_endpoint_call("ListObjectsV2");
610593
self.inner.list_objects_v2(req).await
611594
}
612595

613596
async fn list_parts(
614597
&self,
615598
req: S3Request<ListPartsInput>,
616599
) -> S3Result<S3Response<ListPartsOutput>> {
617-
telemetry::record_endpoint_call("ListParts");
618600
self.inner.list_parts(req).await
619601
}
620602

621603
async fn upload_part(
622604
&self,
623605
req: S3Request<UploadPartInput>,
624606
) -> S3Result<S3Response<UploadPartOutput>> {
625-
telemetry::record_endpoint_call("UploadPart");
626607
self.inner.upload_part(req).await
627608
}
628609

629610
async fn upload_part_copy(
630611
&self,
631612
req: S3Request<UploadPartCopyInput>,
632613
) -> S3Result<S3Response<UploadPartCopyOutput>> {
633-
telemetry::record_endpoint_call("UploadPartCopy");
634614
self.inner.upload_part_copy(req).await
635615
}
636616
}

src/telemetry.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,7 @@ pub(crate) fn record_request_duration(data: RequestDuration) {
683683

684684
// MARK: - Endpoint Calls
685685

686-
pub(crate) fn record_endpoint_call(method: &'static str) {
686+
pub(crate) fn record_endpoint_call(method: &str) {
687687
static ENDPOINT_TOTAL: LazyLock<Counter<u64>> = LazyLock::new(|| {
688688
opentelemetry::global::meter(CARGO_CRATE_NAME)
689689
.u64_counter("s3_cache.endpoint_call_total")
@@ -707,7 +707,7 @@ pub(crate) fn record_endpoint_call(method: &'static str) {
707707
});
708708

709709
PROM_ENDPOINT_TOTAL.with_label_values(&[method]).inc();
710-
ENDPOINT_TOTAL.add(1, &[KeyValue::new("rpc.method", method)]);
710+
ENDPOINT_TOTAL.add(1, &[KeyValue::new("rpc.method", method.to_owned())]);
711711
}
712712

713713
// MARK: Response Body Sizes

0 commit comments

Comments
 (0)