Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 24 additions & 6 deletions common/src/subgraph_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,23 @@ impl DeploymentDetails {
graph_node_status_url: &str,
graph_node_base_url: &str,
deployment: DeploymentId,
) -> Result<Self, anyhow::Error> {
Self::for_graph_node_url(
Url::parse(graph_node_status_url)?,
Url::parse(graph_node_base_url)?,
deployment,
)
}

pub fn for_graph_node_url(
graph_node_status_url: Url,
graph_node_base_url: Url,
deployment: DeploymentId,
) -> Result<Self, anyhow::Error> {
Ok(Self {
deployment: Some(deployment),
status_url: Some(Url::parse(graph_node_status_url)?),
query_url: Url::parse(graph_node_base_url)?
.join(&format!("subgraphs/id/{deployment}"))?,
status_url: Some(graph_node_status_url),
query_url: graph_node_base_url.join(&format!("subgraphs/id/{deployment}"))?,
query_auth_token: None,
})
}
Expand All @@ -111,12 +122,19 @@ impl DeploymentDetails {
query_url: &str,
query_auth_token: Option<String>,
) -> Result<Self, anyhow::Error> {
Ok(Self {
Ok(Self::for_query_url_with_token_url(
Url::parse(query_url)?,
query_auth_token,
))
}

pub fn for_query_url_with_token_url(query_url: Url, query_auth_token: Option<String>) -> Self {
Self {
deployment: None,
status_url: None,
query_url: Url::parse(query_url)?,
query_url,
query_auth_token,
})
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ serde.workspace = true
thegraph-core.workspace = true
tracing.workspace = true
bigdecimal = { workspace = true, features = ["serde"] }
bip39 = "2.0.0"
bip39 = { version = "2.0.0", features = ["rand"] }
figment = { version = "0.10.19", features = ["env", "toml"] }
serde_with = { version = "3.8.1", default-features = false }
serde_repr = "0.1.19"
Expand Down
4 changes: 2 additions & 2 deletions config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ pub struct IndexerConfig {
pub operator_mnemonic: Mnemonic,
}

#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
#[serde(untagged)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -300,7 +300,7 @@ pub struct SubgraphConfig {
pub syncing_interval_secs: Duration,
}

#[derive(Debug, Deserialize_repr, Clone)]
#[derive(Debug, Deserialize_repr, Clone, Copy)]
#[cfg_attr(test, derive(PartialEq))]
#[repr(u64)]
pub enum TheGraphChainId {
Expand Down
10 changes: 9 additions & 1 deletion config/src/grt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,18 @@
use bigdecimal::{BigDecimal, ToPrimitive};
use serde::{de::Error, Deserialize};

#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Default)]
pub struct NonZeroGRT(u128);

impl NonZeroGRT {
pub fn new(value: u128) -> Result<Self, String> {
if value == 0 {
Err("GRT value cannot be represented as a u128 GRT wei value".into())
} else {
Ok(NonZeroGRT(value))
}
}

pub fn get_value(&self) -> u128 {
self.0
}
Expand Down
1 change: 1 addition & 0 deletions config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
mod config;
mod grt;

pub use bip39::Mnemonic;
pub use config::*;
pub use grt::*;
2 changes: 1 addition & 1 deletion service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl From<MainConfig> for Config {
recently_closed_allocation_buffer_seconds: 0,
},
graph_network: GraphNetworkConfig {
chain_id: value.blockchain.chain_id.clone() as u64,
chain_id: value.blockchain.chain_id as u64,
},
tap: TapConfig {
chain_id: value.blockchain.chain_id as u64,
Expand Down
110 changes: 60 additions & 50 deletions tap-agent/src/agent.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use std::time::Duration;

use indexer_common::prelude::{
escrow_accounts, indexer_allocations, DeploymentDetails, SubgraphClient,
};
use indexer_config::{
Config, EscrowSubgraphConfig, GraphNodeConfig, IndexerConfig, NetworkSubgraphConfig,
SubgraphConfig, SubgraphsConfig, TapConfig,
};
use ractor::concurrency::JoinHandle;
use ractor::{Actor, ActorRef};
use sender_account::SenderAccountConfig;

use crate::agent::sender_accounts_manager::{
SenderAccountsManagerArgs, SenderAccountsManagerMessage,
};
use crate::config::{
Config, EscrowSubgraph, Ethereum, IndexerInfrastructure, NetworkSubgraph, Tap,
};
use crate::{database, CONFIG, EIP_712_DOMAIN};
use sender_accounts_manager::SenderAccountsManager;

Expand All @@ -25,95 +25,105 @@ pub mod unaggregated_receipts;

pub async fn start_agent() -> (ActorRef<SenderAccountsManagerMessage>, JoinHandle<()>) {
let Config {
ethereum: Ethereum { indexer_address },
indexer_infrastructure:
IndexerInfrastructure {
graph_node_query_endpoint,
graph_node_status_endpoint,
..
},
postgres,
network_subgraph:
NetworkSubgraph {
network_subgraph_deployment,
network_subgraph_endpoint,
network_subgraph_auth_token,
allocation_syncing_interval_ms,
recently_closed_allocation_buffer_seconds,
indexer: IndexerConfig {
indexer_address, ..
},
graph_node:
GraphNodeConfig {
status_url: graph_node_status_endpoint,
query_url: graph_node_query_endpoint,
},
escrow_subgraph:
EscrowSubgraph {
escrow_subgraph_deployment,
escrow_subgraph_endpoint,
escrow_subgraph_auth_token,
escrow_syncing_interval_ms,
database,
subgraphs:
SubgraphsConfig {
network:
NetworkSubgraphConfig {
config:
SubgraphConfig {
query_url: network_query_url,
query_auth_token: network_query_auth_token,
deployment_id: network_deployment_id,
syncing_interval_secs: network_sync_interval,
},
recently_closed_allocation_buffer_secs: recently_closed_allocation_buffer,
},
escrow:
EscrowSubgraphConfig {
config:
SubgraphConfig {
query_url: escrow_query_url,
query_auth_token: escrow_query_auth_token,
deployment_id: escrow_deployment_id,
syncing_interval_secs: escrow_sync_interval,
},
},
},
tap:
Tap {
TapConfig {
// TODO: replace with a proper implementation once the gateway registry contract is ready
sender_aggregator_endpoints,
..
},
..
} = &*CONFIG;
let pgpool = database::connect(postgres).await;
let pgpool = database::connect(database.clone()).await;

let http_client = reqwest::Client::new();

let network_subgraph = Box::leak(Box::new(SubgraphClient::new(
http_client.clone(),
network_subgraph_deployment
network_deployment_id
.map(|deployment| {
DeploymentDetails::for_graph_node(
graph_node_status_endpoint,
graph_node_query_endpoint,
DeploymentDetails::for_graph_node_url(
graph_node_status_endpoint.clone(),
graph_node_query_endpoint.clone(),
deployment,
)
})
.transpose()
.expect("Failed to parse graph node query endpoint and network subgraph deployment"),
DeploymentDetails::for_query_url_with_token(
network_subgraph_endpoint,
network_subgraph_auth_token.clone(),
)
.expect("Failed to parse network subgraph endpoint"),
DeploymentDetails::for_query_url_with_token_url(
network_query_url.clone(),
network_query_auth_token.clone(),
),
)));

let indexer_allocations = indexer_allocations(
network_subgraph,
*indexer_address,
Duration::from_millis(*allocation_syncing_interval_ms),
Duration::from_secs(*recently_closed_allocation_buffer_seconds),
*network_sync_interval,
*recently_closed_allocation_buffer,
);

let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new(
http_client.clone(),
escrow_subgraph_deployment
escrow_deployment_id
.map(|deployment| {
DeploymentDetails::for_graph_node(
graph_node_status_endpoint,
graph_node_query_endpoint,
DeploymentDetails::for_graph_node_url(
graph_node_status_endpoint.clone(),
graph_node_query_endpoint.clone(),
deployment,
)
})
.transpose()
.expect("Failed to parse graph node query endpoint and escrow subgraph deployment"),
DeploymentDetails::for_query_url_with_token(
escrow_subgraph_endpoint,
escrow_subgraph_auth_token.clone(),
)
.expect("Failed to parse escrow subgraph endpoint"),
DeploymentDetails::for_query_url_with_token_url(
escrow_query_url.clone(),
escrow_query_auth_token.clone(),
),
)));

let escrow_accounts = escrow_accounts(
escrow_subgraph,
*indexer_address,
Duration::from_millis(*escrow_syncing_interval_ms),
*escrow_sync_interval,
false,
);

let config = Box::leak(Box::new(SenderAccountConfig::from_config(&CONFIG)));

let args = SenderAccountsManagerArgs {
config: &CONFIG,
config,
domain_separator: EIP_712_DOMAIN.clone(),
pgpool,
indexer_allocations,
Expand Down
Loading
Loading