Skip to content

Commit 30f4183

Browse files
committed
feat: Implement TransactionOrchestrator with RBF and optimistic nonces
1 parent 08d378a commit 30f4183

File tree

3 files changed

+512
-0
lines changed

3 files changed

+512
-0
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from .orchestrator import (
2+
TransactionOrchestrator,
3+
TransactionMetadata,
4+
TransactionStatus,
5+
RBFPolicy,
6+
OptimisticNonceManager
7+
)
Lines changed: 348 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,348 @@
1+
2+
import asyncio
3+
import time
4+
import uuid
5+
from dataclasses import dataclass, field
6+
from enum import Enum
7+
from typing import Any, Dict, List, Optional, Union
8+
9+
from bittensor.core.async_subtensor import AsyncSubtensor
10+
from bittensor.utils.btlogging import logging
11+
12+
class TransactionStatus(Enum):
13+
QUEUED = "QUEUED"
14+
SUBMITTED = "SUBMITTED"
15+
IN_BLOCK = "IN_BLOCK"
16+
FINALIZED = "FINALIZED"
17+
STUCK = "STUCK"
18+
FAILED = "FAILED"
19+
REPLACED = "REPLACED"
20+
21+
def generate_uuid() -> str:
22+
return str(uuid.uuid4())
23+
24+
@dataclass
25+
class TransactionMetadata:
26+
"""
27+
Immutable tracking object for a specific user intent.
28+
"""
29+
call_module: str
30+
call_function: str
31+
call_params: Dict[str, Any]
32+
33+
# Internal State
34+
id: str = field(default_factory=generate_uuid)
35+
assigned_nonce: Optional[int] = None
36+
created_at: float = field(default_factory=time.time)
37+
38+
# RBF Tracking
39+
current_tip: int = 0
40+
extrinsic_hashes: List[str] = field(default_factory=list)
41+
status: TransactionStatus = TransactionStatus.QUEUED
42+
last_submit_at: float = 0.0
43+
retry_count: int = 0
44+
45+
# Error tracking
46+
last_error: Optional[str] = None
47+
expiration_block: Optional[int] = None
48+
49+
class OptimisticNonceManager:
50+
"""
51+
Manages the local nonce state to allow for high-throughput submission
52+
without querying the chain for every transaction.
53+
"""
54+
def __init__(self, subtensor: AsyncSubtensor, wallet_address: str):
55+
self.subtensor = subtensor
56+
self.wallet_address = wallet_address
57+
self.local_nonce: Optional[int] = None
58+
self._lock = asyncio.Lock()
59+
60+
async def sync(self):
61+
"""
62+
Force synchronization with the chain state.
63+
"""
64+
async with self._lock:
65+
# We use the underlying substrate interface to get the nonce directly
66+
# This avoids any caching that might be in higher layers if they exist
67+
# though async_subtensor usually wraps this directly.
68+
self.local_nonce = await self.subtensor.get_account_nonce(self.wallet_address)
69+
logging.debug(f"Nonce synced to: {self.local_nonce}")
70+
71+
async def get_next_nonce(self) -> int:
72+
"""
73+
Returns the next nonce and increments the local counter.
74+
"""
75+
async with self._lock:
76+
if self.local_nonce is None:
77+
await self.sync()
78+
79+
nonce = self.local_nonce
80+
self.local_nonce += 1
81+
return nonce
82+
83+
async def reset(self):
84+
"""
85+
Resets the local nonce to None, forcing a sync on next request.
86+
"""
87+
async with self._lock:
88+
self.local_nonce = None
89+
90+
@dataclass
91+
class RBFPolicy:
92+
"""
93+
Configuration for Replace-By-Fee logic.
94+
"""
95+
base_tip: int = 0
96+
increment_type: str = "percentage" # "linear" or "percentage"
97+
increment_value: float = 0.15 # 15% increase or fixed RAO amount
98+
epsilon: int = 1000 # Minimum increment in RAO
99+
max_tip: int = 1_000_000_000 # 1 TAO safety limit (example)
100+
stuck_timeout: float = 24.0 # Seconds before considering a tx stuck
101+
102+
def calculate_next_tip(self, current_tip: int) -> int:
103+
if self.increment_type == "percentage":
104+
next_tip = int(current_tip * (1 + self.increment_value))
105+
# If current tip is 0, percentage won't work, so apply epsilon or base
106+
if next_tip == current_tip:
107+
next_tip += self.epsilon
108+
else:
109+
next_tip = current_tip + int(self.increment_value)
110+
111+
# Ensure we always bump by at least epsilon if the calculation resulted in less
112+
if next_tip < current_tip + self.epsilon:
113+
next_tip = current_tip + self.epsilon
114+
115+
return min(next_tip, self.max_tip)
116+
117+
class TransactionOrchestrator:
118+
"""
119+
Stateful engine to manage transaction lifecycles, nonces, and RBF.
120+
"""
121+
def __init__(
122+
self,
123+
wallet: "bittensor_wallet.Wallet",
124+
subtensor: Optional[AsyncSubtensor] = None,
125+
config: Optional[RBFPolicy] = None
126+
):
127+
self.wallet = wallet
128+
self.subtensor = subtensor or AsyncSubtensor()
129+
self.config = config or RBFPolicy()
130+
131+
self.nonce_manager = OptimisticNonceManager(self.subtensor, self.wallet.hotkey.ss58_address)
132+
self.queue: asyncio.PriorityQueue = asyncio.PriorityQueue() # Stores TransactionMetadata
133+
self.active_transactions: Dict[str, TransactionMetadata] = {} # id -> metadata
134+
135+
self._worker_task: Optional[asyncio.Task] = None
136+
self._monitor_task: Optional[asyncio.Task] = None
137+
self._running = False
138+
139+
async def start(self):
140+
"""
141+
Initializes the connection and starts the worker loops.
142+
"""
143+
if self._running:
144+
return
145+
146+
if not self.subtensor.substrate:
147+
await self.subtensor.initialize()
148+
149+
await self.nonce_manager.sync()
150+
151+
self._running = True
152+
self._worker_task = asyncio.create_task(self._process_queue())
153+
self._monitor_task = asyncio.create_task(self._monitor_pool())
154+
logging.info("TransactionOrchestrator started.")
155+
156+
async def stop(self):
157+
self._running = False
158+
if self._worker_task:
159+
self._worker_task.cancel()
160+
if self._monitor_task:
161+
self._monitor_task.cancel()
162+
try:
163+
await self._worker_task
164+
await self._monitor_task
165+
except asyncio.CancelledError:
166+
pass
167+
168+
async def submit_extrinsic(
169+
self,
170+
call_module: str,
171+
call_function: str,
172+
call_params: Dict[str, Any],
173+
wait_for_finalization: bool = False, # Kept for API compatibility, but usually False for high throughput
174+
wait_for_inclusion: bool = False
175+
) -> TransactionMetadata:
176+
"""
177+
Submits an intent to the orchestrator.
178+
"""
179+
tx = TransactionMetadata(
180+
call_module=call_module,
181+
call_function=call_function,
182+
call_params=call_params,
183+
current_tip=self.config.base_tip
184+
)
185+
186+
# Assign nonce immediately to preserve order of python calls
187+
tx.assigned_nonce = await self.nonce_manager.get_next_nonce()
188+
189+
self.active_transactions[tx.id] = tx
190+
await self.queue.put((tx.assigned_nonce, tx)) # Priority queue by nonce
191+
192+
logging.info(f"Queued transaction {call_module}.{call_function} with nonce {tx.assigned_nonce}")
193+
194+
if wait_for_inclusion or wait_for_finalization:
195+
await self._wait_for_status(tx, wait_for_finalization)
196+
197+
return tx
198+
199+
async def get_transaction_status(self, tx_id: str) -> Optional[TransactionStatus]:
200+
if tx_id in self.active_transactions:
201+
return self.active_transactions[tx_id].status
202+
return None
203+
204+
async def _wait_for_status(self, tx: TransactionMetadata, finalization: bool):
205+
"""
206+
Helper to block until inclusion or finalization.
207+
"""
208+
while True:
209+
if tx.status == TransactionStatus.FAILED:
210+
raise Exception(f"Transaction failed: {tx.last_error}")
211+
212+
if finalization:
213+
if tx.status == TransactionStatus.FINALIZED:
214+
return
215+
else:
216+
if tx.status in [TransactionStatus.IN_BLOCK, TransactionStatus.FINALIZED]:
217+
return
218+
219+
await asyncio.sleep(1)
220+
221+
async def _process_queue(self):
222+
"""
223+
Worker loop that signs and submits transactions.
224+
"""
225+
while self._running:
226+
try:
227+
_, tx = await self.queue.get()
228+
229+
if tx.status in [TransactionStatus.FAILED, TransactionStatus.FINALIZED]:
230+
self.queue.task_done()
231+
continue
232+
233+
try:
234+
# Compose call
235+
call = await self.subtensor.substrate.compose_call(
236+
call_module=tx.call_module,
237+
call_function=tx.call_function,
238+
call_params=tx.call_params
239+
)
240+
241+
# Sign extrinsic
242+
# accessing internal substrate interface for explicit nonce/tip control
243+
extrinsic = await self.subtensor.substrate.create_signed_extrinsic(
244+
call=call,
245+
keypair=self.wallet.hotkey,
246+
nonce=tx.assigned_nonce,
247+
tip=tx.current_tip
248+
)
249+
250+
# Submit
251+
# We don't wait for inclusion here, we just fire it off.
252+
try:
253+
receipt = await self.subtensor.substrate.submit_extrinsic(
254+
extrinsic,
255+
wait_for_inclusion=False,
256+
wait_for_finalization=False
257+
)
258+
# Handle varied return types from different substrate interface versions
259+
tx_hash = receipt.extrinsic_hash if hasattr(receipt, 'extrinsic_hash') else str(receipt)
260+
261+
tx.extrinsic_hashes.append(tx_hash)
262+
tx.status = TransactionStatus.SUBMITTED
263+
tx.last_submit_at = time.time()
264+
logging.info(f"Submitted {tx.id} (Nonce: {tx.assigned_nonce}) Hash: {tx_hash}")
265+
266+
except Exception as e:
267+
# Handle immediate submission errors (e.g. priority too low, invalid nonce)
268+
error_str = str(e)
269+
if "1014" in error_str or "Priority is too low" in error_str:
270+
logging.warning(f"Priority too low for {tx.id}. Triggering RBF.")
271+
await self._trigger_rbf(tx)
272+
elif "Invalid Nonce" in error_str or "Transaction is outdated" in error_str:
273+
logging.error(f"Nonce misalignment for {tx.id}. Triggering re-sync.")
274+
await self.nonce_manager.reset()
275+
# Re-queue? If nonce is invalid, we might need to re-assign nonce.
276+
# But that breaks the sequence.
277+
# Simpler approach: Fail this, and let user retry?
278+
# Or drift detection: sync nonce, and if actual nonce > tx.nonce, then this tx is already done or dead.
279+
# If actual nonce < tx.nonce, we are in future.
280+
# For now, mark FAILED to stop loop.
281+
tx.status = TransactionStatus.FAILED
282+
tx.last_error = error_str
283+
else:
284+
logging.error(f"Submission error for {tx.id}: {e}")
285+
tx.status = TransactionStatus.FAILED
286+
tx.last_error = str(e)
287+
288+
except Exception as e:
289+
logging.error(f"Fatal error processing {tx.id}: {e}")
290+
tx.status = TransactionStatus.FAILED
291+
tx.last_error = str(e)
292+
293+
finally:
294+
self.queue.task_done()
295+
296+
except asyncio.CancelledError:
297+
break
298+
except Exception as e:
299+
logging.error(f"Worker loop error: {e}")
300+
await asyncio.sleep(1)
301+
302+
async def _monitor_pool(self):
303+
"""
304+
Monitors submitted transactions for stuckness.
305+
"""
306+
while self._running:
307+
try:
308+
# Iterate over copy of values to avoid modification issues
309+
current_time = time.time()
310+
for tx in list(self.active_transactions.values()):
311+
if tx.status == TransactionStatus.SUBMITTED:
312+
# Check timeout
313+
if current_time - tx.last_submit_at > self.config.stuck_timeout:
314+
logging.warning(f"Transaction {tx.id} stuck. Triggering RBF.")
315+
await self._trigger_rbf(tx)
316+
else:
317+
# Optional: Check chain status if we want faster feedback than just timeout
318+
# But that's expensive. relying on timeout is standard for RBF.
319+
pass
320+
321+
await asyncio.sleep(5) # Check every 5 seconds
322+
except asyncio.CancelledError:
323+
break
324+
except Exception as e:
325+
logging.error(f"Monitor loop error: {e}")
326+
await asyncio.sleep(5)
327+
328+
async def _trigger_rbf(self, tx: TransactionMetadata):
329+
"""
330+
Calculates new tip and re-queues the transaction.
331+
"""
332+
new_tip = self.config.calculate_next_tip(tx.current_tip)
333+
if new_tip > self.config.max_tip:
334+
logging.error(f"Max tip reached for {tx.id}. Cannot RBF.")
335+
# Don't fail it yet, maybe it will eventually pass.
336+
# Or mark as STUCK
337+
tx.status = TransactionStatus.STUCK
338+
return
339+
340+
tx.current_tip = new_tip
341+
tx.retry_count += 1
342+
logging.info(f"RBF: Increasing tip to {tx.current_tip} for {tx.id}")
343+
344+
# Push back to queue.
345+
# Since queue is priority queue by nonce, and we use the same nonce,
346+
# it will be processed in correct order relative to other nonces,
347+
# but we want it processed ASAP.
348+
await self.queue.put((tx.assigned_nonce, tx))

0 commit comments

Comments
 (0)