Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ path = "examples/udp_link.rs"
name = "link-client"
path = "examples/link_client.rs"

[[example]]
name = "channel-server"
path = "examples/channel_server.rs"

[[example]]
name = "kaonic-client"
path = "examples/kaonic_client.rs"
Expand All @@ -76,3 +80,7 @@ path = "examples/kaonic_tcp_mesh.rs"
[[example]]
name = "kaonic-mesh"
path = "examples/kaonic_mesh.rs"

[[example]]
name = "channel-client"
path = "examples/channel_client.rs"
74 changes: 74 additions & 0 deletions examples/channel_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use rand_core::OsRng;

use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{Duration, sleep};

use reticulum::channel::{Message, WrappedLink};
use reticulum::destination::DestinationName;
use reticulum::identity::PrivateIdentity;
use reticulum::iface::tcp_client::TcpClient;
use reticulum::iface::tcp_server::TcpServer;
use reticulum::transport::{Transport, TransportConfig};

mod channel_util;
use channel_util::ExampleMessage;


#[tokio::main]
async fn main() {
env_logger::Builder::from_env(
env_logger::Env::default().default_filter_or("trace")
).init();

let mut transport = Transport::new(TransportConfig::default());

let client_addr = transport
.iface_manager()
.lock()
.await
.spawn(TcpClient::new("127.0.0.1:4242"), TcpClient::spawn);

let identity = PrivateIdentity::new_from_name("link-example");

let in_destination = transport
.add_destination(
identity,
DestinationName::new("example_utilities", "linkexample")
)
.await;

transport
.send_direct(client_addr, in_destination.lock().await.announce(OsRng, None).unwrap())
.await;

tokio::spawn(async move {
let recv = transport.recv_announces();
let mut recv = recv.await;
let arc_transport = Arc::new(Mutex::new(transport));

let link = if let Ok(announce) = recv.recv().await {
arc_transport.lock().await.link(
announce.destination.lock().await.desc
).await
} else {
log::error!("Could not establish link, is the server running?");
return;
};

let mut wrapped = WrappedLink::<ExampleMessage>::new(link).await;
log::info!("channel created");

let message = ExampleMessage::new_text("foo");

while wrapped.get_channel().send(&message, &arc_transport).await.is_err() {
log::info!("Sending message: Channel not ready, retrying....");
sleep(Duration::from_secs(1)).await;
}

log::info!("message successfully sent over channel");
});

let _ = tokio::signal::ctrl_c().await;
}

118 changes: 118 additions & 0 deletions examples/channel_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use tokio::sync::broadcast::error::TryRecvError;

use rand_core::OsRng;
use reticulum::channel::WrappedLink;
use reticulum::destination::{DestinationName, SingleInputDestination};
use reticulum::destination::link::{Link, LinkEvent, LinkStatus};
use reticulum::hash::AddressHash;
use reticulum::identity::PrivateIdentity;
use reticulum::iface::tcp_server::TcpServer;
use reticulum::transport::{Transport, TransportConfig};

mod channel_util;
use channel_util::ExampleMessage;

#[tokio::main]
async fn main() {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("debug")).init();

log::info!(">>> TCP SERVER FOR CHANNEL EXAMPLE <<<");

let id = PrivateIdentity::new_from_name("link-example");
let mut transport = Transport::new(TransportConfig::new("server", &id, true));
log::trace!("transport instantiated");

let dest = transport.add_destination(
id,
DestinationName::new("example_utilities", "linkexample")
).await;

let _ = transport.iface_manager().lock().await.spawn(
TcpServer::new("0.0.0.0:4242", transport.iface_manager()),
TcpServer::spawn);

let mut announce_recv = transport.recv_announces().await;
let mut out_link_events = transport.out_link_events();
let mut in_link_events = transport.in_link_events();

let mut links = HashMap::<AddressHash, Arc<tokio::sync::Mutex<WrappedLink<ExampleMessage>>>>::new();
let mut in_links = vec![];

