Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 24 additions & 6 deletions common/src/subgraph_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,23 @@ impl DeploymentDetails {
graph_node_status_url: &str,
graph_node_base_url: &str,
deployment: DeploymentId,
) -> Result<Self, anyhow::Error> {
Self::for_graph_node_url(
Url::parse(graph_node_status_url)?,
Url::parse(graph_node_base_url)?,
deployment,
)
}

pub fn for_graph_node_url(
graph_node_status_url: Url,
graph_node_base_url: Url,
deployment: DeploymentId,
) -> Result<Self, anyhow::Error> {
Ok(Self {
deployment: Some(deployment),
status_url: Some(Url::parse(graph_node_status_url)?),
query_url: Url::parse(graph_node_base_url)?
.join(&format!("subgraphs/id/{deployment}"))?,
status_url: Some(graph_node_status_url),
query_url: graph_node_base_url.join(&format!("subgraphs/id/{deployment}"))?,
query_auth_token: None,
})
}
Expand All @@ -111,12 +122,19 @@ impl DeploymentDetails {
query_url: &str,
query_auth_token: Option<String>,
) -> Result<Self, anyhow::Error> {
Ok(Self {
Ok(Self::for_query_url_with_token_url(
Url::parse(query_url)?,
query_auth_token,
))
}

pub fn for_query_url_with_token_url(query_url: Url, query_auth_token: Option<String>) -> Self {
Self {
deployment: None,
status_url: None,
query_url: Url::parse(query_url)?,
query_url,
query_auth_token,
})
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ serde.workspace = true
thegraph-core.workspace = true
tracing.workspace = true
bigdecimal = { workspace = true, features = ["serde"] }
bip39 = "2.0.0"
bip39 = { version = "2.0.0", features = ["rand"] }
figment = { version = "0.10.19", features = ["env", "toml"] }
serde_with = { version = "3.8.1", default-features = false }
serde_repr = "0.1.19"
Expand Down
4 changes: 2 additions & 2 deletions config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ pub struct IndexerConfig {
pub operator_mnemonic: Mnemonic,
}

#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
#[serde(untagged)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -300,7 +300,7 @@ pub struct SubgraphConfig {
pub syncing_interval_secs: Duration,
}

#[derive(Debug, Deserialize_repr, Clone)]
#[derive(Debug, Deserialize_repr, Clone, Copy)]
#[cfg_attr(test, derive(PartialEq))]
#[repr(u64)]
pub enum TheGraphChainId {
Expand Down
10 changes: 9 additions & 1 deletion config/src/grt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,18 @@
use bigdecimal::{BigDecimal, ToPrimitive};
use serde::{de::Error, Deserialize};

#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Default)]
pub struct NonZeroGRT(u128);

impl NonZeroGRT {
pub fn new(value: u128) -> Result<Self, String> {
if value == 0 {
Err("GRT value cannot be represented as a u128 GRT wei value".into())
} else {
Ok(NonZeroGRT(value))
}
}

pub fn get_value(&self) -> u128 {
self.0
}
Expand Down
1 change: 1 addition & 0 deletions config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
mod config;
mod grt;

pub use bip39::Mnemonic;
pub use config::*;
pub use grt::*;
2 changes: 1 addition & 1 deletion service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl From<MainConfig> for Config {
recently_closed_allocation_buffer_seconds: 0,
},
graph_network: GraphNetworkConfig {
chain_id: value.blockchain.chain_id.clone() as u64,
chain_id: value.blockchain.chain_id as u64,
},
tap: TapConfig {
chain_id: value.blockchain.chain_id as u64,
Expand Down
110 changes: 60 additions & 50 deletions tap-agent/src/agent.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use std::time::Duration;

use indexer_common::prelude::{
escrow_accounts, indexer_allocations, DeploymentDetails, SubgraphClient,
};
use indexer_config::{
Config, EscrowSubgraphConfig, GraphNodeConfig, IndexerConfig, NetworkSubgraphConfig,
SubgraphConfig, SubgraphsConfig, TapConfig,
};
use ractor::concurrency::JoinHandle;
use ractor::{Actor, ActorRef};
use sender_account::SenderAccountConfig;

use crate::agent::sender_accounts_manager::{
SenderAccountsManagerArgs, SenderAccountsManagerMessage,
};
use crate::config::{
Config, EscrowSubgraph, Ethereum, IndexerInfrastructure, NetworkSubgraph, Tap,
};
use crate::{database, CONFIG, EIP_712_DOMAIN};
use sender_accounts_manager::SenderAccountsManager;

Expand All @@ -25,95 +25,105 @@ pub mod unaggregated_receipts;

pub async fn start_agent() -> (ActorRef<SenderAccountsManagerMessage>, JoinHandle<()>) {
let Config {
ethereum: Ethereum { indexer_address },
indexer_infrastructure:
IndexerInfrastructure {
graph_node_query_endpoint,
graph_node_status_endpoint,
..
},
postgres,
network_subgraph:
NetworkSubgraph {
network_subgraph_deployment,
network_subgraph_endpoint,
network_subgraph_auth_token,
allocation_syncing_interval_ms,
recently_closed_allocation_buffer_seconds,
indexer: IndexerConfig {
indexer_address, ..
},
graph_node:
GraphNodeConfig {
status_url: graph_node_status_endpoint,
query_url: graph_node_query_endpoint,
},
escrow_subgraph:
EscrowSubgraph {
escrow_subgraph_deployment,
escrow_subgraph_endpoint,
escrow_subgraph_auth_token,
escrow_syncing_interval_ms,
database,
subgraphs:
SubgraphsConfig {
network:
NetworkSubgraphConfig {
config:
SubgraphConfig {
query_url: network_query_url,
query_auth_token: network_query_auth_token,
deployment_id: network_deployment_id,
syncing_interval_secs: network_sync_interval,
},
recently_closed_allocation_buffer_secs: recently_closed_allocation_buffer,
},
escrow:
EscrowSubgraphConfig {
config:
SubgraphConfig {
query_url: escrow_query_url,
query_auth_token: escrow_query_auth_token,
deployment_id: escrow_deployment_id,
syncing_interval_secs: escrow_sync_interval,
},
},
},
tap:
Tap {
TapConfig {
// TODO: replace with a proper implementation once the gateway registry contract is ready
sender_aggregator_endpoints,
..
},
..
} = &*CONFIG;
let pgpool = database::connect(postgres).await;
let pgpool = database::connect(database.clone()).await;

let http_client = reqwest::Client::new();

let network_subgraph = Box::leak(Box::new(SubgraphClient::new(
http_client.clone(),
network_subgraph_deployment
network_deployment_id
.map(|deployment| {
DeploymentDetails::for_graph_node(
graph_node_status_endpoint,
graph_node_query_endpoint,
DeploymentDetails::for_graph_node_url(
graph_node_status_endpoint.clone(),
graph_node_query_endpoint.clone(),
deployment,
)
})
.transpose()
.expect("Failed to parse graph node query endpoint and network subgraph deployment"),
DeploymentDetails::for_query_url_with_token(
network_subgraph_endpoint,
network_subgraph_auth_token.clone(),
)
.expect("Failed to parse network subgraph endpoint"),
DeploymentDetails::for_query_url_with_token_url(
network_query_url.clone(),
network_query_auth_token.clone(),
),
)));

let indexer_allocations = indexer_allocations(
network_subgraph,
*indexer_address,
Duration::from_millis(*allocation_syncing_interval_ms),
Duration::from_secs(*recently_closed_allocation_buffer_seconds),
*network_sync_interval,
*recently_closed_allocation_buffer,
);

let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new(
http_client.clone(),
escrow_subgraph_deployment
escrow_deployment_id
.map(|deployment| {
DeploymentDetails::for_graph_node(
graph_node_status_endpoint,
graph_node_query_endpoint,
DeploymentDetails::for_graph_node_url(
graph_node_status_endpoint.clone(),
graph_node_query_endpoint.clone(),
deployment,
)
})
.transpose()
.expect("Failed to parse graph node query endpoint and escrow subgraph deployment"),
DeploymentDetails::for_query_url_with_token(
escrow_subgraph_endpoint,
escrow_subgraph_auth_token.clone(),
)
.expect("Failed to parse escrow subgraph endpoint"),
DeploymentDetails::for_query_url_with_token_url(
escrow_query_url.clone(),
escrow_query_auth_token.clone(),
),
)));

let escrow_accounts = escrow_accounts(
escrow_subgraph,
*indexer_address,
Duration::from_millis(*escrow_syncing_interval_ms),
*escrow_sync_interval,
false,
);

let config = Box::leak(Box::new(SenderAccountConfig::from_config(&CONFIG)));

let args = SenderAccountsManagerArgs {
config: &CONFIG,
config,
domain_separator: EIP_712_DOMAIN.clone(),
pgpool,
indexer_allocations,
Expand Down
Loading
Loading