Skip to content

Subscriber messages leak when subscribing in one thread and publishing in another #4811

@mangodan2003

Description

@mangodan2003

version: 4.3.5

In our application we are experiencing memory leaks over time. This seems to sometimes happen when a new message is received on a socket before we have finished dealing with the previous one. It does not happen if the process is only subscribing to messages published elsewhere. In our application the process both subscribes to and publishes messages to and from other processes. To keep this example self contained and as short as possible it subscribes to its own messages, but this is not a prerequisite for the bug.

The behaviour reproduced by the below is that the memory use ramps right up (to about 1GiB) whilst the message count is below 50 (and we take longer to handle message than the frequency at which they are received). Once we "catch up" some of that memory is freed but a lot of it remains allocated.
If we do not sleep ever the memory footprint of the process is initially as expected (a great deal less) but in our application it sporadically increases as time goes by, I suspect as a result scheduling meaning that sometimes a new message can be received before the previous one has been processed.

By commenting out the sleep in the receive loop the real would leak over time can be reproduced. It may be necessary to reduce the sleep in the send loop.

zmq_pub_sub.c:

#include <stdio.h>
#include <assert.h>
#include <signal.h>
#include <unistd.h>
#include <zmq.h>
#include <pthread.h>

// A basic standalone zmq publisher and subscriber that receives messages that it is sends
// This is a successful attempt at reproducing a memory leak that occurs in our application
// Simply subscring to messages published by another process DOES NOT reproduce the problem.
// Publising in another thread of THE SAME PROCESS is a mandatory requirement for the problem to occur.

#define SOCKET_PATH "ipc:///tmp/zmq_pub_sub_test.sock"
// Using this size purely as it was the size of the messages published in our application
#define MESSAGE_SIZE 2764800

uint8_t stopping = 0;


int send_messages(void *arg) {

    void *context = zmq_ctx_new();
    void *publisher = zmq_socket(context, ZMQ_PUB);
    int rc = zmq_bind(publisher, SOCKET_PATH);
    assert(rc == 0);

    while(!stopping) {
        zmq_msg_t msg;
        int rc = zmq_msg_init_size(&msg, MESSAGE_SIZE);
        assert(rc == 0);
        rc = zmq_sendmsg(publisher, &msg, 0);
        if(rc == -1) {
            printf("error sending message: %s\n", zmq_strerror(zmq_errno()));
            break;
        }
        zmq_msg_close(&msg);
        usleep(10000);
    }
    zmq_close(publisher);
    zmq_ctx_destroy(context);
    stopping = 1;
    return 0;
}


void signal_handler(int signal) {
    stopping = 1;
}


int main()
{
    void *context = zmq_ctx_new();
    void *subscriber = zmq_socket(context, ZMQ_SUB);
    int rc = zmq_connect(subscriber, SOCKET_PATH);
    assert(rc == 0);
    rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
    assert(rc == 0);

    size_t count=0;

    // signal handler to allow stopping with ctrl+c
    signal(SIGINT, signal_handler);

    // Spin up a thread to send messages to the subscriber
    pthread_t thread;
    pthread_create(&thread, NULL, (void *(*)(void *))send_messages, NULL);


    while(!stopping) {
        int64_t moreParts = 0;
        size_t more_size = sizeof moreParts;
        do {
            zmq_msg_t msg;
            int rc = zmq_msg_init(&msg);
            assert(rc == 0);
            // In our application,  zmq::msg_t::init_size  sometimes allocates memory twice for a single message and only one instance is freed.
            // By publishing and subscribing in the same process we have reproduced the same issue!
            rc = zmq_recvmsg(subscriber, &msg, ZMQ_DONTWAIT);
            if (rc == -1 && zmq_errno() == EAGAIN) {
                zmq_msg_close(&msg);
                break;
            }
            assert(rc != -1);

            // Our application copies the data to a py list of bytes object here

            printf("received %u bytes.\n", zmq_msg_size(&msg));
            // This is where memory should be free'd or in the case of our application, there is occassionaly a leak
            // consistenly a leak if the callback hasnt returned betime we receive the next message
            // suspect this happens sporadically in our application due to process scheduling variance - i.e. sometimes a callback hasnt completed before the next message is received
            zmq_msg_close(&msg);
        
            rc = zmq_getsockopt(subscriber, ZMQ_RCVMORE, &moreParts, &more_size);
            assert(rc == 0);
        } while (moreParts);


        // Our application calls the handler callback here
        // Causing a backlog of messages by taking too long to handle them is a reliable way to reproduce a leak
        if(count++ < 50) {
             usleep(100000);
        }
        // once we reach 50 and stop sleeping we catach up, but many messages have been double allocated and are not free'd  
    }

    stopping = 1;
    pthread_join(thread, NULL);

    printf("stopping\n");
    zmq_close(subscriber);
    zmq_ctx_destroy(context);

    return 0;
}

Build wtih:
gcc -o zmq_pub_sub zmq_pub_sub.c -lzmq

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions