Skip to content

Commit c713798

Browse files
committed
use DriaMessage for heartbeat
1 parent 96a7936 commit c713798

File tree

1 file changed

+25
-15
lines changed

1 file changed

+25
-15
lines changed

compute/src/reqres/heartbeat.rs

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
1-
use std::time::Duration;
2-
31
use dkn_p2p::libp2p::{request_response::OutboundRequestId, PeerId};
4-
use dkn_workflows::{Model, ModelProvider};
52
use eyre::{eyre, Result};
3+
use serde::{Deserialize, Serialize};
4+
use std::time::Duration;
65
use uuid::Uuid;
76

87
use super::IsResponder;
9-
use serde::{Deserialize, Serialize};
108

11-
use crate::DriaComputeNode;
9+
use crate::{utils::DriaMessage, DriaComputeNode};
1210

1311
pub struct HeartbeatRequester;
1412

@@ -18,8 +16,8 @@ pub struct HeartbeatRequest {
1816
pub(crate) heartbeat_id: Uuid,
1917
/// Deadline for the heartbeat request, in nanoseconds.
2018
pub(crate) deadline: chrono::DateTime<chrono::Utc>,
21-
/// Models available in the node.
22-
pub(crate) models: Vec<(ModelProvider, Model)>,
19+
/// Model names available in the node.
20+
pub(crate) models: Vec<String>,
2321
/// Number of tasks in the channel currently, `single` and `batch`.
2422
pub(crate) pending_tasks: [usize; 2],
2523
}
@@ -39,13 +37,16 @@ pub struct HeartbeatResponse {
3937
}
4038

4139
impl IsResponder for HeartbeatRequester {
42-
type Request = HeartbeatRequest;
43-
type Response = HeartbeatResponse;
40+
type Request = DriaMessage; // TODO: HeartbeatRequest;
41+
type Response = DriaMessage; // TODO: HeartbeatResponse;
4442
}
4543

4644
/// Any acknowledged heartbeat that is older than this duration is considered dead.
4745
const HEARTBEAT_DEADLINE_SECS: Duration = Duration::from_secs(20);
4846

47+
/// Topic for the [`DriaMessage`].
48+
const HEARTBEAT_TOPIC: &str = "heartbeat";
49+
4950
impl HeartbeatRequester {
5051
pub(crate) async fn send_heartbeat(
5152
node: &mut DriaComputeNode,
@@ -57,16 +58,23 @@ impl HeartbeatRequester {
5758
let heartbeat_request = HeartbeatRequest {
5859
heartbeat_id: uuid,
5960
deadline,
60-
models: node.config.workflows.models.clone(),
61+
models: node
62+
.config
63+
.workflows
64+
.models
65+
.iter()
66+
.map(|m| m.1.to_string())
67+
.collect(),
6168
pending_tasks: node.get_pending_task_count(),
6269
};
6370

71+
let heartbeat_message = node.new_message(
72+
serde_json::to_vec(&heartbeat_request).expect("should be serializable"),
73+
HEARTBEAT_TOPIC,
74+
);
6475
let request_id = node
6576
.p2p
66-
.request(
67-
peer_id,
68-
serde_json::to_vec(&heartbeat_request).expect("should be serializable"),
69-
)
77+
.request(peer_id, heartbeat_message.to_bytes()?)
7078
.await?;
7179

7280
// add it to local heartbeats set
@@ -78,8 +86,10 @@ impl HeartbeatRequester {
7886
/// Handles the heartbeat request received from the network.
7987
pub(crate) async fn handle_ack(
8088
node: &mut DriaComputeNode,
81-
res: HeartbeatResponse,
89+
ack_message: DriaMessage,
8290
) -> Result<()> {
91+
let res = ack_message.parse_payload::<HeartbeatResponse>()?;
92+
8393
if let Some(deadline) = node.heartbeats.remove(&res.heartbeat_id) {
8494
if let Some(err) = res.error {
8595
Err(eyre!("Heartbeat was not acknowledged: {}", err))

0 commit comments

Comments
 (0)