Skip to content

Commit 6aceb34

Browse files
authored
Merge pull request #15 from drift-labs/master
add pub/sub heartbeat channel
2 parents d6b57e4 + bda9c73 commit 6aceb34

File tree

3 files changed

+108
-18
lines changed

3 files changed

+108
-18
lines changed

src/swift_server.rs

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,75 @@ pub async fn process_order(
231231
}
232232
}
233233

234+
pub async fn send_heartbeat(server_params: &mut ServerParams) -> () {
235+
let hearbeat_time = unix_now_ms();
236+
let log_prefix = format!("[hearbeat: {hearbeat_time}]");
237+
238+
if let Some(kafka_producer) = &server_params.kafka_producer {
239+
match kafka_producer
240+
.send(
241+
FutureRecord::<String, String>::to(&"hearbeat").payload(&"love you".to_string()),
242+
Timeout::After(Duration::ZERO),
243+
)
244+
.await
245+
{
246+
Ok(_) => {
247+
log::trace!(target: "kafka", "{log_prefix}: Sent heartbeat");
248+
server_params
249+
.metrics
250+
.order_type_counter
251+
.with_label_values(&["_", "heartbeat"])
252+
.inc();
253+
254+
server_params
255+
.metrics
256+
.response_time_histogram
257+
.observe((unix_now_ms() - hearbeat_time) as f64);
258+
}
259+
Err((e, _)) => {
260+
log::error!(
261+
target: "kafka",
262+
"{log_prefix}: Failed to deliver heartbeat, error: {e:?}"
263+
);
264+
server_params.metrics.kafka_forward_fail_counter.inc();
265+
}
266+
}
267+
} else {
268+
let mut conn = match server_params.redis_pool.as_ref().unwrap().get().await {
269+
Ok(conn) => conn,
270+
Err(_) => {
271+
log::error!(target: "redis", "{log_prefix}: Obtaining redis connection failed");
272+
return;
273+
}
274+
};
275+
276+
match conn
277+
.publish::<String, String, i64>("heartbeat".to_string(), "love you".to_string())
278+
.await
279+
{
280+
Ok(_) => {
281+
log::trace!(target: "redis", "{log_prefix}: Sent redis heartbeat");
282+
server_params
283+
.metrics
284+
.order_type_counter
285+
.with_label_values(&["_", "heartbeat"])
286+
.inc();
287+
288+
server_params
289+
.metrics
290+
.response_time_histogram
291+
.observe((unix_now_ms() - hearbeat_time) as f64);
292+
}
293+
Err(e) => {
294+
log::error!(
295+
target: "redis",
296+
"{log_prefix}: Failed to deliver heartbeat, error: {e:?}"
297+
);
298+
}
299+
}
300+
}
301+
}
302+
234303
pub async fn health_check() -> impl axum::response::IntoResponse {
235304
axum::http::StatusCode::OK
236305
}
@@ -408,10 +477,25 @@ pub async fn start_server() {
408477
}
409478
});
410479

480+
let mut state_clone = state.clone();
481+
let send_heartbeat_loop = tokio::spawn(async move {
482+
let mut interval = tokio::time::interval(Duration::from_secs(2));
483+
484+
loop {
485+
interval.tick().await;
486+
send_heartbeat(&mut state_clone).await;
487+
}
488+
});
489+
411490
let axum_server = tokio::spawn(async { axum::serve(listener, app).await });
412491
let metrics_server = tokio::spawn(async { axum::serve(listener_metrics, metrics_app).await });
413492

414-
let _ = tokio::try_join!(rpc_sim_loop, axum_server, metrics_server);
493+
let _ = tokio::try_join!(
494+
rpc_sim_loop,
495+
axum_server,
496+
metrics_server,
497+
send_heartbeat_loop
498+
);
415499
}
416500

417501
/// Simple validation from program's `handle_signed_order_ix`

src/types/messages.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use base64::Engine;
1010
use drift_rs::types::{MarketType, SignedMsgOrderParamsMessage};
1111
use ed25519_dalek::{PublicKey, Signature, Verifier};
1212
use serde_json::json;
13-
use solana_sdk::{hash::Hash, pubkey::Pubkey};
13+
use solana_sdk::pubkey::Pubkey;
1414

