Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions doc/design-documents/drafts/async_api/assets/event_example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright (c) 2025 Contributors to the Eclipse Foundation
//
// See the NOTICE file(s) distributed with this work for additional
// information regarding copyright ownership.
//
// This program and the accompanying materials are made available under the
// terms of the Apache Software License 2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
// which is available at https://opensource.org/licenses/MIT.
//
// SPDX-License-Identifier: Apache-2.0 OR MIT

pub struct Listener<Service>
where
Service: service::Service,
{
listener: iceoryx2::port::listener::Listener<Service>,
io: BridgedFd<RawFdBridge<AsyncSelector>>,
}

impl<Service> Listener<Service>
where
Service: service::Service,
<Service::Event as iceoryx2_cal::event::Event>::Listener: FileDescriptorBased,
{
pub(crate) fn from(listener: iceoryx2::port::listener::Listener<Service>) -> Result<Self, CommonErrors> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, would it make sense to implement the TryFrom trait here? If not, it might be a good idea to call this function try_from, since it has the same signature like the one from the trait.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one of two you said makse sense. Since it's example I dont correct it for now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pawelrutkaq We have iceoryx2 users that use semaphores instead of unix domain sockets for signaling mechanism - since unix domain sockets are non-certifiable and not allowed in mission-critical contexts.

As you see here, the Listener implements only file_descriptor() when the underlying concept is file descriptor based: https://github.com/eclipse-iceoryx/iceoryx2/blob/main/iceoryx2/src/port/listener.rs#L128

When a user now uses the Semaphore variant, async cannot be implemented since Semaphores alone do not support event multiplexing.

Currently, we already have the issue, that not all iceoryx2 code compiles (waitset breaks) when the semaphore configuration is used. But we must avoid adding more code that has the same issue.

One way of addressing this would be to introduce another service variant called ipc_async_threadsafe and define the event: https://github.com/eclipse-iceoryx/iceoryx2/blob/main/iceoryx2/src/service/ipc.rs#L53 as socket based.

But this will wonderfully clash with the C, C++ and Python language bindings - so it would be a service variant that is not cross-language compatible.

@elBoberido what is your opinion here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Semaphores are not events. So in case user have system where we dont have proper notification backend (hope we dont dicuss here QNX ;)) then the only thing user can do is polling or ? In that case, using impl for specific trait (Event - FileDescriptor or Event - Semaphore) we can implement different stategy. For semaphore it would be simply timeout based polling.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Semaphores we could implement a WaitSet that works with them and for async, a separate thread could be spawned to wait on this WaitSet and the use e.g. EventFDs to wake the async runtime.

Regarding the additional service variant. I think we need a solution that scales better than what we have today. But that is not a simple task and needs some investment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makse no sense really. Waitset is doing sleep (or what have you meant?) and will add new thread. The runtimes can do the same with one liner.

Also the runtimes has own IO multiplexers, so adding new one in seperate thread witll normally cause multiple thread hops to process a change instead processing in place by runtime. So I would not do it as default solution.

Copy link
Contributor Author

@pawelrutkaq pawelrutkaq Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we dicussed today: We can write specialized handling of API when events do use Semaphore or so. And the other option is what @elBoberido mentioned, once custom waitset will be there. This would be up to Runtime to choose an integration way for this.

// Safety:
// - This FD is owned by iceoryx2 listener and we don't close it on drop of RawFdBridge
// - The FD is kept along with listener so lifetime is take care of
// - Each Listener has its own FD so no sharing is done in iceoryx2 layer
let fd = unsafe { listener.file_descriptor().native_handle() };

Ok(Self {
listener,
io: BridgedFd::new_with_interest(RawFdBridge::from(fd)?, IoEventInterest::READABLE)?,
})
}

/// Returns the [`UniqueListenerId`] of the [`Listener`]
pub fn id(&self) -> UniqueListenerId {
self.listener.id()
}

/// Returns the deadline of the corresponding [`Service`](crate::service::Service).
pub fn deadline(&self) -> Option<Duration> {
self.listener.deadline()
}

/// Async wait for a new [`EventId`]. On error it returns [`ListenerWaitError`] is returned which describes
/// the error in detail.
pub async fn wait_one(&self) -> Result<EventId, ListenerWaitError> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarification: Do you expect that there is a one-to-one relation between sent samples and received events?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is Event part. so here we have one to one realtions. However second part of your questions is more into pub-sub. Here our idea is:

  • first implemetation - yes, each new produced sample fires event
  • later stage (optimization when rolled out) - one can limit number of events by implemeting special checks in pub-sub internals where we can fire event only if we know async subscriber as now awaiting new sample.

