Skip to content

Commit e71e401

Browse files
dvushCopilot
andauthored
feat: priortized orderpool (#69)
* feat: priortized orderpool * adds a prioritized orderpool * adds an example stage using prioritized orderpool (unfinished) * ci * Apply suggestions from code review Co-authored-by: Copilot <[email protected]> * move orderpool utils to a separate module * minimize diff --------- Co-authored-by: Copilot <[email protected]>
1 parent 3e57981 commit e71e401

File tree

8 files changed

+1086
-1
lines changed

8 files changed

+1086
-1
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ serde = "1.0"
111111
jsonrpsee = "0.26.0"
112112
parking_lot = "0.12"
113113
metrics = "0.24.0"
114+
priority-queue = "2.0.0"
115+
114116

115117
# Alloy dependencies
116118
alloy-origin = { version = "1.0.37", package = "alloy", features = [

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ pub mod pool;
2525
/// Common steps library
2626
pub mod steps;
2727

28+
/// Orderpool utils
29+
pub mod orderpool2;
30+
2831
/// Externally available test utilities
2932
#[cfg(any(test, feature = "test-utils"))]
3033
pub mod test_utils;

src/orderpool2/mod.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use {crate::alloy::primitives::Address, std::hash::Hash};
2+
3+
pub mod prioritized_pool;
4+
pub mod sim_tree;
5+
6+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
7+
pub struct AccountNonce {
8+
pub account: Address,
9+
pub nonce: u64,
10+
}
11+
12+
impl AccountNonce {
13+
#[must_use]
14+
pub fn with_nonce(self, nonce: u64) -> Self {
15+
Self {
16+
account: self.account,
17+
nonce,
18+
}
19+
}
20+
}
21+
22+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
23+
pub struct BundleNonce {
24+
pub address: Address,
25+
pub nonce: u64,
26+
pub optional: bool,
27+
}
28+
29+
pub trait OrderpoolOrder {
30+
type ID: Hash + Eq;
31+
fn id(&self) -> Self::ID;
32+
fn nonces(&self) -> Vec<BundleNonce>;
33+
}
34+
35+
pub trait OrderpoolNonceSource {
36+
type NonceError;
37+
fn nonce(&self, address: &Address) -> Result<u64, Self::NonceError>;
38+
}
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
/// Prioritized orderpool is used by block building thread to iterate
2+
/// over all bundles using some ordering and include them while keeping
3+
/// track of onchain nonces.
4+
///
5+
/// Usage:
6+
/// 1. Create `PrioritizedOrderpool` and incrementally fill it with bundles
7+
/// using `insert_order`
8+
/// 2. Clone it before starting block building run
9+
/// 3. Pop bundles using `pop_order` until it returns None and try to
10+
/// include them
11+
/// 4. Update onchain nonces after each successful commit using
12+
/// `update_onchain_nonces`
13+
use {
14+
crate::alloy::primitives::Address,
15+
priority_queue::PriorityQueue,
16+
std::collections::{HashMap, HashSet, hash_map::Entry},
17+
};
18+
19+
use super::{AccountNonce, BundleNonce, OrderpoolNonceSource, OrderpoolOrder};
20+
21+
pub mod step;
22+
#[cfg(test)]
23+
mod tests;
24+
25+
pub trait PrioritizedOrderpoolPriority: Ord + Clone + Send + Sync {
26+
type Order;
27+
fn new(order: &Self::Order) -> Self;
28+
}
29+
30+
#[derive(Debug, Clone)]
31+
pub struct PrioritizedOrderpool<Priority, Order: OrderpoolOrder> {
32+
/// Ready (all nonce matching (or not matched but optional)) to execute
33+
/// orders sorted
34+
main_queue: PriorityQueue<Order::ID, Priority>,
35+
/// For each account we store all the orders from `main_queue` which contain
36+
/// a tx from this account. Since the orders belong to `main_queue` these
37+
/// are orders ready to execute. As soon as we execute an order from
38+
/// `main_queue` all orders for all the accounts the order used
39+
/// (`order.nonces()`) could get invalidated (if tx is not optional).
40+
main_queue_nonces: HashMap<Address, Vec<Order::ID>>,
41+
42+
/// Up to date "onchain" nonces for the current block we are building.
43+
/// Special care must be taken to keep this in sync.
44+
onchain_nonces: HashMap<Address, u64>,
45+
46+
/// Orders waiting for an account to reach a particular nonce.
47+
pending_orders: HashMap<AccountNonce, Vec<Order::ID>>,
48+
/// Id -> order for all orders we manage. Carefully maintained by
49+
/// remove/insert
50+
orders: HashMap<Order::ID, Order>,
51+
}
52+
53+
impl<Priority: Ord, Order: OrderpoolOrder> Default
54+
for PrioritizedOrderpool<Priority, Order>
55+
{
56+
fn default() -> Self {
57+
Self {
58+
main_queue: PriorityQueue::new(),
59+
main_queue_nonces: HashMap::default(),
60+
onchain_nonces: HashMap::default(),
61+
pending_orders: HashMap::default(),
62+
orders: HashMap::default(),
63+
}
64+
}
65+
}
66+
67+
impl<Priority, Order> PrioritizedOrderpool<Priority, Order>
68+
where
69+
Priority: PrioritizedOrderpoolPriority<Order = Order>,
70+
Order: OrderpoolOrder,
71+
{
72+
/// Removes order from the pool
73+
/// # Panics
74+
/// Panics if implementation has a bug
75+
pub fn pop_order(&mut self) -> Option<Order> {
76+
let (id, _) = self.main_queue.pop()?;
77+
78+
let order = self
79+
.remove_popped_order(&id)
80+
.expect("order from prio queue not found in block orders");
81+
Some(order)
82+
}
83+
84+
/// Clean up after some order was removed from `main_queue`
85+
fn remove_popped_order(&mut self, id: &Order::ID) -> Option<Order> {
86+
let order = self.remove_from_orders(id)?;
87+
for BundleNonce { address, .. } in order.nonces() {
88+
match self.main_queue_nonces.entry(address) {
89+
Entry::Occupied(mut entry) => {
90+
entry.get_mut().retain(|id| *id != order.id());
91+
}
92+
Entry::Vacant(_) => {}
93+
}
94+
}
95+
Some(order)
96+
}
97+
98+
/// Updates orderpool with changed nonces
99+
/// if order updates onchain nonce from n -> n + 2, we get n + 2 as an
100+
/// arguments here
101+
/// # Panics
102+
/// Panics if implementation has a bug
103+
pub fn update_onchain_nonces<NonceSource: OrderpoolNonceSource>(
104+
&mut self,
105+
new_nonces: &[AccountNonce],
106+
nonce_source: &NonceSource,
107+
) -> Result<(), NonceSource::NonceError> {
108+
let mut invalidated_orders: HashSet<Order::ID> = HashSet::default();
109+
for new_nonce in new_nonces {
110+
self
111+
.onchain_nonces
112+
.insert(new_nonce.account, new_nonce.nonce);
113+
114+
if let Some(orders) = self.main_queue_nonces.remove(&new_nonce.account) {
115+
invalidated_orders.extend(orders.into_iter());
116+
}
117+
}
118+
119+
for order_id in invalidated_orders {
120+
// check if order can still be valid because of optional nonces
121+
self.main_queue.remove(&order_id);
122+
let order = self
123+
.remove_popped_order(&order_id)
124+
.expect("order from prio queue not found in block orders");
125+
let mut valid = true;
126+
let mut valid_nonces = 0;
127+
for BundleNonce {
128+
nonce,
129+
address,
130+
optional,
131+
} in order.nonces()
132+
{
133+
let onchain_nonce = self.nonce(&address, nonce_source)?;
134+
if onchain_nonce > nonce && !optional {
135+
valid = false;
136+
break;
137+
} else if onchain_nonce == nonce {
138+
valid_nonces += 1;
139+
}
140+
}
141+
let retain_order = valid && valid_nonces > 0;
142+
if retain_order {
143+
self.insert_order(order, nonce_source)?;
144+
}
145+
}
146+
147+
for new_nonce in new_nonces {
148+
if let Some(pending) = self.pending_orders.remove(new_nonce) {
149+
let orders = pending
150+
.iter()
151+
.filter_map(|id| self.remove_from_orders(id))
152+
.collect::<Vec<_>>();
153+
for order in orders {
154+
self.insert_order(order, nonce_source)?;
155+
}
156+
}
157+
}
158+
Ok(())
159+
}
160+
161+
fn remove_from_orders(&mut self, id: &Order::ID) -> Option<Order> {
162+
self.orders.remove(id)
163+
}
164+
165+
fn nonce<NonceSource: OrderpoolNonceSource>(
166+
&mut self,
167+
address: &Address,
168+
nonce_source: &NonceSource,
169+
) -> Result<u64, NonceSource::NonceError> {
170+
match self.onchain_nonces.entry(*address) {
171+
Entry::Occupied(entry) => Ok(*entry.get()),
172+
Entry::Vacant(entry) => {
173+
let nonce = nonce_source.nonce(address)?;
174+
entry.insert(nonce);
175+
Ok(nonce)
176+
}
177+
}
178+
}
179+
180+
pub fn insert_order<NonceSource: OrderpoolNonceSource>(
181+
&mut self,
182+
order: Order,
183+
nonce_source: &NonceSource,
184+
) -> Result<(), NonceSource::NonceError> {
185+
if self.orders.contains_key(&order.id()) {
186+
return Ok(());
187+
}
188+
let mut pending_nonces = Vec::new();
189+
for BundleNonce {
190+
nonce,
191+
address,
192+
optional,
193+
} in order.nonces()
194+
{
195+
let onchain_nonce = self.nonce(&address, nonce_source)?;
196+
if onchain_nonce > nonce && !optional {
197+
// order can't be included because of nonce
198+
return Ok(());
199+
}
200+
if onchain_nonce < nonce && !optional {
201+
pending_nonces.push(AccountNonce {
202+
account: address,
203+
nonce,
204+
});
205+
}
206+
}
207+
if pending_nonces.is_empty() {
208+
self.main_queue.push(order.id(), Priority::new(&order));
209+
for nonce in order.nonces() {
210+
self
211+
.main_queue_nonces
212+
.entry(nonce.address)
213+
.or_default()
214+
.push(order.id());
215+
}
216+
} else {
217+
for pending_nonce in pending_nonces {
218+
let pending = self.pending_orders.entry(pending_nonce).or_default();
219+
if !pending.contains(&order.id()) {
220+
pending.push(order.id());
221+
}
222+
}
223+
}
224+
self.orders.insert(order.id(), order);
225+
Ok(())
226+
}
227+
228+
pub fn remove_order(&mut self, id: &Order::ID) -> Option<Order> {
229+
// we don't remove from pending because pending will clean itself
230+
if self.main_queue.remove(id).is_some() {
231+
self.remove_popped_order(id);
232+
}
233+
self.remove_from_orders(id)
234+
}
235+
}

0 commit comments

Comments
 (0)