Skip to content

Commit 66684cf

Browse files
committed
fix(core, graph): resolve metric conflicts, use quoted table references in sql
1 parent 38889a2 commit 66684cf

File tree

13 files changed

+150
-137
lines changed

13 files changed

+150
-137
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/amp_subgraph/metrics.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ impl Metrics {
3535
let stopwatch = StopwatchMetrics::new(
3636
logger.cheap_clone(),
3737
deployment.cheap_clone(),
38-
"amp-process",
38+
"process",
3939
metrics_registry.cheap_clone(),
4040
store.shard().to_string(),
4141
);
@@ -123,7 +123,11 @@ impl DeploymentHead {
123123
.new_int_gauge(
124124
"deployment_head",
125125
"Tracks the most recent block number processed by a deployment",
126-
const_labels,
126+
const_labels
127+
.into_iter()
128+
.map(|(k, v)| (k.to_string(), v.to_string()))
129+
// TODO: Pass the network from the Amp manifest
130+
.chain([("network".to_string(), "".to_string())]),
127131
)
128132
.expect("failed to register `deployment_head` gauge");
129133

@@ -139,7 +143,7 @@ impl DeploymentHead {
139143
}
140144
}
141145

142-
/// Tracks the target block number of a deployment.
146+
/// Tracks the maximum block number currently available for indexing within a deployment.
143147
pub(super) struct DeploymentTarget(IntGauge);
144148

145149
impl DeploymentTarget {
@@ -149,8 +153,8 @@ impl DeploymentTarget {
149153
) -> Self {
150154
let int_gauge = metrics_registry
151155
.new_int_gauge(
152-
"amp_deployment_target",
153-
"Tracks the target block number of a deployment",
156+
"deployment_target",
157+
"Tracks the maximum block number currently available for indexing within a deployment",
154158
const_labels,
155159
)
156160
.expect("failed to register `amp_deployment_target` gauge");
@@ -217,7 +221,7 @@ impl IndexingDuration {
217221
) -> Self {
218222
let int_counter = metrics_registry
219223
.new_int_counter(
220-
"amp_deployment_indexing_duration_seconds",
224+
"deployment_indexing_duration_seconds",
221225
"Tracks the total duration in seconds of deployment indexing",
222226
const_labels,
223227
)

core/src/amp_subgraph/monitor.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -439,13 +439,6 @@ impl Monitor {
439439
error!(logger, "Subgraph panicked";
440440
"error" => ?e
441441
);
442-
443-
// TODO: Maybe abort the entire process on panic and require a full graph-node restart.
444-
// Q: Should a bug that is triggered in a specific subgraph affect everything?
445-
// Q: How to make this failure loud enough so it is not missed?
446-
//
447-
// println!("Subgraph panicked");
448-
// std::process::abort();
449442
}
450443
Err(e) => {
451444
error!(logger, "Subgraph failed";

core/src/amp_subgraph/runner/data_processing.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ async fn process_record_batch_group<AC>(
133133
})?;
134134
}
135135

136+
let section = cx.metrics.stopwatch.start_section("as_modifications");
136137
let ModificationsAndCache {
137138
modifications,
138139
entity_lfu_cache,
@@ -142,7 +143,9 @@ async fn process_record_batch_group<AC>(
142143
.await
143144
.map_err(Error::from)
144145
.map_err(|e| e.context("failed to extract entity modifications from the state"))?;
146+
section.end();
145147

148+
let _section = cx.metrics.stopwatch.start_section("transact_block");
146149
let is_close_to_chain_head = latest_block.saturating_sub(block_number) <= 100;
147150

148151
cx.store

docs/amp-powered-subgraphs.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ Amp-powered subgraphs feature introduces the following new ENV variables:
553553
In addition to reporting updates to the existing `deployment_status`, `deployment_head`, `deployment_synced` and `deployment_blocks_processed_count`
554554
metrics, Amp-powered subgraphs feature introduces the following new metrics:
555555

556-
- `amp_deployment_target` – Tracks the target block number of a deployment.
557-
- `amp_deployment_indexing_duration_seconds` – Tracks the total duration in seconds of deployment indexing.
556+
- `deployment_target` – Tracks the maximum block number currently available for indexing within a deployment.
557+
- `deployment_indexing_duration_seconds` – Tracks the total duration in seconds of deployment indexing.
558558

559-
Additionally, the `deployment_sync_secs` is extended with a new `amp-process` stage and new sections specific to the Amp indexing process.
559+
Additionally, the `deployment_sync_secs` is extended with new sections specific to the Amp indexing process.

graph/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ alloy.workspace = true
9999
arrow-flight.workspace = true
100100
arrow.workspace = true
101101
half.workspace = true
102+
indoc.workspace = true
102103
lazy-regex.workspace = true
103104
sqlparser-latest.workspace = true
104105
tokio-util.workspace = true

graph/src/amp/manifest/data_source/raw.rs

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use alloy::{
77
use anyhow::anyhow;
88
use arrow::{array::RecordBatch, datatypes::Schema};
99
use futures03::future::try_join_all;
10+
use itertools::Itertools;
1011
use lazy_regex::regex_is_match;
1112
use semver::Version;
1213
use serde::Deserialize;
@@ -86,14 +87,7 @@ impl RawDataSource {
8687
.map_err(|e| e.source_context("invalid `source`"))?;
8788

8889
let transformer = transformer
89-
.resolve(
90-
&logger,
91-
link_resolver,
92-
amp_client,
93-
input_schema,
94-
&network,
95-
&source,
96-
)
90+
.resolve(&logger, link_resolver, amp_client, input_schema, &source)
9791
.await
9892
.map_err(|e| e.source_context("invalid `transformer`"))?;
9993

@@ -159,6 +153,12 @@ impl RawSource {
159153
}
160154
Self::validate_tables(&tables)?;
161155

156+
let dataset = normalize_sql_ident(&dataset);
157+
let tables = tables
158+
.into_iter()
159+
.map(|table| normalize_sql_ident(&table))
160+
.collect_vec();
161+
162162
let address = address.unwrap_or(Address::ZERO);
163163
let start_block = start_block.unwrap_or(BlockNumber::MIN);
164164
let end_block = end_block.unwrap_or(BlockNumber::MAX);
@@ -232,7 +232,6 @@ impl RawTransformer {
232232
link_resolver: &dyn LinkResolver,
233233
amp_client: &impl amp::Client,
234234
input_schema: Option<&InputSchema>,
235-
network: &str,
236235
source: &Source,
237236
) -> Result<Transformer, Error> {
238237
let Self {
@@ -248,7 +247,6 @@ impl RawTransformer {
248247
link_resolver,
249248
amp_client,
250249
input_schema,
251-
network,
252250
tables,
253251
source,
254252
&abis,
@@ -306,7 +304,6 @@ impl RawTransformer {
306304
link_resolver: &dyn LinkResolver,
307305
amp_client: &impl amp::Client,
308306
input_schema: Option<&InputSchema>,
309-
network: &str,
310307
tables: Vec<RawTable>,
311308
source: &Source,
312309
abis: &[Abi],
@@ -335,7 +332,6 @@ impl RawTransformer {
335332
link_resolver,
336333
amp_client,
337334
input_schema,
338-
network,
339335
source,
340336
abis,
341337
)
@@ -431,7 +427,6 @@ impl RawTable {
431427
link_resolver: &dyn LinkResolver,
432428
amp_client: &impl amp::Client,
433429
input_schema: Option<&InputSchema>,
434-
network: &str,
435430
source: &Source,
436431
abis: &[Abi],
437432
) -> Result<Table, Error> {
@@ -459,7 +454,6 @@ impl RawTable {
459454
logger,
460455
amp_client,
461456
input_schema,
462-
network,
463457
source,
464458
query,
465459
schema.clone(),
@@ -557,7 +551,6 @@ impl RawTable {
557551
logger: &Logger,
558552
amp_client: &impl amp::Client,
559553
input_schema: Option<&InputSchema>,
560-
network: &str,
561554
source: &Source,
562555
query: ValidQuery,
563556
schema: Schema,
@@ -586,15 +579,7 @@ impl RawTable {
586579
let context_sources_iter = source
587580
.tables
588581
.iter()
589-
.map(|table| (source.dataset.as_str(), table.as_str()))
590-
// TODO: Replace hardcoded values with schema metadata sources when available
591-
.chain(match network {
592-
"ethereum-mainnet" => vec![("edgeandnode/ethereum_mainnet", "blocks")],
593-
"base-mainnet" => vec![("edgeandnode/base_mainnet", "blocks")],
594-
"base-sepolia" => vec![("edgeandnode/base_sepolia", "blocks")],
595-
"arbitrum-one" => vec![("edgeandnode/arbitrum_one", "blocks")],
596-
_ => vec![],
597-
});
582+
.map(|table| (source.dataset.as_str(), table.as_str()));
598583

599584
for (dataset, table) in context_sources_iter {
600585
let context_logger = logger.new(slog::o!(
@@ -719,3 +704,10 @@ fn validate_ident(s: &str) -> Result<(), Error> {
719704
}
720705
Ok(())
721706
}
707+
708+
fn normalize_sql_ident(s: &str) -> String {
709+
match validate_ident(s) {
710+
Ok(()) => s.to_lowercase(),
711+
Err(_e) => sqlparser_latest::ast::Ident::with_quote('"', s).to_string(),
712+
}
713+
}

graph/src/amp/sql/query_builder/block_range_query.rs

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -134,24 +134,25 @@ mod tests {
134134
let block_range_query = new_block_range_query(&query, block_number_column, &block_range);
135135

136136
assert_eq!(
137-
block_range_query,
137+
block_range_query.to_string(),
138138
parse_query(
139139
r#"
140-
WITH block_range_1164572571450379730 AS (
141-
SELECT * FROM "d" WHERE _block_num BETWEEN 0 AND 1000000
140+
WITH block_range_14621009630487609643 AS (
141+
SELECT * FROM d WHERE _block_num BETWEEN 0 AND 1000000
142142
),
143-
source_1164572571450379730 AS (
144-
SELECT a, b, c FROM block_range_1164572571450379730 AS d
143+
source_14621009630487609643 AS (
144+
SELECT a, b, c FROM block_range_14621009630487609643 AS d
145145
)
146146
SELECT
147-
source_1164572571450379730.*
147+
source_14621009630487609643.*
148148
FROM
149-
source_1164572571450379730
149+
source_14621009630487609643
150150
ORDER BY
151-
source_1164572571450379730.b
151+
source_14621009630487609643.b
152152
"#
153153
)
154-
.unwrap(),
154+
.unwrap()
155+
.to_string(),
155156
)
156157
}
157158

@@ -163,27 +164,28 @@ mod tests {
163164
let block_range_query = new_block_range_query(&query, block_number_column, &block_range);
164165

165166
assert_eq!(
166-
block_range_query,
167+
block_range_query.to_string(),
167168
parse_query(
168169
r#"
169-
WITH block_range_1164572571450379730 AS (
170-
SELECT * FROM "d" WHERE _block_num BETWEEN 0 AND 1000000
170+
WITH block_range_14621009630487609643 AS (
171+
SELECT * FROM d WHERE _block_num BETWEEN 0 AND 1000000
171172
),
172-
block_range_13063992259633584610 AS (
173-
SELECT * FROM "e" WHERE _block_num BETWEEN 0 AND 1000000
173+
block_range_12377422807768256314 AS (
174+
SELECT * FROM e WHERE _block_num BETWEEN 0 AND 1000000
174175
),
175-
source_13063992259633584610 AS (
176-
SELECT a, b, c FROM block_range_1164572571450379730 AS d JOIN block_range_13063992259633584610 AS e ON e.e = d.d
176+
source_12377422807768256314 AS (
177+
SELECT a, b, c FROM block_range_14621009630487609643 AS d JOIN block_range_12377422807768256314 AS e ON e.e = d.d
177178
)
178179
SELECT
179-
source_13063992259633584610.*
180+
source_12377422807768256314.*
180181
FROM
181-
source_13063992259633584610
182+
source_12377422807768256314
182183
ORDER BY
183-
source_13063992259633584610.b
184+
source_12377422807768256314.b
184185
"#
185186
)
186-
.unwrap(),
187+
.unwrap()
188+
.to_string(),
187189
)
188190
}
189191
}

0 commit comments

Comments
 (0)