diff --git a/crates/dips/package.json b/crates/dips/package.json index 059e59dc6..755c8a338 100644 --- a/crates/dips/package.json +++ b/crates/dips/package.json @@ -1,6 +1,6 @@ { "name": "@graphprotocol/dips-proto", - "version": "0.2.2", + "version": "0.3.0", "main": "generated/index.js", "types": "generated/index.d.ts", "files": [ diff --git a/crates/dips/proto/gateway.proto b/crates/dips/proto/gateway.proto index 47453b7df..a9c127d92 100644 --- a/crates/dips/proto/gateway.proto +++ b/crates/dips/proto/gateway.proto @@ -18,6 +18,14 @@ service GatewayDipsService { * and receive payment for the indexing work done. */ rpc CollectPayment(CollectPaymentRequest) returns (CollectPaymentResponse); + + /** + * Get the status of a payment receipt by ID. + * + * This method allows the indexer to poll for the status of a previously + * initiated payment collection. + */ + rpc GetReceiptById(GetReceiptByIdRequest) returns (GetReceiptByIdResponse); } @@ -58,7 +66,9 @@ message CollectPaymentRequest { message CollectPaymentResponse { uint64 version = 1; CollectPaymentStatus status = 2; - bytes tap_receipt = 3; + string receipt_id = 3; // Receipt ID for polling + string amount = 4; // Payment amount in GRT + string payment_status = 5; // Initial status: "PENDING" } /** @@ -71,3 +81,28 @@ enum CollectPaymentStatus { ERR_AMOUNT_OUT_OF_BOUNDS = 3; /// The payment request is for too large an amount ERR_UNKNOWN = 99; /// Something else went terribly wrong } + +/** + * A request to get receipt status by ID. + * + * See the `GatewayDipsService.GetReceiptById` method. + */ +message GetReceiptByIdRequest { + uint64 version = 1; + string receipt_id = 2; +} + +/** + * A response containing the receipt status. + * + * See the `GatewayDipsService.GetReceiptById` method. + */ +message GetReceiptByIdResponse { + uint64 version = 1; + string receipt_id = 2; + string status = 3; // "PENDING" | "SUBMITTED" | "FAILED" + string transaction_hash = 4; // Present when SUBMITTED + string error_message = 5; // Present when FAILED + string amount = 6; + string payment_submitted_at = 7; // ISO timestamp when SUBMITTED +} diff --git a/crates/dips/src/ipfs.rs b/crates/dips/src/ipfs.rs index 9279081a4..b5d2fe35f 100644 --- a/crates/dips/src/ipfs.rs +++ b/crates/dips/src/ipfs.rs @@ -1,12 +1,12 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use async_trait::async_trait; use derivative::Derivative; use futures::TryStreamExt; -use ipfs_api_backend_hyper::{IpfsApi, TryFromUri}; +use ipfs_api_backend_hyper::{BackendWithGlobalOptions, GlobalOptions, IpfsApi, TryFromUri}; use serde::Deserialize; use crate::DipsError; @@ -27,12 +27,17 @@ impl IpfsFetcher for Arc { #[derivative(Debug)] pub struct IpfsClient { #[derivative(Debug = "ignore")] - client: ipfs_api_backend_hyper::IpfsClient, + client: ipfs_api_backend_hyper::BackendWithGlobalOptions, } impl IpfsClient { pub fn new(url: &str) -> anyhow::Result { - let client = ipfs_api_backend_hyper::IpfsClient::from_str(url)?; + let opts = GlobalOptions { + offline: None, + timeout: Some(Duration::from_secs(30)), + }; + let backend = ipfs_api_backend_hyper::IpfsClient::from_str(url)?; + let client = BackendWithGlobalOptions::new(backend, opts); Ok(Self { client }) } } diff --git a/crates/dips/src/proto/graphprotocol.gateway.dips.rs b/crates/dips/src/proto/graphprotocol.gateway.dips.rs index b35823706..d7454b4c5 100644 --- a/crates/dips/src/proto/graphprotocol.gateway.dips.rs +++ b/crates/dips/src/proto/graphprotocol.gateway.dips.rs @@ -40,8 +40,51 @@ pub struct CollectPaymentResponse { pub version: u64, #[prost(enumeration = "CollectPaymentStatus", tag = "2")] pub status: i32, - #[prost(bytes = "vec", tag = "3")] - pub tap_receipt: ::prost::alloc::vec::Vec, + /// Receipt ID for polling + #[prost(string, tag = "3")] + pub receipt_id: ::prost::alloc::string::String, + /// Payment amount in GRT + #[prost(string, tag = "4")] + pub amount: ::prost::alloc::string::String, + /// Initial status: "PENDING" + #[prost(string, tag = "5")] + pub payment_status: ::prost::alloc::string::String, +} +/// * +/// A request to get receipt status by ID. +/// +/// See the `GatewayDipsService.GetReceiptById` method. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetReceiptByIdRequest { + #[prost(uint64, tag = "1")] + pub version: u64, + #[prost(string, tag = "2")] + pub receipt_id: ::prost::alloc::string::String, +} +/// * +/// A response containing the receipt status. +/// +/// See the `GatewayDipsService.GetReceiptById` method. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetReceiptByIdResponse { + #[prost(uint64, tag = "1")] + pub version: u64, + #[prost(string, tag = "2")] + pub receipt_id: ::prost::alloc::string::String, + /// "PENDING" | "SUBMITTED" | "FAILED" + #[prost(string, tag = "3")] + pub status: ::prost::alloc::string::String, + /// Present when SUBMITTED + #[prost(string, tag = "4")] + pub transaction_hash: ::prost::alloc::string::String, + /// Present when FAILED + #[prost(string, tag = "5")] + pub error_message: ::prost::alloc::string::String, + #[prost(string, tag = "6")] + pub amount: ::prost::alloc::string::String, + /// ISO timestamp when SUBMITTED + #[prost(string, tag = "7")] + pub payment_submitted_at: ::prost::alloc::string::String, } /// * /// The status on response to collect an _indexing agreement_. @@ -244,6 +287,40 @@ pub mod gateway_dips_service_client { ); self.inner.unary(req, path, codec).await } + /// * + /// Get the status of a payment receipt by ID. + /// + /// This method allows the indexer to poll for the status of a previously + /// initiated payment collection. + pub async fn get_receipt_by_id( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/graphprotocol.gateway.dips.GatewayDipsService/GetReceiptById", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "graphprotocol.gateway.dips.GatewayDipsService", + "GetReceiptById", + ), + ); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -283,6 +360,18 @@ pub mod gateway_dips_service_server { tonic::Response, tonic::Status, >; + /// * + /// Get the status of a payment receipt by ID. + /// + /// This method allows the indexer to poll for the status of a previously + /// initiated payment collection. + async fn get_receipt_by_id( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } #[derive(Debug)] pub struct GatewayDipsServiceServer { @@ -452,6 +541,55 @@ pub mod gateway_dips_service_server { }; Box::pin(fut) } + "/graphprotocol.gateway.dips.GatewayDipsService/GetReceiptById" => { + #[allow(non_camel_case_types)] + struct GetReceiptByIdSvc(pub Arc); + impl< + T: GatewayDipsService, + > tonic::server::UnaryService + for GetReceiptByIdSvc { + type Response = super::GetReceiptByIdResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_receipt_by_id( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetReceiptByIdSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => { Box::pin(async move { let mut response = http::Response::new( diff --git a/migrations/20250801191304_dips_receipts.down.sql b/migrations/20250801191304_dips_receipts.down.sql new file mode 100644 index 000000000..7a998d6dc --- /dev/null +++ b/migrations/20250801191304_dips_receipts.down.sql @@ -0,0 +1,3 @@ +-- Add down migration script here + +DROP TABLE IF EXISTS dips_receipts; \ No newline at end of file diff --git a/migrations/20250801191304_dips_receipts.up.sql b/migrations/20250801191304_dips_receipts.up.sql new file mode 100644 index 000000000..aa174622c --- /dev/null +++ b/migrations/20250801191304_dips_receipts.up.sql @@ -0,0 +1,17 @@ +-- Add up migration script here + +CREATE TABLE IF NOT EXISTS dips_receipts ( + id VARCHAR(255) PRIMARY KEY, + agreement_id UUID NOT NULL REFERENCES indexing_agreements(id), + amount NUMERIC(39) NOT NULL, + status VARCHAR(20) NOT NULL DEFAULT 'PENDING', + transaction_hash CHAR(66), + error_message TEXT, + created_at TIMESTAMP WITH TIME ZONE NOT NULL, + updated_at TIMESTAMP WITH TIME ZONE NOT NULL, + retry_count INTEGER NOT NULL DEFAULT 0, + CONSTRAINT valid_status CHECK (status IN ('PENDING', 'SUBMITTED', 'FAILED')) +); + +CREATE INDEX idx_dips_receipts_agreement_id ON dips_receipts(agreement_id); +CREATE INDEX idx_dips_receipts_status ON dips_receipts(status); \ No newline at end of file