Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
142 commits
Select commit Hold shift + click to select a range
09a7c34
links() and transports() in info()
milyin Dec 5, 2025
a6577c6
cargo fmt
milyin Dec 5, 2025
cc43776
accessors to private fileds
milyin Dec 5, 2025
25022af
removed no_run, tested docs
milyin Dec 5, 2025
2685fb7
events first implementation
milyin Dec 5, 2025
7e6a860
connectivity event handling separated
milyin Dec 5, 2025
04f42f0
use weaksession
milyin Dec 5, 2025
41e36de
cargo fmt
milyin Dec 6, 2025
5a0f9c7
clippy fix
milyin Dec 8, 2025
5a6db0a
added cancellation token to transportevents
milyin Dec 9, 2025
1102aa9
cancellation token support
milyin Dec 9, 2025
e9d018c
cancellation tests
milyin Dec 9, 2025
5ad9e9f
test sync fixes
milyin Dec 9, 2025
7c5edff
use statements
milyin Dec 9, 2025
c90ea58
clippy fix
milyin Dec 9, 2025
ccb958b
Merge branch 'main' into connectivity_api2
milyin Dec 9, 2025
5f1bbea
crgo lock
milyin Dec 9, 2025
a231238
doc updates
milyin Dec 9, 2025
53974df
filtering by transport
milyin Dec 9, 2025
5b9469d
get_links corrected
milyin Dec 9, 2025
a4039cc
events methods shortened
milyin Dec 9, 2025
949cc15
renamed to _listemer
milyin Dec 10, 2025
ae52a53
Merge branch 'main' into connectivity_api2
milyin Dec 12, 2025
6bdea98
info transport builder separated, cancellation removed
milyin Dec 12, 2025
71e1caf
info links builder separated, cancellation removed
milyin Dec 12, 2025
c49f803
Listener objects
milyin Dec 12, 2025
a9e243d
id is u32
milyin Dec 12, 2025
9ec45ca
transport events listener moved to session
milyin Dec 12, 2025
13ce863
runtime removed from info()
milyin Dec 12, 2025
e6cdc04
rustfmt
milyin Dec 12, 2025
60f6bee
background added to transporteventlistener
milyin Dec 12, 2025
aff5b1d
background added to transport events listener
milyin Dec 13, 2025
52afbec
test refactor
milyin Dec 13, 2025
5f8e792
session open moved to common
milyin Dec 13, 2025
ca0f8f2
test shortened
milyin Dec 13, 2025
47dba25
useless test removed
milyin Dec 13, 2025
5248e87
rustdoc build fix
milyin Dec 14, 2025
1f5d3a0
unstable use added
milyin Dec 14, 2025
4c5ac1b
clippy fix
milyin Dec 14, 2025
f91e6fb
clipy fix
milyin Dec 14, 2025
8693507
cargo fmt
milyin Dec 14, 2025
189680f
derive debug restored
milyin Dec 14, 2025
a77761a
unnecessary is_ methods removed
milyin Dec 14, 2025
74a04ba
name correctionj
milyin Dec 14, 2025
1ee3518
use added
milyin Dec 14, 2025
2f1c0e2
use removed
milyin Dec 14, 2025
85de9b0
session-based links events listener
milyin Dec 14, 2025
e35360d
rustfmt
milyin Dec 14, 2025
44b902e
use added
milyin Dec 14, 2025
295734c
z_info example
milyin Dec 14, 2025
7e07a10
print in info uses debug
milyin Dec 14, 2025
dbc47d0
fields added, reliability is calculated
milyin Dec 15, 2025
c33044d
no_run added in doctests to avoid hanging CI
milyin Dec 15, 2025
4428eea
doctrests fixes
milyin Dec 15, 2025
4b3ade2
Merge branch 'main' into connectivity_api2
milyin Dec 15, 2025
b8ba7b2
write to state in line to avoid explicit drop
milyin Dec 16, 2025
94bf7b8
Minor doc fixes
OlivierHecart Dec 16, 2025
2f3d15b
multicast field added
milyin Dec 16, 2025
fe1268e
fileter by transport
milyin Dec 17, 2025
94ede78
adminspace handler refactor
milyin Dec 18, 2025
93d29a6
cleaned up IRuntime
milyin Dec 18, 2025
0e78fab
test fix
milyin Dec 18, 2025
eb2cdd9
fixed crash on get `@/**`
milyin Dec 18, 2025
d871642
Merge branch 'adminspace_fix' into connectivity_api_multicast_fix
milyin Dec 18, 2025
103fe2d
small refactor
milyin Dec 19, 2025
7117581
ke refactor
milyin Dec 19, 2025
bcc3e28
publishing connectivity
milyin Dec 23, 2025
6bf6b88
test connectivity adminspace
milyin Dec 23, 2025
4e2cee9
asserrt_json_field function
milyin Dec 23, 2025
e366db6
check all fields
milyin Dec 23, 2025
bdff01d
test priorities and reliability fields
milyin Dec 23, 2025
cca2e1e
assert json macro improved
milyin Dec 23, 2025
ca87031
test simplified
milyin Dec 23, 2025
5a7a8e2
clippy fix
milyin Dec 23, 2025
fb38a99
subscribrers test added
milyin Dec 23, 2025
63d1cfa
refactor subscriber
milyin Dec 23, 2025
e56eb83
expect one sample
milyin Dec 23, 2025
5eb4242
simplified subscriber recv
milyin Dec 23, 2025
2ea3ee4
cargo fmt
milyin Dec 23, 2025
a1034c5
clippy fix
milyin Dec 24, 2025
6c0f04e
Merge branch 'main' into connectivity_adminspace_test
milyin Dec 24, 2025
69d84e5
Merge branch 'connectivity_api_multicast_fix' into connectivity_api2
milyin Dec 24, 2025
b41ffa2
Merge branch 'connectivity_adminspace_test' into connectivity_api2
milyin Dec 24, 2025
00a83bc
rustfmt
milyin Dec 24, 2025
412b630
special json structures for compativility with old adminspace
milyin Dec 28, 2025
d80d5f8
implemented new link publishing
milyin Dec 28, 2025
a4a49a2
closed session test
milyin Dec 28, 2025
3ba163c
cargo fmt
milyin Dec 28, 2025
edab3ff
clippy fix: no internal unstable now, code needed for adminspace
milyin Dec 28, 2025
799075b
cargo fmt
milyin Dec 28, 2025
2591163
priorities, reloability are none if transport !qos
milyin Dec 28, 2025
914b609
fix bug: lock session state for too much time
milyin Jan 1, 2026
5049869
memory leak fix
milyin Jan 4, 2026
26f4978
unstable macro doc update
milyin Jan 6, 2026
3d03f11
remove `internal_config` feature (#1939)
wyfo Jul 3, 2025
109a649
removed internal config mentions
milyin Jan 6, 2026
3724827
cargo fmt
milyin Jan 6, 2026
1faf310
clippy --stable fixes
milyin Jan 6, 2026
6ecf9a8
documentation link fix
milyin Jan 6, 2026
c54d917
add_unstable_warning common method
milyin Jan 6, 2026
f7f5504
blank lines added
milyin Jan 6, 2026
f27493b
rustfmt
milyin Jan 6, 2026
1f8701e
internal validation
milyin Jan 6, 2026
8374217
doc link fix
milyin Jan 6, 2026
e60db7a
cargo fmt
milyin Jan 6, 2026
c750577
Merge branch 'main' into unstable_documentation_fix
milyin Jan 6, 2026
c98bd8c
clippy fix
milyin Jan 6, 2026
76f4007
undoed all changes related to doumentation unstable_doc macro
milyin Jan 6, 2026
10acfca
removed macro for internal_config
milyin Jan 6, 2026
fb32d63
documentation related fixes undoed
milyin Jan 6, 2026
de5c51b
updated to latest code
milyin Jan 6, 2026
cdb5be4
correction made shorter
milyin Jan 6, 2026
c62be80
unnecessary "unstable" removed
milyin Jan 6, 2026
5f52b7c
clippy stable fix
milyin Jan 6, 2026
492bce6
Merge branch 'main' into internal_config_removal
milyin Jan 8, 2026
8e1f885
Revert "undoed all changes related to doumentation unstable_doc macro"
milyin Jan 8, 2026
2834859
cargo check fix
milyin Jan 8, 2026
07f865e
rustfmt
milyin Jan 8, 2026
8e55cab
extra lines corrected
milyin Jan 8, 2026
af61f91
Merge branch 'main' into connectivity_api2
milyin Jan 8, 2026
72ca73f
Merge branch 'unstable_documentation_fix' into connectivity_api2
milyin Jan 8, 2026
f9254bd
transport/link documentation fix
milyin Jan 8, 2026
0ed00f2
rustfmt
milyin Jan 8, 2026
5d1d705
fix after internal_config removal
milyin Jan 8, 2026
c7c5bc6
fix after internal_config removal
milyin Jan 8, 2026
5125fff
Merge branch 'main' into connectivity_api2
milyin Jan 9, 2026
93675c2
Merge branch 'main' into connectivity_api2
milyin Jan 12, 2026
40f799b
unnecessary method transport_zid removed
milyin Jan 13, 2026
1e52736
"empty" methods added for zenoh-c
milyin Jan 13, 2026
97a106b
mutable access to transport/link added
milyin Jan 13, 2026
b2e8a9e
collect all events
milyin Jan 14, 2026
886056d
code shortened
milyin Jan 14, 2026
64a6254
Merge branch 'main' into connectivity_api2
milyin Jan 14, 2026
7fd073e
cargo fmt
milyin Jan 14, 2026
fc6381b
third session
milyin Jan 15, 2026
cc44ccf
multilink test
milyin Jan 15, 2026
a4f4f2c
tests simplified
milyin Jan 15, 2026
93a42ad
concurrent del_link fix, unnecessary box removed
milyin Jan 15, 2026
8154f43
claude report removed
milyin Jan 15, 2026
52f0e6a
restored box after clippy warning
milyin Jan 15, 2026
9196943
removed box keeping the clippy happy
milyin Jan 15, 2026
f91c0c5
Merge branch 'main' into connectivity_duplication_event
milyin Jan 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions io/zenoh-transport/src/unicast/universal/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,6 @@ impl TransportUnicastUniversal {
}

pub(crate) async fn del_link(&self, link: Link) -> ZResult<()> {
enum Target {
Transport,
Link(Box<TransportLinkUnicastUniversal>),
}

// Try to remove the link
let target = {
let mut guard = zwrite!(self.links);
Expand All @@ -177,16 +172,17 @@ impl TransportUnicastUniversal {
}) {
let is_last = guard.len() == 1;
if is_last {
// even if closing the whole transport, still need to remove the link from the list
// because multiple concurrent del_link calls could be staying on this guard
*guard = vec![].into_boxed_slice();
// Close the whole transport
drop(guard);
Target::Transport
None
} else {
// Remove the link
let mut links = guard.to_vec();
let stl = links.remove(index);
*guard = links.into_boxed_slice();
drop(guard);
Target::Link(stl.into())
Some(stl)
}
} else {
bail!(
Expand All @@ -204,8 +200,8 @@ impl TransportUnicastUniversal {
}

match target {
Target::Transport => self.delete().await,
Target::Link(stl) => stl.close().await,
None => self.delete().await,
Some(stl) => stl.close().await,
}
}

Expand Down
49 changes: 49 additions & 0 deletions zenoh/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,52 @@ pub async fn close_session(peer01: Session, peer02: Session) {
println!("[ ][02d] Closing peer02 session");
ztimeout!(peer02.close()).unwrap();
}

/// Open sessions configured for multilink (max_links > 1)
#[allow(dead_code)]
pub async fn open_session_multilink(
listen_endpoints: &[&str],
connect_endpoints: &[&str],
) -> (Session, Session) {
// Session1: listener with multilink enabled
let mut config1 = zenoh_config::Config::default();
config1
.listen
.endpoints
.set(
listen_endpoints
.iter()
.map(|e| e.parse().unwrap())
.collect::<Vec<_>>(),
)
.unwrap();
config1.scouting.multicast.set_enabled(Some(false)).unwrap();
config1
.transport
.unicast
.set_max_links(listen_endpoints.len())
.unwrap();
let session1 = ztimeout!(zenoh::open(config1)).unwrap();

// Session2: connector with multilink enabled
let mut config2 = zenoh_config::Config::default();
config2
.connect
.endpoints
.set(
connect_endpoints
.iter()
.map(|e| e.parse().unwrap())
.collect::<Vec<_>>(),
)
.unwrap();
config2.scouting.multicast.set_enabled(Some(false)).unwrap();
config2
.transport
.unicast
.set_max_links(connect_endpoints.len())
.unwrap();
let session2 = ztimeout!(zenoh::open(config2)).unwrap();

(session1, session2)
}
148 changes: 96 additions & 52 deletions zenoh/tests/connectivity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,28 @@ mod common;
#[cfg(feature = "unstable")]
mod tests {
use std::{
fmt::Debug,
sync::{atomic::AtomicUsize, Arc},
time::Duration,
};

use zenoh::sample::SampleKind;

use crate::common::{
close_session, open_session_connect, open_session_listen, open_session_unicast,
close_session, open_session_connect, open_session_listen, open_session_multilink,
open_session_unicast,
};

async fn collect_events<T: Debug>(events: &flume::Receiver<T>, timeout: Duration) -> Vec<T> {
let mut collected = Vec::new();
while let Ok(event) = tokio::time::timeout(timeout, events.recv_async()).await {
let event = event.expect("Channel closed");
println!("{:?}", event);
collected.push(event);
}
collected
}

const SLEEP: Duration = Duration::from_millis(100);

/// Test that transports() returns an iterator of Transport objects
Expand Down Expand Up @@ -108,39 +120,27 @@ mod tests {
.await;

let session2 = open_session_connect(&["tcp/127.0.0.1:17450"]).await;

// Wait for connection to establish
tokio::time::sleep(SLEEP).await;

// Should receive transport opened event with SampleKind::Put
let event = tokio::time::timeout(Duration::from_secs(5), events.recv_async())
.await
.expect("Timeout waiting for transport event")
.expect("Channel closed");

assert_eq!(
event.kind(),
zenoh::sample::SampleKind::Put,
"Event kind should be Put for opened transport"
// Collect transport opened events - should be exactly 1 Put
let put_events = collect_events(&events, Duration::from_millis(200)).await;
assert!(
put_events.len() == 1 && put_events[0].kind() == SampleKind::Put,
"Expected exactly 1 Put event, got {} events",
put_events.len()
);
println!("Transport opened: {}", event.transport().zid());

// Close session2 to trigger transport close event
session2.close().await.unwrap();
tokio::time::sleep(SLEEP).await;

// Should receive transport closed event with SampleKind::Delete
let event = tokio::time::timeout(Duration::from_secs(5), events.recv_async())
.await
.expect("Timeout waiting for transport close event")
.expect("Channel closed");

assert_eq!(
event.kind(),
zenoh::sample::SampleKind::Delete,
"Event kind should be Delete for closed transport"
// Collect transport closed events - should be exactly 1 Delete
let delete_events = collect_events(&events, Duration::from_millis(200)).await;
assert!(
delete_events.len() == 1 && delete_events[0].kind() == SampleKind::Delete,
"Expected exactly 1 Delete event, got {} events",
delete_events.len()
);
println!("Transport closed");

session1.close().await.unwrap();
}
Expand All @@ -160,45 +160,89 @@ mod tests {
.with(flume::bounded(32))
.await;

// Connect two sessions
let session2 = open_session_connect(&["tcp/127.0.0.1:17451"]).await;

// Wait for connection to establish
let session3 = open_session_connect(&["tcp/127.0.0.1:17451"]).await;
tokio::time::sleep(SLEEP).await;

// Should receive link added event with SampleKind::Put
let event = tokio::time::timeout(Duration::from_secs(5), events.recv_async())
.await
.expect("Timeout waiting for link event")
.expect("Channel closed");
// Collect link added events - should be exactly 2 Put
let put_events = collect_events(&events, Duration::from_millis(200)).await;
assert!(
put_events.len() == 2 && put_events.iter().all(|e| e.kind() == SampleKind::Put),
"Expected exactly 2 Put events, got {} events",
put_events.len()
);

assert_eq!(
event.kind(),
zenoh::sample::SampleKind::Put,
"Event kind should be Put for added link"
// Close session2 (first transport's last link)
session2.close().await.unwrap();
tokio::time::sleep(SLEEP).await;

// Collect link removed events - should be exactly 1 Delete
let delete_events = collect_events(&events, Duration::from_millis(200)).await;
assert!(
delete_events.len() == 1 && delete_events[0].kind() == SampleKind::Delete,
"First close: expected exactly 1 Delete event, got {:?} events",
delete_events.len()
);
println!(
"Link added: {} -> {} (transport: {})",
event.link().src(),
event.link().dst(),
event.link().zid()

// Close session3 (second transport's last link)
session3.close().await.unwrap();
tokio::time::sleep(SLEEP).await;

// Collect link removed events - should be exactly 1 Delete
let delete_events = collect_events(&events, Duration::from_millis(200)).await;
assert!(
delete_events.len() == 1 && delete_events[0].kind() == SampleKind::Delete,
"Second close: expected exactly 1 Delete event, got {} events",
delete_events.len()
);

// Close session2 to trigger link removal event
session2.close().await.unwrap();
session1.close().await.unwrap();
}

/// Test link events with multilink transport (multiple links in same transport)
/// This tests is_last=false (first link) vs is_last=true (last link) in del_link()
#[cfg(feature = "transport_multilink")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_link_events_multilink() {
zenoh_util::init_log_from_env_or("error");

let endpoints = &["tcp/127.0.0.1:17470", "tcp/127.0.0.1:17471"];
let (session1, session2) = open_session_multilink(endpoints, endpoints).await;

tokio::time::sleep(SLEEP).await;

// Should receive link removed event with SampleKind::Delete
let event = tokio::time::timeout(Duration::from_secs(5), events.recv_async())
.await
.expect("Timeout waiting for link removal event")
.expect("Channel closed");
// Verify we have 2 links in 1 transport
let transports: Vec<_> = session1.info().transports().await.collect();
assert_eq!(transports.len(), 1, "Should have exactly 1 transport");

let links: Vec<_> = session1.info().links().await.collect();
assert_eq!(
event.kind(),
zenoh::sample::SampleKind::Delete,
"Event kind should be Delete for removed link"
links.len(),
2,
"Should have exactly 2 links in multilink transport"
);

// Subscribe to link events
let events = session1
.info()
.link_events_listener()
.history(false)
.with(flume::bounded(32))
.await;

// Close session2 - this closes both links
session2.close().await.unwrap();
tokio::time::sleep(SLEEP).await;

// Collect delete events - should be exactly 2 (one per link)
let delete_events = collect_events(&events, Duration::from_millis(200)).await;
assert!(
delete_events.len() == 2
&& delete_events.iter().all(|e| e.kind() == SampleKind::Delete),
"Expected exactly 2 Delete events (one per link), got {} events",
delete_events.len()
);
println!("Link removed");

session1.close().await.unwrap();
}
Expand Down
Loading