Skip to content

Commit 27cbcdd

Browse files
authored
delete 'shallow' fh eth blks on rpc+ingestor start (#4790)
* delete 'shallow' fh eth blks on rpc+ingestor start when launching grpah-node with an ingestor on an eth chain with an RPC provider (i.e. not firehose), this will delete the 'shallow' blocks in chainX.blocks, so they do not cause issues to the RPC provider. It only runs on ethereum chains where firehose is not used and when ingestor is active. * cleanup shallowblocks only within reorg threshold, remove nulldata indexes (too costly) * also remove cursor on non-firehose ethereum chains * delete_shallowblocks: check for overflow, use double reorg_threshold for safety * add comment with github pr link regarding cleanup_shallow_blocks * Only remove shallow blocks if cursor was removed * minor style improvements * more minor style refinement in block_store --------- Co-authored-by: Stéphane Duchesneau <[email protected]>
1 parent aeacbeb commit 27cbcdd

File tree

5 files changed

+109
-44
lines changed

5 files changed

+109
-44
lines changed

node/src/main.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,18 @@ async fn main() {
379379
Some(&eth_firehose_only_networks)
380380
};
381381

382+
if !opt.disable_block_ingestor && eth_networks.networks.len() != 0 {
383+
let eth_network_names = Vec::from_iter(eth_networks.networks.keys());
384+
let fh_only = match eth_firehose_only_networks {
385+
Some(firehose_only) => Some(Vec::from_iter(firehose_only.networks.keys())),
386+
None => None,
387+
};
388+
network_store
389+
.block_store()
390+
.cleanup_ethereum_shallow_blocks(eth_network_names, fh_only)
391+
.unwrap();
392+
}
393+
382394
let ethereum_chains = ethereum_networks_as_chains(
383395
&mut blockchain_map,
384396
&logger,

store/postgres/migrations/2023-06-22_050000_index_ethereum_nulldata/down.sql

Lines changed: 0 additions & 17 deletions
This file was deleted.

store/postgres/migrations/2023-06-22_050000_index_ethereum_nulldata/up.sql

Lines changed: 0 additions & 26 deletions
This file was deleted.

store/postgres/src/block_store.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77
use graph::{
88
blockchain::ChainIdentifier,
99
components::store::BlockStore as BlockStoreTrait,
10-
prelude::{error, warn, BlockNumber, BlockPtr, Logger, ENV_VARS},
10+
prelude::{error, info, warn, BlockNumber, BlockPtr, Logger, ENV_VARS},
1111
};
1212
use graph::{constraint_violation, prelude::CheapClone};
1313
use graph::{
@@ -442,6 +442,44 @@ impl BlockStore {
442442
Ok(())
443443
}
444444

445+
// cleanup_ethereum_shallow_blocks will delete cached blocks previously produced by firehose on
446+
// an ethereum chain that is not currently configured to use firehose provider.
447+
//
448+
// This is to prevent an issue where firehose stores "shallow" blocks (with null data) in `chainX.blocks`
449+
// table but RPC provider requires those blocks to be full.
450+
//
451+
// - This issue only affects ethereum chains.
452+
// - This issue only happens when switching providers from firehose back to RPC. it is gated by
453+
// the presence of a cursor in the public.ethereum_networks table for a chain configured without firehose.
454+
// - Only the shallow blocks close to HEAD need to be deleted, the older blocks don't need data.
455+
// - Deleting everything or creating an index on empty data would cause too much performance
456+
// hit on graph-node startup.
457+
//
458+
// Discussed here: https://github.com/graphprotocol/graph-node/pull/4790
459+
pub fn cleanup_ethereum_shallow_blocks(
460+
&self,
461+
ethereum_networks: Vec<&String>,
462+
firehose_only_networks: Option<Vec<&String>>,
463+
) -> Result<(), StoreError> {
464+
for store in self.stores.read().unwrap().values() {
465+
if !ethereum_networks.contains(&&store.chain) {
466+
continue;
467+
};
468+
if let Some(fh_nets) = firehose_only_networks.clone() {
469+
if fh_nets.contains(&&store.chain) {
470+
continue;
471+
};
472+
}
473+
474+
if let Some(head_block) = store.remove_cursor(&&store.chain)? {
475+
let lower_bound = head_block.saturating_sub(ENV_VARS.reorg_threshold * 2);
476+
info!(&self.logger, "Removed cursor for non-firehose chain, now cleaning shallow blocks"; "network" => &store.chain, "lower_bound" => lower_bound);
477+
store.cleanup_shallow_blocks(lower_bound)?;
478+
}
479+
}
480+
Ok(())
481+
}
482+
445483
fn truncate_block_caches(&self) -> Result<(), StoreError> {
446484
for store in self.stores.read().unwrap().values() {
447485
store.truncate_block_cache()?

store/postgres/src/chain_store.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,52 @@ mod data {
418418
Ok(())
419419
}
420420

421+
pub(super) fn cleanup_shallow_blocks(
422+
&self,
423+
conn: &PgConnection,
424+
lowest_block: i32,
425+
) -> Result<(), StoreError> {
426+
let table_name = match &self {
427+
Storage::Shared => ETHEREUM_BLOCKS_TABLE_NAME,
428+
Storage::Private(Schema { blocks, .. }) => &blocks.qname,
429+
};
430+
conn.batch_execute(&format!(
431+
"delete from {} WHERE number >= {} AND data->'block'->'data' = 'null'::jsonb;",
432+
table_name, lowest_block,
433+
))?;
434+
Ok(())
435+
}
436+
437+
pub(super) fn remove_cursor(
438+
&self,
439+
conn: &PgConnection,
440+
chain: &str,
441+
) -> Result<Option<BlockNumber>, StoreError> {
442+
use diesel::dsl::not;
443+
use public::ethereum_networks::dsl::*;
444+
445+
match update(
446+
ethereum_networks
447+
.filter(name.eq(chain))
448+
.filter(not(head_block_cursor.is_null())),
449+
)
450+
.set(head_block_cursor.eq(None as Option<String>))
451+
.returning(head_block_number)
452+
.get_result::<Option<i64>>(conn)
453+
.optional()
454+
{
455+
Ok(res) => match res {
456+
Some(opt_num) => match opt_num {
457+
Some(num) => Ok(Some(num as i32)),
458+
None => Ok(None),
459+
},
460+
None => Ok(None),
461+
},
462+
Err(e) => Err(e),
463+
}
464+
.map_err(Into::into)
465+
}
466+
421467
/// Insert a block. If the table already contains a block with the
422468
/// same hash, then overwrite that block since it may be adding
423469
/// transaction receipts. If `overwrite` is `true`, overwrite a
@@ -1553,6 +1599,18 @@ impl ChainStore {
15531599
.delete_blocks_by_hash(&conn, &self.chain, block_hashes)
15541600
}
15551601

1602+
pub fn cleanup_shallow_blocks(&self, lowest_block: i32) -> Result<(), StoreError> {
1603+
let conn = self.get_conn()?;
1604+
self.storage.cleanup_shallow_blocks(&conn, lowest_block)?;
1605+
Ok(())
1606+
}
1607+
1608+
// remove_cursor delete the chain_store cursor and return true if it was present
1609+
pub fn remove_cursor(&self, chain: &str) -> Result<Option<BlockNumber>, StoreError> {
1610+
let conn = self.get_conn()?;
1611+
self.storage.remove_cursor(&conn, chain)
1612+
}
1613+
15561614
pub fn truncate_block_cache(&self) -> Result<(), StoreError> {
15571615
let conn = self.get_conn()?;
15581616
self.storage.truncate_block_cache(&conn)?;

0 commit comments

Comments
 (0)