Since my approach is to first do not touch iceoryx2 implemetation details.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pawelrutkaq I just skimmed over the comments - so this comment might be misplaced.

first implemetation - yes, each new produced sample fires event

This is an implementation flaw of iceoryx2 that will be fixed. The issue is here: #925 and we are currently writing a design doc for it.

The event is in the and a switch that is triggered when a state changes. If the same state changes multiple times, the switch is only triggered once until the counterpart resets it by handling all events.

This means for the following sequence, that only one event is emitted.

  1. Publish sample
  2. Notify sample-published (emits event notification)
  3. Publish sample
  4. Notify sample-published (does nothing)
  5. ....

When the listener is activated and acquires all events then the event is reseted and the event notified once again.

The reason for this behavior change is, that it is essential in a mission-critical context that no notification is lost. This can be best realized with some form of bitset - not a queue with a limited buffer size.

Copy link
Contributor Author

@pawelrutkaq pawelrutkaq Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not matter I think. So we are doing notify for each sample produced. Whether this send event, or already have send event does not matter, and is even better since we would send less events. At the end, some event was send to listener which will wake up and process all until it has no more events, correct? ;)
Speaking in code it means AsyncSubscriber doing receive().await:

  • check if there is sample, if there is, its done, next cal does same.
  • if no samples, wait for notification
  • repeat

Still i wonder what You will do, counter for each event value ? At the end, it will mean less OS calls -> better preformance in both async and non async I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As dicussed: No issue

self.io
.async_call(IoEventInterest::READABLE, |raw_fd| {
raw_fd.io_call(|fd| {
info!("Checking for Iceoryx event on fd: {}", fd);
self.wait_one_internal()
})
})
.await
.map_err(|_| ListenerWaitError::InternalFailure)
.and_then(|r| match r {
Ok(event) => Ok(event),
Err(e) => Err(e),
})
}

fn wait_one_internal(&self) -> IoResult<Result<EventId, ListenerWaitError>> {
loop {
match self.listener.try_wait_one() {
Ok(event) if event.is_some() => return Ok(Ok(event.unwrap())),
Ok(_) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the assumption is that when try_wait_one() returns nothing that there was an error?

This assumption would be wrong. One-to-one relations between sample and event notification will soon no longer be possible and events will have switch semantics - otherwise we have no way of dealing with limited queue buffer sizes like every mechanism like unix domain sockets has.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what that mean, we need to dicuss.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As dicussed: Not a problem, some notification will happen underneath and this is enought to get waked in async. The implemented need to take care before waiting that it consumed all events/samples etc.

// This is None, so there was and error, probably EAGAIN or EWOULDBLOCK
if std::io::Error::last_os_error().kind() == std::io::ErrorKind::WouldBlock {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the whole error handling based on errno. This is broken since iceoryx2 resets the errno variable always to zero when it is handled - like the POSIX standard recommends.

Copy link
Contributor Author

@pawelrutkaq pawelrutkaq Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is esentially needed in FD case that we get EWouldBlock in any runtime. Otherwise we dont know when to wait. And You are not clearing this specific error ;) Anyway this information shall be preserved or we need side API.

Copy link
Contributor Author

@pawelrutkaq pawelrutkaq Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We agreed to stick to a fact that try_receive will return Ok(None) only whne there would be EWOULDBLOCK error from os call in FD case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed: We may not need to do anything as per mentioned guarantee it will not block and it will do correct action (after reimplementation). I would leave still open for later Whether AsyncPublisher shall be created as proxy wrapper (or type alias) to Publisher to keep async API symetric only.

error!("Iceoryx listener would block, should do re-register!... {}", unsafe {
self.listener.file_descriptor().native_handle()
});
return Err(std::io::ErrorKind::WouldBlock.into());
} else {
panic!("Something went wrong!");
}
}
Err(ListenerWaitError::InterruptSignal) => {
continue;
}
Err(e) => {
error!("Error waiting for Iceoryx event: {}", e);
return Ok(Err(e));
}
}
}
}
}

impl<T> Drop for Listener<T>
where
T: service::Service,
{
fn drop(&mut self) {
// Leave the underlying fd open, as we don't own it and let iceoryx2 handle it
self.io.close_on_drop(false);
}
}

Loading
Loading