Skip to content

UDP PUBSUB Mode Abnormal Message Reception & Pipe Status Detection Failure #2210

@MuNianCi

Description

@MuNianCi

Describe the bug

A clear and concise description of what the bug is.
In UDP-based NNG PUBSUB communication mode, two critical issues emerge after the SUB (Subscriber) program is restarted repeatedly:

  1. The SUB program's message reception from the PUB (Publisher) becomes abnormally slow (severe reception degradation); Packet capture with tcpdump reveals that the PUB is still sending messages to the closed client connections.
  2. The PUB program cannot reliably obtain the SUB's pipe closure status, even though the NNG_PIPE_EV_REM (pipe removal event) callback has been properly registered and implemented.

Expected behavior

A clear and concise description of what you expected to happen.
The SUB program should stably and quickly receive all messages from the PUB program, regardless of how many times the SUB program is restarted (no reception slowdown or abnormality).
The PUB program should immediately and accurately detect the SUB's pipe closure status via the registered NNG_PIPE_EV_REM event callback when the SUB program exits or restarts, enabling timely resource cleanup or reconnection logic.

Actual Behavior

Describe what occurred.
After the SUB program is restarted consecutively for 30 times, the SUB's message reception speed drops drastically—messages from the PUB are received with significant delays (several seconds to tens of seconds in extreme cases).
When the SUB program restarts (which triggers pipe disconnection), the PUB program's NNG_PIPE_EV_REM callback is not triggered as expected. The PUB maintains the "connected" state of the old SUB pipe and fails to recognize the pipe closure, leading to accumulated invalid pipe resources and further exacerbating the SUB's reception slowness.
The abnormality is persistent: once the 30-time restart threshold is reached, the SUB's slow reception and PUB's pipe status detection failure will not recover unless both PUB and SUB programs are fully restarted.

Environment Details

The latest Version : 2.0.0-dev

Operating system and version:
Ubuntu 24.04 LTS (x86_64, kernel 6.14.0-36-generic)

Compiler and language used:
GCC 13.3.0 (Ubuntu/Debian)

Language: Standard C11
Shared library: Shared library (libnng.so)

To Reproduce

PUB:
`#include <nng/nng.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdbool.h>

// Callback for NNG_PIPE_EV_REM event, when sub conneted this func never called.
void pipe_rem_callback(nng_pipe pipe, nng_pipe_ev ev, void *arg) {
if (ev == NNG_PIPE_EV_REM_POST) {
printf("[PUB] Pipe removed (SUB disconnected)\n");
}
}

int main() {
nng_socket pub_sock;
int rv;

nng_init(NULL);
// Create PUB socket
if ((rv = nng_pub0_open(&pub_sock)) != 0) {
    fprintf(stderr, "Failed to open PUB socket: %s\n", nng_strerror(rv));
    return 1;
}

// Set pipe event callback (NNG_PIPE_EV_REM)
if ((rv = nng_pipe_notify(pub_sock, NNG_PIPE_EV_REM_POST, pipe_rem_callback, NULL)) != 0) {
    fprintf(stderr, "Failed to register PIPE_EV_REM callback: %s\n", nng_strerror(rv));
    nng_socket_close(pub_sock);
    return 1;
}

// Bind to UDP address
if ((rv = nng_listen(pub_sock, "udp://0.0.0.0:8888", NULL, 0)) != 0) {
    fprintf(stderr, "Failed to bind PUB socket: %s\n", nng_strerror(rv));
    nng_socket_close(pub_sock);
    return 1;
}

printf("[PUB] Started, sending messages every 100ms...\n");

// Continuously send test messages
char msg_buf[64];
int msg_count = 0;
while (true) {
    snprintf(msg_buf, sizeof(msg_buf), "Test message #%d from PUB", ++msg_count);
    rv = nng_send(pub_sock, msg_buf, strlen(msg_buf) + 1, 0);
    if (rv != 0) {
        fprintf(stderr, "Failed to send message: %s\n", nng_strerror(rv));
        sleep(1);
        continue;
    }
    sleep(1); // Send 10 messages per second
}

nng_socket_close(pub_sock);
nng_fini();
return 0;

}`

SUB

`#include <nng/nng.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdbool.h>

int main() {
nng_socket sub_sock;
int rv;

nng_init(NULL);
// Create SUB socket
if ((rv = nng_sub0_open(&sub_sock)) != 0) {
    fprintf(stderr, "Failed to open SUB socket: %s\n", nng_strerror(rv));
    return 1;
}

// Subscribe to all messages (empty filter)
if ((rv = nng_sub0_socket_subscribe(sub_sock, "", 0)) != 0) {
    fprintf(stderr, "Failed to set subscription: %s\n", nng_strerror(rv));
    nng_socket_close(sub_sock);
    return 1;
}

// Dial PUB's UDP address
if ((rv = nng_dial(sub_sock, "udp://127.0.0.1:8888", NULL, 0)) != 0) {
    fprintf(stderr, "Failed to dial PUB socket: %s\n", nng_strerror(rv));
    nng_socket_close(sub_sock);
    return 1;
}

printf("[SUB] Started, receiving messages...\n");

// Receive messages for 5 seconds, then exit (simulate restart)
char   msg[128] = {0};
size_t msg_len;
while (true) {
memset(msg, 0x00, sizeof(msg));
    rv = nng_recv(sub_sock, msg, &msg_len, 0);
    if (rv != 0) {
        fprintf(stderr, "Failed to receive message: %s\n", nng_strerror(rv));
        usleep(100000);
        continue;
    }
    printf("[SUB] Received: %s (length: %zu)\n", msg, msg_len);
}

printf("[SUB] Exiting (simulate restart)\n");
nng_socket_close(sub_sock);
nng_fini();
return 0;

}`

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions