Skip to content

Commit 2df58ab

Browse files
committed
chore(all): rename Nozzle to Amp
1 parent 4c747b9 commit 2df58ab

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+508
-420
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ substreams-entity-change = "2"
9898
substreams-near-core = "=0.10.2"
9999
rand = { version = "0.9.2", features = ["os_rng"] }
100100

101-
# Dependencies related to Nozzle Subgraphs
101+
# Dependencies related to Amp subgraphs
102102
ahash = "0.8.11"
103103
alloy = { version = "1.0.12", default-features = false, features = ["json-abi", "serde"] }
104104
arrow = { version = "=55.0.0" }

chain/ethereum/src/runtime/runtime_adapter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ impl blockchain::RuntimeAdapter<Chain> for RuntimeAdapter {
182182
create_host_fns(abis, archive, call_cache, eth_adapters, eth_call_gas)
183183
}
184184
data_source::DataSource::Offchain(_) => vec![],
185-
data_source::DataSource::Nozzle(_) => vec![],
185+
data_source::DataSource::Amp(_) => vec![],
186186
};
187187

188188
Ok(host_fns)
Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use alloy::primitives::BlockNumber;
44
use anyhow::Context;
55
use async_trait::async_trait;
66
use graph::{
7+
amp,
78
components::{
89
link_resolver::{LinkResolver, LinkResolverContext},
910
metrics::MetricsRegistry,
@@ -12,44 +13,43 @@ use graph::{
1213
},
1314
env::EnvVars,
1415
log::factory::LoggerFactory,
15-
nozzle,
1616
prelude::CheapClone,
1717
};
1818
use slog::{debug, error};
1919
use tokio_util::sync::CancellationToken;
2020

2121
use super::{runner, Metrics, Monitor};
2222

23-
/// Manages Nozzle subgraph runner futures.
23+
/// Manages Amp subgraph runner futures.
2424
///
25-
/// Creates and schedules Nozzle subgraph runner futures for execution on demand.
26-
/// Also handles stopping previously started Nozzle subgraph runners.
25+
/// Creates and schedules Amp subgraph runner futures for execution on demand.
26+
/// Also handles stopping previously started Amp subgraph runners.
2727
pub struct Manager<SS, NC> {
2828
logger_factory: LoggerFactory,
2929
metrics_registry: Arc<MetricsRegistry>,
3030
env_vars: Arc<EnvVars>,
3131
monitor: Monitor,
3232
subgraph_store: Arc<SS>,
3333
link_resolver: Arc<dyn LinkResolver>,
34-
nozzle_client: Arc<NC>,
34+
amp_client: Arc<NC>,
3535
}
3636

3737
impl<SS, NC> Manager<SS, NC>
3838
where
3939
SS: SubgraphStore,
40-
NC: nozzle::Client,
40+
NC: amp::Client,
4141
{
42-
/// Creates a new Nozzle subgraph manager.
42+
/// Creates a new Amp subgraph manager.
4343
pub fn new(
4444
logger_factory: &LoggerFactory,
4545
metrics_registry: Arc<MetricsRegistry>,
4646
env_vars: Arc<EnvVars>,
4747
cancel_token: &CancellationToken,
4848
subgraph_store: Arc<SS>,
4949
link_resolver: Arc<dyn LinkResolver>,
50-
nozzle_client: Arc<NC>,
50+
amp_client: Arc<NC>,
5151
) -> Self {
52-
let logger = logger_factory.component_logger("NozzleSubgraphManager", None);
52+
let logger = logger_factory.component_logger("AmpSubgraphManager", None);
5353
let logger_factory = logger_factory.with_parent(logger);
5454

5555
let monitor = Monitor::new(&logger_factory, cancel_token);
@@ -61,7 +61,7 @@ where
6161
monitor,
6262
subgraph_store,
6363
link_resolver,
64-
nozzle_client,
64+
amp_client,
6565
}
6666
}
6767
}
@@ -70,7 +70,7 @@ where
7070
impl<SS, NC> SubgraphInstanceManager for Manager<SS, NC>
7171
where
7272
SS: SubgraphStore,
73-
NC: nozzle::Client + Send + Sync + 'static,
73+
NC: amp::Client + Send + Sync + 'static,
7474
{
7575
async fn start_subgraph(
7676
self: Arc<Self>,
@@ -115,10 +115,10 @@ where
115115
let raw_manifest = serde_yaml::from_slice(&manifest_bytes)
116116
.context("failed to parse subgraph manifest")?;
117117

118-
let mut manifest = nozzle::Manifest::resolve::<graph_chain_ethereum::Chain, _>(
118+
let mut manifest = amp::Manifest::resolve::<graph_chain_ethereum::Chain, _>(
119119
&logger,
120120
manager.link_resolver.cheap_clone(),
121-
manager.nozzle_client.cheap_clone(),
121+
manager.amp_client.cheap_clone(),
122122
manager.env_vars.max_spec_version.cheap_clone(),
123123
deployment.hash.cheap_clone(),
124124
raw_manifest,
@@ -138,8 +138,8 @@ where
138138

139139
let runner_context = runner::Context::new(
140140
&logger,
141-
&manager.env_vars.nozzle,
142-
manager.nozzle_client.cheap_clone(),
141+
&manager.env_vars.amp,
142+
manager.amp_client.cheap_clone(),
143143
store,
144144
deployment.hash.cheap_clone(),
145145
manifest,
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ impl Metrics {
2626
let stopwatch = StopwatchMetrics::new(
2727
logger.cheap_clone(),
2828
deployment,
29-
"nozzle-process",
29+
"amp-process",
3030
metrics_registry,
3131
store.shard().to_string(),
3232
);
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ impl Monitor {
9292
/// A new cancel token is derived from the `cancel_token` and only the derived token is used by the
9393
/// subgraph monitor and its background process.
9494
pub(super) fn new(logger_factory: &LoggerFactory, cancel_token: &CancellationToken) -> Self {
95-
let logger = logger_factory.component_logger("NozzleSubgraphMonitor", None);
95+
let logger = logger_factory.component_logger("AmpSubgraphMonitor", None);
9696
let logger_factory = Arc::new(logger_factory.with_parent(logger));
9797

9898
// A derived token makes sure it is not possible to accidentally cancel the parent token
@@ -221,7 +221,7 @@ impl Monitor {
221221
_ = cancel_token.cancelled() => {
222222
debug!(logger, "Stopping command processor");
223223

224-
// All active Subgraphs will shutdown gracefully
224+
// All active subgraphs will shutdown gracefully
225225
// because their cancel tokens are derived from this cancelled token.
226226
return;
227227
}

core/src/nozzle_subgraph/runner/context.rs renamed to core/src/amp_subgraph/runner/context.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,21 @@ use std::sync::Arc;
22

33
use alloy::primitives::{BlockHash, BlockNumber};
44
use graph::{
5+
amp::{log::Logger as _, Codec, Manifest},
56
cheap_clone::CheapClone,
67
components::store::WritableStore,
78
data::subgraph::DeploymentHash,
8-
env::NozzleEnv,
9-
nozzle::{log::Logger as _, Codec, Manifest},
9+
env::AmpEnv,
1010
util::backoff::ExponentialBackoff,
1111
};
1212
use slog::Logger;
1313

1414
use super::Compat;
15-
use crate::nozzle_subgraph::Metrics;
15+
use crate::amp_subgraph::Metrics;
1616

17-
pub(in super::super) struct Context<NC> {
17+
pub(in super::super) struct Context<AC> {
1818
pub(super) logger: Logger,
19-
pub(super) client: Arc<NC>,
19+
pub(super) client: Arc<AC>,
2020
pub(super) store: Arc<dyn WritableStore>,
2121
pub(super) max_buffer_size: usize,
2222
pub(super) max_block_range: usize,
@@ -27,17 +27,17 @@ pub(in super::super) struct Context<NC> {
2727
pub(super) codec: Codec,
2828
}
2929

30-
impl<NC> Context<NC> {
30+
impl<AC> Context<AC> {
3131
pub(in super::super) fn new(
3232
logger: &Logger,
33-
env: &NozzleEnv,
34-
client: Arc<NC>,
33+
env: &AmpEnv,
34+
client: Arc<AC>,
3535
store: Arc<dyn WritableStore>,
3636
deployment: DeploymentHash,
3737
manifest: Manifest,
3838
metrics: Metrics,
3939
) -> Self {
40-
let logger = logger.component("NozzleSubgraphRunner");
40+
let logger = logger.component("AmpSubgraphRunner");
4141
let backoff = ExponentialBackoff::new(env.query_retry_min_delay, env.query_retry_max_delay);
4242
let codec = Codec::new(manifest.schema.cheap_clone());
4343

core/src/nozzle_subgraph/runner/data_processing.rs renamed to core/src/amp_subgraph/runner/data_processing.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,20 @@ use anyhow::anyhow;
55
use arrow::array::RecordBatch;
66
use chrono::{DateTime, Utc};
77
use graph::{
8-
blockchain::block_stream::FirehoseCursor,
9-
cheap_clone::CheapClone,
10-
components::store::{EntityCache, ModificationsAndCache},
11-
nozzle::{
8+
amp::{
129
codec::{utils::auto_block_timestamp_decoder, DecodeOutput, DecodedEntity, Decoder},
1310
stream_aggregator::{RecordBatchGroup, RecordBatchGroups, StreamRecordBatch},
1411
},
12+
blockchain::block_stream::FirehoseCursor,
13+
cheap_clone::CheapClone,
14+
components::store::{EntityCache, ModificationsAndCache},
1515
};
1616
use slog::{debug, trace};
1717

1818
use super::{data_stream::TablePtr, Compat, Context, Error};
1919

20-
pub(super) async fn process_record_batch_groups<NC>(
21-
cx: &mut Context<NC>,
20+
pub(super) async fn process_record_batch_groups<AC>(
21+
cx: &mut Context<AC>,
2222
mut entity_cache: EntityCache,
2323
record_batch_groups: RecordBatchGroups,
2424
stream_table_ptr: Arc<[TablePtr]>,
@@ -72,8 +72,8 @@ pub(super) async fn process_record_batch_groups<NC>(
7272
Ok(entity_cache)
7373
}
7474

75-
async fn process_record_batch_group<NC>(
76-
cx: &mut Context<NC>,
75+
async fn process_record_batch_group<AC>(
76+
cx: &mut Context<AC>,
7777
mut entity_cache: EntityCache,
7878
block_number: BlockNumber,
7979
block_hash: BlockHash,
@@ -146,8 +146,8 @@ async fn process_record_batch_group<NC>(
146146
))
147147
}
148148

149-
async fn process_record_batch<NC>(
150-
cx: &mut Context<NC>,
149+
async fn process_record_batch<AC>(
150+
cx: &mut Context<AC>,
151151
entity_cache: &mut EntityCache,
152152
block_number: BlockNumber,
153153
record_batch: RecordBatch,

core/src/nozzle_subgraph/runner/data_stream.rs renamed to core/src/amp_subgraph/runner/data_stream.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,25 @@ use futures::{
77
StreamExt, TryStreamExt,
88
};
99
use graph::{
10-
cheap_clone::CheapClone,
11-
nozzle::{
10+
amp::{
1211
manifest::DataSource,
1312
stream_aggregator::{RecordBatchGroups, StreamAggregator},
1413
Client,
1514
},
15+
cheap_clone::CheapClone,
1616
};
1717
use slog::{debug, warn};
1818

1919
use super::{Context, Error};
2020

2121
pub(super) type TablePtr = (usize, usize);
2222

23-
pub(super) fn new_data_stream<NC>(
24-
cx: &Context<NC>,
23+
pub(super) fn new_data_stream<AC>(
24+
cx: &Context<AC>,
2525
latest_block: BlockNumber,
2626
) -> BoxStream<'static, Result<(RecordBatchGroups, Arc<[TablePtr]>), Error>>
2727
where
28-
NC: Client,
28+
AC: Client,
2929
{
3030
let logger = cx.logger.new(slog::o!("process" => "new_data_stream"));
3131

@@ -118,8 +118,8 @@ where
118118
merged_data_stream
119119
}
120120

121-
fn next_block_ranges<NC>(
122-
cx: &Context<NC>,
121+
fn next_block_ranges<AC>(
122+
cx: &Context<AC>,
123123
latest_queried_block: Option<BlockNumber>,
124124
latest_block: BlockNumber,
125125
) -> HashMap<usize, RangeInclusive<BlockNumber>> {
@@ -148,8 +148,8 @@ fn next_block_ranges<NC>(
148148
.collect()
149149
}
150150

151-
fn next_block_range<NC>(
152-
cx: &Context<NC>,
151+
fn next_block_range<AC>(
152+
cx: &Context<AC>,
153153
data_source: &DataSource,
154154
latest_queried_block: Option<BlockNumber>,
155155
latest_block: BlockNumber,

0 commit comments

Comments
 (0)