Skip to content

Commit c008d3e

Browse files
committed
chore: dipper client
1 parent d903ebf commit c008d3e

File tree

2 files changed

+22
-4
lines changed

2 files changed

+22
-4
lines changed

crates/dips/src/lib.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
use std::{str::FromStr, sync::Arc};
55

6+
use proto::gateway::graphprotocol::gateway::dips::dipper_service_client::DipperServiceClient;
67
use thegraph_core::alloy::{
78
core::primitives::Address,
89
hex::ToHexExt,
@@ -19,6 +20,7 @@ pub mod store;
1920

2021
use store::AgreementStore;
2122
use thiserror::Error;
23+
use tonic::transport::Channel;
2224
use uuid::Uuid;
2325

2426
/// The Arbitrum One (mainnet) chain ID (eip155).
@@ -151,6 +153,8 @@ pub enum DipsError {
151153
AgreementCancelled,
152154
#[error("invalid voucher: {0}")]
153155
InvalidVoucher(String),
156+
#[error("connection error: {0}")]
157+
ConnectionError(#[from] tonic::transport::Error),
154158
}
155159

156160
// TODO: send back messages
@@ -328,7 +332,7 @@ pub async fn validate_and_cancel_agreement(
328332
pub async fn collect_indexing_agreements(
329333
store: Arc<dyn AgreementStore>,
330334
allocation_id: Address,
331-
_dipper_grpc_url: &str,
335+
_dipper_client: &DipperServiceClient<Channel>,
332336
) -> Result<(), DipsError> {
333337
let agreements = store
334338
.get_by_last_allocation_id(allocation_id.encode_hex())

crates/tap-agent/src/agent/sender_account.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ use anyhow::Context;
1212
use bigdecimal::{num_bigint::ToBigInt, ToPrimitive};
1313
use futures::{stream, StreamExt};
1414
use indexer_dips::{
15-
collect_indexing_agreements, database::PsqlAgreementStore, store::AgreementStore,
15+
collect_indexing_agreements, database::PsqlAgreementStore,
16+
proto::gateway::graphprotocol::gateway::dips::dipper_service_client::DipperServiceClient,
17+
store::AgreementStore, DipsError,
1618
};
1719
use indexer_monitor::{EscrowAccounts, SubgraphClient};
1820
use indexer_query::{
@@ -324,6 +326,8 @@ pub struct State {
324326
config: &'static SenderAccountConfig,
325327
/// Indexing agreement store
326328
agreement_store: Arc<dyn AgreementStore>,
329+
/// Client for the dipper grpc server
330+
dipper_client: Option<DipperServiceClient<Channel>>,
327331
}
328332

329333
/// Configuration derived from config.toml
@@ -836,6 +840,15 @@ impl Actor for SenderAccount {
836840
let agreement_store = Arc::new(PsqlAgreementStore {
837841
pool: pgpool.clone(),
838842
});
843+
844+
let dipper_client = if let Some(ref url) = config.dipper_grpc_url {
845+
let endpoint = tonic::transport::Endpoint::from_str(url)
846+
.map_err(DipsError::ConnectionError)?
847+
.connect_lazy();
848+
Some(DipperServiceClient::new(endpoint))
849+
} else {
850+
None
851+
};
839852
let state = State {
840853
prefix,
841854
sender_fee_tracker: SenderFeeTracker::new(config.rav_request_buffer),
@@ -858,6 +871,7 @@ impl Actor for SenderAccount {
858871
backoff_info: BackoffInfo::default(),
859872
config,
860873
agreement_store,
874+
dipper_client,
861875
};
862876

863877
stream::iter(allocation_ids)
@@ -1069,12 +1083,12 @@ impl Actor for SenderAccount {
10691083
if let Some(sender_handle) = ActorRef::<SenderAllocationMessage>::where_is(
10701084
state.format_sender_allocation(&allocation_id.address()),
10711085
) {
1072-
if let Some(ref dipper_grpc_url) = state.config.dipper_grpc_url {
1086+
if let Some(ref dipper_client) = state.dipper_client {
10731087
tracing::trace!(%allocation_id, "SenderAccount checking for indexing agreements to collect");
10741088
match collect_indexing_agreements(
10751089
state.agreement_store.clone(),
10761090
allocation_id.address(),
1077-
dipper_grpc_url,
1091+
dipper_client,
10781092
)
10791093
.await
10801094
{

0 commit comments

Comments
 (0)