Skip to content

Commit 3576eb2

Browse files
authored
Amp-powered subgraphs (#6218)
* feat(graph): add Nozzle Flight service client * feat(graph): add Nozzle stream aggregator * feat(graph): add Nozzle data decoder * feat(graph): add SQL query parser, resolver and validator * feat(graph): use a new identifier type in Nozzle related modules * feat(graph): add Nozzle Subgraph schema generation * feat(graph): add Nozzle Subgraph manifest * feat(graph): add reorg handling to the Nozzle FlightClient * feat(graph, core): extend SubgraphInstanceManager trait * feat(core, graph, node): allow multiple subgraph instance managers * fix(graph): update deterministic error patterns in Nozzle Flight client * feat(graph): add Nozzle related ENV variables * fix(graph): make block range filter return a new query * feat(graph): add decoding utilities * fix(graph): use decoding utilities in the stream aggregator * feat(graph): add more details to Nozzle data sources * feat(core, graph, node): add Nozzle subgraph deployment * feat(graph): add a dedicated Nozzle manifest resolver * feat(node): add shutdown token * feat(core, graph): add Nozzle subgraph runner * chore(all): rename Nozzle to Amp * fix(graph): produce consistent query hashes for logging * fix(core, graph): simplify SQL query requirements Only require block number columns and try to load block hashes and timestamps from the source tables * chore(graph): fix typos * fix(graph): use nozzle-resume header name * fix(graph): extend common column aliases * fix(core, graph): use named streams in the stream aggregator * fix(core, graph): simplify working with identifiers * fix(graph): validate query output column names * fix(graph): support all versions of the Amp server * fix(graph): extend the list of common column aliases * test(graph): add decoder unit-tests * feat(core, graph): add Amp subgraph metrics * fix(graph): allow more complex dataset and table names * fix(graph): remove CTE name requirements * fix(graph, node): add option to authenticate Flight service requests * fix(graph): update temporary predefined list of source context tables * docs: add docs for Amp-powered subgraphs * chore(core): reuse existing metric names * fix(core, graph): minor adjustments after rebase * fix(core, graph): resolve metric conflicts, use quoted table references in sql
1 parent cbf55c5 commit 3576eb2

File tree

98 files changed

+13418
-431
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

98 files changed

+13418
-431
lines changed

Cargo.lock

Lines changed: 1292 additions & 46 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,20 @@ substreams = "=0.6.0"
104104
substreams-entity-change = "2"
105105
substreams-near-core = "=0.10.2"
106106
rand = { version = "0.9.2", features = ["os_rng"] }
107+
prometheus = "0.14.0"
108+
109+
# Dependencies related to Amp subgraphs
110+
ahash = "0.8.11"
111+
alloy = { version = "1.0.12", default-features = false, features = ["json-abi", "serde"] }
112+
arrow = { version = "=55.0.0" }
113+
arrow-flight = { version = "=55.0.0", features = ["flight-sql-experimental"] }
114+
futures = "0.3.31"
115+
half = "2.7.1"
116+
indoc = "2.0.7"
117+
lazy-regex = "3.4.1"
118+
parking_lot = "0.12.4"
119+
sqlparser-latest = { version = "0.57.0", package = "sqlparser", features = ["visitor"] }
120+
tokio-util = "0.7.15"
107121

108122
# Incremental compilation on Rust 1.58 causes an ICE on build. As soon as graph node builds again, these can be removed.
109123
[profile.test]

chain/ethereum/src/runtime/runtime_adapter.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ impl blockchain::RuntimeAdapter<Chain> for RuntimeAdapter {
182182
create_host_fns(abis, archive, call_cache, eth_adapters, eth_call_gas)
183183
}
184184
data_source::DataSource::Offchain(_) => vec![],
185+
data_source::DataSource::Amp(_) => vec![],
185186
};
186187

187188
Ok(host_fns)

core/Cargo.toml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,19 @@ tower = { git = "https://github.com/tower-rs/tower.git", features = ["full"] }
1919
thiserror = { workspace = true }
2020
anyhow = "1.0"
2121

22+
# Dependencies related to Amp subgraphs
23+
alloy.workspace = true
24+
arrow.workspace = true
25+
chrono.workspace = true
26+
futures.workspace = true
27+
indoc.workspace = true
28+
itertools.workspace = true
29+
parking_lot.workspace = true
30+
prometheus.workspace = true
31+
slog.workspace = true
32+
strum.workspace = true
33+
tokio-util.workspace = true
34+
2235
[dev-dependencies]
2336
tower-test = { git = "https://github.com/tower-rs/tower.git" }
2437
wiremock = "0.6.5"

