Skip to content

Commit 64f316d

Browse files
author
Felix Obenhuber
committed
feat(anng): migrate to NNG v2 API
Major update to support NNG v2.0: API changes: - Use typed nng_err enum instead of raw u32 error constants - Rename socket functions: nng_close → nng_socket_close, nng_send_aio → nng_socket_send, nng_recv_aio → nng_socket_recv - Update nng_strerror signature to take nng_err enum - Add NNG initialization via nng_init() (required by NNG 2.0) Dependency changes: - Update nng-sys from 0.3.0 to 0.4.0-v2pre.1 - Remove build.rs version detection (no longer needed) - Remove #[cfg(nng_110)] gates as NNG 2.0 is now baseline Error handling: - Use nng_err::NNG_* variants with pattern guards - Format transport errors directly (NNG doesn't resolve them) - Consistent 'err' naming in match patterns Test updates: - Adjust error kind expectations for NNG 2 behavior - Update inproc address format expectations - Enable pubsub tests unconditionally
1 parent 28cbe87 commit 64f316d

File tree

18 files changed

+330
-322
lines changed

18 files changed

+330
-322
lines changed

.github/workflows/check.yml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,12 +136,17 @@ jobs:
136136
# --feature-powerset runs for every combination of features
137137
- name: cargo hack nng
138138
run: cargo hack -p nng --feature-powerset check
139-
- name: cargo hack anng
140-
run: cargo hack -p anng --feature-powerset check
141139
- name: remove NNG v1
142140
run: sudo apt-get remove -y libnng1 libnng-dev
143141
- name: build and install NNG v2 for nng-sys
144142
run: .github/scripts/install-nng.sh
143+
- name: cargo hack anng
144+
run: cargo hack -p anng --feature-powerset check
145+
env:
146+
# add /usr/local/include to the C compiler search path for NNG headers
147+
CFLAGS: -I/usr/local/include
148+
# add /usr/local/lib to the linker search path for NNG libraries
149+
RUSTFLAGS: -L native=/usr/local/lib
145150
- name: cargo hack nng-sys
146151
run: cargo hack -p nng-sys --feature-powerset check --skip source-update-bindings
147152
env:

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

anng/Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@ tracing = "0.1.41"
2727
lazy_static = { version = "1.5.0", optional = true }
2828

2929
[dependencies.nng-sys]
30-
version = "0.3.0"
31-
# TODO(flxo): use path dependency once anng is updated to use nng-sys v0.4
32-
# path = "../nng-sys"
30+
version = "0.4.0-v2pre.1"
31+
path = "../nng-sys"
3332
default-features = false
3433

3534
[features]

anng/build.rs

Lines changed: 0 additions & 24 deletions
This file was deleted.

anng/src/aio.rs

Lines changed: 139 additions & 127 deletions
Large diffs are not rendered by default.

anng/src/context.rs

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ use crate::{
33
aio::{Aio, AioError, ImplicationOnMessage},
44
message::Message,
55
};
6-
use core::{fmt, mem::MaybeUninit, num::NonZeroU32, ptr::NonNull};
6+
use core::{fmt, mem::MaybeUninit, ptr::NonNull};
7+
use nng_sys::nng_err;
78

