Skip to content

Commit cc4ff1d

Browse files
committed
wip
1 parent 00832c3 commit cc4ff1d

File tree

3 files changed

+402
-135
lines changed

3 files changed

+402
-135
lines changed

executors/src/eoa/store.rs

Lines changed: 275 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,22 @@ impl EoaExecutorStore {
403403
None => format!("eoa_executor:health:{chain_id}:{eoa}"),
404404
}
405405
}
406+
407+
/// Name of the sorted set for completed transactions per EOA (for pruning)
408+
fn completed_transactions_per_eoa_key_name(&self, eoa: Address, chain_id: u64) -> String {
409+
match &self.namespace {
410+
Some(ns) => format!("{ns}:eoa_executor:completed:{chain_id}:{eoa}"),
411+
None => format!("eoa_executor:completed:{chain_id}:{eoa}"),
412+
}
413+
}
414+
415+
/// Name of the sorted set for completed transactions globally (for pruning)
416+
fn completed_transactions_global_key_name(&self) -> String {
417+
match &self.namespace {
418+
Some(ns) => format!("{ns}:eoa_executor:completed_global"),
419+
None => "eoa_executor:completed_global".to_string(),
420+
}
421+
}
406422
}
407423

408424
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -1417,6 +1433,8 @@ impl EoaExecutorStore {
14171433
let submitted_key = self.submitted_transactions_zset_name(eoa, chain_id);
14181434
let hash_to_id_key = self.transaction_hash_to_id_key_name(hash);
14191435
let tx_data_key = self.transaction_data_key_name(transaction_id);
1436+
let completed_eoa_key = self.completed_transactions_per_eoa_key_name(eoa, chain_id);
1437+
let completed_global_key = self.completed_transactions_global_key_name();
14201438
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;
14211439

14221440
// Remove this hash:id from submitted
@@ -1430,6 +1448,10 @@ impl EoaExecutorStore {
14301448
pipeline.hset(&tx_data_key, "completed_at", now);
14311449
pipeline.hset(&tx_data_key, "receipt", receipt);
14321450
pipeline.hset(&tx_data_key, "status", "confirmed");
1451+
1452+
// Add to completed transactions tracking for pruning
1453+
pipeline.zadd(&completed_eoa_key, transaction_id, now);
1454+
pipeline.zadd(&completed_global_key, transaction_id, now);
14331455
})
14341456
.await
14351457
}
@@ -1484,7 +1506,7 @@ impl EoaExecutorStore {
14841506
eoa: Address,
14851507
chain_id: u64,
14861508
worker_id: &str,
1487-
failures: Vec<crate::eoa::worker::TransactionFailure>,
1509+
failures: Vec<crate::eoa::worker::TransactionReplacement>,
14881510
) -> Result<(), TransactionStoreError> {
14891511
if failures.is_empty() {
14901512
return Ok(());
@@ -1532,6 +1554,8 @@ impl EoaExecutorStore {
15321554

15331555
self.with_lock_check(eoa, chain_id, worker_id, |pipeline| {
15341556
let submitted_key = self.submitted_transactions_zset_name(eoa, chain_id);
1557+
let completed_eoa_key = self.completed_transactions_per_eoa_key_name(eoa, chain_id);
1558+
let completed_global_key = self.completed_transactions_global_key_name();
15351559
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;
15361560

15371561
for success in &successes {
@@ -1548,6 +1572,10 @@ impl EoaExecutorStore {
15481572
pipeline.hset(&tx_data_key, "completed_at", now);
15491573
pipeline.hset(&tx_data_key, "receipt", &success.receipt_data);
15501574
pipeline.hset(&tx_data_key, "status", "confirmed");
1575+
1576+
// Add to completed transactions tracking for pruning
1577+
pipeline.zadd(&completed_eoa_key, &success.transaction_id, now);
1578+
pipeline.zadd(&completed_global_key, &success.transaction_id, now);
15511579
}
15521580
})
15531581
.await
@@ -1713,6 +1741,197 @@ impl EoaExecutorStore {
17131741

17141742
Ok(())
17151743
}
1744+
1745+
/// Get count of submitted transactions awaiting confirmation
1746+
pub async fn get_submitted_transactions_count(
1747+
&self,
1748+
eoa: Address,
1749+
chain_id: u64,
1750+
) -> Result<u64, TransactionStoreError> {
1751+
let submitted_key = self.submitted_transactions_zset_name(eoa, chain_id);
1752+
let mut conn = self.redis.clone();
1753+
1754+
let count: u64 = conn.zcard(&submitted_key).await?;
1755+
Ok(count)
1756+
}
1757+
1758+
/// Fail a transaction that's in the pending queue
1759+
pub async fn fail_pending_transaction(
1760+
&self,
1761+
eoa: Address,
1762+
chain_id: u64,
1763+
worker_id: &str,
1764+
transaction_id: &str,
1765+
failure_reason: &str,
1766+
) -> Result<(), TransactionStoreError> {
1767+
self.with_lock_check(eoa, chain_id, worker_id, |pipeline| {
1768+
let pending_key = self.pending_transactions_list_name(eoa, chain_id);
1769+
let tx_data_key = self.transaction_data_key_name(transaction_id);
1770+
let completed_eoa_key = self.completed_transactions_per_eoa_key_name(eoa, chain_id);
1771+
let completed_global_key = self.completed_transactions_global_key_name();
1772+
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;
1773+
1774+
// Remove from pending queue
1775+
pipeline.lrem(&pending_key, 0, transaction_id);
1776+
1777+
// Update transaction data with failure
1778+
pipeline.hset(&tx_data_key, "completed_at", now);
1779+
pipeline.hset(&tx_data_key, "failure_reason", failure_reason);
1780+
pipeline.hset(&tx_data_key, "status", "failed");
1781+
1782+
// Add to completed transactions tracking for pruning
1783+
pipeline.zadd(&completed_eoa_key, transaction_id, now);
1784+
pipeline.zadd(&completed_global_key, transaction_id, now);
1785+
})
1786+
.await
1787+
}
1788+
1789+
/// Fail a transaction that's in the borrowed state (we know the nonce)
1790+
pub async fn fail_borrowed_transaction(
1791+
&self,
1792+
eoa: Address,
1793+
chain_id: u64,
1794+
worker_id: &str,
1795+
transaction_id: &str,
1796+
nonce: u64,
1797+
failure_reason: &str,
1798+
) -> Result<(), TransactionStoreError> {
1799+
self.with_lock_check(eoa, chain_id, worker_id, |pipeline| {
1800+
let borrowed_key = self.borrowed_transactions_hashmap_name(eoa, chain_id);
1801+
let tx_data_key = self.transaction_data_key_name(transaction_id);
1802+
let completed_eoa_key = self.completed_transactions_per_eoa_key_name(eoa, chain_id);
1803+
let completed_global_key = self.completed_transactions_global_key_name();
1804+
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;
1805+
1806+
// Remove from borrowed state using the known nonce
1807+
pipeline.hdel(&borrowed_key, nonce.to_string());
1808+
1809+
// Update transaction data with failure
1810+
pipeline.hset(&tx_data_key, "completed_at", now);
1811+
pipeline.hset(&tx_data_key, "failure_reason", failure_reason);
1812+
pipeline.hset(&tx_data_key, "status", "failed");
1813+
1814+
// Add to completed transactions tracking for pruning
1815+
pipeline.zadd(&completed_eoa_key, transaction_id, now);
1816+
pipeline.zadd(&completed_global_key, transaction_id, now);
1817+
})
1818+
.await
1819+
}
1820+
1821+
/// Prune old completed transactions for a specific EOA if it exceeds the cap
1822+
pub async fn prune_completed_transactions_for_eoa(
1823+
&self,
1824+
eoa: Address,
1825+
chain_id: u64,
1826+
max_per_eoa: u64,
1827+
batch_size: u64,
1828+
) -> Result<u64, TransactionStoreError> {
1829+
let completed_eoa_key = self.completed_transactions_per_eoa_key_name(eoa, chain_id);
1830+
let completed_global_key = self.completed_transactions_global_key_name();
1831+
let mut conn = self.redis.clone();
1832+
1833+
// Check current count
1834+
let current_count: u64 = conn.zcard(&completed_eoa_key).await?;
1835+
if current_count <= max_per_eoa {
1836+
return Ok(0); // No pruning needed
1837+
}
1838+
1839+
let to_remove = current_count - max_per_eoa;
1840+
let batch_to_remove = to_remove.min(batch_size);
1841+
1842+
// Get oldest transactions (lowest scores)
1843+
let oldest_transactions: Vec<String> = conn
1844+
.zrange(&completed_eoa_key, 0, (batch_to_remove - 1) as isize)
1845+
.await?;
1846+
1847+
if oldest_transactions.is_empty() {
1848+
return Ok(0);
1849+
}
1850+
1851+
// Remove transaction data and tracking
1852+
for transaction_id in &oldest_transactions {
1853+
let tx_data_key = self.transaction_data_key_name(transaction_id);
1854+
let _: () = conn.del(&tx_data_key).await?;
1855+
}
1856+
1857+
// Remove from tracking sets
1858+
for transaction_id in &oldest_transactions {
1859+
let _: () = conn.zrem(&completed_eoa_key, transaction_id).await?;
1860+
let _: () = conn.zrem(&completed_global_key, transaction_id).await?;
1861+
}
1862+
1863+
Ok(oldest_transactions.len() as u64)
1864+
}
1865+
1866+
/// Prune old completed transactions globally if it exceeds the global cap
1867+
pub async fn prune_completed_transactions_globally(
1868+
&self,
1869+
max_global: u64,
1870+
batch_size: u64,
1871+
) -> Result<u64, TransactionStoreError> {
1872+
let completed_global_key = self.completed_transactions_global_key_name();
1873+
let mut conn = self.redis.clone();
1874+
1875+
// Check current count
1876+
let current_count: u64 = conn.zcard(&completed_global_key).await?;
1877+
if current_count <= max_global {
1878+
return Ok(0); // No pruning needed
1879+
}
1880+
1881+
let to_remove = current_count - max_global;
1882+
let batch_to_remove = to_remove.min(batch_size);
1883+
1884+
// Get oldest transactions (lowest scores)
1885+
let oldest_transactions: Vec<String> = conn
1886+
.zrange(&completed_global_key, 0, (batch_to_remove - 1) as isize)
1887+
.await?;
1888+
1889+
if oldest_transactions.is_empty() {
1890+
return Ok(0);
1891+
}
1892+
1893+
// Remove transaction data
1894+
for transaction_id in &oldest_transactions {
1895+
let tx_data_key = self.transaction_data_key_name(transaction_id);
1896+
let _: () = conn.del(&tx_data_key).await?;
1897+
}
1898+
1899+
// Remove from global tracking
1900+
for transaction_id in &oldest_transactions {
1901+
let _: () = conn.zrem(&completed_global_key, transaction_id).await?;
1902+
}
1903+
1904+
// Also remove from per-EOA tracking sets (we need to scan for these)
1905+
// Note: This is less efficient but necessary to keep consistency
1906+
self.remove_transactions_from_all_eoa_tracking(&oldest_transactions)
1907+
.await?;
1908+
1909+
Ok(oldest_transactions.len() as u64)
1910+
}
1911+
1912+
/// Helper to remove transactions from all EOA tracking sets
1913+
async fn remove_transactions_from_all_eoa_tracking(
1914+
&self,
1915+
transaction_ids: &[String],
1916+
) -> Result<(), TransactionStoreError> {
1917+
let mut conn = self.redis.clone();
1918+
let pattern = match &self.namespace {
1919+
Some(ns) => format!("{ns}:eoa_executor:completed:*"),
1920+
None => "eoa_executor:completed:*".to_string(),
1921+
};
1922+
1923+
// Get all EOA-specific completed transaction keys
1924+
let keys: Vec<String> = conn.keys(&pattern).await?;
1925+
1926+
// Remove transactions from each EOA tracking set
1927+
for key in keys {
1928+
for transaction_id in transaction_ids {
1929+
let _: () = conn.zrem(&key, transaction_id).await?;
1930+
}
1931+
}
1932+
1933+
Ok(())
1934+
}
17161935
}
17171936

17181937
// Additional error types
@@ -1974,7 +2193,7 @@ impl<'a> ScopedEoaExecutorStore<'a> {
19742193
/// Efficiently batch fail and requeue multiple transactions
19752194
pub async fn batch_fail_and_requeue_transactions(
19762195
&self,
1977-
failures: Vec<crate::eoa::worker::TransactionFailure>,
2196+
failures: Vec<crate::eoa::worker::TransactionReplacement>,
19782197
) -> Result<(), TransactionStoreError> {
19792198
self.store
19802199
.batch_fail_and_requeue_transactions(self.eoa, self.chain_id, &self.worker_id, failures)
@@ -2153,4 +2372,58 @@ impl<'a> ScopedEoaExecutorStore<'a> {
21532372
) -> Result<Option<TransactionData>, TransactionStoreError> {
21542373
self.store.get_transaction_data(transaction_id).await
21552374
}
2375+
2376+
/// Get count of submitted transactions awaiting confirmation
2377+
pub async fn get_submitted_transactions_count(&self) -> Result<u64, TransactionStoreError> {
2378+
self.store
2379+
.get_submitted_transactions_count(self.eoa, self.chain_id)
2380+
.await
2381+
}
2382+
2383+
/// Fail a transaction that's in the pending queue
2384+
pub async fn fail_pending_transaction(
2385+
&self,
2386+
transaction_id: &str,
2387+
failure_reason: &str,
2388+
) -> Result<(), TransactionStoreError> {
2389+
self.store
2390+
.fail_pending_transaction(
2391+
self.eoa,
2392+
self.chain_id,
2393+
&self.worker_id,
2394+
transaction_id,
2395+
failure_reason,
2396+
)
2397+
.await
2398+
}
2399+
2400+
/// Fail a transaction that's in the borrowed state (we know the nonce)
2401+
pub async fn fail_borrowed_transaction(
2402+
&self,
2403+
transaction_id: &str,
2404+
nonce: u64,
2405+
failure_reason: &str,
2406+
) -> Result<(), TransactionStoreError> {
2407+
self.store
2408+
.fail_borrowed_transaction(
2409+
self.eoa,
2410+
self.chain_id,
2411+
&self.worker_id,
2412+
transaction_id,
2413+
nonce,
2414+
failure_reason,
2415+
)
2416+
.await
2417+
}
2418+
2419+
/// Prune old completed transactions for this EOA if it exceeds the cap
2420+
pub async fn prune_completed_transactions(
2421+
&self,
2422+
max_per_eoa: u64,
2423+
batch_size: u64,
2424+
) -> Result<u64, TransactionStoreError> {
2425+
self.store
2426+
.prune_completed_transactions_for_eoa(self.eoa, self.chain_id, max_per_eoa, batch_size)
2427+
.await
2428+
}
21562429
}

0 commit comments

Comments
 (0)