apollo_mempool: impl FIFO mempool#12856
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
6a3d616 to
c2e542f
Compare
6c35d05 to
cbf9ccb
Compare
cbf9ccb to
301c306
Compare
c2e542f to
97080c0
Compare
301c306 to
9a9197a
Compare
ff66c0e to
bf8061c
Compare
97080c0 to
e5f0d51
Compare
bf8061c to
f848224
Compare
ayeletstarkware
left a comment
There was a problem hiding this comment.
@ayeletstarkware made 1 comment.
Reviewable status: 0 of 10 files reviewed, 1 unresolved discussion (waiting on matanl-starkware and ron-starkware).
e5f0d51 to
1ef60f5
Compare
88e6d0b to
b4775dc
Compare
1ef60f5 to
21674b7
Compare
b4775dc to
b1ff608
Compare
90b9b4e to
a837dfb
Compare
b1ff608 to
ce54e6e
Compare
a837dfb to
f8cb984
Compare
ce54e6e to
b5587a9
Compare
c97a9dc to
025783b
Compare
matanl-starkware
left a comment
There was a problem hiding this comment.
@matanl-starkware made 6 comments and resolved 8 discussions.
Reviewable status: 1 of 10 files reviewed, 19 unresolved discussions (waiting on ayeletstarkware and ron-starkware).
crates/apollo_mempool/src/fifo_transaction_queue.rs line 29 at r12 (raw file):
// Transactions that were returned by get_txs and may need rewind during commit. staged_tx_refs: Vec<TransactionReference>, }
I suggest to consider the following structure:
- A transaction with metadata
struct X {
tx: TransactionReference,
timestamp: Option,
original_block: Option, # Will be used to skip empty blocks
} - One data structure to hold all transactions (i.e. the "pool")
hash_to_tx: HashMap<Hash, X> - 2 "queues" for pending and staged txs:
pending: VecDequeue
staged: Vec
-- Alternative is to keep an enum "state" alongside the metadata, and change it from "Pending" to "Staged" to something else when the transaction moves between states.
Code quote:
pub struct FifoTransactionQueue {
// Queue of transaction hashes ordered by arrival time (FIFO).
queue: VecDeque<TransactionHash>,
// Map from transaction hash to transaction reference for efficient lookups.
hash_to_tx: HashMap<TransactionHash, TransactionReference>,
// Map from transaction hash to timestamp. Entries are kept after pop for potential rewind,
// and cleaned up after commit via remove_txs.
hash_to_timestamp: HashMap<TransactionHash, UnixTimestamp>,
// Last timestamp returned by get_timestamp_for_batch() - used to filter transactions in
// pop_ready_chunk.
last_returned_timestamp: Option<UnixTimestamp>,
// Transactions that were returned by get_txs and may need rewind during commit.
staged_tx_refs: Vec<TransactionReference>,
}crates/apollo_mempool/src/fifo_transaction_queue.rs line 49 at r12 (raw file):
grouped_by_address.entry(tx.address).or_default().push(*tx); grouped_by_address })
avoids moving the HashMap every iteration
Suggestion:
let mut grouped_by_address = HashMap::new();
for &tx in staged_tx_refs {
grouped_by_address.entry(tx.address).or_default().push(tx);
}
grouped_by_addresscrates/apollo_mempool/src/fifo_transaction_queue.rs line 85 at r12 (raw file):
.map(|(&address, _)| address) .collect();
if addresses_to_rewind is empty, may stop here.
crates/apollo_mempool/src/fifo_transaction_queue.rs line 111 at r12 (raw file):
for tx_hash in committed_tx_hashes { self.hash_to_timestamp.remove(&tx_hash); }
Consider having a single loop, skipping entries that rewound_hashes.contains(hash)
Code quote:
let committed_tx_hashes: Vec<TransactionHash> = self
.staged_tx_refs
.iter()
.filter(|tx_ref| !rewound_hashes.contains(&tx_ref.tx_hash))
.map(|tx_ref| tx_ref.tx_hash)
.collect();
for tx_hash in committed_tx_hashes {
self.hash_to_timestamp.remove(&tx_hash);
}crates/apollo_mempool/src/fifo_transaction_queue.rs line 114 at r12 (raw file):
} fn rewind_tx(&mut self, tx_ref: TransactionReference) -> TransactionHash {
It's like insert(), but pushed to front instead of back.
Consider combining into a single function, with a bool flag to determine the queue's end.
Code quote:
rewind_txcrates/apollo_mempool/src/fifo_transaction_queue.rs line 127 at r12 (raw file):
self.queue.push_front(tx_hash); self.hash_to_tx.insert(tx_hash, tx_ref); tx_hash
Why do we have to return it?
Code quote:
tx_hash025783b to
ecacb51
Compare
fa47fa1 to
1eab5c8
Compare
1eab5c8 to
8c4a2b6
Compare
ecacb51 to
060f077
Compare
ayeletstarkware
left a comment
There was a problem hiding this comment.
@ayeletstarkware made 6 comments and resolved 1 discussion.
Reviewable status: 1 of 10 files reviewed, 18 unresolved discussions (waiting on matanl-starkware and ron-starkware).
crates/apollo_mempool/src/fifo_transaction_queue.rs line 29 at r12 (raw file):
Previously, matanl-starkware (Matan Lior) wrote…
I suggest to consider the following structure:
- A transaction with metadata
struct X {
tx: TransactionReference,
timestamp: Option,
original_block: Option, # Will be used to skip empty blocks
}- One data structure to hold all transactions (i.e. the "pool")
hash_to_tx: HashMap<Hash, X>- 2 "queues" for pending and staged txs:
pending: VecDequeue
staged: Vec
-- Alternative is to keep an enum "state" alongside the metadata, and change it from "Pending" to "Staged" to something else when the transaction moves between states.
discussed offline, done
crates/apollo_mempool/src/fifo_transaction_queue.rs line 49 at r12 (raw file):
Previously, matanl-starkware (Matan Lior) wrote…
avoids moving the
HashMapevery iteration
Done.
crates/apollo_mempool/src/fifo_transaction_queue.rs line 85 at r12 (raw file):
Previously, matanl-starkware (Matan Lior) wrote…
if addresses_to_rewind is empty, may stop here.
Done.
crates/apollo_mempool/src/fifo_transaction_queue.rs line 111 at r12 (raw file):
Previously, matanl-starkware (Matan Lior) wrote…
Consider having a single loop, skipping entries that rewound_hashes.contains(hash)
Done.
crates/apollo_mempool/src/fifo_transaction_queue.rs line 114 at r12 (raw file):
Previously, matanl-starkware (Matan Lior) wrote…
It's like insert(), but pushed to front instead of back.
Consider combining into a single function, with a bool flag to determine the queue's end.
done
crates/apollo_mempool/src/fifo_transaction_queue.rs line 127 at r12 (raw file):
Previously, matanl-starkware (Matan Lior) wrote…
Why do we have to return it?
stale.
we need to return the hash in rewind_txs which is called by mempool.
mempool skips rewound txs when removing rejected txs.
matanl-starkware
left a comment
There was a problem hiding this comment.
@matanl-starkware reviewed 9 files and all commit messages, made 9 comments, and resolved 18 discussions.
Reviewable status: all files reviewed, 9 unresolved discussions (waiting on ayeletstarkware and ron-starkware).
crates/apollo_mempool/src/fifo_transaction_queue.rs line 100 at r14 (raw file):
self.staged_txs .iter() .rev()
Why reversing?
(IMO this function should not care about later insertion order)
Code quote:
.rev()crates/apollo_mempool/src/fifo_transaction_queue.rs line 108 at r14 (raw file):
}) .copied() .collect()
Suggestion:
Single filter and map tx.tx_reference to a short var
Code quote (i):
self.staged_txs
.iter()
.rev()
.filter(|tx| addresses_to_rewind.contains(&tx.tx_reference.address))
.filter(|tx| {
committed_nonces
.get(&tx.tx_reference.address)
.is_none_or(|&committed_nonce| tx.tx_reference.nonce >= committed_nonce)
})
.copied()
.collect()Code snippet (ii):
self.staged_txs
.iter()
.filter(|tx| {
let tx_ref = &tx.tx_reference;
if !addresses_to_rewind.contains(&tx_ref.address) {
return false;
}
committed_nonces
.get(&tx_ref.address)
.is_none_or(|&committed_nonce| tx_ref.nonce >= committed_nonce)
})
.copied()
.collect()crates/apollo_mempool/src/fifo_transaction_queue.rs line 111 at r14 (raw file):
} fn enqueue_tx(&mut self, tx: FifoTransaction, insert_to_front: bool) {
Consider having an Enum for better call-site readability
Code quote:
insert_to_front: boocrates/apollo_mempool/src/fifo_transaction_queue.rs line 149 at r14 (raw file):
}; let tx_timestamp = front_tx.timestamp;
Use a smaller block for front_tx existence to avoid the copied().
Code quote (i):
let Some(front_tx) = self.queue.front().copied() else {
break;
};
let tx_timestamp = front_tx.timestamp;Code snippet (ii):
let front_tx_timestamp = {
let Some(front_tx) = self.queue.front() else {
break;
};
front_tx.timestamp
};crates/apollo_mempool/src/fifo_transaction_queue.rs line 184 at r14 (raw file):
}); removed }
Please comment that 'return true' means that the address was removed
Code quote (i):
fn remove_by_address(&mut self, address: ContractAddress) -> bool {
let mut removed = false;
self.queue.retain(|tx| {
let keep = tx.tx_reference.address != address;
if !keep {
removed = true;
}
keep
});
self.staged_txs.retain(|tx| {
let keep = tx.tx_reference.address != address;
if !keep {
removed = true;
}
keep
});
removed
}Code snippet (ii):
fn remove_by_address(&mut self, address: ContractAddress) -> bool {
let len_before = self.queue.len() + self.staged_txs.len();
self.queue.retain(|tx| tx.tx_reference.address != address);
self.staged_txs.retain(|tx| tx.tx_reference.address != address);
let len_after = self.queue.len() + self.staged_txs.len();
len_before != len_after
}crates/apollo_mempool/src/fifo_transaction_queue.rs line 187 at r14 (raw file):
fn remove_txs(&mut self, txs: &[TransactionReference]) -> Vec<TransactionReference> { let tx_hashes: HashSet<TransactionHash> = txs.iter().map(|tx| tx.tx_hash).collect();
If you'd use a mutable HashSet here, you would be able to call tx_hashes.remove() instead of contains().
That would make the set smaller already on the first iteration.
Suggestion:
let mut tx_hashes: HashSet<_> = txs.iter().map(|tx| tx.tx_hash).collect();crates/apollo_mempool/src/fifo_transaction_queue.rs line 245 at r14 (raw file):
let txs_to_rewind = self.collect_txs_to_rewind(committed_nonces, rejected_tx_hashes); let rewound_hashes: IndexSet<TransactionHash> = txs_to_rewind .into_iter()
IMO this is the correct place to go in reverse order
Code quote:
.into_iter()crates/apollo_mempool/src/mempool.rs line 284 at r14 (raw file):
} pub fn get_timestamp(&mut self) -> UnixTimestamp {
We need to discuss this with some stakeholders.
We've added a dependency between Consensus->Batcher->Mempool regarding block creation.
Meaning that when Consensus needs to propose a new block, if either Batcher or Mempool is down, it will fail.
Unlike the "unable to batch" error, the timestamp error is much more severe and occurs earlier in the proposal process.
@ron-starkware FYI
Code quote:
pub fn get_timestamp060f077 to
8f292c7
Compare
ayeletstarkware
left a comment
There was a problem hiding this comment.
@ayeletstarkware made 9 comments.
Reviewable status: all files reviewed, 9 unresolved discussions (waiting on matanl-starkware and ron-starkware).
crates/apollo_mempool/src/fifo_transaction_queue.rs line 100 at r14 (raw file):
Previously, matanl-starkware (Matan Lior) wrote…
Why reversing?
(IMO this function should not care about later insertion order)
Done.
crates/apollo_mempool/src/fifo_transaction_queue.rs line 108 at r14 (raw file):
Previously, matanl-starkware (Matan Lior) wrote…
Suggestion:
Singlefilterand map tx.tx_reference to a short var
Done.
crates/apollo_mempool/src/fifo_transaction_queue.rs line 111 at r14 (raw file):
Previously, matanl-starkware (Matan Lior) wrote…
Consider having an Enum for better call-site readability
thought it's more complicated
but no problem, done
crates/apollo_mempool/src/fifo_transaction_queue.rs line 149 at r14 (raw file):
Previously, matanl-starkware (Matan Lior) wrote…
Use a smaller block for front_tx existence to avoid the copied().
Done.
crates/apollo_mempool/src/fifo_transaction_queue.rs line 184 at r14 (raw file):
Previously, matanl-starkware (Matan Lior) wrote…
Please comment that 'return true' means that the address was removed
Done.
crates/apollo_mempool/src/fifo_transaction_queue.rs line 187 at r14 (raw file):
Previously, matanl-starkware (Matan Lior) wrote…
If you'd use a mutable HashSet here, you would be able to call tx_hashes.remove() instead of contains().
That would make the set smaller already on the first iteration.
Done.
crates/apollo_mempool/src/fifo_transaction_queue.rs line 245 at r14 (raw file):
Previously, matanl-starkware (Matan Lior) wrote…
IMO this is the correct place to go in reverse order
Done.
crates/apollo_mempool/src/mempool.rs line 284 at r14 (raw file):
Previously, matanl-starkware (Matan Lior) wrote…
We need to discuss this with some stakeholders.
We've added a dependency between Consensus->Batcher->Mempool regarding block creation.
Meaning that when Consensus needs to propose a new block, if either Batcher or Mempool is down, it will fail.
Unlike the "unable to batch" error, the timestamp error is much more severe and occurs earlier in the proposal process.
@ron-starkware FYI
discussed offline, updated Ron as well :)
matanl-starkware
left a comment
There was a problem hiding this comment.
@matanl-starkware reviewed 1 file and all commit messages, made 1 comment, and resolved 9 discussions.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on ayeletstarkware and ron-starkware).
crates/apollo_mempool/src/fifo_transaction_queue.rs line 197 at r15 (raw file):
self.staged_txs.retain(|tx| { let tx_hash = tx.tx_reference.tx_hash; if tx_hashes.remove(&tx_hash) || removed_hashes.contains(&tx_hash) {
Why?
Anyhow removed_hashes is a Set...
Code quote:
|| removed_hashes.contains(&tx_hash)8f292c7 to
1f06daa
Compare
ayeletstarkware
left a comment
There was a problem hiding this comment.
@ayeletstarkware made 1 comment.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on matanl-starkware and ron-starkware).
crates/apollo_mempool/src/fifo_transaction_queue.rs line 197 at r15 (raw file):
Previously, matanl-starkware (Matan Lior) wrote…
Why?
Anyhow removed_hashes is a Set...
Done.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
1f06daa to
b0b64d4
Compare
ayeletstarkware
left a comment
There was a problem hiding this comment.
@ayeletstarkware resolved 1 discussion.
Reviewable status: 9 of 10 files reviewed, 1 unresolved discussion (waiting on matanl-starkware and ron-starkware).
matanl-starkware
left a comment
There was a problem hiding this comment.
@matanl-starkware reviewed 2 files and all commit messages, and resolved 1 discussion.
Reviewable status:complete! all files reviewed, all discussions resolved (waiting on ron-starkware).

Note
Medium Risk
Touches core mempool queuing/commit/rewind logic and introduces dynamic dispatch, so regressions could affect transaction ordering and removal semantics. Risk is mitigated by behavior-mode scoping plus substantial new FIFO coverage and updated integration tests.
Overview
Implements an Echonet-specific FIFO transaction queue and wires it into
Mempoolvia aBox<dyn TransactionQueueTrait>, selecting FIFO vs fee-priority behavior based onbehavior_mode.FIFO mode now gates
get_txsby a per-batch timestamp (fetched from the recorder and persisted across empty-queue periods), enqueues all transactions immediately (no declare delay / nonce gating), and rewinds staged transactions oncommit_blockby pushing them back to the front while preserving original order.The queue trait is extended with
RewindData+resolve_timestampand rewind now returns rewound hashes; mempool commit/rejection/expiry paths were adjusted to handle FIFO semantics, and extensive new FIFO-focused tests were added while enabling previously ignored recorder integration tests.Written by Cursor Bugbot for commit b0b64d4. This will update automatically on new commits. Configure here.