Skip to content

Commit 26e3575

Browse files
committed
added data transform
1 parent aebbaa2 commit 26e3575

File tree

5 files changed

+110
-34
lines changed

5 files changed

+110
-34
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ ollama-workflows = { git = "https://github.com/andthattoo/ollama-workflows", rev
4848

4949
# peer-to-peer
5050
libp2p = { git = "https://github.com/anilaltuner/rust-libp2p.git", rev = "be2ed55", features = [
51+
# libp2p = { version = "0.54.1", features = [
5152
"dcutr",
5253
"ping",
5354
"relay",
@@ -64,7 +65,7 @@ libp2p = { git = "https://github.com/anilaltuner/rust-libp2p.git", rev = "be2ed5
6465
"kad",
6566
] }
6667

67-
libp2p-identity = { version = "0.2.9", features = ["secp256k1", "ed25519"] }
68+
libp2p-identity = { version = "0.2.9", features = ["secp256k1"] }
6869
tracing = { version = "0.1.40" }
6970
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
7071

src/p2p/behaviour.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ fn create_gossipsub_behavior(author: PeerId) -> gossipsub::Behaviour {
9292

9393
/// We accept permissive validation mode, meaning that we accept all messages
9494
/// and check their fields based on whether they exist or not.
95-
const VALIDATION_MODE: ValidationMode = ValidationMode::Permissive;
95+
const VALIDATION_MODE: ValidationMode = ValidationMode::None;
9696

9797
/// Gossip cache TTL in seconds
9898
const GOSSIP_TTL_SECS: u64 = 100;
@@ -101,7 +101,7 @@ fn create_gossipsub_behavior(author: PeerId) -> gossipsub::Behaviour {
101101
const MESSAGE_CAPACITY: usize = 100;
102102

103103
/// Max transmit size for payloads 256 KB
104-
const MAX_TRANSMIT_SIZE: usize = 262144;
104+
const MAX_TRANSMIT_SIZE: usize = 256 << 10;
105105

106106
/// Max IHAVE length, this is much lower than the default
107107
/// because we don't need historic messages at all

src/p2p/client.rs

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ impl P2PClient {
7575
}) {
7676
log::info!("Dialling peer: {}", addr);
7777
swarm.dial(addr.clone()).map_err(|e| e.to_string())?;
78-
log::info!("Adding address to Kademlia routing table");
78+
log::info!("Adding {} to Kademlia routing table", addr);
7979
swarm
8080
.behaviour_mut()
8181
.kademlia
@@ -269,38 +269,16 @@ impl P2PClient {
269269
) {
270270
match result {
271271
Ok(GetClosestPeersOk { peers, .. }) => {
272-
if !peers.is_empty() {
273-
log::debug!(
274-
"Kademlia: Query finished with {} closest peers.",
275-
peers.len()
276-
);
277-
for peer in peers {
278-
log::debug!("Gossipsub: Adding peer {0}", peer.peer_id);
279-
self.swarm
280-
.behaviour_mut()
281-
.gossipsub
282-
.add_explicit_peer(&peer.peer_id);
283-
}
284-
} else {
285-
log::warn!("Kademlia: Query finished with no closest peers.");
286-
}
272+
log::info!(
273+
"Kademlia: Query finished with {} closest peers.",
274+
peers.len()
275+
);
287276
}
288277
Err(GetClosestPeersError::Timeout { peers, .. }) => {
289-
if !peers.is_empty() {
290-
log::debug!(
291-
"Kademlia: Query timed out with {} closest peers.",
292-
peers.len()
293-
);
294-
for peer in peers {
295-
log::info!("Gossipsub: Adding peer {0}", peer.peer_id);
296-
self.swarm
297-
.behaviour_mut()
298-
.gossipsub
299-
.add_explicit_peer(&peer.peer_id);
300-
}
301-
} else {
302-
log::warn!("Kademlia: Query timed out with no closest peers.");
303-
}
278+
log::info!(
279+
"Kademlia: Query timed out with {} closest peers.",
280+
peers.len()
281+
);
304282
}
305283
}
306284
}

src/p2p/data_transform.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
use libp2p::gossipsub::{DataTransform, Message, RawMessage, TopicHash};
2+
use std::io::{Error, ErrorKind};
3+
use std::time::{SystemTime, UNIX_EPOCH};
4+
5+
/// A `DataTransform` implementation that adds & checks a timestamp to the message.
6+
pub struct TTLDataTransform {
7+
/// Time-to-live, e.g. obtained from some `duration.as_secs()`.
8+
ttl_secs: u64,
9+
}
10+
11+
impl TTLDataTransform {
12+
const MID_SIZE: usize = 8;
13+
14+
#[inline(always)]
15+
fn get_time_secs(&self) -> u64 {
16+
SystemTime::now()
17+
.duration_since(UNIX_EPOCH)
18+
.unwrap()
19+
.as_secs()
20+
}
21+
}
22+
23+
impl DataTransform for TTLDataTransform {
24+
fn inbound_transform(&self, mut raw_message: RawMessage) -> Result<Message, Error> {
25+
// check length
26+
if raw_message.data.len() < Self::MID_SIZE {
27+
return Err(Error::new(ErrorKind::InvalidInput, "Message too short"));
28+
}
29+
30+
// parse time
31+
let raw_data = raw_message.data.split_off(Self::MID_SIZE);
32+
let msg_time = u64::from_be_bytes(raw_message.data[0..Self::MID_SIZE].try_into().unwrap());
33+
34+
// check ttl
35+
if msg_time + self.ttl_secs < self.get_time_secs() {
36+
return Err(Error::new(ErrorKind::InvalidInput, "Message expired"));
37+
}
38+
39+
Ok(Message {
40+
source: raw_message.source,
41+
data: raw_data,
42+
sequence_number: raw_message.sequence_number,
43+
topic: raw_message.topic,
44+
})
45+
}
46+
47+
fn outbound_transform(
48+
&self,
49+
_topic: &TopicHash,
50+
data: Vec<u8>,
51+
) -> Result<Vec<u8>, std::io::Error> {
52+
let msg_time = self.get_time_secs().to_be_bytes();
53+
54+
// prepend time bytes to the data
55+
let mut transformed_data = Vec::with_capacity(Self::MID_SIZE + data.len());
56+
transformed_data.extend_from_slice(&msg_time);
57+
transformed_data.extend_from_slice(&data);
58+
59+
Ok(transformed_data)
60+
}
61+
}
62+
63+
#[cfg(test)]
64+
mod tests {
65+
use std::time::Duration;
66+
67+
use super::*;
68+
69+
#[test]
70+
fn test_ttl_data_transform() {
71+
let data = vec![1, 2, 3, 4, 5];
72+
let ttl_secs = Duration::from_secs(100).as_secs();
73+
let ttl_data_transform = TTLDataTransform { ttl_secs };
74+
let topic = TopicHash::from_raw("topic");
75+
76+
// outbound transform
77+
let transformed_data = ttl_data_transform
78+
.outbound_transform(&topic, data.clone())
79+
.unwrap();
80+
81+
// inbound transform
82+
let raw_message = RawMessage {
83+
source: Default::default(),
84+
data: transformed_data,
85+
sequence_number: None,
86+
topic,
87+
signature: Default::default(),
88+
key: Default::default(),
89+
validated: false,
90+
};
91+
let message = ttl_data_transform.inbound_transform(raw_message).unwrap();
92+
93+
assert_eq!(message.data, data);
94+
}
95+
}

src/p2p/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,5 @@ pub use message::P2PMessage;
1212

1313
mod available_nodes;
1414
pub use available_nodes::AvailableNodes;
15+
16+
mod data_transform;

0 commit comments

Comments
 (0)