Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/pyth-lazer-agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/pyth-lazer-agent/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-lazer-agent"
version = "0.1.1"
version = "0.1.2"
edition = "2024"

[dependencies]
Expand Down
19 changes: 19 additions & 0 deletions apps/pyth-lazer-agent/src/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ pub struct RelayerRequest(pub http::Request<hyper::body::Incoming>);
const PUBLISHER_WS_URI: &str = "/v1/publisher";
const PUBLISHER_WS_URI_V2: &str = "/v2/publisher";

const READINESS_PROBE_PATH: &str = "/ready";
const LIVENESS_PROBE_PATH: &str = "/live";

pub async fn run(config: Config, lazer_publisher: LazerPublisher) -> Result<()> {
let listener = TcpListener::bind(&config.listen_address).await?;
info!("listening on {:?}", &config.listen_address);
Expand Down Expand Up @@ -74,6 +77,22 @@ async fn request_handler(
let request_type = match path {
PUBLISHER_WS_URI => Request::PublisherV1,
PUBLISHER_WS_URI_V2 => Request::PublisherV2,
LIVENESS_PROBE_PATH => {
let response = Response::builder().status(StatusCode::OK);
return Ok(response.body(FullBody::default())?);
}
READINESS_PROBE_PATH => {
let status = if lazer_publisher
.is_ready
.load(std::sync::atomic::Ordering::Relaxed)
{
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
let response = Response::builder().status(status);
return Ok(response.body(FullBody::default())?);
}
_ => {
return Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
Expand Down
10 changes: 9 additions & 1 deletion apps/pyth-lazer-agent/src/lazer_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use pyth_lazer_publisher_sdk::transaction::{
};
use solana_keypair::read_keypair_file;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use tokio::sync::broadcast;
use tokio::{
select,
Expand All @@ -25,6 +27,7 @@ use tracing::error;
#[derive(Clone)]
pub struct LazerPublisher {
sender: Sender<FeedUpdate>,
pub(crate) is_ready: Arc<AtomicBool>,
}

impl LazerPublisher {
Expand Down Expand Up @@ -66,11 +69,13 @@ impl LazerPublisher {
};

let (relayer_sender, _) = broadcast::channel(CHANNEL_CAPACITY);
let is_ready = Arc::new(AtomicBool::new(false));
for url in config.relayer_urls.iter() {
let mut task = RelayerSessionTask {
url: url.clone(),
token: authorization_token.clone(),
receiver: relayer_sender.subscribe(),
is_ready: is_ready.clone(),
};
tokio::spawn(async move { task.run().await });
}
Expand All @@ -84,7 +89,10 @@ impl LazerPublisher {
signing_key,
};
tokio::spawn(async move { task.run().await });
Self { sender }
Self {
sender,
is_ready: Arc::new(AtomicBool::new(false)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha, created a new one after moving this earlier, this didn't work.

}
}

pub async fn push_feed_update(&self, feed_update: FeedUpdate) -> Result<()> {
Expand Down
9 changes: 9 additions & 0 deletions apps/pyth-lazer-agent/src/relayer_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use futures_util::{SinkExt, StreamExt};
use http::HeaderValue;
use protobuf::Message;
use pyth_lazer_publisher_sdk::transaction::SignedLazerTransaction;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use tokio::net::TcpStream;
use tokio::select;
Expand Down Expand Up @@ -63,6 +65,7 @@ pub struct RelayerSessionTask {
pub url: Url,
pub token: String,
pub receiver: broadcast::Receiver<SignedLazerTransaction>,
pub is_ready: Arc<AtomicBool>,
}

impl RelayerSessionTask {
Expand Down Expand Up @@ -116,6 +119,9 @@ impl RelayerSessionTask {
ws_sender: relayer_ws_sender,
};

// If we have at least one successful connection, mark as ready.
self.is_ready.store(true, Ordering::Relaxed);

loop {
select! {
recv_result = self.receiver.recv() => {
Expand Down Expand Up @@ -174,6 +180,8 @@ mod tests {
Ed25519SignatureData, LazerTransaction, SignatureData, SignedLazerTransaction,
};
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use tokio::net::TcpListener;
use tokio::sync::{broadcast, mpsc};
use url::Url;
Expand Down Expand Up @@ -234,6 +242,7 @@ mod tests {
url: Url::parse("ws://127.0.0.1:12346").unwrap(),
token: "token1".to_string(),
receiver: relayer_receiver,
is_ready: Arc::new(AtomicBool::new(false)),
};
tokio::spawn(async move { relayer_session_task.run().await });
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
Expand Down