Skip to content

Commit 0496481

Browse files
committed
Split relayer sessions into separate threads
1 parent d27384d commit 0496481

File tree

1 file changed

+140
-101
lines changed

1 file changed

+140
-101
lines changed

src/agent/services/lazer_exporter.rs

Lines changed: 140 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use {
33
anyhow::{
44
Result,
55
anyhow,
6+
bail,
67
},
78
futures_util::{
89
SinkExt,
@@ -24,6 +25,11 @@ use {
2425
},
2526
tokio::{
2627
net::TcpStream,
28+
select,
29+
sync::mpsc::{
30+
self,
31+
Receiver,
32+
},
2733
task::JoinHandle,
2834
},
2935
tokio_tungstenite::{
@@ -42,6 +48,8 @@ use {
4248
url::Url,
4349
};
4450

51+
pub const RELAYER_CHANNEL_CAPACITY: usize = 1000;
52+
4553
#[derive(Clone, Debug, Deserialize)]
4654
pub struct Config {
4755
pub history_url: Url,
@@ -56,21 +64,21 @@ fn default_publish_interval() -> Duration {
5664
Duration::from_millis(200)
5765
}
5866

59-
struct RelayerSender {
60-
ws_senders: Vec<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>>,
67+
struct RelayerWsSession {
68+
ws_sender: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>,
6169
}
6270

63-
impl RelayerSender {
64-
async fn send_price_update(
71+
impl RelayerWsSession {
72+
async fn send_transaction(
6573
&mut self,
6674
signed_lazer_transaction: &SignedLazerTransaction,
6775
) -> Result<()> {
6876
tracing::debug!("price_update: {:?}", signed_lazer_transaction);
6977
let buf = signed_lazer_transaction.write_to_bytes()?;
70-
for sender in self.ws_senders.iter_mut() {
71-
sender.send(TungsteniteMessage::from(buf.clone())).await?;
72-
sender.flush().await?;
73-
}
78+
self.ws_sender
79+
.send(TungsteniteMessage::from(buf.clone()))
80+
.await?;
81+
self.ws_sender.flush().await?;
7482
Ok(())
7583
}
7684
}
@@ -88,31 +96,80 @@ async fn connect_to_relayer(
8896
let headers = req.headers_mut();
8997
headers.insert(
9098
"Authorization",
91-
HeaderValue::from_str(&format!("Bearer {}", token))?,
99+
HeaderValue::from_str(&format!("Bearer {token}"))?,
92100
);
93101
let (ws_stream, _) = connect_async_with_config(req, None, true).await?;
94102
Ok(ws_stream.split())
95103
}
96104

97-
async fn connect_to_relayers(
98-
config: &Config,
99-
) -> Result<(
100-
RelayerSender,
101-
Vec<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
102-
)> {
103-
let mut relayer_senders = Vec::new();
104-
let mut relayer_receivers = Vec::new();
105-
for url in config.relayer_urls.clone() {
106-
let (relayer_sender, relayer_receiver) =
107-
connect_to_relayer(url, &config.authorization_token).await?;
108-
relayer_senders.push(relayer_sender);
109-
relayer_receivers.push(relayer_receiver);
105+
struct RelayerSessionTask {
106+
// connection state
107+
url: Url,
108+
token: String,
109+
receiver: Receiver<SignedLazerTransaction>,
110+
}
111+
112+
impl RelayerSessionTask {
113+
pub async fn run(&mut self) {
114+
let mut failure_count = 0;
115+
let retry_duration = Duration::from_secs(1);
116+
117+
loop {
118+
match self.run_relayer_connection().await {
119+
Ok(()) => {
120+
tracing::info!("relayer session graceful shutdown");
121+
return;
122+
}
123+
Err(e) => {
124+
failure_count += 1;
125+
tracing::error!(
126+
"relayer session failed with error: {:?}, failure_count: {}; retrying in {:?}",
127+
e,
128+
failure_count,
129+
retry_duration
130+
);
131+
tokio::time::sleep(retry_duration).await;
132+
}
133+
}
134+
}
135+
}
136+
137+
pub async fn run_relayer_connection(&mut self) -> Result<()> {
138+
// Establish relayer connection
139+
// Relayer will drop the connection if no data received in 5s
140+
let (relayer_ws_sender, mut relayer_ws_receiver) =
141+
connect_to_relayer(self.url.clone(), &self.token).await?;
142+
let mut relayer_ws_session = RelayerWsSession {
143+
ws_sender: relayer_ws_sender,
144+
};
145+
146+
loop {
147+
select! {
148+
Some(transaction) = self.receiver.recv() => {
149+
if let Err(e) = relayer_ws_session.send_transaction(&transaction).await
150+
{
151+
tracing::error!("Error publishing transaction to Lazer relayer: {e:?}");
152+
bail!("Failed to publish transaction to Lazer relayer: {e:?}");
153+
}
154+
}
155+
// Handle messages from the relayers, such as errors if we send a bad update
156+
msg = relayer_ws_receiver.next() => {
157+
match msg {
158+
Some(Ok(msg)) => {
159+
tracing::debug!("Received message from relayer: {msg:?}");
160+
}
161+
Some(Err(e)) => {
162+
tracing::error!("Error receiving message from at relayer: {e:?}");
163+
}
164+
None => {
165+
tracing::error!("relayer connection closed");
166+
bail!("relayer connection closed");
167+
}
168+
}
169+
}
170+
}
171+
}
110172
}
111-
let sender = RelayerSender {
112-
ws_senders: relayer_senders,
113-
};
114-
tracing::info!("connected to relayers: {:?}", config.relayer_urls);
115-
Ok((sender, relayer_receivers))
116173
}
117174

118175
// TODO: This is copied from history-service; move to Lazer protocol sdk.
@@ -156,10 +213,26 @@ async fn fetch_symbols(history_url: &Url) -> Result<Vec<SymbolResponse>> {
156213

157214
#[instrument(skip(config, state))]
158215
pub fn lazer_exporter(config: Config, state: Arc<state::State>) -> Vec<JoinHandle<()>> {
159-
let handles = vec![tokio::spawn(lazer_exporter::lazer_exporter(
216+
let mut handles = vec![];
217+
let mut relayer_senders = vec![];
218+
219+
for url in config.relayer_urls.iter() {
220+
let (sender, receiver) = mpsc::channel(RELAYER_CHANNEL_CAPACITY);
221+
let mut task = RelayerSessionTask {
222+
url: url.clone(),
223+
token: config.authorization_token.to_owned(),
224+
receiver,
225+
};
226+
handles.push(tokio::spawn(async move { task.run().await }));
227+
relayer_senders.push(sender);
228+
}
229+
230+
handles.push(tokio::spawn(lazer_exporter::lazer_exporter(
160231
config.clone(),
161232
state,
162-
))];
233+
relayer_senders,
234+
)));
235+
163236
handles
164237
}
165238

@@ -170,20 +243,19 @@ mod lazer_exporter {
170243
services::lazer_exporter::{
171244
Config,
172245
SymbolResponse,
173-
connect_to_relayers,
174246
fetch_symbols,
175247
},
176248
state::local::LocalStore,
177249
},
178250
anyhow::{
179251
Context,
252+
Result,
180253
bail,
181254
},
182255
ed25519_dalek::{
183256
Signer,
184257
SigningKey,
185258
},
186-
futures_util::StreamExt,
187259
protobuf::{
188260
Message,
189261
MessageField,
@@ -209,44 +281,44 @@ mod lazer_exporter {
209281
std::{
210282
collections::HashMap,
211283
sync::Arc,
212-
time::Duration,
213284
},
214-
tokio_stream::StreamMap,
285+
tokio::sync::mpsc::Sender,
215286
};
216287

217-
pub async fn lazer_exporter<S>(config: Config, state: Arc<S>)
218-
where
219-
S: LocalStore,
220-
S: Send + Sync + 'static,
221-
{
222-
let mut failure_count = 0;
223-
let retry_duration = Duration::from_secs(1);
224-
225-
loop {
226-
match run(&config, state.clone()).await {
227-
Ok(()) => {
228-
tracing::info!("lazer_exporter graceful shutdown");
229-
return;
230-
}
231-
Err(e) => {
232-
failure_count += 1;
233-
tracing::error!(
234-
"lazer_exporter failed with error: {:?}, failure_count: {}; retrying in {:?}",
235-
e,
236-
failure_count,
237-
retry_duration
238-
);
239-
tokio::time::sleep(retry_duration).await;
240-
}
288+
fn get_signing_key(config: &Config) -> Result<SigningKey> {
289+
// Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher
290+
let publish_keypair = match keypair::read_keypair_file(&config.publish_keypair_path) {
291+
Ok(k) => k,
292+
Err(e) => {
293+
tracing::error!(
294+
error = ?e,
295+
publish_keypair_path = config.publish_keypair_path.display().to_string(),
296+
"Reading publish keypair returned an error. ",
297+
);
298+
bail!("Reading publish keypair returned an error. ");
241299
}
242-
}
300+
};
301+
302+
SigningKey::from_keypair_bytes(&publish_keypair.to_bytes())
303+
.context("Failed to create signing key from keypair")
243304
}
244305

245-
async fn run<S>(config: &Config, state: Arc<S>) -> anyhow::Result<()>
246-
where
306+
pub async fn lazer_exporter<S>(
307+
config: Config,
308+
state: Arc<S>,
309+
relayer_senders: Vec<Sender<SignedLazerTransaction>>,
310+
) where
247311
S: LocalStore,
248312
S: Send + Sync + 'static,
249313
{
314+
let signing_key = match get_signing_key(&config) {
315+
Ok(signing_key) => signing_key,
316+
Err(e) => {
317+
tracing::error!("lazer_exporter signing key failure: {e:?}");
318+
return;
319+
}
320+
};
321+
250322
// TODO: Re-fetch on an interval?
251323
let lazer_symbols: HashMap<pyth_sdk::Identifier, SymbolResponse> =
252324
match fetch_symbols(&config.history_url).await {
@@ -265,33 +337,10 @@ mod lazer_exporter {
265337
.collect(),
266338
Err(e) => {
267339
tracing::error!("Failed to fetch Lazer symbols: {e:?}");
268-
bail!("Failed to fetch Lazer symbols: {e:?}");
340+
return;
269341
}
270342
};
271343

272-
// Establish relayer connections
273-
// Relayer will drop the connection if no data received in 5s
274-
let (mut relayer_sender, relayer_receivers) = connect_to_relayers(config).await?;
275-
let mut stream_map = StreamMap::new();
276-
for (i, receiver) in relayer_receivers.into_iter().enumerate() {
277-
stream_map.insert(config.relayer_urls[i].clone(), receiver);
278-
}
279-
280-
// Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher
281-
let publish_keypair = match keypair::read_keypair_file(&config.publish_keypair_path) {
282-
Ok(k) => k,
283-
Err(e) => {
284-
tracing::error!(
285-
error = ?e,
286-
publish_keypair_path = config.publish_keypair_path.display().to_string(),
287-
"Reading publish keypair returned an error. ",
288-
);
289-
bail!("Reading publish keypair returned an error. ");
290-
}
291-
};
292-
293-
let signing_key = SigningKey::from_keypair_bytes(&publish_keypair.to_bytes())
294-
.context("Failed to create signing key from keypair")?;
295344
let mut publish_interval = tokio::time::interval(config.publish_interval_duration);
296345

297346
loop {
@@ -356,23 +405,13 @@ mod lazer_exporter {
356405
payload: Some(buf),
357406
special_fields: Default::default(),
358407
};
359-
if let Err(e) = relayer_sender.send_price_update(&signed_lazer_transaction).await {
360-
tracing::error!("Error publishing update to Lazer relayer: {e:?}");
361-
bail!("Failed to publish update to Lazer relayer: {e:?}");
362-
}
363-
}
364-
// Handle messages from the relayers, such as errors if we send a bad update
365-
mapped_msg = stream_map.next() => {
366-
match mapped_msg {
367-
Some((relayer_url, Ok(msg))) => {
368-
tracing::debug!("Received message from relayer at {relayer_url}: {msg:?}");
369-
}
370-
Some((relayer_url, Err(e))) => {
371-
tracing::error!("Error receiving message from at relayer {relayer_url}: {e:?}");
372-
}
373-
None => {
374-
tracing::error!("relayer connection closed");
375-
bail!("relayer connection closed");
408+
for relayer_sender in relayer_senders.iter() {
409+
if let Err(e) = relayer_sender
410+
.send(signed_lazer_transaction.clone())
411+
.await
412+
{
413+
tracing::error!("Error sending transaction to Lazer relayer session: {e:?}");
414+
// TODO: Under what circumstances would the channel be hosed and is it worth retry?
376415
}
377416
}
378417
}

0 commit comments

Comments
 (0)