Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 47 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions crates/monitor/src/deployment_to_allocation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashMap;
use thegraph_core::{Address, DeploymentId};
use tokio::sync::watch::Receiver;

use indexer_allocation::Allocation;
use indexer_watcher::map_watcher;

/// Watcher of indexer allocation
/// returning a map of subgraph deployment to allocation id
pub fn deployment_to_allocation(
indexer_allocations_rx: Receiver<HashMap<Address, Allocation>>,
) -> Receiver<HashMap<DeploymentId, Address>> {
map_watcher(indexer_allocations_rx, move |allocation| {
allocation
.iter()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use into_iter() here to get rid of deref in *address?

.map(|(address, allocation)| (allocation.subgraph_deployment.id, *address))
.collect()
})
}

#[cfg(test)]
mod tests {
use tokio::sync::watch;

use super::deployment_to_allocation;

#[tokio::test]
async fn test_deployment_to_allocation() {
let allocations = test_assets::INDEXER_ALLOCATIONS.clone();
let allocations_watcher = watch::channel(allocations.clone()).1;
let deployment = deployment_to_allocation(allocations_watcher);

let deployments = deployment.borrow();
// one of the allocation id point to the same subgraph
assert_eq!(deployments.len(), 3);
// check if all allocations point to the subgraph id
for (key, val) in deployments.iter() {
assert_eq!(allocations.get(val).unwrap().subgraph_deployment.id, *key);
}
}
}
2 changes: 2 additions & 0 deletions crates/monitor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
mod allocations;
mod attestation;
mod client;
mod deployment_to_allocation;
mod dispute_manager;
mod escrow_accounts;

pub use crate::{
allocations::indexer_allocations,
attestation::attestation_signers,
client::{DeploymentDetails, SubgraphClient},
deployment_to_allocation::deployment_to_allocation,
dispute_manager::dispute_manager,
escrow_accounts::{escrow_accounts, EscrowAccounts, EscrowAccountsError},
};
6 changes: 5 additions & 1 deletion crates/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ tap_core.workspace = true
uuid.workspace = true
alloy.workspace = true
tower_governor = "0.4.0"
tower-http = { version = "0.5.2", features = [
tower-http = { version = "0.6.2", features = [
"auth",
"cors",
"normalize-path",
Expand All @@ -53,10 +53,14 @@ axum-extra = { version = "0.9.3", features = [
tokio-util = "0.7.10"
cost-model = { git = "https://github.com/graphprotocol/agora", rev = "3ed34ca" }
bip39.workspace = true
tower = "0.5.1"
pin-project = "1.1.7"

[dev-dependencies]
hex-literal = "0.4.1"
test-assets = { path = "../test-assets" }
tower-test = "0.4.0"
tokio-test = "0.4.4"

[build-dependencies]
build-info-build = { version = "0.0.39", default-features = false }
60 changes: 59 additions & 1 deletion crates/service/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,70 @@
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use alloy::primitives::Address;
use anyhow::Error;
use axum::response::{IntoResponse, Response};
use axum::{
response::{IntoResponse, Response},
Json,
};
use indexer_monitor::EscrowAccountsError;
use reqwest::StatusCode;
use serde::Serialize;
use thegraph_core::DeploymentId;
use thiserror::Error;

#[derive(Debug, Error)]
pub enum IndexerServiceError {
#[error("Issues with provided receipt: {0}")]
ReceiptError(#[from] tap_core::Error),
#[error("No attestation signer found for allocation `{0}`")]
NoSignerForAllocation(Address),
#[error("Invalid request body: {0}")]
InvalidRequest(anyhow::Error),
#[error("Error while processing the request: {0}")]
ProcessingError(SubgraphServiceError),
#[error("No valid receipt or free query auth token provided")]
Unauthorized,
#[error("Invalid free query auth token")]
InvalidFreeQueryAuthToken,
#[error("Failed to sign attestation")]
FailedToSignAttestation,

#[error("There was an error while accessing escrow account: {0}")]
EscrowAccount(#[from] EscrowAccountsError),
}

impl IntoResponse for IndexerServiceError {
fn into_response(self) -> Response {
use IndexerServiceError::*;

#[derive(Serialize)]
struct ErrorResponse {
message: String,
}

let status = match self {
Unauthorized => StatusCode::UNAUTHORIZED,

NoSignerForAllocation(_) | FailedToSignAttestation => StatusCode::INTERNAL_SERVER_ERROR,

ReceiptError(_)
| InvalidRequest(_)
| InvalidFreeQueryAuthToken
| EscrowAccount(_)
| ProcessingError(_) => StatusCode::BAD_REQUEST,
};
tracing::error!(%self, "An IndexerServiceError occoured.");
(
status,
Json(ErrorResponse {
message: self.to_string(),
}),
)
.into_response()
}
}

#[derive(Debug, Error)]
pub enum SubgraphServiceError {
#[error("Invalid status query: {0}")]
Expand Down
2 changes: 2 additions & 0 deletions crates/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
mod cli;
mod database;
mod error;
mod metrics;
mod middleware;
mod routes;
pub mod service;
mod tap;
Expand Down
39 changes: 39 additions & 0 deletions crates/service/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use lazy_static::lazy_static;
use prometheus::{register_counter_vec, register_histogram_vec, CounterVec, HistogramVec};

lazy_static! {
/// Metric registered in global registry for
/// indexer query handler
///
/// Labels: "deployment", "allocation", "sender"
pub static ref HANDLER_HISTOGRAM: HistogramVec = register_histogram_vec!(
"indexer_query_handler_seconds",
"Histogram for default indexer query handler",
&["deployment", "allocation", "sender"]
).unwrap();

/// Metric registered in global registry for
/// Failed queries to handler
///
/// Labels: "deployment"
pub static ref HANDLER_FAILURE: CounterVec = register_counter_vec!(
"indexer_query_handler_failed_total",
"Failed queries to handler",
&["deployment"]
).unwrap();

/// Metric registered in global registry for
/// Failed receipt checks
///
/// Labels: "deployment", "allocation", "sender"
pub static ref FAILED_RECEIPT: CounterVec = register_counter_vec!(
"indexer_receipt_failed_total",
"Failed receipt checks",
&["deployment", "allocation", "sender"]
)
.unwrap();

}
Loading
Loading