Skip to content

Commit 762e27f

Browse files
committed
fix(core, graph): simplify SQL query requirements
Only require block number columns and try to load block hashes and timestamps from the source tables
1 parent 82cf29a commit 762e27f

File tree

21 files changed

+1409
-773
lines changed

21 files changed

+1409
-773
lines changed

core/src/amp_subgraph/runner/data_processing.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ fn decode_block_timestamp(record_batches: &[StreamRecordBatch]) -> Result<DateTi
215215

216216
for record_batch in record_batches {
217217
match auto_block_timestamp_decoder(&record_batch.record_batch) {
218-
Ok(decoder) => {
218+
Ok((_, decoder)) => {
219219
return decoder
220220
.decode(0)
221221
.map_err(|e| Error::Deterministic(e))?

core/src/amp_subgraph/runner/data_stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ where
7070
}
7171

7272
for (j, table) in data_source.transformer.tables.iter().enumerate() {
73-
let query = table.query.with_block_range_filter(block_range);
73+
let query = table.query.build_with_block_range(block_range);
7474

7575
query_streams.push(cx.client.query(&cx.logger, query, None));
7676
query_streams_table_ptr.push((i, j));

graph/src/amp/codec/utils.rs

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,61 +10,85 @@ use crate::amp::common::column_aliases;
1010

1111
pub fn auto_block_number_decoder<'a>(
1212
record_batch: &'a RecordBatch,
13-
) -> Result<Box<dyn Decoder<Option<BlockNumber>> + 'a>> {
14-
let column_index = column_index(record_batch, column_aliases::BLOCK_NUMBER)
15-
.context("failed to find block numbers column")?;
13+
) -> Result<(&'static str, Box<dyn Decoder<Option<BlockNumber>> + 'a>)> {
14+
let (&column_name, column_index) = find_column(record_batch, column_aliases::BLOCK_NUMBER)
15+
.with_context(|| {
16+
format!(
17+
"failed to find block numbers column; expected one of: {}",
18+
column_aliases::BLOCK_NUMBER.join(", ")
19+
)
20+
})?;
1621

1722
block_number_decoder(record_batch, column_index)
23+
.map(|decoder| (column_name, decoder))
24+
.with_context(|| format!("column '{column_name}' is not valid"))
1825
}
1926

2027
pub fn block_number_decoder<'a>(
2128
record_batch: &'a RecordBatch,
2229
column_index: usize,
2330
) -> Result<Box<dyn Decoder<Option<BlockNumber>> + 'a>> {
24-
column_decoder::<UInt64Array, BlockNumber>(record_batch, column_index)
31+
column_decoder::<UInt64Array, BlockNumber>(record_batch, column_index, false)
2532
}
2633

2734
pub fn auto_block_hash_decoder<'a>(
2835
record_batch: &'a RecordBatch,
29-
) -> Result<Box<dyn Decoder<Option<BlockHash>> + 'a>> {
30-
let column_index = column_index(record_batch, column_aliases::BLOCK_HASH)
31-
.context("failed to find block hashes column")?;
36+
) -> Result<(&'static str, Box<dyn Decoder<Option<BlockHash>> + 'a>)> {
37+
let (&column_name, column_index) = find_column(record_batch, column_aliases::BLOCK_HASH)
38+
.with_context(|| {
39+
format!(
40+
"failed to find block hashes column; expected one of: {}",
41+
column_aliases::BLOCK_HASH.join(", ")
42+
)
43+
})?;
3244

3345
block_hash_decoder(record_batch, column_index)
46+
.map(|decoder| (column_name, decoder))
47+
.with_context(|| format!("column '{column_name}' is not valid"))
3448
}
3549

3650
pub fn block_hash_decoder<'a>(
3751
record_batch: &'a RecordBatch,
3852
column_index: usize,
3953
) -> Result<Box<dyn Decoder<Option<BlockHash>> + 'a>> {
40-
column_decoder::<FixedSizeBinaryArray, BlockHash>(record_batch, column_index)
54+
column_decoder::<FixedSizeBinaryArray, BlockHash>(record_batch, column_index, false)
4155
}
4256

