Skip to content

Commit 2833bee

Browse files
committed
Subgraph composition: Use block ptrs instead of full blocks in blockstream
1 parent 854383b commit 2833bee

File tree

16 files changed

+403
-102
lines changed

16 files changed

+403
-102
lines changed

chain/arweave/src/chain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
269269
}))
270270
}
271271

272-
async fn load_blocks_by_numbers(
272+
async fn load_block_ptrs_by_numbers(
273273
&self,
274274
_logger: Logger,
275275
_block_numbers: HashSet<BlockNumber>,

chain/cosmos/src/chain.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,15 +197,13 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
197197
) -> Result<Option<codec::Block>, Error> {
198198
panic!("Should never be called since not used by FirehoseBlockStream")
199199
}
200-
201-
async fn load_blocks_by_numbers(
200+
async fn load_block_ptrs_by_numbers(
202201
&self,
203202
_logger: Logger,
204203
_block_numbers: HashSet<BlockNumber>,
205204
) -> Result<Vec<Block>, Error> {
206-
unimplemented!()
205+
todo!()
207206
}
208-
209207
async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
210208
unimplemented!()
211209
}

chain/ethereum/src/adapter.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use anyhow::Error;
22
use ethabi::{Error as ABIError, Function, ParamType, Token};
33
use graph::blockchain::ChainIdentifier;
4+
use graph::blockchain::ExtendedBlockPtr;
45
use graph::components::subgraph::MappingError;
56
use graph::data::store::ethereum::call;
67
use graph::firehose::CallToFilter;
@@ -1109,12 +1110,12 @@ pub trait EthereumAdapter: Send + Sync + 'static {
11091110
block_hash: H256,
11101111
) -> Box<dyn Future<Item = LightEthereumBlock, Error = Error> + Send>;
11111112

1112-
async fn load_blocks_by_numbers(
1113+
async fn load_block_ptrs_by_numbers(
11131114
&self,
11141115
_logger: Logger,
11151116
_chain_store: Arc<dyn ChainStore>,
11161117
_block_numbers: HashSet<BlockNumber>,
1117-
) -> Box<dyn Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send>;
1118+
) -> Box<dyn Stream<Item = Arc<ExtendedBlockPtr>, Error = Error> + Send>;
11181119

11191120
/// Load Ethereum blocks in bulk, returning results as they come back as a Stream.
11201121
/// May use the `chain_store` as a cache.

