-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Feat/graphman/clear stale call cache #6186
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -83,6 +83,7 @@ pub use data::Storage; | |
|
||
/// Encapuslate access to the blocks table for a chain. | ||
mod data { | ||
use crate::diesel::dsl::IntervalDsl; | ||
use diesel::sql_types::{Array, Binary, Bool, Nullable}; | ||
use diesel::{connection::SimpleConnection, insert_into}; | ||
use diesel::{delete, prelude::*, sql_query}; | ||
|
@@ -104,7 +105,7 @@ mod data { | |
use graph::prelude::transaction_receipt::LightTransactionReceipt; | ||
use graph::prelude::web3::types::H256; | ||
use graph::prelude::{ | ||
serde_json as json, BlockNumber, BlockPtr, CachedEthereumCall, Error, StoreError, | ||
serde_json as json, BlockNumber, BlockPtr, CachedEthereumCall, Error, Logger, StoreError, | ||
}; | ||
use std::collections::HashMap; | ||
use std::convert::TryFrom; | ||
|
@@ -1398,6 +1399,126 @@ mod data { | |
} | ||
} | ||
|
||
pub fn clear_stale_call_cache( | ||
&self, | ||
conn: &mut PgConnection, | ||
logger: &Logger, | ||
ttl_days: i32, | ||
) -> Result<(), Error> { | ||
// Delete cache entries in batches since there could be thousands of cache entries per contract | ||
let mut total_deleted = 0; | ||
let batch_size = 5000; | ||
|
||
match self { | ||
Storage::Shared => { | ||
use public::eth_call_cache as cache; | ||
use public::eth_call_meta as meta; | ||
|
||
let stale_contracts = meta::table | ||
.select(meta::contract_address) | ||
.filter( | ||
meta::accessed_at | ||
.lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())), | ||
) | ||
.get_results::<Vec<u8>>(conn)?; | ||
|
||
if stale_contracts.is_empty() { | ||
return Ok(()); | ||
} | ||
|
||
loop { | ||
let next_batch = cache::table | ||
.select(cache::id) | ||
.filter(cache::contract_address.eq_any(&stale_contracts)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be better to batch by contracts rather than cache entries? Im wondering because the number of stale_contracts itself can be very huge, in our production i just checked and the number for >7days is 9624844. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is this case, probably both have to be batched There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Btw is that number in the public There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no its just for ethereum in the private schema There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll look into adjusting the queries to make them more efficient, but In general I doubt this approach is preferable, even if optimized for such a number of records. Depending on the avg number of call cache records per contract, it could take days to clean up this many. |
||
.limit(batch_size as i64) | ||
.get_results::<Vec<u8>>(conn)?; | ||
let deleted_count = | ||
diesel::delete(cache::table.filter(cache::id.eq_any(&next_batch))) | ||
.execute(conn)?; | ||
|
||
total_deleted += deleted_count; | ||
|
||
if deleted_count < batch_size { | ||
dimitrovmaksim marked this conversation as resolved.
Show resolved
Hide resolved
|
||
break; | ||
} | ||
} | ||
|
||
graph::slog::info!( | ||
logger, | ||
"Cleaned call cache: deleted {} entries for {} contracts", | ||
total_deleted, | ||
stale_contracts.len() | ||
); | ||
|
||
diesel::delete( | ||
meta::table.filter(meta::contract_address.eq_any(&stale_contracts)), | ||
) | ||
.execute(conn)?; | ||
|
||
Ok(()) | ||
} | ||
Storage::Private(Schema { | ||
call_cache, | ||
call_meta, | ||
.. | ||
}) => { | ||
let select_query = format!( | ||
"SELECT contract_address FROM {} \ | ||
WHERE accessed_at < CURRENT_DATE - interval '{} days'", | ||
call_meta.qname, ttl_days | ||
); | ||
|
||
#[derive(QueryableByName)] | ||
struct ContractAddress { | ||
#[diesel(sql_type = Bytea)] | ||
contract_address: Vec<u8>, | ||
} | ||
|
||
let all_stale_contracts: Vec<Vec<u8>> = sql_query(select_query) | ||
.load::<ContractAddress>(conn)? | ||
.into_iter() | ||
.map(|row| row.contract_address) | ||
.collect(); | ||
|
||
if all_stale_contracts.is_empty() { | ||
graph::slog::info!(logger, "Cleaned call cache: no stale entries found"); | ||
dimitrovmaksim marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return Ok(()); | ||
} | ||
|
||
loop { | ||
let delete_cache_query = format!( | ||
"DELETE FROM {} WHERE id IN ( | ||
SELECT id FROM {} | ||
WHERE contract_address = ANY($1) | ||
LIMIT {} | ||
)", | ||
call_cache.qname, call_cache.qname, batch_size | ||
); | ||
|
||
let deleted_count = sql_query(delete_cache_query) | ||
.bind::<Array<Bytea>, _>(&all_stale_contracts) | ||
.execute(conn)?; | ||
|
||
total_deleted += deleted_count; | ||
|
||
if deleted_count < batch_size { | ||
dimitrovmaksim marked this conversation as resolved.
Show resolved
Hide resolved
|
||
break; | ||
} | ||
} | ||
|
||
let delete_meta_query = format!( | ||
"DELETE FROM {} WHERE contract_address = ANY($1)", | ||
call_meta.qname | ||
); | ||
sql_query(delete_meta_query) | ||
.bind::<Array<Bytea>, _>(&all_stale_contracts) | ||
.execute(conn)?; | ||
|
||
Ok(()) | ||
} | ||
} | ||
} | ||
|
||
pub(super) fn update_accessed_at( | ||
&self, | ||
conn: &mut PgConnection, | ||
|
@@ -2508,6 +2629,12 @@ impl ChainStoreTrait for ChainStore { | |
Ok(()) | ||
} | ||
|
||
async fn clear_stale_call_cache(&self, ttl_days: i32) -> Result<(), Error> { | ||
let conn = &mut *self.get_conn()?; | ||
self.storage | ||
.clear_stale_call_cache(conn, &self.logger, ttl_days) | ||
} | ||
|
||
async fn transaction_receipts_in_block( | ||
&self, | ||
block_hash: &H256, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't allow negative values here