Skip to content

Commit bfecfa4

Browse files
committed
fix: Route invalidation is not hat-local
Also merges 8d059ff9
2 parents ce4ee62 + 8d059ff commit bfecfa4

File tree

14 files changed

+331
-86
lines changed

14 files changed

+331
-86
lines changed

commons/zenoh-protocol/src/zenoh/query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ pub mod flag {
8080
pub const Z: u8 = 1 << 7; // 0x80 Extensions if Z==1 then an extension will follow
8181
}
8282

83-
#[derive(Debug, Clone, PartialEq, Eq)]
83+
#[derive(Debug, Default, Clone, PartialEq, Eq)]
8484
pub struct Query {
8585
pub consolidation: ConsolidationMode,
8686
pub parameters: String,

examples/examples/z_info.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,15 @@ async fn main() {
6060
let transport_events = info
6161
.transport_events_listener()
6262
.history(false) // Don't repeat transports we already printed
63-
.await;
63+
.await
64+
.expect("Failed to declare transport events listener");
6465

6566
// Set up link events listener (using default handler)
6667
let link_events = info
6768
.link_events_listener()
6869
.history(false) // Don't repeat links we already printed
69-
.await;
70+
.await
71+
.expect("Failed to declare link events listener");
7072

7173
// Listen for events until CTRL-C
7274
loop {

zenoh/src/api/builders/info_links.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,8 @@ impl std::fmt::Debug for LinkEventsListenerInner {
148148
/// .link_events_listener()
149149
/// .history(true)
150150
/// .with(flume::bounded(32))
151-
/// .await;
151+
/// .await
152+
/// .expect("Failed to declare link events listener");
152153
///
153154
/// while let Ok(event) = listener.recv_async().await {
154155
/// match event.kind() {
@@ -181,7 +182,8 @@ impl<Handler> LinkEventsListener<Handler> {
181182
/// let listener = session.info()
182183
/// .link_events_listener()
183184
/// .with(flume::bounded(32))
184-
/// .await;
185+
/// .await
186+
/// .expect("Failed to declare link events listener");
185187
/// listener.undeclare().await.unwrap();
186188
/// # }
187189
/// ```
@@ -300,7 +302,8 @@ impl<Handler> IntoFuture for LinkEventsListenerUndeclaration<Handler> {
300302
/// .link_events_listener()
301303
/// .history(true)
302304
/// .with(flume::bounded(32))
303-
/// .await;
305+
/// .await
306+
/// .expect("Failed to declare link events listener");
304307
///
305308
/// while let Ok(event) = listener.recv_async().await {
306309
/// match event.kind() {
@@ -437,7 +440,7 @@ where
437440
Handler: IntoHandler<LinkEvent> + Send,
438441
Handler::Handler: Send,
439442
{
440-
type To = LinkEventsListener<Handler::Handler>;
443+
type To = ZResult<LinkEventsListener<Handler::Handler>>;
441444
}
442445

443446
#[zenoh_macros::unstable]
@@ -448,19 +451,20 @@ where
448451
{
449452
fn wait(self) -> Self::To {
450453
let (callback, handler) = self.handler.into_handler();
451-
let state = self
452-
.session
453-
.declare_transport_links_listener_inner(callback, self.history, self.transport)
454-
.expect("Failed to declare link events listener");
454+
let state = self.session.declare_transport_links_listener_inner(
455+
callback,
456+
self.history,
457+
self.transport,
458+
)?;
455459

456-
LinkEventsListener {
460+
Ok(LinkEventsListener {
457461
inner: LinkEventsListenerInner {
458462
session: self.session.clone(),
459463
id: state.id,
460464
undeclare_on_drop: true,
461465
},
462466
handler,
463-
}
467+
})
464468
}
465469
}
466470

zenoh/src/api/builders/info_transport.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ impl std::fmt::Debug for TransportEventsListenerInner {
116116
/// .transport_events_listener()
117117
/// .history(true)
118118
/// .with(flume::bounded(32))
119-
/// .await;
119+
/// .await
120+
/// .expect("Failed to declare transport events listener");
120121
///
121122
/// while let Ok(event) = listener.recv_async().await {
122123
/// match event.kind() {
@@ -147,7 +148,8 @@ impl<Handler> TransportEventsListener<Handler> {
147148
/// let listener = session.info()
148149
/// .transport_events_listener()
149150
/// .with(flume::bounded(32))
150-
/// .await;
151+
/// .await
152+
/// .expect("Failed to declare transport events listener");
151153
/// listener.undeclare().await.unwrap();
152154
/// # }
153155
/// ```
@@ -263,7 +265,8 @@ impl<Handler> IntoFuture for TransportEventsListenerUndeclaration<Handler> {
263265
/// .transport_events_listener()
264266
/// .history(true)
265267
/// .with(flume::bounded(32))
266-
/// .await;
268+
/// .await
269+
/// .expect("Failed to declare transport events listener");
267270
///
268271
/// while let Ok(event) = events.recv_async().await {
269272
/// match event.kind() {
@@ -378,7 +381,7 @@ where
378381
Handler: IntoHandler<TransportEvent> + Send,
379382
Handler::Handler: Send,
380383
{
381-
type To = TransportEventsListener<Handler::Handler>;
384+
type To = ZResult<TransportEventsListener<Handler::Handler>>;
382385
}
383386

384387
#[zenoh_macros::unstable]
@@ -391,17 +394,16 @@ where
391394
let (callback, handler) = self.handler.into_handler();
392395
let state = self
393396
.session
394-
.declare_transport_events_listener_inner(callback, self.history)
395-
.expect("Failed to declare transport events listener");
397+
.declare_transport_events_listener_inner(callback, self.history)?;
396398

397-
TransportEventsListener {
399+
Ok(TransportEventsListener {
398400
inner: TransportEventsListenerInner {
399401
session: self.session.clone(),
400402
id: state.id,
401403
undeclare_on_drop: true,
402404
},
403405
handler,
404-
}
406+
})
405407
}
406408
}
407409

zenoh/src/api/info.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,8 @@ impl SessionInfo {
167167
/// .transport_events_listener()
168168
/// .history(true)
169169
/// .with(flume::bounded(32))
170-
/// .await;
170+
/// .await
171+
/// .expect("Failed to declare transport events listener");
171172
///
172173
/// while let Ok(event) = events.recv_async().await {
173174
/// match event.kind() {
@@ -195,7 +196,8 @@ impl SessionInfo {
195196
/// .link_events_listener()
196197
/// .history(true)
197198
/// .with(flume::bounded(32))
198-
/// .await;
199+
/// .await
200+
/// .expect("Failed to declare link events listener");
199201
///
200202
/// while let Ok(event) = listener.recv_async().await {
201203
/// match event.kind() {

zenoh/src/net/routing/dispatcher/face.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,7 @@ impl Primitives for Face {
566566
let src_fid = ctx.src_face.id;
567567

568568
for mut res in hats[region].unregister_face_subscriptions(ctx.reborrow()) {
569-
disable_matches_data_routes(&mut res, &region);
569+
disable_matches_data_routes(ctx.tables, &mut res);
570570

571571
let mut remaining = hats
572572
.values_mut()
@@ -585,7 +585,7 @@ impl Primitives for Face {
585585
}
586586

587587
for mut res in hats[region].unregister_face_queryables(ctx.reborrow()) {
588-
disable_matches_query_routes(&mut res, &region);
588+
disable_matches_query_routes(ctx.tables, &mut res);
589589

590590
let remaining = hats
591591
.iter()

zenoh/src/net/routing/dispatcher/pubsub.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use crate::net::routing::{
3232
dispatcher::{
3333
face::Face,
3434
local_resources::{LocalResourceInfoTrait, LocalResources},
35+
tables::TablesData,
3536
},
3637
hat::{BaseContext, SendDeclare},
3738
router::{get_or_set_route, Direction, RouteBuilder},
@@ -113,7 +114,8 @@ impl Face {
113114
.reduce(|_, _| SubscriberInfo);
114115

115116
hats[region].propagate_subscription(ctx.reborrow(), res.clone(), other_info);
116-
disable_matches_data_routes(&mut res, &region);
117+
118+
disable_matches_data_routes(ctx.tables, &mut res);
117119
}
118120

119121
drop(wtables);
@@ -188,7 +190,7 @@ impl Face {
188190
if let Some(mut res) =
189191
hats[region].unregister_subscription(ctx.reborrow(), id, res.clone(), node_id)
190192
{
191-
disable_matches_data_routes(&mut res, &region);
193+
disable_matches_data_routes(ctx.tables, &mut res);
192194

193195
let mut remaining = tables
194196
.hats
@@ -208,16 +210,29 @@ impl Face {
208210
}
209211
}
210212

211-
/// Disables data routes of the given regions's hat.
213+
/// Disables data routes for the given [`Resource`].
214+
///
215+
/// ## Note
212216
///
213-
/// A subscription declaration or undeclaration should in theory only invalidate data routes of its owner hat.
214-
pub(crate) fn disable_matches_data_routes(res: &mut Arc<Resource>, region: &Region) {
217+
/// **Changes in data/query routes are not hat-local**. For example, a north peer hat has routes for data
218+
/// that originate from south-bound remotes but has no routes for data that originate in its north
219+
/// region, thus a change in a broker's data routes affects the routes of the north peer hat.
220+
pub(crate) fn disable_matches_data_routes(_tables: &mut TablesData, res: &mut Arc<Resource>) {
215221
if res.ctx.is_some() {
216-
get_mut_unchecked(res).context_mut().hats[region].disable_data_routes();
222+
for hat in get_mut_unchecked(res).context_mut().hats.values_mut() {
223+
hat.disable_data_routes();
224+
}
225+
217226
for match_ in &res.context().matches {
218227
let mut match_ = match_.upgrade().unwrap();
219228
if !Arc::ptr_eq(&match_, res) {
220-
get_mut_unchecked(&mut match_).context_mut().hats[region].disable_data_routes();
229+
for hat in get_mut_unchecked(&mut match_)
230+
.context_mut()
231+
.hats
232+
.values_mut()
233+
{
234+
hat.disable_data_routes();
235+
}
221236
}
222237
}
223238
}

0 commit comments

Comments
 (0)