1515
#[derive(AnchorDeserialize, AnchorSerialize, Debug, Clone, InitSpace, Copy, Default, PartialEq)]
1616
pub struct SignedOrderParamsMessageWithPrefix {
@@ -241,10 +241,6 @@ pub struct BorshBuf<const N: usize> {
241241
}
242242

243243
impl<const N: usize> BorshBuf<N> {
244-
/// Get the SHA256 digest of the buffer
245-
pub fn sha256_digest(&self) -> Hash {
246-
solana_sdk::hash::hash(self.data())
247-
}
248244
/// Deserialize the buffer as `T`
249245
pub fn deserialize<T: anchor_lang::AnchorDeserialize>(&self) -> Result<T, std::io::Error> {
250246
T::deserialize(&mut self.data())
@@ -290,7 +286,9 @@ where
290286
}
291287

292288
/// Deserialize base64 str as fixed size byte array with Borsh helpers
293-
pub fn base64_to_borsh_buf<'de, D, const N: usize>(deserializer: D) -> Result<BorshBuf<N>, D::Error>
289+
pub fn _base64_to_borsh_buf<'de, D, const N: usize>(
290+
deserializer: D,
291+
) -> Result<BorshBuf<N>, D::Error>
294292
where
295293
D: serde::Deserializer<'de>,
296294
{

src/ws_server.rs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,7 @@ async fn subscribe_kafka_consumer(
599599
topics_prefix: &str,
600600
) {
601601
kafka_consumer
602-
.subscribe(&[topics_prefix])
602+
.subscribe(&[topics_prefix, &"heartbeat"])
603603
.context("Failed to subscribe to topics")
604604
.expect("subscribed topic prefix");
605605

@@ -626,9 +626,11 @@ async fn subscribe_kafka_consumer(
626626
.expect("topic channel exists");
627627

628628
if (tx.receiver_count() as u32) < 1 {
629-
// TODO: consumer got the message but it won't be forwarded since no one's listening??
630-
// do we want to have something where it replays recent message on connect?
631-
log::error!(target: "kafka", "No receiver found for topic: {topic}, order message lost!");
629+
if topic != "heartbeat" {
630+
log::warn!(target: "kafka", "No receiver found for topic: {topic}, order message lost!");
631+
} else {
632+
log::debug!(target: "kafka", "Received heartbeat message");
633+
}
632634
continue;
633635
}
634636
let payload: &[u8] = message.payload().context("Failed to get payload").unwrap();
@@ -728,9 +730,11 @@ async fn subscribe_redis_pubsub(
728730
.expect("topic channel exists");
729731

730732
if (tx.receiver_count() as u32) < 1 {
731-
// TODO: consumer got the message but it won't be forwarded since no one's listening??
732-
// do we want to have something where it replays recent message on connect?
733-
log::error!(target: "ws", "No receiver found for topic: {topic}, order message lost!");
733+
if topic != "heartbeat" {
734+
log::warn!(target: "ws", "No receiver found for topic: {topic}, order message lost!");
735+
} else {
736+
log::debug!(target: "ws", "Received heartbeat message");
737+
}
734738
continue;
735739
}
736740
let payload: &[u8] = message.get_payload_bytes();
@@ -793,8 +797,12 @@ pub async fn start_server() {
793797

794798
// Set up the server with the server params
795799
let subscriptions = DashMap::new();
796-
let mut topic_names: Vec<String> = vec![];
800+
let mut topic_names: Vec<String> = vec!["heartbeat".to_string()];
797801
for market in &perp_market_accounts {
802+
if market.symbol().contains("BET") {
803+
log::info!("Skipping BET market");
804+
continue;
805+
}
798806
let topic = format!(
799807
"swift_orders_{}_{}",
800808
market.market_type(),
@@ -1299,7 +1307,7 @@ mod test {
12991307
let mut ws_conn = WsConnection::new(Pubkey::new_unique());
13001308
ws_conn.authenticated = true;
13011309

1302-
ws_conn
1310+
let _ = ws_conn
13031311
.spawn_handler(&mut client_rx, &mut client_tx, Box::leak(Box::default()))
13041312
.await;
13051313

@@ -1330,7 +1338,7 @@ mod test {
13301338
let mut client_rx = vec![];
13311339
let mut client_tx = stream::iter([Ok(Message::Close(None))]);
13321340
let ws_conn = WsConnection::new(Pubkey::new_unique());
1333-
ws_conn
1341+
let _ = ws_conn
13341342
.spawn_handler(&mut client_rx, &mut client_tx, Box::leak(Box::default()))
13351343
.await;
13361344
}
@@ -1344,7 +1352,7 @@ mod test {
13441352
let _ = ws_conn.send_message(WsMessage::heartbeat());
13451353
let _ = ws_conn.send_message(WsMessage::heartbeat());
13461354
let _ = ws_conn.send_message(WsMessage::heartbeat());
1347-
ws_conn
1355+
let _ = ws_conn
13481356
.spawn_handler(&mut client_rx, &mut client_tx, Box::leak(Box::default()))
13491357
.await;
13501358
assert!(client_rx.len() >= 3);

0 commit comments

Comments
 (0)