89
impl<Protocol: crate::protocols::SupportsContext> Socket<Protocol> {
910
/// Creates a new context for concurrent operations on this socket.
@@ -26,10 +27,10 @@ impl<Protocol: crate::protocols::SupportsContext> Socket<Protocol> {
2627
let errno = unsafe { nng_sys::nng_ctx_open(context.as_mut_ptr(), self.socket) };
2728
match u32::try_from(errno).expect("errno is never negative") {
2829
0 => {}
29-
nng_sys::NNG_ENOMEM => {
30+
x if x == nng_err::NNG_ENOMEM as u32 => {
3031
panic!("OOM");
3132
}
32-
nng_sys::NNG_ENOTSUP => {
33+
x if x == nng_err::NNG_ENOTSUP as u32 => {
3334
// the SupportsContext trait bound ensures this method is only callable
3435
// on protocols that support contexts, so this error should be impossible.
3536
unreachable!("protocol supports contexts per SupportsContext trait bound");
@@ -131,33 +132,27 @@ impl<Protocol> Context<'_, Protocol> {
131132
let msg = unsafe { Message::from_raw_unchecked(msg) };
132133
Ok(Some(msg))
133134
}
134-
nng_sys::NNG_EAGAIN => Ok(None),
135-
nng_sys::NNG_ECLOSED => {
135+
x if x == nng_err::NNG_EAGAIN as u32 => Ok(None),
136+
x if x == nng_err::NNG_ECLOSED as u32 => {
136137
unreachable!("socket is still open since we have a reference to it");
137138
}
138-
nng_sys::NNG_EINVAL => {
139+
x if x == nng_err::NNG_EINVAL as u32 => {
139140
unreachable!("flags are valid for the call");
140141
}
141-
nng_sys::NNG_ENOMEM => {
142+
x if x == nng_err::NNG_ENOMEM as u32 => {
142143
panic!("OOM");
143144
}
144-
err @ nng_sys::NNG_ENOTSUP => {
145+
x if x == nng_err::NNG_ENOTSUP as u32 => {
145146
// protocol does not support receiving
146-
Err(AioError::Operation(
147-
NonZeroU32::try_from(err).expect("statically checked to be >0"),
148-
))
147+
Err(AioError::from_nng_err(nng_err::NNG_ENOTSUP))
149148
}
150-
err @ nng_sys::NNG_ESTATE => {
149+
x if x == nng_err::NNG_ESTATE as u32 => {
151150
// protocol does not support receiving in its current state
152-
Err(AioError::Operation(
153-
NonZeroU32::try_from(err).expect("statically checked to be >0"),
154-
))
151+
Err(AioError::from_nng_err(nng_err::NNG_ESTATE))
155152
}
156-
err @ nng_sys::NNG_ETIMEDOUT => {
153+
x if x == nng_err::NNG_ETIMEDOUT as u32 => {
157154
// likely due to a protocol-level timeout (like surveys)
158-
Err(AioError::Operation(
159-
NonZeroU32::try_from(err).expect("statically checked to be >0"),
160-
))
155+
Err(AioError::from_nng_err(nng_err::NNG_ETIMEDOUT))
161156
}
162157
errno => {
163158
unreachable!(

anng/src/lib.rs

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -100,13 +100,8 @@ use crate::{
100100
aio::{Aio, ImplicationOnMessage},
101101
context::Context,
102102
};
103-
use core::{
104-
ffi::{CStr, c_int},
105-
fmt,
106-
marker::PhantomData,
107-
num::NonZeroU32,
108-
ptr::NonNull,
109-
};
103+
use core::{ffi::CStr, fmt, marker::PhantomData, ptr::NonNull};
104+
use nng_sys::nng_err;
110105
use std::io;
111106

112107
mod aio;
@@ -262,7 +257,7 @@ impl<Protocol> fmt::Debug for ContextfulSocket<'_, Protocol> {
262257
impl<Protocol> Drop for Socket<Protocol> {
263258
fn drop(&mut self) {
264259
// SAFETY: socket is valid and not already closed (socket is live until `self` drops).
265-
crate::block_in_place(|| unsafe { nng_sys::nng_close(self.socket) });
260+
crate::block_in_place(|| unsafe { nng_sys::nng_socket_close(self.socket) });
266261
}
267262
}
268263

@@ -288,7 +283,7 @@ impl<Protocol> Socket<Protocol> {
288283
// SAFETY: socket is valid and not closed (socket is live until `self` drops),
289284
// AIO is valid and not busy (per `Aio` busy state invariant), and
290285
// message has been set on AIO (just above).
291-
unsafe { nng_sys::nng_send_aio(self.socket, self.aio.as_ptr()) };
286+
unsafe { nng_sys::nng_socket_send(self.socket, self.aio.as_ptr()) };
292287
// the above started an async operation (makes AIO busy).
293288
// we call wait() to preserve the Aio busy invariant.
294289
match self.aio.wait(ImplicationOnMessage::Sent).await {
@@ -326,33 +321,27 @@ impl<Protocol> Socket<Protocol> {
326321
let msg = unsafe { Message::from_raw_unchecked(msg) };
327322
Ok(Some(msg))
328323
}
329-
nng_sys::NNG_EAGAIN => Ok(None),
330-
nng_sys::NNG_ECLOSED => {
324+
x if x == nng_err::NNG_EAGAIN as u32 => Ok(None),
325+
x if x == nng_err::NNG_ECLOSED as u32 => {
331326
unreachable!("socket is still open since we have a reference to it");
332327
}
333-
nng_sys::NNG_EINVAL => {
328+
x if x == nng_err::NNG_EINVAL as u32 => {
334329
unreachable!("flags are valid for the call");
335330
}
336-
nng_sys::NNG_ENOMEM => {
331+
x if x == nng_err::NNG_ENOMEM as u32 => {
337332
panic!("OOM");
338333
}
339-
err @ nng_sys::NNG_ENOTSUP => {
334+
x if x == nng_err::NNG_ENOTSUP as u32 => {
340335
// protocol does not support receiving
341-
Err(AioError::Operation(
342-
NonZeroU32::try_from(err).expect("statically checked to be >0"),
343-
))
336+
Err(AioError::from_nng_err(nng_err::NNG_ENOTSUP))
344337
}
345-
err @ nng_sys::NNG_ESTATE => {
338+
x if x == nng_err::NNG_ESTATE as u32 => {
346339
// protocol does not support receiving in its current state
347-
Err(AioError::Operation(
348-
NonZeroU32::try_from(err).expect("statically checked to be >0"),
349-
))
340+
Err(AioError::from_nng_err(nng_err::NNG_ESTATE))
350341
}
351-
err @ nng_sys::NNG_ETIMEDOUT => {
342+
x if x == nng_err::NNG_ETIMEDOUT as u32 => {
352343
// likely due to a protocol-level timeout (like surveys)
353-
Err(AioError::Operation(
354-
NonZeroU32::try_from(err).expect("statically checked to be >0"),
355-
))
344+
Err(AioError::from_nng_err(nng_err::NNG_ETIMEDOUT))
356345
}
357346
errno => {
358347
unreachable!("nng_recvmsg documentation claims errno {errno} is never returned");
@@ -370,7 +359,7 @@ impl<Protocol> Socket<Protocol> {
370359

371360
// SAFETY: socket is valid and not closed (socket is live until `self` drops), and
372361
// AIO is valid and not busy (per `Aio` busy state invariant).
373-
unsafe { nng_sys::nng_recv_aio(self.socket, self.aio.as_ptr()) };
362+
unsafe { nng_sys::nng_socket_recv(self.socket, self.aio.as_ptr()) };
374363
// the above started an async operation (which makes the AIO busy), so we must eventually
375364
// call wait() later to preserve the Aio busy invariant.
376365
//
@@ -527,7 +516,7 @@ impl<'socket, Protocol> ContextfulSocket<'socket, Protocol> {
527516
}
528517
}
529518

530-
fn nng_strerror(errno: c_int) -> &'static CStr {
519+
fn nng_strerror(errno: nng_sys::nng_err) -> &'static CStr {
531520
// SAFETY: nng_strerror has no additional safety requirements.
532521
let raw = unsafe { nng_sys::nng_strerror(errno) };
533522
// SAFETY: nng_strerror returns a valid null-terminated string.
@@ -538,8 +527,8 @@ fn nng_strerror(errno: c_int) -> &'static CStr {
538527
cstr
539528
}
540529

541-
fn nng_errno_to_string(errno: c_int) -> String {
542-
nng_strerror(errno).to_string_lossy().into_owned()
530+
fn nng_err_to_string(err: nng_sys::nng_err) -> String {
531+
nng_strerror(err).to_string_lossy().into_owned()
543532
}
544533

545534
/// Helper function that calls `tokio::task::block_in_place` when appropriate.

0 commit comments

Comments
 (0)