Skip to content

Commit ef03637

Browse files
chore: remove chain tip from mempool subscription (#1771)
1 parent e0b168a commit ef03637

File tree

7 files changed

+28
-86
lines changed

7 files changed

+28
-86
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
- Replaced NTX Builder's in-memory state management with SQLite-backed persistence; account states, notes, and transaction effects are now stored in the database and inflight state is purged on startup ([#1662](https://github.com/0xMiden/node/pull/1662)).
2929
- [BREAKING] Reworked `miden-remote-prover`, removing the `worker`/`proxy` distinction and simplifying to a `worker` with a request queue ([#1688](https://github.com/0xMiden/node/pull/1688)).
3030
- [BREAKING] Renamed `NoteRoot` protobuf message used in `GetNoteScriptByRoot` gRPC endpoints into `NoteScriptRoot` ([#1722](https://github.com/0xMiden/node/pull/1722)).
31+
- Removed `chain_tip` requirement from mempool subscription request ([#1771](https://github.com/0xMiden/node/pull/1771)).
3132

3233
### Fixes
3334

crates/block-producer/src/mempool/mod.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -586,17 +586,9 @@ impl Mempool {
586586
/// Creates a subscription to [`MempoolEvent`] which will be emitted in the order they occur.
587587
///
588588
/// Only emits events which occurred after the current committed block.
589-
///
590-
/// # Errors
591-
///
592-
/// Returns an error if the provided chain tip does not match the mempool's chain tip. This
593-
/// prevents desync between the caller's view of the world and the mempool's event stream.
594589
#[instrument(target = COMPONENT, name = "mempool.subscribe", skip_all)]
595-
pub fn subscribe(
596-
&mut self,
597-
chain_tip: BlockNumber,
598-
) -> Result<mpsc::Receiver<MempoolEvent>, BlockNumber> {
599-
self.subscription.subscribe(chain_tip)
590+
pub fn subscribe(&mut self) -> mpsc::Receiver<MempoolEvent> {
591+
self.subscription.subscribe()
600592
}
601593

602594
// STATS & INSPECTION

crates/block-producer/src/mempool/subscription.rs

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -43,22 +43,7 @@ impl SubscriptionProvider {
4343
/// Creates a new [`MempoolEvent`] subscription.
4444
///
4545
/// This replaces any existing subscription.
46-
///
47-
/// # Errors
48-
///
49-
/// Returns an error if the provided chain tip does not match the provider's. The error
50-
/// value contains the provider's chain tip.
51-
///
52-
/// This prevents desync between the subscribers view of the world and the mempool's event
53-
/// stream.
54-
pub fn subscribe(
55-
&mut self,
56-
chain_tip: BlockNumber,
57-
) -> Result<mpsc::Receiver<MempoolEvent>, BlockNumber> {
58-
if self.chain_tip != chain_tip {
59-
return Err(self.chain_tip);
60-
}
61-
46+
pub fn subscribe(&mut self) -> mpsc::Receiver<MempoolEvent> {
6247
// We should leave enough space to at least send the uncommitted events (plus some extra).
6348
let capacity = self.inflight_txs.len().mul(2).max(1024);
6449
let (tx, rx) = mpsc::channel(capacity);
@@ -74,7 +59,7 @@ impl SubscriptionProvider {
7459
Self::send_event(&mut self.subscription, tx.clone());
7560
}
7661

77-
Ok(rx)
62+
rx
7863
}
7964

8065
pub(super) fn transaction_added(&mut self, tx: &AuthenticatedTransaction) {

crates/block-producer/src/server/mod.rs

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -414,22 +414,9 @@ impl api_server::Api for BlockProducerRpcServer {
414414

415415
async fn mempool_subscription(
416416
&self,
417-
request: tonic::Request<proto::block_producer::MempoolSubscriptionRequest>,
417+
_request: tonic::Request<()>,
418418
) -> Result<tonic::Response<Self::MempoolSubscriptionStream>, tonic::Status> {
419-
let chain_tip = BlockNumber::from(request.into_inner().chain_tip);
420-
421-
let subscription =
422-
self.mempool
423-
.lock()
424-
.await
425-
.lock()
426-
.await
427-
.subscribe(chain_tip)
428-
.map_err(|mempool_tip| {
429-
tonic::Status::invalid_argument(format!(
430-
"Mempool's chain tip {mempool_tip} does not match request's {chain_tip}"
431-
))
432-
})?;
419+
let subscription = self.mempool.lock().await.lock().await.subscribe();
433420
let subscription = ReceiverStream::new(subscription);
434421

435422
Ok(tonic::Response::new(MempoolEventSubscription { inner: subscription }))

crates/ntx-builder/src/clients/block_producer.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use miden_node_proto::clients::{BlockProducerClient as InnerBlockProducerClient,
55
use miden_node_proto::domain::mempool::MempoolEvent;
66
use miden_node_proto::generated::{self as proto};
77
use miden_node_utils::FlattenResult;
8-
use miden_protocol::block::BlockNumber;
98
use miden_protocol::transaction::ProvenTransaction;
109
use miden_tx::utils::Serializable;
1110
use tokio_stream::StreamExt;
@@ -61,11 +60,10 @@ impl BlockProducerClient {
6160
#[instrument(target = COMPONENT, name = "ntx.block_producer.client.subscribe_to_mempool", skip_all, err)]
6261
pub async fn subscribe_to_mempool_with_retry(
6362
&self,
64-
chain_tip: BlockNumber,
6563
) -> Result<impl TryStream<Ok = MempoolEvent, Error = Status> + Send + 'static, Status> {
6664
let mut retry_counter = 0;
6765
loop {
68-
match self.subscribe_to_mempool(chain_tip).await {
66+
match self.subscribe_to_mempool().await {
6967
Err(err) if err.code() == tonic::Code::Unavailable => {
7068
// Exponential backoff with base 500ms and max 30s.
7169
let backoff = Duration::from_millis(500)
@@ -89,11 +87,8 @@ impl BlockProducerClient {
8987

9088
async fn subscribe_to_mempool(
9189
&self,
92-
chain_tip: BlockNumber,
9390
) -> Result<impl TryStream<Ok = MempoolEvent, Error = Status> + Send + 'static, Status> {
94-
let request =
95-
proto::block_producer::MempoolSubscriptionRequest { chain_tip: chain_tip.as_u32() };
96-
let stream = self.client.clone().mempool_subscription(request).await?;
91+
let stream = self.client.clone().mempool_subscription(()).await?;
9792

9893
let stream = stream
9994
.into_inner()

crates/ntx-builder/src/lib.rs

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -204,29 +204,19 @@ impl NtxBuilderConfig {
204204
let store = StoreClient::new(self.store_url.clone());
205205
let block_producer = BlockProducerClient::new(self.block_producer_url.clone());
206206

207-
let (chain_tip_header, chain_mmr, mempool_events) = loop {
208-
let (chain_tip_header, chain_mmr) = store
209-
.get_latest_blockchain_data_with_retry()
210-
.await?
211-
.context("store should contain a latest block")?;
212-
213-
match block_producer
214-
.subscribe_to_mempool_with_retry(chain_tip_header.block_num())
215-
.await
216-
{
217-
Ok(subscription) => {
218-
let stream: MempoolEventStream = Box::pin(subscription.into_stream());
219-
break (chain_tip_header, chain_mmr, stream);
220-
},
221-
Err(status) if status.code() == tonic::Code::InvalidArgument => {
222-
tracing::warn!(
223-
err = %status,
224-
"mempool subscription failed due to chain tip desync, retrying"
225-
);
226-
},
227-
Err(err) => return Err(err).context("failed to subscribe to mempool events"),
228-
}
229-
};
207+
// Subscribe to mempool first to ensure we don't miss any events. The subscription
208+
// replays all inflight transactions, so the subscriber's state is fully reconstructed.
209+
let subscription = block_producer
210+
.subscribe_to_mempool_with_retry()
211+
.await
212+
.map_err(|err| anyhow::anyhow!(err))
213+
.context("failed to subscribe to mempool events")?;
214+
let mempool_events: MempoolEventStream = Box::pin(subscription.into_stream());
215+
216+
let (chain_tip_header, chain_mmr) = store
217+
.get_latest_blockchain_data_with_retry()
218+
.await?
219+
.context("store should contain a latest block")?;
230220

231221
// Store the chain tip in the DB.
232222
db.upsert_chain_state(chain_tip_header.block_num(), chain_tip_header.clone())

proto/proto/internal/block_producer.proto

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,27 +35,19 @@ service Api {
3535

3636
// Subscribe to mempool events.
3737
//
38-
// The request will be rejected if the caller and the mempool disagree on the current chain tip.
39-
// This prevents potential desync issues. The caller can resolve this by resync'ing its chain state.
38+
// The event stream will contain all events after the current chain tip. This includes all
39+
// currently inflight events that have not yet been committed to the chain.
4040
//
41-
// The event stream will contain all events after the chain tip. This includes all currently inflight
42-
// events that have not yet been committed to the chain.
43-
//
44-
// Currently only a single active subscription is supported. Subscription requests will cancel the active
45-
// subscription, if any.
46-
rpc MempoolSubscription(MempoolSubscriptionRequest) returns (stream MempoolEvent) {}
41+
// Currently only a single active subscription is supported. Subscription requests will cancel
42+
// the active subscription, if any.
43+
rpc MempoolSubscription(google.protobuf.Empty) returns (stream MempoolEvent) {}
4744
}
4845

4946
// MEMPOOL SUBSCRIPTION
5047
// ================================================================================================
5148

5249
// Request to subscribe to mempool events.
53-
message MempoolSubscriptionRequest {
54-
// The caller's current chain height.
55-
//
56-
// Request will be rejected if this does not match the mempool's current view.
57-
fixed32 chain_tip = 1;
58-
}
50+
message MempoolSubscriptionRequest {}
5951

6052
// Event from the mempool.
6153
message MempoolEvent {

0 commit comments

Comments
 (0)