Skip to content

Commit 17e9e61

Browse files
graph,node,store: Add option to remove stale call_cache in graphman
Signed-off-by: Maksim Dimitrov <[email protected]>
1 parent 647be7c commit 17e9e61

File tree

5 files changed

+161
-3
lines changed

5 files changed

+161
-3
lines changed

graph/src/blockchain/mock.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,9 @@ impl ChainStore for MockChainStore {
571571
async fn clear_call_cache(&self, _from: BlockNumber, _to: BlockNumber) -> Result<(), Error> {
572572
unimplemented!()
573573
}
574+
async fn clear_stale_call_cache(&self, _ttl_days: i32) -> Result<(), Error> {
575+
unimplemented!()
576+
}
574577
fn chain_identifier(&self) -> Result<ChainIdentifier, Error> {
575578
unimplemented!()
576579
}

graph/src/components/store/traits.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,9 @@ pub trait ChainStore: ChainHeadStore {
598598
/// Clears call cache of the chain for the given `from` and `to` block number.
599599
async fn clear_call_cache(&self, from: BlockNumber, to: BlockNumber) -> Result<(), Error>;
600600

601+
/// Clears stale call cache entries for the given TTL in days.
602+
async fn clear_stale_call_cache(&self, ttl_days: i32) -> Result<(), Error>;
603+
601604
/// Return the chain identifier for this store.
602605
fn chain_identifier(&self) -> Result<ChainIdentifier, Error>;
603606

node/src/bin/manager.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -555,14 +555,18 @@ pub enum ChainCommand {
555555
pub enum CallCacheCommand {
556556
/// Remove the call cache of the specified chain.
557557
///
558-
/// Either remove entries in the range `--from` and `--to`, or remove
559-
/// the entire cache with `--remove-entire-cache`. Removing the entire
558+
/// Either remove entries in the range `--from` and `--to`,
559+
/// remove the cache for contracts that have not been accessed for the specified duration --ttl_days,
560+
/// or remove the entire cache with `--remove-entire-cache`. Removing the entire
560561
/// cache can reduce indexing performance significantly and should
561562
/// generally be avoided.
562563
Remove {
563564
/// Remove the entire cache
564565
#[clap(long, conflicts_with_all = &["from", "to"])]
565566
remove_entire_cache: bool,
567+
/// Remove the cache for contracts that have not been accessed in the last <TTL_DAYS> days
568+
#[clap(long, conflicts_with_all = &["from", "to", "remove-entire-cache"])]
569+
ttl_days: Option<i32>,
566570
/// Starting block number
567571
#[clap(long, short, conflicts_with = "remove-entire-cache", requires = "to")]
568572
from: Option<i32>,
@@ -1472,8 +1476,17 @@ async fn main() -> anyhow::Result<()> {
14721476
from,
14731477
to,
14741478
remove_entire_cache,
1479+
ttl_days,
14751480
} => {
14761481
let chain_store = ctx.chain_store(&chain_name)?;
1482+
if let Some(ttl_days) = ttl_days {
1483+
return commands::chain::clear_stale_call_cache(
1484+
chain_store,
1485+
ttl_days,
1486+
)
1487+
.await;
1488+
}
1489+
14771490
if !remove_entire_cache && from.is_none() && to.is_none() {
14781491
bail!("you must specify either --from and --to or --remove-entire-cache");
14791492
}

node/src/manager/commands/chain.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,18 @@ pub async fn clear_call_cache(
8181
Ok(())
8282
}
8383

84+
pub async fn clear_stale_call_cache(
85+
chain_store: Arc<ChainStore>,
86+
ttl_days: i32,
87+
) -> Result<(), Error> {
88+
println!(
89+
"Removing stale entries from the call cache for `{}`",
90+
chain_store.chain
91+
);
92+
chain_store.clear_stale_call_cache(ttl_days).await?;
93+
Ok(())
94+
}
95+
8496
pub async fn info(
8597
primary: ConnectionPool,
8698
store: Arc<BlockStore>,

store/postgres/src/chain_store.rs

Lines changed: 128 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ pub use data::Storage;
8383

8484
/// Encapuslate access to the blocks table for a chain.
8585
mod data {
86+
use crate::diesel::dsl::IntervalDsl;
8687
use diesel::sql_types::{Array, Binary, Bool, Nullable};
8788
use diesel::{connection::SimpleConnection, insert_into};
8889
use diesel::{delete, prelude::*, sql_query};
@@ -104,7 +105,7 @@ mod data {
104105
use graph::prelude::transaction_receipt::LightTransactionReceipt;
105106
use graph::prelude::web3::types::H256;
106107
use graph::prelude::{
107-
serde_json as json, BlockNumber, BlockPtr, CachedEthereumCall, Error, StoreError,
108+
serde_json as json, BlockNumber, BlockPtr, CachedEthereumCall, Error, Logger, StoreError,
108109
};
109110
use std::collections::HashMap;
110111
use std::convert::TryFrom;
@@ -1398,6 +1399,126 @@ mod data {
13981399
}
13991400
}
14001401

1402+
pub fn clear_stale_call_cache(
1403+
&self,
1404+
conn: &mut PgConnection,
1405+
logger: &Logger,
1406+
ttl_days: i32,
1407+
) -> Result<(), Error> {
1408+
// Delete cache entries in batches since there could be thousands of cache entries per contract
1409+
let mut total_deleted = 0;
1410+
let batch_size = 5000;
1411+
1412+
match self {
1413+
Storage::Shared => {
1414+
use public::eth_call_cache as cache;
1415+
use public::eth_call_meta as meta;
1416+
1417+
let stale_contracts = meta::table
1418+
.select(meta::contract_address)
1419+
.filter(
1420+
meta::accessed_at
1421+
.lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())),
1422+
)
1423+
.get_results::<Vec<u8>>(conn)?;
1424+
1425+
if stale_contracts.is_empty() {
1426+
return Ok(());
1427+
}
1428+
1429+
loop {
1430+
let next_batch = cache::table
1431+
.select(cache::id)
1432+
.filter(cache::contract_address.eq_any(&stale_contracts))
1433+
.limit(batch_size as i64)
1434+
.get_results::<Vec<u8>>(conn)?;
1435+
let deleted_count =
1436+
diesel::delete(cache::table.filter(cache::id.eq_any(&next_batch)))
1437+
.execute(conn)?;
1438+
1439+
total_deleted += deleted_count;
1440+
1441+
if deleted_count < batch_size {
1442+
break;
1443+
}
1444+
}
1445+
1446+
graph::slog::info!(
1447+
logger,
1448+
"Cleaned call cache: deleted {} entries for {} contracts",
1449+
total_deleted,
1450+
stale_contracts.len()
1451+
);
1452+
1453+
diesel::delete(
1454+
meta::table.filter(meta::contract_address.eq_any(&stale_contracts)),
1455+
)
1456+
.execute(conn)?;
1457+
1458+
Ok(())
1459+
}
1460+
Storage::Private(Schema {
1461+
call_cache,
1462+
call_meta,
1463+
..
1464+
}) => {
1465+
let select_query = format!(
1466+
"SELECT contract_address FROM {} \
1467+
WHERE accessed_at < CURRENT_DATE - interval '{} days'",
1468+
call_meta.qname, ttl_days
1469+
);
1470+
1471+
#[derive(QueryableByName)]
1472+
struct ContractAddress {
1473+
#[diesel(sql_type = Bytea)]
1474+
contract_address: Vec<u8>,
1475+
}
1476+
1477+
let all_stale_contracts: Vec<Vec<u8>> = sql_query(select_query)
1478+
.load::<ContractAddress>(conn)?
1479+
.into_iter()
1480+
.map(|row| row.contract_address)
1481+
.collect();
1482+
1483+
if all_stale_contracts.is_empty() {
1484+
graph::slog::info!(logger, "Cleaned call cache: no stale entries found");
1485+
return Ok(());
1486+
}
1487+
1488+
loop {
1489+
let delete_cache_query = format!(
1490+
"DELETE FROM {} WHERE id IN (
1491+
SELECT id FROM {}
1492+
WHERE contract_address = ANY($1)
1493+
LIMIT {}
1494+
)",
1495+
call_cache.qname, call_cache.qname, batch_size
1496+
);
1497+
1498+
let deleted_count = sql_query(delete_cache_query)
1499+
.bind::<Array<Bytea>, _>(&all_stale_contracts)
1500+
.execute(conn)?;
1501+
1502+
total_deleted += deleted_count;
1503+
1504+
if deleted_count < batch_size {
1505+
break;
1506+
}
1507+
}
1508+
1509+
let delete_meta_query = format!(
1510+
"DELETE FROM {} WHERE contract_address = ANY($1)",
1511+
call_meta.qname
1512+
);
1513+
sql_query(delete_meta_query)
1514+
.bind::<Array<Bytea>, _>(&all_stale_contracts)
1515+
.execute(conn)?;
1516+
1517+
Ok(())
1518+
}
1519+
}
1520+
}
1521+
14011522
pub(super) fn update_accessed_at(
14021523
&self,
14031524
conn: &mut PgConnection,
@@ -2508,6 +2629,12 @@ impl ChainStoreTrait for ChainStore {
25082629
Ok(())
25092630
}
25102631

2632+
async fn clear_stale_call_cache(&self, ttl_days: i32) -> Result<(), Error> {
2633+
let conn = &mut *self.get_conn()?;
2634+
self.storage
2635+
.clear_stale_call_cache(conn, &self.logger, ttl_days)
2636+
}
2637+
25112638
async fn transaction_receipts_in_block(
25122639
&self,
25132640
block_hash: &H256,

0 commit comments

Comments
 (0)