Skip to content

Commit aaf11a1

Browse files
authored
Add NotificationsExt trait to await specific notifications. (#4834)
## Motivation The test-only `eventually` function waits in a loop with increasing delays until a condition is satisfied. But usually these conditions are satisfied exactly when the node processes a certain block and outputs the corresponding notification. ## Proposal Add a test-only `NotificationsExt` trait to simplify waiting for a particular notification. Use it instead of several `eventually` and `sync` calls in the tests. ## Test Plan Most tests were changed to use this instead of `eventually`. Only the ones waiting for the faucet weren't, because it currently doesn't support GraphQL subscriptions for notifications. ## Release Plan - These changes _could_ be backported to `testnet_conway`, to keep the diff small. ## Links - Closes #4718. - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
1 parent b452c36 commit aaf11a1

File tree

5 files changed

+309
-326
lines changed

5 files changed

+309
-326
lines changed

linera-service/src/cli_wrappers/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ mod wallet;
3434
use anyhow::Result;
3535
use async_trait::async_trait;
3636
pub use linera_faucet_client::Faucet;
37+
#[cfg(with_testing)]
38+
pub use wallet::NotificationsExt;
3739
pub use wallet::{ApplicationWrapper, ClientWrapper, FaucetService, NodeService, OnClientDrop};
3840

3941
/// The information needed to start a Linera net of a particular kind.

linera-service/src/cli_wrappers/wallet.rs

Lines changed: 129 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::{
88
marker::PhantomData,
99
mem,
1010
path::{Path, PathBuf},
11+
pin::Pin,
1112
process::Stdio,
1213
str::FromStr,
1314
sync,
@@ -23,7 +24,7 @@ use linera_base::{
2324
abi::ContractAbi,
2425
command::{resolve_binary, CommandExt},
2526
crypto::{CryptoHash, InMemorySigner},
26-
data_types::{Amount, Bytecode, Epoch},
27+
data_types::{Amount, BlockHeight, Bytecode, Epoch},
2728
identifiers::{
2829
Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
2930
},
@@ -43,6 +44,12 @@ use tokio::{
4344
sync::oneshot,
4445
task::JoinHandle,
4546
};
47+
#[cfg(with_testing)]
48+
use {
49+
futures::FutureExt as _,
50+
linera_core::worker::Reason,
51+
std::{collections::BTreeSet, future::Future},
52+
};
4653

4754
use crate::{
4855
cli::command::BenchmarkCommand,
@@ -1532,18 +1539,27 @@ impl NodeService {
15321539
.with_abi())
15331540
}
15341541

1535-
/// Obtains the hash of the `chain`'s tip block, as known by this node service.
1536-
pub async fn chain_tip_hash(&self, chain: ChainId) -> Result<Option<CryptoHash>> {
1537-
let query = format!(r#"query {{ block(chainId: "{chain}") {{ hash }} }}"#);
1542+
/// Obtains the hash and height of the `chain`'s tip block, as known by this node service.
1543+
pub async fn chain_tip(&self, chain: ChainId) -> Result<Option<(CryptoHash, BlockHeight)>> {
1544+
let query = format!(
1545+
r#"query {{ block(chainId: "{chain}") {{
1546+
hash
1547+
block {{ header {{ height }} }}
1548+
}} }}"#
1549+
);
15381550

15391551
let mut response = self.query_node(&query).await?;
15401552

1541-
match mem::take(&mut response["block"]["hash"]) {
1542-
Value::Null => Ok(None),
1543-
Value::String(hash) => Ok(Some(
1553+
match (
1554+
mem::take(&mut response["block"]["hash"]),
1555+
mem::take(&mut response["block"]["block"]["header"]["height"]),
1556+
) {
1557+
(Value::Null, Value::Null) => Ok(None),
1558+
(Value::String(hash), Value::Number(height)) => Ok(Some((
15441559
hash.parse()
15451560
.context("Received an invalid hash {hash:?} for chain tip")?,
1546-
)),
1561+
BlockHeight(height.as_u64().unwrap()),
1562+
))),
15471563
invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
15481564
}
15491565
}
@@ -1552,7 +1568,7 @@ impl NodeService {
15521568
pub async fn notifications(
15531569
&self,
15541570
chain_id: ChainId,
1555-
) -> Result<impl Stream<Item = Result<Notification>>> {
1571+
) -> Result<Pin<Box<impl Stream<Item = Result<Notification>>>>> {
15561572
let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
15571573
let url = format!("ws://localhost:{}/ws", self.port);
15581574
let mut request = url.into_client_request()?;
@@ -1585,17 +1601,17 @@ impl NodeService {
15851601
}
15861602
});
15871603
websocket.send(query_json.to_string().into()).await?;
1588-
Ok(websocket
1589-
.map_err(anyhow::Error::from)
1590-
.and_then(|message| async {
1604+
Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
1605+
|message| async {
15911606
let text = message.into_text()?;
15921607
let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
15931608
if let Some(errors) = value["payload"].get("errors") {
15941609
bail!("Notification subscription failed: {errors:?}");
15951610
}
15961611
serde_json::from_value(value["payload"]["data"]["notifications"].clone())
15971612
.context("Failed to deserialize notification")
1598-
}))
1613+
},
1614+
)))
15991615
}
16001616
}
16011617