4357
pub fn auto_block_timestamp_decoder<'a>(
4458
record_batch: &'a RecordBatch,
45-
) -> Result<Box<dyn Decoder<Option<DateTime<Utc>>> + 'a>> {
46-
let column_index = column_index(record_batch, column_aliases::BLOCK_TIMESTAMP)
47-
.context("failed to find block timestamps column")?;
59+
) -> Result<(&'static str, Box<dyn Decoder<Option<DateTime<Utc>>> + 'a>)> {
60+
let (&column_name, column_index) = find_column(record_batch, column_aliases::BLOCK_TIMESTAMP)
61+
.with_context(|| {
62+
format!(
63+
"failed to find block timestamps column; expected one of: {}",
64+
column_aliases::BLOCK_TIMESTAMP.join(", ")
65+
)
66+
})?;
4867

4968
block_timestamp_decoder(record_batch, column_index)
69+
.map(|decoder| (column_name, decoder))
70+
.with_context(|| format!("column '{column_name}' is not valid"))
5071
}
5172

5273
pub fn block_timestamp_decoder<'a>(
5374
record_batch: &'a RecordBatch,
5475
column_index: usize,
5576
) -> Result<Box<dyn Decoder<Option<DateTime<Utc>>> + 'a>> {
56-
column_decoder::<TimestampNanosecondArray, DateTime<Utc>>(record_batch, column_index)
77+
column_decoder::<TimestampNanosecondArray, DateTime<Utc>>(record_batch, column_index, false)
5778
}
5879

59-
pub fn column_index(
80+
pub fn find_column<T>(
6081
record_batch: &RecordBatch,
61-
column_names: impl IntoIterator<Item = impl AsRef<str>>,
62-
) -> Option<usize> {
82+
column_names: impl IntoIterator<Item = T>,
83+
) -> Option<(T, usize)>
84+
where
85+
T: AsRef<str>,
86+
{
6387
let schema_ref = record_batch.schema_ref();
6488

6589
for column_name in column_names {
6690
if let Some((column_index, _)) = schema_ref.column_with_name(column_name.as_ref()) {
67-
return Some(column_index);
91+
return Some((column_name, column_index));
6892
}
6993
}
7094

@@ -74,16 +98,22 @@ pub fn column_index(
7498
pub fn column_decoder<'a, T: 'static, U>(
7599
record_batch: &'a RecordBatch,
76100
column_index: usize,
101+
nullable: bool,
77102
) -> Result<Box<dyn Decoder<Option<U>> + 'a>>
78103
where
79104
T: Array,
80105
ArrayDecoder<'a, T>: Decoder<Option<U>>,
81106
{
82107
if column_index >= record_batch.num_columns() {
83-
bail!("column {column_index} does not exist");
108+
bail!("column does not exist");
84109
}
85110

86111
let array = record_batch.column(column_index);
112+
113+
if !nullable && array.is_nullable() {
114+
bail!("column must not have nullable values");
115+
}
116+
87117
let decoder = ArrayDecoder::<T>::new(array)?;
88118

89119
Ok(Box::new(decoder))

graph/src/amp/manifest/data_source/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use arrow::datatypes::Schema;
88
use semver::Version;
99

1010
use crate::{
11-
amp::{common::Ident, sql::Query},
11+
amp::{common::Ident, sql::BlockRangeQueryBuilder},
1212
data::subgraph::SPEC_VERSION_1_5_0,
1313
};
1414

@@ -106,7 +106,7 @@ pub struct Table {
106106
/// The SQL query that executes on the Amp server.
107107
///
108108
/// The data resulting from this SQL query execution transforms into subgraph entities.
109-
pub query: Query,
109+
pub query: BlockRangeQueryBuilder,
110110

111111
/// The Arrow schema of this transformed table SQL query.
112112
///

0 commit comments

Comments
 (0)