core/src/amp_subgraph/manager.rs

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
use std::sync::Arc;
2+
3+
use alloy::primitives::BlockNumber;
4+
use anyhow::Context;
5+
use async_trait::async_trait;
6+
use graph::{
7+
amp,
8+
components::{
9+
link_resolver::{LinkResolver, LinkResolverContext},
10+
metrics::MetricsRegistry,
11+
store::{DeploymentLocator, SubgraphStore},
12+
subgraph::SubgraphInstanceManager,
13+
},
14+
env::EnvVars,
15+
log::factory::LoggerFactory,
16+
prelude::CheapClone,
17+
};
18+
use slog::{debug, error};
19+
use tokio_util::sync::CancellationToken;
20+
21+
use super::{runner, Metrics, Monitor};
22+
23+
/// Manages Amp subgraph runner futures.
24+
///
25+
/// Creates and schedules Amp subgraph runner futures for execution on demand.
26+
/// Also handles stopping previously started Amp subgraph runners.
27+
pub struct Manager<SS, NC> {
28+
logger_factory: LoggerFactory,
29+
metrics_registry: Arc<MetricsRegistry>,
30+
env_vars: Arc<EnvVars>,
31+
monitor: Monitor,
32+
subgraph_store: Arc<SS>,
33+
link_resolver: Arc<dyn LinkResolver>,
34+
amp_client: Arc<NC>,
35+
}
36+
37+
impl<SS, NC> Manager<SS, NC>
38+
where
39+
SS: SubgraphStore,
40+
NC: amp::Client,
41+
{
42+
/// Creates a new Amp subgraph manager.
43+
pub fn new(
44+
logger_factory: &LoggerFactory,
45+
metrics_registry: Arc<MetricsRegistry>,
46+
env_vars: Arc<EnvVars>,
47+
cancel_token: &CancellationToken,
48+
subgraph_store: Arc<SS>,
49+
link_resolver: Arc<dyn LinkResolver>,
50+
amp_client: Arc<NC>,
51+
) -> Self {
52+
let logger = logger_factory.component_logger("AmpSubgraphManager", None);
53+
let logger_factory = logger_factory.with_parent(logger);
54+
55+
let monitor = Monitor::new(&logger_factory, cancel_token);
56+
57+
Self {
58+
logger_factory,
59+
metrics_registry,
60+
env_vars,
61+
monitor,
62+
subgraph_store,
63+
link_resolver,
64+
amp_client,
65+
}
66+
}
67+
}
68+
69+
#[async_trait]
70+
impl<SS, NC> SubgraphInstanceManager for Manager<SS, NC>
71+
where
72+
SS: SubgraphStore,
73+
NC: amp::Client + Send + Sync + 'static,
74+
{
75+
async fn start_subgraph(
76+
self: Arc<Self>,
77+
deployment: DeploymentLocator,
78+
stop_block: Option<i32>,
79+
) {
80+
let manager = self.cheap_clone();
81+
82+
self.monitor.start(
83+
deployment.cheap_clone(),
84+
Box::new(move |cancel_token| {
85+
Box::pin(async move {
86+
let logger = manager.logger_factory.subgraph_logger(&deployment);
87+
88+
let store = manager
89+
.subgraph_store
90+
.cheap_clone()
91+
.writable(logger.cheap_clone(), deployment.id, Vec::new().into())
92+
.await
93+
.context("failed to create writable store")?;
94+
95+
let metrics = Metrics::new(
96+
&logger,
97+
manager.metrics_registry.cheap_clone(),
98+
store.cheap_clone(),
99+
deployment.hash.cheap_clone(),
100+
);
101+
102+
let link_resolver = manager
103+
.link_resolver
104+
.for_manifest(&deployment.hash.to_string())
105+
.context("failed to create link resolver")?;
106+
107+
let manifest_bytes = link_resolver
108+
.cat(
109+
&LinkResolverContext::new(&deployment.hash, &logger),
110+
&deployment.hash.to_ipfs_link(),
111+
)
112+
.await
113+
.context("failed to load subgraph manifest")?;
114+
115+
let raw_manifest = serde_yaml::from_slice(&manifest_bytes)
116+
.context("failed to parse subgraph manifest")?;
117+
118+
let mut manifest = amp::Manifest::resolve::<graph_chain_ethereum::Chain, _>(
119+
&logger,
120+
manager.link_resolver.cheap_clone(),
121+
manager.amp_client.cheap_clone(),
122+
manager.env_vars.max_spec_version.cheap_clone(),
123+
deployment.hash.cheap_clone(),
124+
raw_manifest,
125+
)
126+
.await?;
127+
128+
if let Some(stop_block) = stop_block {
129+
for data_source in manifest.data_sources.iter_mut() {
130+
data_source.source.end_block = stop_block as BlockNumber;
131+
}
132+
}
133+
134+
store
135+
.start_subgraph_deployment(&logger)
136+
.await
137+
.context("failed to start subgraph deployment")?;
138+
139+
let runner_context = runner::Context::new(
140+
&logger,
141+
&manager.env_vars.amp,
142+
manager.amp_client.cheap_clone(),
143+
store,
144+
deployment.hash.cheap_clone(),
145+
manifest,
146+
metrics,
147+
);
148+
149+
let runner_result = runner::new_runner(runner_context, cancel_token).await;
150+
151+
match manager.subgraph_store.stop_subgraph(&deployment).await {
152+
Ok(()) => {
153+
debug!(logger, "Subgraph writer stopped");
154+
}
155+
Err(e) => {
156+
error!(logger, "Failed to stop subgraph writer";
157+
"e" => ?e
158+
);
159+
}
160+
}
161+
162+
runner_result
163+
})
164+
}),
165+
);
166+
}
167+
168+
async fn stop_subgraph(&self, deployment: DeploymentLocator) {
169+
self.monitor.stop(deployment);
170+
}
171+
}

0 commit comments

Comments
 (0)