diff --git a/.github/workflows/license_headers_check.yml b/.github/workflows/license_headers_check.yml index 1e588284d..2e9ade87a 100644 --- a/.github/workflows/license_headers_check.yml +++ b/.github/workflows/license_headers_check.yml @@ -30,4 +30,5 @@ jobs: -ignore '.github/*.yaml' \ -ignore 'migrations/*.sql' \ -ignore 'crates/dips/src/proto/*' \ + -ignore 'crates/dips/proto/*' \ . diff --git a/Cargo.lock b/Cargo.lock index 3f6c85034..4d65948a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5223,9 +5223,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f" +checksum = "2c0fef6c4230e4ccf618a35c59d7ede15dea37de8427500f50aff708806e42ec" dependencies = [ "bytes", "prost-derive", @@ -5254,9 +5254,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" +checksum = "157c5a9d7ea5c2ed2d9fb8f495b64759f7816c7eaea54ba3978f0d63000162e3" dependencies = [ "anyhow", "itertools 0.13.0", diff --git a/Cargo.toml b/Cargo.toml index 8b14c0e9f..369e776b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,7 @@ rstest = "0.23.0" wiremock = "0.6.1" typed-builder = "0.20.0" tonic = { version = "0.12.3", features = ["tls-roots", "gzip"] } -tonic-build = { version = "0.12.3", features = ["prost"] } -prost = "0.13.3" +prost = "0.13.4" prost-types = "0.13.3" +dipper-rpc = { git = "https://github.com/edgeandnode/dipper/", rev = "c8700e2", default-features = false } +tonic-build = "0.12.3" diff --git a/crates/dips/build.rs b/crates/dips/build.rs index 1d761845f..ee3926245 100644 --- a/crates/dips/build.rs +++ b/crates/dips/build.rs @@ -5,8 +5,17 @@ fn main() { println!("cargo:rerun-if-changed=proto"); tonic_build::configure() .out_dir("src/proto") - .include_file("mod.rs") + .include_file("indexer.rs") + .build_client(false) .protoc_arg("--experimental_allow_proto3_optional") - .compile_protos(&["proto/dips.proto"], &["proto"]) - .expect("Failed to compile dips proto(s)"); + .compile_protos(&["proto/indexer.proto"], &["proto/"]) + .expect("Failed to compile DIPs indexer RPC proto(s)"); + + tonic_build::configure() + .out_dir("src/proto") + .include_file("gateway.rs") + .build_server(false) + .protoc_arg("--experimental_allow_proto3_optional") + .compile_protos(&["proto/gateway.proto"], &["proto"]) + .expect("Failed to compile DIPs gateway RPC proto(s)"); } diff --git a/crates/dips/proto/dips.proto b/crates/dips/proto/dips.proto deleted file mode 100644 index 400c484a3..000000000 --- a/crates/dips/proto/dips.proto +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -syntax = "proto3"; - -package graphprotocol.indexer.dips; - -service AgreementService { - rpc CreateAgreement(CreateAgreementRequest) returns (CreateAgreementResponse); - rpc CancelAgreement(CancelAgreementRequest) returns (AgreementCanellationResponse); - rpc GetAgreementById(GetAgreementByIdRequest) returns (GetAgreementByIdResponse); - rpc GetPrice(PriceRequest) returns (PriceResponse); -} - -message GetAgreementByIdRequest { - -} - -message GetAgreementByIdResponse { - -} - -message CreateAgreementRequest { - string id = 1; - bytes signed_voucher = 2; -} - -message CancelAgreementRequest { - string id = 1; - bytes signed_voucher = 2; -} - -message CreateAgreementResponse { - string uuid = 1; -} - -message AgreementCanellationResponse { - string uuid = 1; -} - -message PriceRequest { - ProtocolNetwork protocol = 1; - string chain_id = 2; -} - -message PriceResponse { - optional Price price = 1; -} - -message Price { - string price_per_block = 1; - string chain_id = 2; - ProtocolNetwork protocol = 3; -} - -enum ProtocolNetwork { - UNKNOWN = 0; - EVM = 1; -} diff --git a/crates/dips/proto/gateway.proto b/crates/dips/proto/gateway.proto new file mode 100644 index 000000000..3ceee360b --- /dev/null +++ b/crates/dips/proto/gateway.proto @@ -0,0 +1,66 @@ +syntax = "proto3"; + +package graphprotocol.gateway.dips; + +service DipsService { + /** + * Cancel an _indexing agreement_. + * + * This method allows the indexer to notify the DIPs gateway that the agreement + * should be canceled. + */ + rpc CancelAgreement(CancelAgreementRequest) returns (CancelAgreementResponse); + + /** + * Report the progress of an _indexing agreement_. + * + * This method allows the indexer to report the work completed to the DIPs gateway + * and receive payment for the indexing work done. + */ + rpc ReportProgress(ReportProgressRequest) returns (ReportProgressResponse); +} + + +/** + * A request to cancel an _indexing agreement_. + * + * See the `DipsService.CancelAgreement` method. + */ +message CancelAgreementRequest { + bytes agreement_id = 1; /// The ID of the agreement to cancel. + reserved 2 to 20; /// Reserved for future use. + + bytes signature = 99; /// The signature of the message. +} + +/** + * A response to a request to cancel an _indexing agreement_. + * + * See the `DipsService.CancelAgreement` method. + */ +message CancelAgreementResponse { + // Empty message +} + + +/** + * A request to report the progress of an _indexing agreement_. + * + * See the `DipsService.ReportProgress` method. + */ +message ReportProgressRequest { + bytes agreement_id = 1; /// The ID of the agreement to report progress for. + // TODO(LNSD): Add fields to the message + + bytes signature = 99; /// The signature of the message. +} + +/** + * A response to a request to report the progress of an _indexing agreement_. + * + * See the `DipsService.ReportProgress` method. + */ +message ReportProgressResponse { + // TODO(LNSD): Add fields to the message +} + diff --git a/crates/dips/proto/indexer.proto b/crates/dips/proto/indexer.proto new file mode 100644 index 000000000..07763f105 --- /dev/null +++ b/crates/dips/proto/indexer.proto @@ -0,0 +1,67 @@ +syntax = "proto3"; + +package graphprotocol.indexer.dips; + +service DipsService { + /** + * Propose a new _indexing agreement_ to an _indexer_. + * + * The _indexer_ can `ACCEPT` or `REJECT` the agreement. + */ + rpc SubmitAgreementProposal(SubmitAgreementProposalRequest) returns (SubmitAgreementProposalResponse); + + /** + * Request to cancel an existing _indexing agreement_. + */ + rpc CancelAgreement(CancelAgreementRequest) returns (CancelAgreementResponse); +} + +/** + * A request to propose a new _indexing agreement_ to an _indexer_. + * + * See the `DipsService.SubmitAgreementProposal` method. + */ +message SubmitAgreementProposalRequest { + bytes agreementId = 1; /// The ID of the agreement to register. + reserved 2 to 19; /// Reserved for future use. + + bytes signed_voucher = 20; /// The voucher of the agreement. +} + +/** + * A response to a request to propose a new _indexing agreement_ to an _indexer_. + * + * See the `DipsService.SubmitAgreementProposal` method. + */ +message SubmitAgreementProposalResponse { + ProposalResponse response = 1; /// The response to the agreement proposal. +} + +/** + * The response to an _indexing agreement_ proposal. + */ +enum ProposalResponse { + ACCEPT = 0; /// The agreement proposal was accepted. + REJECT = 1; /// The agreement proposal was rejected. +} + +/** + * A request to cancel an existing _indexing agreement_. + * + * See the `DipsService.CancelAgreement` method. + */ +message CancelAgreementRequest { + bytes agreementId = 1; /// The ID of the agreement to cancel. + reserved 2 to 20; /// Reserved for future use. + + bytes signature = 99; /// The signature of the message. +} + +/** + * A response to a request to cancel an existing _indexing agreement_. + * + * See the `DipsService.CancelAgreement` method. + */ +message CancelAgreementResponse { + // Empty message +} diff --git a/crates/dips/src/lib.rs b/crates/dips/src/lib.rs index 2c369b872..62b8268e3 100644 --- a/crates/dips/src/lib.rs +++ b/crates/dips/src/lib.rs @@ -38,19 +38,17 @@ sol! { // should coincide with signer address payer; // should coincide with indexer - address payee; + address recipient; // data service that will initiate payment collection address service; - // initial indexing amount max + + uint32 durationEpochs; + uint256 maxInitialAmount; - uint256 maxOngoingAmountPerEpoch; - // time to accept the agreement, intended to be on the order - // of hours or mins - uint64 deadline; + uint256 minOngoingAmountPerEpoch; + uint32 maxEpochsPerCollection; uint32 minEpochsPerCollection; - // after which the agreement is complete - uint32 durationEpochs; bytes metadata; } @@ -152,10 +150,10 @@ impl SignedIndexingAgreementVoucher { return Err(DipsError::PayerNotAuthorised(payer)); } - if !self.voucher.payee.eq(expected_payee) { + if !self.voucher.recipient.eq(expected_payee) { return Err(DipsError::UnexpectedPayee { expected: *expected_payee, - actual: self.voucher.payee, + actual: self.voucher.recipient, }); } @@ -300,11 +298,10 @@ mod test { let voucher = IndexingAgreementVoucher { payer: payer_addr, - payee: payee_addr, + recipient: payee_addr, service: Address(FixedBytes::ZERO), maxInitialAmount: U256::from(10000_u64), - maxOngoingAmountPerEpoch: U256::from(10000_u64), - deadline: 1000, + minOngoingAmountPerEpoch: U256::from(10000_u64), maxEpochsPerCollection: 1000, minEpochsPerCollection: 1000, durationEpochs: 1000, @@ -353,11 +350,10 @@ mod test { let voucher = IndexingAgreementVoucher { payer: payer_addr, - payee: payee.address(), + recipient: payee.address(), service: Address(FixedBytes::ZERO), maxInitialAmount: U256::from(10000_u64), - maxOngoingAmountPerEpoch: U256::from(10000_u64), - deadline: 1000, + minOngoingAmountPerEpoch: U256::from(10000_u64), maxEpochsPerCollection: 1000, minEpochsPerCollection: 1000, durationEpochs: 1000, @@ -395,11 +391,10 @@ mod test { let voucher = IndexingAgreementVoucher { payer: payer_addr, - payee: payee_addr, + recipient: payee_addr, service: Address(FixedBytes::ZERO), maxInitialAmount: U256::from(10000_u64), - maxOngoingAmountPerEpoch: U256::from(10000_u64), - deadline: 1000, + minOngoingAmountPerEpoch: U256::from(10000_u64), maxEpochsPerCollection: 1000, minEpochsPerCollection: 1000, durationEpochs: 1000, diff --git a/crates/dips/src/proto/gateway.rs b/crates/dips/src/proto/gateway.rs new file mode 100644 index 000000000..fd13ab498 --- /dev/null +++ b/crates/dips/src/proto/gateway.rs @@ -0,0 +1,8 @@ +// This file is @generated by prost-build. +pub mod graphprotocol { + pub mod gateway { + pub mod dips { + include!("graphprotocol.gateway.dips.rs"); + } + } +} diff --git a/crates/dips/src/proto/graphprotocol.gateway.dips.rs b/crates/dips/src/proto/graphprotocol.gateway.dips.rs new file mode 100644 index 000000000..d1eec36ff --- /dev/null +++ b/crates/dips/src/proto/graphprotocol.gateway.dips.rs @@ -0,0 +1,204 @@ +// This file is @generated by prost-build. +/// * +/// A request to cancel an _indexing agreement_. +/// +/// See the `DipsService.CancelAgreement` method. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CancelAgreementRequest { + /// / The ID of the agreement to cancel. + #[prost(bytes = "vec", tag = "1")] + pub agreement_id: ::prost::alloc::vec::Vec, + /// / The signature of the message. + #[prost(bytes = "vec", tag = "99")] + pub signature: ::prost::alloc::vec::Vec, +} +/// * +/// A response to a request to cancel an _indexing agreement_. +/// +/// See the `DipsService.CancelAgreement` method. +/// +/// Empty message +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct CancelAgreementResponse {} +/// * +/// A request to report the progress of an _indexing agreement_. +/// +/// See the `DipsService.ReportProgress` method. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReportProgressRequest { + /// / The ID of the agreement to report progress for. + #[prost(bytes = "vec", tag = "1")] + pub agreement_id: ::prost::alloc::vec::Vec, + /// / The signature of the message. + #[prost(bytes = "vec", tag = "99")] + pub signature: ::prost::alloc::vec::Vec, +} +/// * +/// A response to a request to report the progress of an _indexing agreement_. +/// +/// See the `DipsService.ReportProgress` method. +/// +/// TODO(LNSD): Add fields to the message +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct ReportProgressResponse {} +/// Generated client implementations. +pub mod dips_service_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct DipsServiceClient { + inner: tonic::client::Grpc, + } + impl DipsServiceClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl DipsServiceClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> DipsServiceClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + DipsServiceClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// * + /// Cancel an _indexing agreement_. + /// + /// This method allows the indexer to notify the DIPs gateway that the agreement + /// should be canceled. + pub async fn cancel_agreement( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/graphprotocol.gateway.dips.DipsService/CancelAgreement", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "graphprotocol.gateway.dips.DipsService", + "CancelAgreement", + ), + ); + self.inner.unary(req, path, codec).await + } + /// * + /// Report the progress of an _indexing agreement_. + /// + /// This method allows the indexer to report the work completed to the DIPs gateway + /// and receive payment for the indexing work done. + pub async fn report_progress( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/graphprotocol.gateway.dips.DipsService/ReportProgress", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "graphprotocol.gateway.dips.DipsService", + "ReportProgress", + ), + ); + self.inner.unary(req, path, codec).await + } + } +} diff --git a/crates/dips/src/proto/graphprotocol.indexer.dips.rs b/crates/dips/src/proto/graphprotocol.indexer.dips.rs index 57fce10e6..d01d4be96 100644 --- a/crates/dips/src/proto/graphprotocol.indexer.dips.rs +++ b/crates/dips/src/proto/graphprotocol.indexer.dips.rs @@ -1,287 +1,80 @@ // This file is @generated by prost-build. -#[derive(Clone, Copy, PartialEq, ::prost::Message)] -pub struct GetAgreementByIdRequest {} -#[derive(Clone, Copy, PartialEq, ::prost::Message)] -pub struct GetAgreementByIdResponse {} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct CreateAgreementRequest { - #[prost(string, tag = "1")] - pub id: ::prost::alloc::string::String, - #[prost(bytes = "vec", tag = "2")] - pub signed_voucher: ::prost::alloc::vec::Vec, -} +/// * +/// A request to propose a new _indexing agreement_ to an _indexer_. +/// +/// See the `DipsService.SubmitAgreementProposal` method. #[derive(Clone, PartialEq, ::prost::Message)] -pub struct CancelAgreementRequest { - #[prost(string, tag = "1")] - pub id: ::prost::alloc::string::String, - #[prost(bytes = "vec", tag = "2")] +pub struct SubmitAgreementProposalRequest { + /// / The ID of the agreement to register. + #[prost(bytes = "vec", tag = "1")] + pub agreement_id: ::prost::alloc::vec::Vec, + /// / The voucher of the agreement. + #[prost(bytes = "vec", tag = "20")] pub signed_voucher: ::prost::alloc::vec::Vec, } -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct CreateAgreementResponse { - #[prost(string, tag = "1")] - pub uuid: ::prost::alloc::string::String, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct AgreementCanellationResponse { - #[prost(string, tag = "1")] - pub uuid: ::prost::alloc::string::String, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct PriceRequest { - #[prost(enumeration = "ProtocolNetwork", tag = "1")] - pub protocol: i32, - #[prost(string, tag = "2")] - pub chain_id: ::prost::alloc::string::String, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct PriceResponse { - #[prost(message, optional, tag = "1")] - pub price: ::core::option::Option, +/// * +/// A response to a request to propose a new _indexing agreement_ to an _indexer_. +/// +/// See the `DipsService.SubmitAgreementProposal` method. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct SubmitAgreementProposalResponse { + /// / The response to the agreement proposal. + #[prost(enumeration = "ProposalResponse", tag = "1")] + pub response: i32, } +/// * +/// A request to cancel an existing _indexing agreement_. +/// +/// See the `DipsService.CancelAgreement` method. #[derive(Clone, PartialEq, ::prost::Message)] -pub struct Price { - #[prost(string, tag = "1")] - pub price_per_block: ::prost::alloc::string::String, - #[prost(string, tag = "2")] - pub chain_id: ::prost::alloc::string::String, - #[prost(enumeration = "ProtocolNetwork", tag = "3")] - pub protocol: i32, +pub struct CancelAgreementRequest { + /// / The ID of the agreement to cancel. + #[prost(bytes = "vec", tag = "1")] + pub agreement_id: ::prost::alloc::vec::Vec, + /// / The signature of the message. + #[prost(bytes = "vec", tag = "99")] + pub signature: ::prost::alloc::vec::Vec, } +/// * +/// A response to a request to cancel an existing _indexing agreement_. +/// +/// See the `DipsService.CancelAgreement` method. +/// +/// Empty message +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct CancelAgreementResponse {} +/// * +/// The response to an _indexing agreement_ proposal. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] -pub enum ProtocolNetwork { - Unknown = 0, - Evm = 1, +pub enum ProposalResponse { + /// / The agreement proposal was accepted. + Accept = 0, + /// / The agreement proposal was rejected. + Reject = 1, } -impl ProtocolNetwork { +impl ProposalResponse { /// String value of the enum field names used in the ProtoBuf definition. /// /// The values are not transformed in any way and thus are considered stable /// (if the ProtoBuf definition does not change) and safe for programmatic use. pub fn as_str_name(&self) -> &'static str { match self { - Self::Unknown => "UNKNOWN", - Self::Evm => "EVM", + Self::Accept => "ACCEPT", + Self::Reject => "REJECT", } } /// Creates an enum from field names used in the ProtoBuf definition. pub fn from_str_name(value: &str) -> ::core::option::Option { match value { - "UNKNOWN" => Some(Self::Unknown), - "EVM" => Some(Self::Evm), + "ACCEPT" => Some(Self::Accept), + "REJECT" => Some(Self::Reject), _ => None, } } } -/// Generated client implementations. -pub mod agreement_service_client { - #![allow( - unused_variables, - dead_code, - missing_docs, - clippy::wildcard_imports, - clippy::let_unit_value, - )] - use tonic::codegen::*; - use tonic::codegen::http::Uri; - #[derive(Debug, Clone)] - pub struct AgreementServiceClient { - inner: tonic::client::Grpc, - } - impl AgreementServiceClient { - /// Attempt to create a new client by connecting to a given endpoint. - pub async fn connect(dst: D) -> Result - where - D: TryInto, - D::Error: Into, - { - let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; - Ok(Self::new(conn)) - } - } - impl AgreementServiceClient - where - T: tonic::client::GrpcService, - T::Error: Into, - T::ResponseBody: Body + std::marker::Send + 'static, - ::Error: Into + std::marker::Send, - { - pub fn new(inner: T) -> Self { - let inner = tonic::client::Grpc::new(inner); - Self { inner } - } - pub fn with_origin(inner: T, origin: Uri) -> Self { - let inner = tonic::client::Grpc::with_origin(inner, origin); - Self { inner } - } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> AgreementServiceClient> - where - F: tonic::service::Interceptor, - T::ResponseBody: Default, - T: tonic::codegen::Service< - http::Request, - Response = http::Response< - >::ResponseBody, - >, - >, - , - >>::Error: Into + std::marker::Send + std::marker::Sync, - { - AgreementServiceClient::new(InterceptedService::new(inner, interceptor)) - } - /// Compress requests with the given encoding. - /// - /// This requires the server to support it otherwise it might respond with an - /// error. - #[must_use] - pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { - self.inner = self.inner.send_compressed(encoding); - self - } - /// Enable decompressing responses. - #[must_use] - pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { - self.inner = self.inner.accept_compressed(encoding); - self - } - /// Limits the maximum size of a decoded message. - /// - /// Default: `4MB` - #[must_use] - pub fn max_decoding_message_size(mut self, limit: usize) -> Self { - self.inner = self.inner.max_decoding_message_size(limit); - self - } - /// Limits the maximum size of an encoded message. - /// - /// Default: `usize::MAX` - #[must_use] - pub fn max_encoding_message_size(mut self, limit: usize) -> Self { - self.inner = self.inner.max_encoding_message_size(limit); - self - } - pub async fn create_agreement( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/graphprotocol.indexer.dips.AgreementService/CreateAgreement", - ); - let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new( - "graphprotocol.indexer.dips.AgreementService", - "CreateAgreement", - ), - ); - self.inner.unary(req, path, codec).await - } - pub async fn cancel_agreement( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/graphprotocol.indexer.dips.AgreementService/CancelAgreement", - ); - let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new( - "graphprotocol.indexer.dips.AgreementService", - "CancelAgreement", - ), - ); - self.inner.unary(req, path, codec).await - } - pub async fn get_agreement_by_id( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/graphprotocol.indexer.dips.AgreementService/GetAgreementById", - ); - let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new( - "graphprotocol.indexer.dips.AgreementService", - "GetAgreementById", - ), - ); - self.inner.unary(req, path, codec).await - } - pub async fn get_price( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/graphprotocol.indexer.dips.AgreementService/GetPrice", - ); - let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new( - "graphprotocol.indexer.dips.AgreementService", - "GetPrice", - ), - ); - self.inner.unary(req, path, codec).await - } - } -} /// Generated server implementations. -pub mod agreement_service_server { +pub mod dips_service_server { #![allow( unused_variables, dead_code, @@ -290,44 +83,39 @@ pub mod agreement_service_server { clippy::let_unit_value, )] use tonic::codegen::*; - /// Generated trait containing gRPC methods that should be implemented for use with AgreementServiceServer. + /// Generated trait containing gRPC methods that should be implemented for use with DipsServiceServer. #[async_trait] - pub trait AgreementService: std::marker::Send + std::marker::Sync + 'static { - async fn create_agreement( + pub trait DipsService: std::marker::Send + std::marker::Sync + 'static { + /// * + /// Propose a new _indexing agreement_ to an _indexer_. + /// + /// The _indexer_ can `ACCEPT` or `REJECT` the agreement. + async fn submit_agreement_proposal( &self, - request: tonic::Request, + request: tonic::Request, ) -> std::result::Result< - tonic::Response, + tonic::Response, tonic::Status, >; + /// * + /// Request to cancel an existing _indexing agreement_. async fn cancel_agreement( &self, request: tonic::Request, ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; - async fn get_agreement_by_id( - &self, - request: tonic::Request, - ) -> std::result::Result< - tonic::Response, + tonic::Response, tonic::Status, >; - async fn get_price( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] - pub struct AgreementServiceServer { + pub struct DipsServiceServer { inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - impl AgreementServiceServer { + impl DipsServiceServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } @@ -378,9 +166,9 @@ pub mod agreement_service_server { self } } - impl tonic::codegen::Service> for AgreementServiceServer + impl tonic::codegen::Service> for DipsServiceServer where - T: AgreementService, + T: DipsService, B: Body + std::marker::Send + 'static, B::Error: Into + std::marker::Send + 'static, { @@ -395,25 +183,30 @@ pub mod agreement_service_server { } fn call(&mut self, req: http::Request) -> Self::Future { match req.uri().path() { - "/graphprotocol.indexer.dips.AgreementService/CreateAgreement" => { + "/graphprotocol.indexer.dips.DipsService/SubmitAgreementProposal" => { #[allow(non_camel_case_types)] - struct CreateAgreementSvc(pub Arc); + struct SubmitAgreementProposalSvc(pub Arc); impl< - T: AgreementService, - > tonic::server::UnaryService - for CreateAgreementSvc { - type Response = super::CreateAgreementResponse; + T: DipsService, + > tonic::server::UnaryService + for SubmitAgreementProposalSvc { + type Response = super::SubmitAgreementProposalResponse; type Future = BoxFuture< tonic::Response, tonic::Status, >; fn call( &mut self, - request: tonic::Request, + request: tonic::Request< + super::SubmitAgreementProposalRequest, + >, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::create_agreement(&inner, request) + ::submit_agreement_proposal( + &inner, + request, + ) .await }; Box::pin(fut) @@ -425,7 +218,7 @@ pub mod agreement_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let method = CreateAgreementSvc(inner); + let method = SubmitAgreementProposalSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( @@ -441,14 +234,14 @@ pub mod agreement_service_server { }; Box::pin(fut) } - "/graphprotocol.indexer.dips.AgreementService/CancelAgreement" => { + "/graphprotocol.indexer.dips.DipsService/CancelAgreement" => { #[allow(non_camel_case_types)] - struct CancelAgreementSvc(pub Arc); + struct CancelAgreementSvc(pub Arc); impl< - T: AgreementService, + T: DipsService, > tonic::server::UnaryService for CancelAgreementSvc { - type Response = super::AgreementCanellationResponse; + type Response = super::CancelAgreementResponse; type Future = BoxFuture< tonic::Response, tonic::Status, @@ -459,8 +252,7 @@ pub mod agreement_service_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::cancel_agreement(&inner, request) - .await + ::cancel_agreement(&inner, request).await }; Box::pin(fut) } @@ -487,100 +279,6 @@ pub mod agreement_service_server { }; Box::pin(fut) } - "/graphprotocol.indexer.dips.AgreementService/GetAgreementById" => { - #[allow(non_camel_case_types)] - struct GetAgreementByIdSvc(pub Arc); - impl< - T: AgreementService, - > tonic::server::UnaryService - for GetAgreementByIdSvc { - type Response = super::GetAgreementByIdResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::get_agreement_by_id( - &inner, - request, - ) - .await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let method = GetAgreementByIdSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/graphprotocol.indexer.dips.AgreementService/GetPrice" => { - #[allow(non_camel_case_types)] - struct GetPriceSvc(pub Arc); - impl< - T: AgreementService, - > tonic::server::UnaryService - for GetPriceSvc { - type Response = super::PriceResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::get_price(&inner, request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let method = GetPriceSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } _ => { Box::pin(async move { let mut response = http::Response::new(empty_body()); @@ -601,7 +299,7 @@ pub mod agreement_service_server { } } } - impl Clone for AgreementServiceServer { + impl Clone for DipsServiceServer { fn clone(&self) -> Self { let inner = self.inner.clone(); Self { @@ -614,8 +312,8 @@ pub mod agreement_service_server { } } /// Generated gRPC service name - pub const SERVICE_NAME: &str = "graphprotocol.indexer.dips.AgreementService"; - impl tonic::server::NamedService for AgreementServiceServer { + pub const SERVICE_NAME: &str = "graphprotocol.indexer.dips.DipsService"; + impl tonic::server::NamedService for DipsServiceServer { const NAME: &'static str = SERVICE_NAME; } } diff --git a/crates/dips/src/proto/indexer.rs b/crates/dips/src/proto/indexer.rs new file mode 100644 index 000000000..8edb4be22 --- /dev/null +++ b/crates/dips/src/proto/indexer.rs @@ -0,0 +1,8 @@ +// This file is @generated by prost-build. +pub mod graphprotocol { + pub mod indexer { + pub mod dips { + include!("graphprotocol.indexer.dips.rs"); + } + } +} diff --git a/crates/dips/src/proto/mod.rs b/crates/dips/src/proto/mod.rs index 8edb4be22..873e26690 100644 --- a/crates/dips/src/proto/mod.rs +++ b/crates/dips/src/proto/mod.rs @@ -1,8 +1,2 @@ -// This file is @generated by prost-build. -pub mod graphprotocol { - pub mod indexer { - pub mod dips { - include!("graphprotocol.indexer.dips.rs"); - } - } -} +pub mod gateway; +pub mod indexer; diff --git a/crates/dips/src/server.rs b/crates/dips/src/server.rs index f92454002..a340739a0 100644 --- a/crates/dips/src/server.rs +++ b/crates/dips/src/server.rs @@ -1,18 +1,22 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use std::{str::FromStr, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; +use crate::{ + proto::indexer::graphprotocol::indexer::dips::{ + dips_service_server::DipsService, CancelAgreementRequest, CancelAgreementResponse, + ProposalResponse, SubmitAgreementProposalRequest, SubmitAgreementProposalResponse, + }, + store::AgreementStore, + validate_and_cancel_agreement, validate_and_create_agreement, DipsError, +}; use anyhow::anyhow; use async_trait::async_trait; use thegraph_core::alloy::{dyn_abi::Eip712Domain, primitives::Address}; +use tonic::{Request, Response, Status}; use uuid::Uuid; -use crate::{ - proto::graphprotocol::indexer::dips::*, store::AgreementStore, validate_and_cancel_agreement, - validate_and_create_agreement, DipsError, -}; - #[derive(Debug)] pub struct DipsServer { pub agreement_store: Arc, @@ -23,15 +27,19 @@ pub struct DipsServer { } #[async_trait] -impl agreement_service_server::AgreementService for DipsServer { - async fn create_agreement( +impl DipsService for DipsServer { + async fn submit_agreement_proposal( &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let CreateAgreementRequest { id, signed_voucher } = request.into_inner(); - let uid = Uuid::from_str(&id).map_err(|_| { + request: Request, + ) -> Result, Status> { + let SubmitAgreementProposalRequest { + agreement_id, + signed_voucher, + } = request.into_inner(); + let bs: [u8; 16] = agreement_id.as_slice().try_into().map_err(|_| { Into::::into(DipsError::from(anyhow!("failed to parse uuid"))) })?; + let uid = Uuid::from_bytes(bs); validate_and_create_agreement( self.agreement_store.clone(), @@ -44,45 +52,79 @@ impl agreement_service_server::AgreementService for DipsServer { .await .map_err(Into::::into)?; - Ok(tonic::Response::new(CreateAgreementResponse { - uuid: uid.to_string(), + Ok(tonic::Response::new(SubmitAgreementProposalResponse { + response: ProposalResponse::Accept.into(), })) } - + /// * + /// Request to cancel an existing _indexing agreement_. async fn cancel_agreement( &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - let CancelAgreementRequest { id, signed_voucher } = request.into_inner(); - let uid = Uuid::from_str(&id).map_err(|_| { + request: Request, + ) -> Result, Status> { + let CancelAgreementRequest { + agreement_id, + signature: _, + } = request.into_inner(); + + let bs: [u8; 16] = agreement_id.as_slice().try_into().map_err(|_| { Into::::into(DipsError::from(anyhow!("failed to parse uuid"))) })?; + let uid = Uuid::from_bytes(bs); validate_and_cancel_agreement( self.agreement_store.clone(), &self.domain, uid, - signed_voucher, + vec![], self.cancel_voucher_time_tolerance, ) .await .map_err(Into::::into)?; - Ok(tonic::Response::new(AgreementCanellationResponse { - uuid: uid.to_string(), - })) - } - async fn get_agreement_by_id( - &self, - _request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - todo!() + Ok(tonic::Response::new(CancelAgreementResponse {})) } - async fn get_price( - &self, - _request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - todo!() - } + // async fn create_agreement( + // &self, + // request: tonic::Request, + // ) -> std::result::Result, tonic::Status> { + // } + + // async fn cancel_agreement( + // &self, + // request: tonic::Request, + // ) -> std::result::Result, tonic::Status> { + // let CancelAgreementRequest { id, signed_voucher } = request.into_inner(); + // let uid = Uuid::from_str(&id).map_err(|_| { + // Into::::into(DipsError::from(anyhow!("failed to parse uuid"))) + // })?; + + // validate_and_cancel_agreement( + // self.agreement_store.clone(), + // &self.domain, + // uid, + // signed_voucher, + // self.cancel_voucher_time_tolerance, + // ) + // .await + // .map_err(Into::::into)?; + + // Ok(tonic::Response::new(AgreementCanellationResponse { + // uuid: uid.to_string(), + // })) + // } + // async fn get_agreement_by_id( + // &self, + // _request: tonic::Request, + // ) -> std::result::Result, tonic::Status> { + // todo!() + // } + + // async fn get_price( + // &self, + // _request: tonic::Request, + // ) -> std::result::Result, tonic::Status> { + // todo!() + // } } diff --git a/crates/service/src/database/dips.rs b/crates/service/src/database/dips.rs index b90bd367f..173750ab9 100644 --- a/crates/service/src/database/dips.rs +++ b/crates/service/src/database/dips.rs @@ -49,7 +49,7 @@ impl AgreementStore for PsqlAgreementStore { bs, protocol, agreement.voucher.service.as_slice(), - agreement.voucher.payee.as_slice(), + agreement.voucher.recipient.as_slice(), agreement.voucher.payer.as_slice(), now, now, diff --git a/crates/service/src/service.rs b/crates/service/src/service.rs index 9d20f0d90..70d060f5b 100644 --- a/crates/service/src/service.rs +++ b/crates/service/src/service.rs @@ -9,8 +9,8 @@ use axum::{extract::Request, serve, ServiceExt}; use clap::Parser; use indexer_config::{Config, DipsConfig, GraphNodeConfig, SubgraphConfig}; use indexer_dips::{ - proto::graphprotocol::indexer::dips::agreement_service_server::{ - AgreementService, AgreementServiceServer, + proto::indexer::graphprotocol::indexer::dips::dips_service_server::{ + DipsService, DipsServiceServer, }, server::DipsServer, }; @@ -165,9 +165,9 @@ pub async fn run() -> anyhow::Result<()> { .with_graceful_shutdown(shutdown_handler()) .await?) } -async fn start_dips_server(addr: SocketAddr, service: impl AgreementService) { +async fn start_dips_server(addr: SocketAddr, service: impl DipsService) { tonic::transport::Server::builder() - .add_service(AgreementServiceServer::new(service)) + .add_service(DipsServiceServer::new(service)) .serve(addr) .await .expect("unable to start dips grpc");