@@ -8,6 +8,7 @@ use std::{
88 marker:: PhantomData ,
99 mem,
1010 path:: { Path , PathBuf } ,
11+ pin:: Pin ,
1112 process:: Stdio ,
1213 str:: FromStr ,
1314 sync,
@@ -23,7 +24,7 @@ use linera_base::{
2324 abi:: ContractAbi ,
2425 command:: { resolve_binary, CommandExt } ,
2526 crypto:: { CryptoHash , InMemorySigner } ,
26- data_types:: { Amount , Bytecode , Epoch } ,
27+ data_types:: { Amount , BlockHeight , Bytecode , Epoch } ,
2728 identifiers:: {
2829 Account , AccountOwner , ApplicationId , ChainId , IndexAndEvent , ModuleId , StreamId ,
2930 } ,
@@ -44,6 +45,12 @@ use tokio::{
4445 task:: JoinHandle ,
4546} ;
4647use tracing:: { error, info, warn} ;
48+ #[ cfg( with_testing) ]
49+ use {
50+ futures:: FutureExt as _,
51+ linera_core:: worker:: Reason ,
52+ std:: { collections:: BTreeSet , future:: Future } ,
53+ } ;
4754
4855use crate :: {
4956 cli:: command:: BenchmarkCommand ,
@@ -1309,6 +1316,28 @@ impl NodeService {
13091316 Ok ( serde_json:: from_value ( data[ "sync" ] . take ( ) ) ?)
13101317 }
13111318
1319+ pub async fn transfer (
1320+ & self ,
1321+ chain_id : ChainId ,
1322+ owner : AccountOwner ,
1323+ recipient : Account ,
1324+ amount : Amount ,
1325+ ) -> Result < CryptoHash > {
1326+ let json_owner = owner. to_value ( ) ;
1327+ let json_recipient = recipient. to_value ( ) ;
1328+ let query = format ! (
1329+ "mutation {{ transfer(\
1330+ chainId: \" {chain_id}\" , \
1331+ owner: {json_owner}, \
1332+ recipient: {json_recipient}, \
1333+ amount: \" {amount}\" ) \
1334+ }}"
1335+ ) ;
1336+ let data = self . query_node ( query) . await ?;
1337+ serde_json:: from_value ( data[ "transfer" ] . clone ( ) )
1338+ . context ( "missing transfer field in response" )
1339+ }
1340+
13121341 pub async fn balance ( & self , account : & Account ) -> Result < Amount > {
13131342 let chain = account. chain_id ;
13141343 let owner = account. owner ;
@@ -1518,18 +1547,27 @@ impl NodeService {
15181547 . with_abi ( ) )
15191548 }
15201549
1521- /// Obtains the hash of the `chain`'s tip block, as known by this node service.
1522- pub async fn chain_tip_hash ( & self , chain : ChainId ) -> Result < Option < CryptoHash > > {
1523- let query = format ! ( r#"query {{ block(chainId: "{chain}") {{ hash }} }}"# ) ;
1550+ /// Obtains the hash and height of the `chain`'s tip block, as known by this node service.
1551+ pub async fn chain_tip ( & self , chain : ChainId ) -> Result < Option < ( CryptoHash , BlockHeight ) > > {
1552+ let query = format ! (
1553+ r#"query {{ block(chainId: "{chain}") {{
1554+ hash
1555+ block {{ header {{ height }} }}
1556+ }} }}"#
1557+ ) ;
15241558
15251559 let mut response = self . query_node ( & query) . await ?;
15261560
1527- match mem:: take ( & mut response[ "block" ] [ "hash" ] ) {
1528- Value :: Null => Ok ( None ) ,
1529- Value :: String ( hash) => Ok ( Some (
1561+ match (
1562+ mem:: take ( & mut response[ "block" ] [ "hash" ] ) ,
1563+ mem:: take ( & mut response[ "block" ] [ "block" ] [ "header" ] [ "height" ] ) ,
1564+ ) {
1565+ ( Value :: Null , Value :: Null ) => Ok ( None ) ,
1566+ ( Value :: String ( hash) , Value :: Number ( height) ) => Ok ( Some ( (
15301567 hash. parse ( )
15311568 . context ( "Received an invalid hash {hash:?} for chain tip" ) ?,
1532- ) ) ,
1569+ BlockHeight ( height. as_u64 ( ) . unwrap ( ) ) ,
1570+ ) ) ) ,
15331571 invalid_data => bail ! ( "Expected a tip hash string, but got {invalid_data:?} instead" ) ,
15341572 }
15351573 }
@@ -1538,7 +1576,7 @@ impl NodeService {
15381576 pub async fn notifications (
15391577 & self ,
15401578 chain_id : ChainId ,
1541- ) -> Result < impl Stream < Item = Result < Notification > > > {
1579+ ) -> Result < Pin < Box < impl Stream < Item = Result < Notification > > > > > {
15421580 let query = format ! ( "subscription {{ notifications(chainId: \" {chain_id}\" ) }}" , ) ;
15431581 let url = format ! ( "ws://localhost:{}/ws" , self . port) ;
15441582 let mut request = url. into_client_request ( ) ?;
@@ -1571,17 +1609,17 @@ impl NodeService {
15711609 }
15721610 } ) ;
15731611 websocket. send ( query_json. to_string ( ) . into ( ) ) . await ?;
1574- Ok ( websocket
1575- . map_err ( anyhow:: Error :: from)
1576- . and_then ( |message| async {
1612+ Ok ( Box :: pin ( websocket. map_err ( anyhow:: Error :: from) . and_then (
1613+ |message| async {
15771614 let text = message. into_text ( ) ?;
15781615 let value: Value = serde_json:: from_str ( & text) . context ( "invalid JSON" ) ?;
15791616 if let Some ( errors) = value[ "payload" ] . get ( "errors" ) {
15801617 bail ! ( "Notification subscription failed: {errors:?}" ) ;
15811618 }
15821619 serde_json:: from_value ( value[ "payload" ] [ "data" ] [ "notifications" ] . clone ( ) )
15831620 . context ( "Failed to deserialize notification" )
1584- } ) )
1621+ } ,
1622+ ) ) )
15851623 }
15861624}
15871625
@@ -1704,3 +1742,103 @@ impl<A> From<String> for ApplicationWrapper<A> {
17041742 }
17051743 }
17061744}
1745+
1746+ /// Returns the timeout for tests that wait for notifications, either read from the env
1747+ /// variable `LINERA_TEST_NOTIFICATION_TIMEOUT_MS`, or the default value of 10 seconds.
1748+ #[ cfg( with_testing) ]
1749+ fn notification_timeout ( ) -> Duration {
1750+ const NOTIFICATION_TIMEOUT_MS_ENV : & str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS" ;
1751+ const NOTIFICATION_TIMEOUT_MS_DEFAULT : u64 = 10_000 ;
1752+
1753+ match env:: var ( NOTIFICATION_TIMEOUT_MS_ENV ) {
1754+ Ok ( var) => Duration :: from_millis ( var. parse ( ) . unwrap_or_else ( |error| {
1755+ panic ! ( "{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}" )
1756+ } ) ) ,
1757+ Err ( env:: VarError :: NotPresent ) => Duration :: from_millis ( NOTIFICATION_TIMEOUT_MS_DEFAULT ) ,
1758+ Err ( env:: VarError :: NotUnicode ( _) ) => {
1759+ panic ! ( "{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode" )
1760+ }
1761+ }
1762+ }
1763+
1764+ #[ cfg( with_testing) ]
1765+ pub trait NotificationsExt {
1766+ /// Waits for a notification for which `f` returns `Some(t)`, and returns `t`.
1767+ fn wait_for < T > (
1768+ & mut self ,
1769+ f : impl FnMut ( Notification ) -> Option < T > ,
1770+ ) -> impl Future < Output = Result < T > > ;
1771+
1772+ /// Waits for a `NewEvents` notification for the given block height. If no height is specified,
1773+ /// any height is accepted.
1774+ fn wait_for_events (
1775+ & mut self ,
1776+ expected_height : impl Into < Option < BlockHeight > > ,
1777+ ) -> impl Future < Output = Result < BTreeSet < StreamId > > > {
1778+ let expected_height = expected_height. into ( ) ;
1779+ self . wait_for ( move |notification| {
1780+ if let Reason :: NewBlock {
1781+ height,
1782+ event_streams,
1783+ ..
1784+ } = notification. reason
1785+ {
1786+ if expected_height. is_none_or ( |h| h == height) && !event_streams. is_empty ( ) {
1787+ return Some ( event_streams) ;
1788+ }
1789+ }
1790+ None
1791+ } )
1792+ }
1793+
1794+ /// Waits for a `NewBlock` notification for the given block height. If no height is specified,
1795+ /// any height is accepted.
1796+ fn wait_for_block (
1797+ & mut self ,
1798+ expected_height : impl Into < Option < BlockHeight > > ,
1799+ ) -> impl Future < Output = Result < CryptoHash > > {
1800+ let expected_height = expected_height. into ( ) ;
1801+ self . wait_for ( move |notification| {
1802+ if let Reason :: NewBlock { height, hash, .. } = notification. reason {
1803+ if expected_height. is_none_or ( |h| h == height) {
1804+ return Some ( hash) ;
1805+ }
1806+ }
1807+ None
1808+ } )
1809+ }
1810+
1811+ /// Waits for a `NewIncomingBundle` notification for the given sender chain and sender block
1812+ /// height. If no height is specified, any height is accepted.
1813+ fn wait_for_bundle (
1814+ & mut self ,
1815+ expected_origin : ChainId ,
1816+ expected_height : impl Into < Option < BlockHeight > > ,
1817+ ) -> impl Future < Output = Result < ( ) > > {
1818+ let expected_height = expected_height. into ( ) ;
1819+ self . wait_for ( move |notification| {
1820+ if let Reason :: NewIncomingBundle { height, origin } = notification. reason {
1821+ if expected_height. is_none_or ( |h| h == height) && origin == expected_origin {
1822+ return Some ( ( ) ) ;
1823+ }
1824+ }
1825+ None
1826+ } )
1827+ }
1828+ }
1829+
1830+ #[ cfg( with_testing) ]
1831+ impl < S : Stream < Item = Result < Notification > > > NotificationsExt for Pin < Box < S > > {
1832+ async fn wait_for < T > ( & mut self , mut f : impl FnMut ( Notification ) -> Option < T > ) -> Result < T > {
1833+ let mut timeout = Box :: pin ( linera_base:: time:: timer:: sleep ( notification_timeout ( ) ) ) . fuse ( ) ;
1834+ loop {
1835+ let notification = futures:: select! {
1836+ ( ) = timeout => bail!( "Timeout waiting for notification" ) ,
1837+ notification = self . next( ) . fuse( ) => notification. context( "Stream closed" ) ??,
1838+ } ;
1839+ if let Some ( t) = f ( notification) {
1840+ return Ok ( t) ;
1841+ }
1842+ }
1843+ }
1844+ }
0 commit comments