Skip to content

Commit f42bbca

Browse files
ebmifamlegner
andauthored
fix: wait for correct RPC epoch state when processing epoch events (cherry-pick #3005) (#3013)
fix: wait for correct RPC epoch state when processing epoch events (#3005) Co-authored-by: Markus Legner <mlegner@users.noreply.github.com>
1 parent 718b91a commit f42bbca

File tree

1 file changed

+60
-1
lines changed

1 file changed

+60
-1
lines changed

crates/walrus-service/src/node.rs

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ use sui_macros::{fail_point_arg, fail_point_async};
4646
use sui_types::{base_types::ObjectID, event::EventID};
4747
use system_events::{CompletableHandle, EVENT_ID_FOR_CHECKPOINT_EVENTS, EventHandle};
4848
use thread_pool::{BoundedThreadPool, ThreadPoolBuilder};
49+
#[cfg(not(any(test, msim)))]
50+
use tokio::time::sleep;
4951
use tokio::{
5052
select,
5153
sync::{Notify, RwLock, watch},
@@ -132,7 +134,7 @@ use walrus_storage_node_client::{
132134
StoredOnNodeStatus,
133135
},
134136
};
135-
use walrus_sui::client::FixedSystemParameters;
137+
use walrus_sui::{client::FixedSystemParameters, types::move_structs::EpochState};
136138
use walrus_utils::metrics::{Registry, TaskMonitorFamily, monitored_scope};
137139

138140
use self::{
@@ -1646,13 +1648,18 @@ impl StorageNode {
16461648
let _scope = monitored_scope::monitored_scope(
16471649
"ProcessEvent::EpochChangeEvent::EpochParametersSelected",
16481650
);
1651+
self.wait_for_epoch_state(event.next_epoch.saturating_sub(1), |state| {
1652+
matches!(state, EpochState::NextParamsSelected(_))
1653+
})
1654+
.await?;
16491655
self.handle_epoch_parameters_selected(event);
16501656
event_handle.mark_as_complete();
16511657
}
16521658
EpochChangeEvent::EpochChangeStart(event) => {
16531659
let _scope = monitored_scope::monitored_scope(
16541660
"ProcessEvent::EpochChangeEvent::EpochChangeStart",
16551661
);
1662+
self.wait_for_epoch_state(event.epoch, |_| true).await?;
16561663
fail_point_async!("epoch_change_start_entry");
16571664
self.process_epoch_change_start_event(blob_event_processor, event_handle, &event)
16581665
.await?;
@@ -1661,6 +1668,13 @@ impl StorageNode {
16611668
let _scope = monitored_scope::monitored_scope(
16621669
"ProcessEvent::EpochChangeEvent::EpochChangeDone",
16631670
);
1671+
self.wait_for_epoch_state(event.epoch, |state| {
1672+
matches!(
1673+
state,
1674+
EpochState::EpochChangeDone(_) | EpochState::NextParamsSelected(_)
1675+
)
1676+
})
1677+
.await?;
16641678
self.process_epoch_change_done_event(&event).await?;
16651679
event_handle.mark_as_complete();
16661680
}
@@ -1680,6 +1694,51 @@ impl StorageNode {
16801694
Ok(())
16811695
}
16821696

1697+
/// Repeatedly checks until the current Sui epoch state matches the expectation.
1698+
///
1699+
/// Returns `Ok(())` if the current epoch is equal to the `expected_epoch` and the
1700+
/// `state_matches` function returns `true` or the current epoch is greater than the
1701+
/// `expected_epoch` (irrespective of the state).
1702+
#[cfg(not(any(test, msim)))]
1703+
async fn wait_for_epoch_state(
1704+
&self,
1705+
expected_epoch: Epoch,
1706+
state_matches: impl Fn(&EpochState) -> bool,
1707+
) -> anyhow::Result<()> {
1708+
const EPOCH_STATE_WAIT_TIMEOUT: Duration = Duration::from_secs(5);
1709+
const EPOCH_STATE_WAIT_SLEEP: Duration = Duration::from_millis(100);
1710+
let deadline = Instant::now() + EPOCH_STATE_WAIT_TIMEOUT;
1711+
while Instant::now() < deadline {
1712+
self.inner.contract_service.flush_cache().await;
1713+
let Ok((epoch, state)) = self.inner.contract_service.get_epoch_and_state().await else {
1714+
tracing::warn!("failed to get current epoch and state");
1715+
continue;
1716+
};
1717+
if epoch == expected_epoch && state_matches(&state) || epoch > expected_epoch {
1718+
return Ok(());
1719+
}
1720+
tracing::debug!(
1721+
expected_epoch,
1722+
current_epoch = epoch,
1723+
current_state = ?state,
1724+
"waiting for expected epoch state",
1725+
);
1726+
sleep(EPOCH_STATE_WAIT_SLEEP).await;
1727+
}
1728+
bail!("timed out after waiting for expected epoch state")
1729+
}
1730+
1731+
#[cfg(any(test, msim))]
1732+
#[allow(clippy::unused_async)]
1733+
async fn wait_for_epoch_state(
1734+
&self,
1735+
_expected_epoch: Epoch,
1736+
_state_matches: impl Fn(&EpochState) -> bool,
1737+
) -> anyhow::Result<()> {
1738+
tracing::info!("waiting for epoch state is not supported in tests, skipping");
1739+
Ok(())
1740+
}
1741+
16831742
/// Handles the epoch parameters selected event.
16841743
///
16851744
/// This function cancels the scheduled voting end and initiates the epoch change.

0 commit comments

Comments
 (0)