Skip to content

Commit 9b29d60

Browse files
committed
sui-rpc: concurrently poll execution and checkpoint stream futures
In order to be a bit more efficent, and ensure that we avoid missing the transaction in the checkpoint stream, poll both the execution and checkpont stream futures concurrently.
1 parent aef7655 commit 9b29d60

File tree

1 file changed

+87
-77
lines changed

1 file changed

+87
-77
lines changed

crates/sui-rpc/src/client/transaction_execution.rs

Lines changed: 87 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -85,98 +85,108 @@ impl Client {
8585
request: impl tonic::IntoRequest<ExecuteTransactionRequest>,
8686
timeout: Duration,
8787
) -> Result<Response<ExecuteTransactionResponse>, ExecuteAndWaitError> {
88-
// Subscribe to checkpoint stream before execution to avoid missing the transaction.
89-
// Uses minimal read mask for efficiency since we only nee digest confirmation.
90-
// Once server-side filtering is available, we should filter by transaction digest to
91-
// further reduce bandwidth.
92-
let mut checkpoint_stream = match self
93-
.subscription_client()
94-
.subscribe_checkpoints(SubscribeCheckpointsRequest::default().with_read_mask(
95-
FieldMask::from_str("transactions.digest,sequence_number,summary.timestamp"),
96-
))
97-
.await
98-
{
99-
Ok(stream) => stream.into_inner(),
100-
Err(e) => return Err(ExecuteAndWaitError::RpcError(e)),
101-
};
88+
let request = request.into_request();
10289

10390
// Calculate digest from the input transaction to avoid relying on response read mask
104-
let request = request.into_request();
105-
let transaction = match request.get_ref().transaction_opt() {
106-
Some(tx) => tx,
107-
None => return Err(ExecuteAndWaitError::MissingTransaction),
108-
};
91+
let transaction_digest = {
92+
let transaction = match request.get_ref().transaction_opt() {
93+
Some(tx) => tx,
94+
None => return Err(ExecuteAndWaitError::MissingTransaction),
95+
};
10996

110-
let executed_txn_digest = match sui_sdk_types::Transaction::try_from(transaction) {
111-
Ok(tx) => tx.digest().to_string(),
112-
Err(e) => return Err(ExecuteAndWaitError::ProtoConversionError(e)),
97+
match sui_sdk_types::Transaction::try_from(transaction) {
98+
Ok(tx) => tx.digest().to_string(),
99+
Err(e) => return Err(ExecuteAndWaitError::ProtoConversionError(e)),
100+
}
113101
};
114102

115-
let mut response = match self.execution_client().execute_transaction(request).await {
116-
Ok(resp) => resp,
117-
Err(e) => return Err(ExecuteAndWaitError::RpcError(e)),
118-
};
103+
let mut this = self.clone();
104+
let checkpoint_info_future = async {
105+
// Subscribe to checkpoint stream before execution to avoid missing the transaction.
106+
// Uses minimal read mask for efficiency since we only need digest confirmation.
107+
// Once server-side filtering is available, we should filter by transaction digest to
108+
// further reduce bandwidth.
109+
let mut checkpoint_stream = match this
110+
.subscription_client()
111+
.subscribe_checkpoints(SubscribeCheckpointsRequest::default().with_read_mask(
112+
FieldMask::from_str("transactions.digest,sequence_number,summary.timestamp"),
113+
))
114+
.await
115+
{
116+
Ok(stream) => stream.into_inner(),
117+
Err(e) => return Ok(Err(e)),
118+
};
119119

120-
// First query the fullnode directly to see if it already has the txn. This is to handle
121-
// the case where an already executed transaction is sent multiple times
122-
if let Ok(resp) = self
123-
.ledger_client()
124-
.get_transaction(
125-
GetTransactionRequest::default()
126-
.with_digest(&executed_txn_digest)
127-
.with_read_mask(FieldMask::from_str("digest,checkpoint,timestamp")),
128-
)
129-
.await
130-
&& resp.get_ref().transaction().checkpoint_opt().is_some()
131-
{
132-
let checkpoint = resp.get_ref().transaction().checkpoint();
133-
let timestamp = resp.get_ref().transaction().timestamp;
134-
response
135-
.get_mut()
136-
.transaction_mut()
137-
.set_checkpoint(checkpoint);
138-
response.get_mut().transaction_mut().timestamp = timestamp;
139-
return Ok(response);
140-
}
120+
// First query the fullnode directly to see if it already has the txn. This is to handle
121+
// the case where an already executed transaction is sent multiple times
122+
if let Ok(resp) = this
123+
.ledger_client()
124+
.get_transaction(
125+
GetTransactionRequest::default()
126+
.with_digest(&transaction_digest)
127+
.with_read_mask(FieldMask::from_str("digest,checkpoint,timestamp")),
128+
)
129+
.await
130+
&& resp.get_ref().transaction().checkpoint_opt().is_some()
131+
{
132+
let checkpoint = resp.get_ref().transaction().checkpoint();
133+
let timestamp = resp.get_ref().transaction().timestamp;
134+
return Ok(Ok((checkpoint, timestamp)));
135+
}
141136

142-
// Wait for the transaction to appear in a checkpoint, at which point indexes will have been
143-
// updated.
144-
let timeout_future = tokio::time::sleep(timeout);
145-
let checkpoint_future = async {
146-
while let Some(response) = checkpoint_stream.try_next().await? {
147-
let checkpoint = response.checkpoint();
137+
// Wait for the transaction to appear in a checkpoint, at which point indexes will have been
138+
// updated.
139+
let checkpoint_future = async {
140+
while let Some(response) = checkpoint_stream.try_next().await? {
141+
let checkpoint = response.checkpoint();
148142

149-
for tx in checkpoint.transactions() {
150-
let digest = tx.digest();
143+
for tx in checkpoint.transactions() {
144+
let digest = tx.digest();
151145

152-
if digest == executed_txn_digest {
153-
return Ok((checkpoint.sequence_number(), checkpoint.summary().timestamp));
146+
if digest == transaction_digest {
147+
return Ok((
148+
checkpoint.sequence_number(),
149+
checkpoint.summary().timestamp,
150+
));
151+
}
154152
}
155153
}
156-
}
157-
Err(tonic::Status::aborted(
158-
"checkpoint stream ended unexpectedly",
159-
))
154+
Err(tonic::Status::aborted(
155+
"checkpoint stream ended unexpectedly",
156+
))
157+
};
158+
159+
tokio::time::timeout(timeout, checkpoint_future).await
160160
};
161161

162-
tokio::select! {
163-
result = checkpoint_future => {
164-
match result {
165-
Ok((checkpoint, timestamp)) => {
166-
response
167-
.get_mut()
168-
.transaction_mut()
169-
.set_checkpoint(checkpoint);
170-
response.get_mut().transaction_mut().timestamp = timestamp;
171-
Ok(response)
172-
}
173-
Err(e) => Err(ExecuteAndWaitError::CheckpointStreamError { response, error: e })
174-
}
175-
},
176-
_ = timeout_future => {
177-
Err(ExecuteAndWaitError::CheckpointTimeout ( response))
162+
let execution_future = async { self.execution_client().execute_transaction(request).await };
163+
164+
let (response, checkpoint_info) =
165+
futures::future::join(execution_future, checkpoint_info_future).await;
166+
167+
let mut response = match response {
168+
Ok(response) => response,
169+
Err(e) => return Err(ExecuteAndWaitError::RpcError(e)),
170+
};
171+
172+
match checkpoint_info {
173+
Ok(Ok((checkpoint, timestamp))) => {
174+
response
175+
.get_mut()
176+
.transaction_mut()
177+
.set_checkpoint(checkpoint);
178+
response.get_mut().transaction_mut().timestamp = timestamp;
178179
}
180+
Ok(Err(status)) => {
181+
return Err(ExecuteAndWaitError::CheckpointStreamError {
182+
response,
183+
error: status,
184+
});
185+
}
186+
Err(_timeout) => return Err(ExecuteAndWaitError::CheckpointTimeout(response)),
179187
}
188+
189+
Ok(response)
180190
}
181191

182192
/// Retrieves the current reference gas price from the latest epoch information.

0 commit comments

Comments
 (0)