Skip to content

Commit f0552e5

Browse files
author
Stanisław Drozd
authored
pyth2wormhole-client: refactor into fully-async futures-based approach (#219)
* pyth2wormhole-client: refactor into fully-async futures-based approach commit-id:2ed35045 * p2w-client: Change inconsistent rpc constructor commit-id:cb3b2ff2 * p2w-client: Move job creation to a function, simplify comment commit-id:35328b38 * pyth2wormhole-client: Use get_multiple_accounts commit-id:7fc85157 * Implement a rate-limited mutex for RPC client commit-id:1a243063 * pyth2wormhole-client: only guard beginning new requests in RLMutex commit-id:d8251474 * pyth2wormhole-client: RLMutex: ensure the inner guard is not dropped commit-id:c3513f5e * pyth2wormhole-client: Clarify attestation_sched_futs comment commit-id:97033670 * pyth2wormhole-client: Use CommitmentConfig's native FromStr parsing commit-id:835d7125 * pyth2wormhole-client: doc comment typo commit-id:5ee388de * pyth2wormhole-client: move closures to their own async functions This makes the main.rs async attestation routines easier to read. commit-id:3565a744 * pyth2wormhole-client: fix merge typo * pyth2wormhole-client: Apply Tom's readability advice * pyth2wormhole-client reword attestation_sched_job() comment * pyth2wormhole-client: expand attestation_sched_job() comment * pyth2wormhole-client: e x p a n d the comment * Trigger CI * p2w-client/main.rs: correct missing awaits after merge
1 parent e76181a commit f0552e5

File tree

11 files changed

+532
-410
lines changed

11 files changed

+532
-410
lines changed

solana/pyth2wormhole/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

solana/pyth2wormhole/client/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ solana-sdk = "=1.10.13"
2929
solana-transaction-status = "=1.10.13"
3030
solitaire-client = {path = "../../solitaire/client"}
3131
solitaire = {path = "../../solitaire/program"}
32+
tokio = {version = "1", features = ["sync", "rt", "time"]}
33+
futures = "0.3.21"
3234

3335
[dev-dependencies]
3436
pyth-client = "0.5.0"

solana/pyth2wormhole/client/src/attestation_cfg.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,30 @@ pub struct SymbolGroup {
2626
pub symbols: Vec<P2WSymbol>,
2727
}
2828

29-
pub const fn DEFAULT_MIN_INTERVAL_SECS() -> u64 {
29+
pub const fn default_min_interval_secs() -> u64 {
3030
60
3131
}
3232

33+
pub const fn default_max_batch_jobs() -> usize {
34+
20
35+
}
36+
3337
/// Spontaneous attestation triggers. Attestation is triggered if any
3438
/// of the active conditions is met. Option<> fields can be
3539
/// de-activated with None. All conditions are inactive by default,
36-
/// except for min_interval_secs set to 1 minute.
40+
/// except for the non-Option ones.
3741
#[derive(Clone, Default, Debug, Deserialize, Serialize, PartialEq)]
3842
pub struct AttestationConditions {
3943
/// Baseline, unconditional attestation interval. Attestation is triggered if the specified interval elapsed since last attestation.
40-
#[serde(default = "DEFAULT_MIN_INTERVAL_SECS")]
44+
#[serde(default = "default_min_interval_secs")]
4145
pub min_interval_secs: u64,
4246

47+
/// Limit concurrent attestation attempts per batch. This setting
48+
/// should act only as a failsafe cap on resource consumption and is
49+
/// best set well above the expected average number of jobs.
50+
#[serde(default = "default_max_batch_jobs")]
51+
pub max_batch_jobs: usize,
52+
4353
/// Trigger attestation if price changes by the specified percentage.
4454
#[serde(default)]
4555
pub price_changed_pct: Option<f64>,
@@ -51,7 +61,7 @@ pub struct AttestationConditions {
5161
}
5262

5363
/// Config entry for a Pyth product + price pair
54-
#[derive(Default, Debug, Deserialize, Serialize, PartialEq, Eq)]
64+
#[derive(Clone, Default, Debug, Deserialize, Serialize, PartialEq, Eq)]
5565
pub struct P2WSymbol {
5666
/// User-defined human-readable name
5767
pub name: Option<String>,

solana/pyth2wormhole/client/src/batch_state.rs

Lines changed: 42 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
use futures::future::TryFutureExt;
12
use log::{
23
debug,
4+
trace,
35
warn,
46
};
5-
use solana_client::rpc_client::RpcClient;
7+
use solana_client::nonblocking::rpc_client::RpcClient;
68
use solana_sdk::signature::Signature;
79

810
use pyth_sdk_solana::state::PriceAccount;
@@ -16,17 +18,18 @@ use crate::{
1618
AttestationConditions,
1719
ErrBox,
1820
P2WSymbol,
21+
RLMutex,
1922
};
2023

24+
/// Runtime representation of a batch. It refers to the original group
25+
/// from the config.
2126
#[derive(Debug)]
2227
pub struct BatchState<'a> {
2328
pub group_name: String,
2429
pub symbols: &'a [P2WSymbol],
2530
pub last_known_symbol_states: Vec<Option<PriceAccount>>,
2631
pub conditions: AttestationConditions,
27-
status: BatchTxStatus,
28-
status_changed_at: Instant,
29-
pub last_success_at: Option<Instant>,
32+
pub last_job_finished_at: Instant,
3033
}
3134

3235
impl<'a> BatchState<'a> {
@@ -40,63 +43,51 @@ impl<'a> BatchState<'a> {
4043
symbols,
4144
conditions,
4245
last_known_symbol_states: vec![None; symbols.len()],
43-
status: BatchTxStatus::Sending { attempt_no: 1 },
44-
status_changed_at: Instant::now(),
45-
last_success_at: None,
46+
last_job_finished_at: Instant::now(),
4647
}
4748
}
48-
/// Ensure only set_status() alters the timestamp
49-
pub fn get_status_changed_at(&self) -> &Instant {
50-
&self.status_changed_at
51-
}
52-
pub fn get_status(&self) -> &BatchTxStatus {
53-
&self.status
54-
}
55-
56-
/// Ensure that status changes are accompanied by a timestamp bump
57-
pub fn set_status(&mut self, s: BatchTxStatus) {
58-
self.status_changed_at = Instant::now();
59-
self.status = s;
60-
}
6149

6250
/// Evaluate the configured attestation conditions for this
6351
/// batch. RPC is used to update last known state. Returns
6452
/// Some("<reason>") if any trigger condition was met. Only the
6553
/// first encountered condition is mentioned.
66-
pub fn should_resend(&mut self, c: &RpcClient) -> Option<String> {
54+
pub async fn should_resend(&mut self, c: &RpcClient) -> Option<String> {
6755
let mut ret = None;
6856

6957
let sym_count = self.symbols.len();
70-
let mut new_symbol_states: Vec<Option<PriceAccount>> = Vec::with_capacity(sym_count);
71-
for (idx, sym) in self.symbols.iter().enumerate() {
72-
let new_state = match c
73-
.get_account_data(&sym.price_addr)
74-
.map_err(|e| e.to_string())
75-
.and_then(|bytes| {
76-
pyth_sdk_solana::state::load_price_account(&bytes)
77-
.map(|state| state.clone())
78-
.map_err(|e| e.to_string())
79-
}) {
80-
Ok(state) => Some(state),
81-
Err(e) => {
82-
warn!(
83-
"Symbol {} ({}/{}): Could not look up state: {}",
84-
sym.name
85-
.as_ref()
86-
.unwrap_or(&format!("Unnamed product {}", sym.product_addr)),
87-
idx + 1,
88-
sym_count,
89-
e.to_string()
90-
);
91-
None
92-
}
93-
};
58+
let pubkeys: Vec<_> = self.symbols.iter().map(|s| s.price_addr).collect();
9459

95-
new_symbol_states.push(new_state);
96-
}
60+
// Always learn the current on-chain state for each symbol, use None values if lookup fails
61+
let mut new_symbol_states: Vec<Option<PriceAccount>> = match c
62+
.get_multiple_accounts(&pubkeys)
63+
.await
64+
{
65+
Ok(acc_opts) => {
66+
acc_opts
67+
.into_iter()
68+
.enumerate()
69+
.map(|(idx, opt)| {
70+
// Take each Some(acc), make it None and log on load_price_account() error
71+
opt.and_then(|acc| {
72+
pyth_sdk_solana::state::load_price_account(&acc.data)
73+
.cloned() // load_price_account() transmutes the data reference into another reference, and owning acc_opts is not enough
74+
.map_err(|e| {
75+
warn!("Could not parse symbol {}/{}: {}", idx, sym_count, e);
76+
e
77+
})
78+
.ok() // Err becomes None
79+
})
80+
})
81+
.collect()
82+
}
83+
Err(e) => {
84+
warn!("Could not look up any symbols on-chain: {}", e);
85+
vec![None; sym_count]
86+
}
87+
};
9788

9889
// min interval
99-
if self.get_status_changed_at().elapsed()
90+
if self.last_job_finished_at.elapsed()
10091
> Duration::from_secs(self.conditions.min_interval_secs)
10192
{
10293
ret = Some(format!(
@@ -154,7 +145,9 @@ impl<'a> BatchState<'a> {
154145
}
155146
}
156147

157-
// Update with newer state if a condition was met
148+
// Update with newer state only if a condition was met. We
149+
// don't want to shadow changes that may happen over a larger
150+
// period between state lookups.
158151
if ret.is_some() {
159152
for (old, new) in self
160153
.last_known_symbol_states
@@ -170,23 +163,3 @@ impl<'a> BatchState<'a> {
170163
return ret;
171164
}
172165
}
173-
174-
#[derive(Debug)]
175-
pub enum BatchTxStatus {
176-
Sending {
177-
attempt_no: usize,
178-
},
179-
Confirming {
180-
attempt_no: usize,
181-
signature: Signature,
182-
},
183-
Success {
184-
seqno: String,
185-
},
186-
FailedSend {
187-
last_err: ErrBox,
188-
},
189-
FailedConfirm {
190-
last_err: ErrBox,
191-
},
192-
}

solana/pyth2wormhole/client/src/cli.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! CLI options
22
33
use solana_program::pubkey::Pubkey;
4+
use solana_sdk::commitment_config::CommitmentConfig;
45
use std::path::PathBuf;
56

67
use clap::{
@@ -29,6 +30,14 @@ pub struct Cli {
2930
pub payer: String,
3031
#[clap(short, long, default_value = "http://localhost:8899")]
3132
pub rpc_url: String,
33+
#[clap(
34+
long = "rpc-interval",
35+
default_value = "150",
36+
help = "Rate-limiting minimum delay between RPC requests in milliseconds"
37+
)]
38+
pub rpc_interval_ms: u64,
39+
#[clap(long, default_value = "confirmed")]
40+
pub commitment: CommitmentConfig,
3241
#[clap(long)]
3342
pub p2w_addr: Pubkey,
3443
#[clap(subcommand)]
@@ -60,10 +69,18 @@ pub enum Action {
6069
#[clap(
6170
short = 'n',
6271
long = "--n-retries",
63-
help = "How many times to retry send_transaction() on each batch before flagging a failure.",
72+
help = "How many times to retry send_transaction() on each batch before flagging a failure. Only active outside daemon mode",
6473
default_value = "5"
6574
)]
6675
n_retries: usize,
76+
#[clap(
77+
short = 'i',
78+
long = "--retry-interval",
79+
help = "How long to wait between send_transaction
80+
retries. Only active outside daemon mode",
81+
default_value = "5"
82+
)]
83+
retry_interval_secs: u64,
6784
#[clap(
6885
short = 'd',
6986
long = "--daemon",
@@ -73,17 +90,10 @@ pub enum Action {
7390
#[clap(
7491
short = 't',
7592
long = "--timeout",
76-
help = "How many seconds to wait before giving up on get_transaction() for tx confirmation.",
77-
default_value = "40"
78-
)]
79-
conf_timeout_secs: u64,
80-
#[clap(
81-
short = 'i',
82-
long = "--rpc-interval",
83-
help = "How many milliseconds to wait between SOL RPC requests",
84-
default_value = "200"
93+
help = "How many seconds to wait before giving up on tx confirmation.",
94+
default_value = "20"
8595
)]
86-
rpc_interval_ms: u64,
96+
confirmation_timeout_secs: u64,
8797
},
8898
#[clap(about = "Retrieve a pyth2wormhole program's current settings")]
8999
GetConfig,