@@ -1727,3 +1743,103 @@ impl<A> From<String> for ApplicationWrapper<A> {
17271743
}
17281744
}
17291745
}
1746+
1747+
/// Returns the timeout for tests that wait for notifications, either read from the env
1748+
/// variable `LINERA_TEST_NOTIFICATION_TIMEOUT_MS`, or the default value of 10 seconds.
1749+
#[cfg(with_testing)]
1750+
fn notification_timeout() -> Duration {
1751+
const NOTIFICATION_TIMEOUT_MS_ENV: &str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS";
1752+
const NOTIFICATION_TIMEOUT_MS_DEFAULT: u64 = 10_000;
1753+
1754+
match env::var(NOTIFICATION_TIMEOUT_MS_ENV) {
1755+
Ok(var) => Duration::from_millis(var.parse().unwrap_or_else(|error| {
1756+
panic!("{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}")
1757+
})),
1758+
Err(env::VarError::NotPresent) => Duration::from_millis(NOTIFICATION_TIMEOUT_MS_DEFAULT),
1759+
Err(env::VarError::NotUnicode(_)) => {
1760+
panic!("{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode")
1761+
}
1762+
}
1763+
}
1764+
1765+
#[cfg(with_testing)]
1766+
pub trait NotificationsExt {
1767+
/// Waits for a notification for which `f` returns `Some(t)`, and returns `t`.
1768+
fn wait_for<T>(
1769+
&mut self,
1770+
f: impl FnMut(Notification) -> Option<T>,
1771+
) -> impl Future<Output = Result<T>>;
1772+
1773+
/// Waits for a `NewEvents` notification for the given block height. If no height is specified,
1774+
/// any height is accepted.
1775+
fn wait_for_events(
1776+
&mut self,
1777+
expected_height: impl Into<Option<BlockHeight>>,
1778+
) -> impl Future<Output = Result<BTreeSet<StreamId>>> {
1779+
let expected_height = expected_height.into();
1780+
self.wait_for(move |notification| {
1781+
if let Reason::NewEvents {
1782+
height,
1783+
event_streams,
1784+
..
1785+
} = notification.reason
1786+
{
1787+
if expected_height.is_none_or(|h| h == height) {
1788+
return Some(event_streams);
1789+
}
1790+
}
1791+
None
1792+
})
1793+
}
1794+
1795+
/// Waits for a `NewBlock` notification for the given block height. If no height is specified,
1796+
/// any height is accepted.
1797+
fn wait_for_block(
1798+
&mut self,
1799+
expected_height: impl Into<Option<BlockHeight>>,
1800+
) -> impl Future<Output = Result<CryptoHash>> {
1801+
let expected_height = expected_height.into();
1802+
self.wait_for(move |notification| {
1803+
if let Reason::NewBlock { height, hash, .. } = notification.reason {
1804+
if expected_height.is_none_or(|h| h == height) {
1805+
return Some(hash);
1806+
}
1807+
}
1808+
None
1809+
})
1810+
}
1811+
1812+
/// Waits for a `NewIncomingBundle` notification for the given sender chain and sender block
1813+
/// height. If no height is specified, any height is accepted.
1814+
fn wait_for_bundle(
1815+
&mut self,
1816+
expected_origin: ChainId,
1817+
expected_height: impl Into<Option<BlockHeight>>,
1818+
) -> impl Future<Output = Result<()>> {
1819+
let expected_height = expected_height.into();
1820+
self.wait_for(move |notification| {
1821+
if let Reason::NewIncomingBundle { height, origin } = notification.reason {
1822+
if expected_height.is_none_or(|h| h == height) && origin == expected_origin {
1823+
return Some(());
1824+
}
1825+
}
1826+
None
1827+
})
1828+
}
1829+
}
1830+
1831+
#[cfg(with_testing)]
1832+
impl<S: Stream<Item = Result<Notification>>> NotificationsExt for Pin<Box<S>> {
1833+
async fn wait_for<T>(&mut self, mut f: impl FnMut(Notification) -> Option<T>) -> Result<T> {
1834+
let mut timeout = Box::pin(linera_base::time::timer::sleep(notification_timeout())).fuse();
1835+
loop {
1836+
let notification = futures::select! {
1837+
() = timeout => bail!("Timeout waiting for notification"),
1838+
notification = self.next().fuse() => notification.context("Stream closed")??,
1839+
};
1840+
if let Some(t) = f(notification) {
1841+
return Ok(t);
1842+
}
1843+
}
1844+
}
1845+
}

linera-service/src/util.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ pub fn parse_millis_delta(s: &str) -> Result<TimeDelta, ParseIntError> {
166166
Ok(TimeDelta::from_millis(s.parse()?))
167167
}
168168

169-
/// Checks the condition five times with increasing delays. Returns true if it is met.
169+
/// Checks the condition five times with increasing delays. Returns `true` if it is met.
170170
#[cfg(with_testing)]
171171
pub async fn eventually<F>(condition: impl Fn() -> F) -> bool
172172
where

0 commit comments

Comments
 (0)