44use std:: sync:: Arc ;
55
66use crate :: {
7- error:: IndexerServiceError , metrics:: FAILED_RECEIPT , middleware:: Sender , tap:: AgoraQuery ,
7+ error:: IndexerServiceError ,
8+ metrics:: FAILED_RECEIPT ,
9+ middleware:: { Allocation , Sender } ,
10+ tap:: AgoraQuery ,
811} ;
912use axum:: {
1013 extract:: { Path , State } ,
@@ -24,7 +27,8 @@ use crate::service::{AttestationOutput, IndexerServiceResponse, IndexerServiceSt
2427pub async fn request_handler (
2528 Path ( manifest_id) : Path < DeploymentId > ,
2629 TypedHeader ( receipt) : TypedHeader < TapReceipt > ,
27- Extension ( sender) : Extension < Sender > ,
30+ Extension ( Sender ( sender) ) : Extension < Sender > ,
31+ Extension ( Allocation ( allocation_id) ) : Extension < Allocation > ,
2832 State ( state) : State < Arc < IndexerServiceState > > ,
2933 headers : HeaderMap ,
3034 req : String ,
@@ -34,8 +38,35 @@ pub async fn request_handler(
3438 let request: QueryBody =
3539 serde_json:: from_str ( & req) . map_err ( |e| IndexerServiceError :: InvalidRequest ( e. into ( ) ) ) ?;
3640
37- let Some ( receipt) = receipt. into_signed_receipt ( ) else {
38- // Serve free query, NO METRICS
41+ if let Some ( receipt) = receipt. into_signed_receipt ( ) {
42+ let variables = request
43+ . variables
44+ . as_ref ( )
45+ . map ( ToString :: to_string)
46+ . unwrap_or_default ( ) ;
47+ let mut ctx = Context :: new ( ) ;
48+ ctx. insert ( AgoraQuery {
49+ deployment_id : manifest_id,
50+ query : request. query . clone ( ) ,
51+ variables,
52+ } ) ;
53+
54+ // Verify the receipt and store it in the database
55+ state
56+ . tap_manager
57+ . verify_and_store_receipt ( & ctx, receipt)
58+ . await
59+ . inspect_err ( |_| {
60+ FAILED_RECEIPT
61+ . with_label_values ( & [
62+ & manifest_id. to_string ( ) ,
63+ & allocation_id. to_string ( ) ,
64+ & sender. to_string ( ) ,
65+ ] )
66+ . inc ( )
67+ } )
68+ . map_err ( IndexerServiceError :: ReceiptError ) ?;
69+ } else {
3970 match headers
4071 . get ( "authorization" )
4172 . and_then ( |v| v. to_str ( ) . ok ( ) )
@@ -51,45 +82,7 @@ pub async fn request_handler(
5182 }
5283
5384 trace ! ( ?manifest_id, "New free query" ) ;
54-
55- let response = state
56- . service_impl
57- . process_request ( manifest_id, request)
58- . await
59- . map_err ( IndexerServiceError :: ProcessingError ) ?
60- . finalize ( AttestationOutput :: Attestable ) ;
61- return Ok ( ( StatusCode :: OK , response) ) ;
62- } ;
63-
64- let allocation_id = receipt. message . allocation_id ;
65-
66- let variables = request
67- . variables
68- . as_ref ( )
69- . map ( ToString :: to_string)
70- . unwrap_or_default ( ) ;
71- let mut ctx = Context :: new ( ) ;
72- ctx. insert ( AgoraQuery {
73- deployment_id : manifest_id,
74- query : request. query . clone ( ) ,
75- variables,
76- } ) ;
77-
78- // Verify the receipt and store it in the database
79- state
80- . tap_manager
81- . verify_and_store_receipt ( & ctx, receipt)
82- . await
83- . inspect_err ( |_| {
84- FAILED_RECEIPT
85- . with_label_values ( & [
86- & manifest_id. to_string ( ) ,
87- & allocation_id. to_string ( ) ,
88- & sender. 0 . to_string ( ) ,
89- ] )
90- . inc ( )
91- } )
92- . map_err ( IndexerServiceError :: ReceiptError ) ?;
85+ }
9386
9487 // Check if we have an attestation signer for the allocation the receipt was created for
9588 let signer = state
0 commit comments