chain/ethereum/src/chain.rs

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use anyhow::{Context, Error};
33
use graph::blockchain::client::ChainClient;
44
use graph::blockchain::firehose_block_ingestor::{FirehoseBlockIngestor, Transforms};
55
use graph::blockchain::{
6-
BlockIngestor, BlockTime, BlockchainKind, ChainIdentifier, TriggerFilterWrapper,
7-
TriggersAdapterSelector,
6+
BlockIngestor, BlockTime, BlockchainKind, ChainIdentifier, ExtendedBlockPtr,
7+
TriggerFilterWrapper, TriggersAdapterSelector,
88
};
99
use graph::components::network_provider::ChainName;
1010
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
@@ -156,6 +156,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
156156
unified_api_version: UnifiedMappingApiVersion,
157157
) -> Result<Box<dyn BlockStream<Chain>>> {
158158
let requirements = filter.chain_filter.node_capabilities();
159+
let is_using_subgraph_composition = !source_subgraph_stores.is_empty();
159160
let adapter = TriggersAdapterWrapper::new(
160161
chain
161162
.triggers_adapter(&deployment, &requirements, unified_api_version.clone())
@@ -181,20 +182,32 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
181182
// This is ok because Celo blocks are always final. And we _need_ to do this because
182183
// some events appear only in eth_getLogs but not in transaction receipts.
183184
// See also ca0edc58-0ec5-4c89-a7dd-2241797f5e50.
184-
let chain_id = match chain.chain_client().as_ref() {
185+
let reorg_threshold = match chain.chain_client().as_ref() {
185186
ChainClient::Rpc(adapter) => {
186-
adapter
187+
let chain_id = adapter
187188
.cheapest()
188189
.await
189190
.ok_or(anyhow!("unable to get eth adapter for chan_id call"))?
190191
.chain_id()
191-
.await?
192+
.await?;
193+
194+
if CELO_CHAIN_IDS.contains(&chain_id) {
195+
0
196+
} else {
197+
chain.reorg_threshold
198+
}
192199
}
193-
_ => panic!("expected rpc when using polling blockstream"),
200+
_ if is_using_subgraph_composition => chain.reorg_threshold,
201+
_ => panic!(
202+
"expected rpc when using polling blockstream : {}",
203+
is_using_subgraph_composition
204+
),
194205
};
195-
let reorg_threshold = match CELO_CHAIN_IDS.contains(&chain_id) {
196-
false => chain.reorg_threshold,
197-
true => 0,
206+
207+
let max_block_range_size = if is_using_subgraph_composition {
208+
ENV_VARS.max_block_range_size * 10
209+
} else {
210+
ENV_VARS.max_block_range_size
198211
};
199212

200213
Ok(Box::new(PollingBlockStream::new(
@@ -207,7 +220,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
207220
start_blocks,
208221
reorg_threshold,
209222
logger,
210-
ENV_VARS.max_block_range_size,
223+
max_block_range_size,
211224
ENV_VARS.target_triggers_per_block_range,
212225
unified_api_version,
213226
subgraph_current_block,
@@ -617,6 +630,8 @@ pub enum BlockFinality {
617630

618631
// If a block may still be reorged, we need to work with more local data.
619632
NonFinal(EthereumBlockWithCalls),
633+
634+
Ptr(Arc<ExtendedBlockPtr>),
620635
}
621636

622637
impl Default for BlockFinality {
@@ -630,6 +645,7 @@ impl BlockFinality {
630645
match self {
631646
BlockFinality::Final(block) => block,
632647
BlockFinality::NonFinal(block) => &block.ethereum_block.block,
648+
BlockFinality::Ptr(_) => unreachable!("light_block called on HeaderOnly"),
633649
}
634650
}
635651
}
@@ -639,6 +655,7 @@ impl<'a> From<&'a BlockFinality> for BlockPtr {
639655
match block {
640656
BlockFinality::Final(b) => BlockPtr::from(&**b),
641657
BlockFinality::NonFinal(b) => BlockPtr::from(&b.ethereum_block),
658+
BlockFinality::Ptr(b) => BlockPtr::new(b.hash.clone(), b.number),
642659
}
643660
}
644661
}
@@ -648,13 +665,17 @@ impl Block for BlockFinality {
648665
match self {
649666
BlockFinality::Final(block) => block.block_ptr(),
650667
BlockFinality::NonFinal(block) => block.ethereum_block.block.block_ptr(),
668+
BlockFinality::Ptr(block) => BlockPtr::new(block.hash.clone(), block.number),
651669
}
652670
}
653671

654672
fn parent_ptr(&self) -> Option<BlockPtr> {
655673
match self {
656674
BlockFinality::Final(block) => block.parent_ptr(),
657675
BlockFinality::NonFinal(block) => block.ethereum_block.block.parent_ptr(),
676+
BlockFinality::Ptr(block) => {
677+
Some(BlockPtr::new(block.parent_hash.clone(), block.number - 1))
678+
}
658679
}
659680
}
660681

@@ -687,13 +708,15 @@ impl Block for BlockFinality {
687708
json::to_value(eth_block)
688709
}
689710
BlockFinality::NonFinal(block) => json::to_value(&block.ethereum_block),
711+
BlockFinality::Ptr(_) => Ok(json::Value::Null),
690712
}
691713
}
692714

693715
fn timestamp(&self) -> BlockTime {
694716
let ts = match self {
695717
BlockFinality::Final(block) => block.timestamp,
696718
BlockFinality::NonFinal(block) => block.ethereum_block.block.timestamp,
719+
BlockFinality::Ptr(block) => block.timestamp,
697720
};
698721
let ts = i64::try_from(ts.as_u64()).unwrap();
699722
BlockTime::since_epoch(ts, 0)
@@ -735,7 +758,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
735758
.await
736759
}
737760

738-
async fn load_blocks_by_numbers(
761+
async fn load_block_ptrs_by_numbers(
739762
&self,
740763
logger: Logger,
741764
block_numbers: HashSet<BlockNumber>,
@@ -749,9 +772,9 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
749772
.await?;
750773

751774
let blocks = adapter
752-
.load_blocks_by_numbers(logger, self.chain_store.clone(), block_numbers)
775+
.load_block_ptrs_by_numbers(logger, self.chain_store.clone(), block_numbers)
753776
.await
754-
.map(|block| BlockFinality::Final(block))
777+
.map(|block| BlockFinality::Ptr(block))
755778
.collect()
756779
.compat()
757780
.await?;
@@ -812,6 +835,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
812835
triggers.append(&mut parse_block_triggers(&filter.block, full_block));
813836
Ok(BlockWithTriggers::new(block, triggers, logger))
814837
}
838+
BlockFinality::Ptr(_) => unreachable!("triggers_in_block called on HeaderOnly"),
815839
}
816840
}
817841

chain/ethereum/src/env.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ pub struct EnvVars {
3333
/// Set by the environment variable `ETHEREUM_BLOCK_BATCH_SIZE`. The
3434
/// default value is 10 blocks.
3535
pub block_batch_size: usize,
36+
/// Set by the environment variable `ETHEREUM_BLOCK_PTR_BATCH_SIZE`. The
37+
/// default value is 10 blocks.
38+
pub block_ptr_batch_size: usize,
3639
/// Maximum number of blocks to request in each chunk.
3740
///
3841
/// Set by the environment variable `GRAPH_ETHEREUM_MAX_BLOCK_RANGE_SIZE`.
@@ -116,6 +119,7 @@ impl From<Inner> for EnvVars {
116119
trace_stream_step_size: x.trace_stream_step_size,
117120
max_event_only_range: x.max_event_only_range,
118121
block_batch_size: x.block_batch_size,
122+
block_ptr_batch_size: x.block_ptr_batch_size,
119123
max_block_range_size: x.max_block_range_size,
120124
json_rpc_timeout: Duration::from_secs(x.json_rpc_timeout_in_secs),
121125
block_receipts_check_timeout: Duration::from_secs(
@@ -160,6 +164,8 @@ struct Inner {
160164
max_event_only_range: BlockNumber,
161165
#[envconfig(from = "ETHEREUM_BLOCK_BATCH_SIZE", default = "10")]
162166
block_batch_size: usize,
167+
#[envconfig(from = "ETHEREUM_BLOCK_PTR_BATCH_SIZE", default = "100")]
168+
block_ptr_batch_size: usize,
163169
#[envconfig(from = "GRAPH_ETHEREUM_MAX_BLOCK_RANGE_SIZE", default = "2000")]
164170
max_block_range_size: BlockNumber,
165171
#[envconfig(from = "GRAPH_ETHEREUM_JSON_RPC_TIMEOUT", default = "180")]

chain/ethereum/src/ethereum_adapter.rs

Lines changed: 39 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use futures03::{future::BoxFuture, stream::FuturesUnordered};
22
use graph::blockchain::client::ChainClient;
33
use graph::blockchain::BlockHash;
44
use graph::blockchain::ChainIdentifier;
5+
use graph::blockchain::ExtendedBlockPtr;
56

67
use graph::components::transaction_receipt::LightTransactionReceipt;
78
use graph::data::store::ethereum::call;
@@ -783,11 +784,11 @@ impl EthereumAdapter {
783784
}
784785

785786
/// Request blocks by number through JSON-RPC.
786-
fn load_blocks_by_numbers_rpc(
787+
fn load_block_ptrs_by_numbers_rpc(
787788
&self,
788789
logger: Logger,
789790
numbers: Vec<BlockNumber>,
790-
) -> impl Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send {
791+
) -> impl Stream<Item = Arc<ExtendedBlockPtr>, Error = Error> + Send {
791792
let web3 = self.web3.clone();
792793

793794
stream::iter_ok::<_, Error>(numbers.into_iter().map(move |number| {
@@ -798,27 +799,37 @@ impl EthereumAdapter {
798799
.run(move || {
799800
Box::pin(
800801
web3.eth()
801-
.block_with_txs(BlockId::Number(Web3BlockNumber::Number(
802-
number.into(),
803-
))),
802+
.block(BlockId::Number(Web3BlockNumber::Number(number.into()))),
804803
)
805804
.compat()
806805
.from_err::<Error>()
807806
.and_then(move |block| {
808-
block.map(Arc::new).ok_or_else(|| {
809-
anyhow::anyhow!(
810-
"Ethereum node did not find block with number {:?}",
811-
number
812-
)
813-
})
807+
block
808+
.map(|block| {
809+
let ptr = ExtendedBlockPtr::try_from((
810+
block.hash,
811+
block.number,
812+
block.parent_hash,
813+
block.timestamp,
814+
))
815+
.unwrap();
816+
817+
Arc::new(ptr)
818+
})
819+
.ok_or_else(|| {
820+
anyhow::anyhow!(
821+
"Ethereum node did not find block with number {:?}",
822+
number
823+
)
824+
})
814825
})
815826
.compat()
816827
})
817828
.boxed()
818829
.compat()
819830
.from_err()
820831
}))
821-
.buffered(ENV_VARS.block_batch_size)
832+
.buffered(ENV_VARS.block_ptr_batch_size)
822833
}
823834

824835
/// Request blocks ptrs for numbers through JSON-RPC.
@@ -1690,27 +1701,27 @@ impl EthereumAdapterTrait for EthereumAdapter {
16901701
}
16911702

16921703
/// Load Ethereum blocks in bulk by number, returning results as they come back as a Stream.
1693-
async fn load_blocks_by_numbers(
1704+
async fn load_block_ptrs_by_numbers(
16941705
&self,
16951706
logger: Logger,
16961707
chain_store: Arc<dyn ChainStore>,
16971708
block_numbers: HashSet<BlockNumber>,
1698-
) -> Box<dyn Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send> {
1699-
let blocks_map: BTreeMap<i32, Vec<json::Value>> = chain_store
1709+
) -> Box<dyn Stream<Item = Arc<ExtendedBlockPtr>, Error = Error> + Send> {
1710+
let blocks_map = chain_store
17001711
.cheap_clone()
1701-
.blocks_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::<Vec<_>>())
1712+
.block_ptrs_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::<Vec<_>>())
17021713
.await
17031714
.map_err(|e| {
17041715
error!(&logger, "Error accessing block cache {}", e);
17051716
e
17061717
})
17071718
.unwrap_or_default();
17081719

1709-
let mut blocks: Vec<Arc<LightEthereumBlock>> = blocks_map
1720+
let mut blocks: Vec<Arc<ExtendedBlockPtr>> = blocks_map
17101721
.into_iter()
17111722
.filter_map(|(_number, values)| {
17121723
if values.len() == 1 {
1713-
json::from_value(values[0].clone()).ok()
1724+
Arc::new(values[0].clone()).into()
17141725
} else {
17151726
None
17161727
}
@@ -1719,7 +1730,7 @@ impl EthereumAdapterTrait for EthereumAdapter {
17191730

17201731
let missing_blocks: Vec<i32> = block_numbers
17211732
.into_iter()
1722-
.filter(|&number| !blocks.iter().any(|block| block.number() == number))
1733+
.filter(|&number| !blocks.iter().any(|block| block.block_number() == number))
17231734
.collect();
17241735

17251736
if !missing_blocks.is_empty() {
@@ -1731,20 +1742,9 @@ impl EthereumAdapterTrait for EthereumAdapter {
17311742
}
17321743

17331744
Box::new(
1734-
self.load_blocks_by_numbers_rpc(logger.clone(), missing_blocks)
1745+
self.load_block_ptrs_by_numbers_rpc(logger.clone(), missing_blocks)
17351746
.collect()
17361747
.map(move |new_blocks| {
1737-
let upsert_blocks: Vec<_> = new_blocks
1738-
.iter()
1739-
.map(|block| BlockFinality::Final(block.clone()))
1740-
.collect();
1741-
let block_refs: Vec<_> = upsert_blocks
1742-
.iter()
1743-
.map(|block| block as &dyn graph::blockchain::Block)
1744-
.collect();
1745-
if let Err(e) = chain_store.upsert_light_blocks(block_refs.as_slice()) {
1746-
error!(logger, "Error writing to block cache {}", e);
1747-
}
17481748
blocks.extend(new_blocks);
17491749
blocks.sort_by_key(|block| block.number);
17501750
stream::iter_ok(blocks)
@@ -2028,6 +2028,9 @@ pub(crate) async fn get_calls(
20282028
calls: Some(calls),
20292029
}))
20302030
}
2031+
BlockFinality::Ptr(_) => {
2032+
unreachable!("get_calls called with BlockFinality::Ptr")
2033+
}
20312034
}
20322035
}
20332036

@@ -2209,6 +2212,11 @@ async fn filter_call_triggers_from_unsuccessful_transactions(
22092212
"this function should not be called when dealing with non-final blocks"
22102213
)
22112214
}
2215+
BlockFinality::Ptr(_block) => {
2216+
unreachable!(
2217+
"this function should not be called when dealing with header-only blocks"
2218+
)
2219+
}
22122220
}
22132221
};
22142222

chain/near/src/chain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
325325
panic!("Should never be called since not used by FirehoseBlockStream")
326326
}
327327

328-
async fn load_blocks_by_numbers(
328+
async fn load_block_ptrs_by_numbers(
329329
&self,
330330
_logger: Logger,
331331
_block_numbers: HashSet<BlockNumber>,

0 commit comments

Comments
 (0)