loop {
match announce_recv.try_recv() {
Ok(announce) => {
let len = links.len();
let destination = announce.destination.lock().await;
let link = match links.get(&destination.desc.address_hash) {
Some(link) => link.clone(),
None => {
let link = transport.link(destination.desc).await;
log::trace!("wl");
let link = Arc::new(
tokio::sync::Mutex::new(
WrappedLink::<ExampleMessage>::new(link).await
)
);
links.insert(destination.desc.address_hash, link.clone());
link
}
};
log::trace!("{} to {} links", len, links.len());
},
Err(error) => {
if error != TryRecvError::Empty {
log::info!("Announce channel error: {}", error);
}
}
}

match out_link_events.try_recv() {
Ok(link_event) => {
match link_event.event {
LinkEvent::Activated => log::info!("link {} activated", link_event.id),
LinkEvent::Closed => log::info!("link {} closed", link_event.id),
LinkEvent::Data(payload) => log::error!("link {} data payload: {}", link_event.id,
std::str::from_utf8(payload.as_slice())
.map(str::to_string)
.unwrap_or_else(|_| format!("{:?}", payload.as_slice()))),
};
out_link_events.resubscribe();
},
Err(error) => {
if error != TryRecvError::Empty {
log::info!("out_link_events channel error: {}", error);
}
}
}

if let Ok(link_event) = in_link_events.try_recv() {
let id = link_event.id;
match link_event.event {
LinkEvent::Activated => {
if let Some(link) = transport.find_in_link(&id).await {
let wrapped = WrappedLink::<ExampleMessage>::new(link).await;
let mut incoming = wrapped.subscribe();
in_links.push(wrapped);
log::info!("in-link {} activated, wrapped", id);
tokio::spawn(async move {
while let Ok(message) = incoming.recv().await {
log::info!("received message on {}: {}", id, message);
}
});
} else {
log::info!("Got activate for {}, but not found", id);
}
}
_ => {}
}
}

transport.send_announce(&dest, None).await;

tokio::time::sleep(Duration::from_secs(1)).await;
}
}
107 changes: 107 additions & 0 deletions examples/channel_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use std::fmt;
use std::time::{SystemTime, UNIX_EPOCH};
use reticulum::channel::{ChannelError, Message, MessageType, PackedMessage};


fn now() -> u64 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()
}


fn unpack_timestamp(bytes: &[u8]) -> u64 {
u64::from_be_bytes(bytes.try_into().unwrap()) & 0x3ffffffff
}


#[derive(Clone)]
struct TextPayload {
text: String,
timestamp: u64
}


impl TextPayload {
fn new(text: String) -> Self {
Self { text, timestamp: now() }
}
}


impl TextPayload {
fn pack(&self) -> Vec<u8> {
// Packing format mimicks that of Python Reticulum, so the
// channel example can be tested against the Channel.py example
// in the reference implementation too.

let mut raw = Vec::with_capacity(self.text.len() + 12);

raw.extend_from_slice(&[0x92, 0xa3]);
raw.extend_from_slice(self.text.as_bytes());

raw.extend_from_slice(&[0xd7, 0xff]);
raw.extend_from_slice(&self.timestamp.to_be_bytes());

raw
}

fn unpack(raw: &[u8]) -> Result<Self, ChannelError> {
if raw.len() <= 12 {
return Err(ChannelError::Misc)
}

match String::from_utf8(raw[2..raw.len()-10].to_vec()) {
Ok(text) => {
let mut payload = TextPayload::new(text);
payload.timestamp = unpack_timestamp(&raw[raw.len()-8..]);
Ok(payload)
},
Err(_) => {
Err(ChannelError::Misc)
}
}
}
}


const MESSAGE_TYPE_TEXT: MessageType = 0x0101;


#[derive(Clone)]
pub enum ExampleMessage {
Text(TextPayload)
}


impl ExampleMessage {
pub fn new_text(text: &str) -> Self {
Self::Text(TextPayload::new(text.to_string()))
}
}


impl fmt::Display for ExampleMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Text(t) => write!(f, "Text at {}: {}", t.timestamp, t.text)
}
}
}


impl Message for ExampleMessage {
fn pack(&self) -> PackedMessage {
match self {
Self::Text(t) => PackedMessage::new(t.pack(), MESSAGE_TYPE_TEXT)
}
}

fn unpack(packed: PackedMessage) -> Result<Self, ChannelError> {
let message_type = packed.message_type();

match message_type {
MESSAGE_TYPE_TEXT =>
Ok(Self::Text(TextPayload::unpack(&packed.payload())?)),
_ => Err(ChannelError::InvalidMessageType)
}
}
}
Loading