solana/pyth2wormhole/client/src/lib.rs

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
pub mod attestation_cfg;
22
pub mod batch_state;
3+
pub mod util;
34

45
use borsh::{
56
BorshDeserialize,
67
BorshSerialize,
78
};
8-
use solana_client::rpc_client::RpcClient;
9+
use solana_client::nonblocking::rpc_client::RpcClient;
910
use solana_program::{
1011
hash::Hash,
1112
instruction::{
@@ -51,19 +52,24 @@ use pyth2wormhole::{
5152
migrate::MigrateAccounts,
5253
set_config::SetConfigAccounts,
5354
AttestData,
54-
Pyth2WormholeConfig,
5555
};
5656

57+
pub use pyth2wormhole::Pyth2WormholeConfig;
58+
5759
pub use attestation_cfg::{
5860
AttestationConditions,
5961
AttestationConfig,
6062
P2WSymbol,
6163
};
62-
pub use batch_state::{
63-
BatchState,
64-
BatchTxStatus,
64+
pub use batch_state::BatchState;
65+
pub use util::{
66+
RLMutex,
67+
RLMutexGuard,
6568
};
6669

70+
/// Future-friendly version of solitaire::ErrBox
71+
pub type ErrBoxSend = Box<dyn std::error::Error + Send + Sync>;
72+
6773
pub fn gen_init_tx(
6874
payer: Keypair,
6975
p2w_addr: Pubkey,
@@ -159,14 +165,17 @@ pub fn gen_migrate_tx(
159165
}
160166

161167
/// Get the current config account data for given p2w program address
162-
pub fn get_config_account(
168+
pub async fn get_config_account(
163169
rpc_client: &RpcClient,
164170
p2w_addr: &Pubkey,
165171
) -> Result<Pyth2WormholeConfig, ErrBox> {
166172
let p2w_config_addr = P2WConfigAccount::<{ AccountState::Initialized }>::key(None, p2w_addr);
167173

168174
let config = Pyth2WormholeConfig::try_from_slice(
169-
rpc_client.get_account_data(&p2w_config_addr)?.as_slice(),
175+
rpc_client
176+
.get_account_data(&p2w_config_addr)
177+
.await?
178+
.as_slice(),
170179
)?;
171180

172181
Ok(config)
@@ -181,7 +190,7 @@ pub fn gen_attest_tx(
181190
symbols: &[P2WSymbol],
182191
wh_msg: &Keypair,
183192
latest_blockhash: Hash,
184-
) -> Result<Transaction, ErrBox> {
193+
) -> Result<Transaction, ErrBoxSend> {
185194
let emitter_addr = P2WEmitter::key(None, &p2w_addr);
186195

187196
let seq_addr = Sequence::key(
@@ -193,11 +202,11 @@ pub fn gen_attest_tx(
193202

194203
let p2w_config_addr = P2WConfigAccount::<{ AccountState::Initialized }>::key(None, &p2w_addr);
195204
if symbols.len() > p2w_config.max_batch_size as usize {
196-
return Err(format!(
205+
return Err((format!(
197206
"Expected up to {} symbols for batch, {} were found",
198207
p2w_config.max_batch_size,
199208
symbols.len()
200-
)
209+
))
201210
.into());
202211
}
203212
// Initial attest() accounts
@@ -267,7 +276,14 @@ pub fn gen_attest_tx(
267276
},
268277
);
269278

270-
let ix = Instruction::new_with_bytes(p2w_addr, ix_data.try_to_vec()?.as_slice(), acc_metas);
279+
let ix = Instruction::new_with_bytes(
280+
p2w_addr,
281+
ix_data
282+
.try_to_vec()
283+
.map_err(|e| -> ErrBoxSend { Box::new(e) })?
284+
.as_slice(),
285+
acc_metas,
286+
);
271287

272288
let tx_signed = Transaction::new_signed_with_payer::<Vec<&Keypair>>(
273289
&[ix],

0 commit comments

Comments
 (0)