Skip to content

Commit 5b9bf23

Browse files
authored
Merge pull request #98 from merutak/recv-same-channel
in recv, make sure all frames are read from the same channel
2 parents 3e73ff8 + aea11bc commit 5b9bf23

File tree

1 file changed

+10
-2
lines changed

1 file changed

+10
-2
lines changed

Modules/_librabbitmq/connection.c

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1305,6 +1305,7 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
13051305
amqp_connection_state_t conn, int piggyback)
13061306
{
13071307
amqp_frame_t frame;
1308+
amqp_channel_t cur_channel = 0;
13081309
amqp_basic_deliver_t *deliver;
13091310
amqp_basic_properties_t *props;
13101311
Py_ssize_t body_target;
@@ -1333,6 +1334,8 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
13331334
if (frame.frame_type != AMQP_FRAME_METHOD) continue;
13341335
if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) goto altframe;
13351336

1337+
cur_channel = frame.channel;
1338+
13361339
delivery_info = PyDict_New();
13371340
deliver = (amqp_basic_deliver_t *)frame.payload.method.decoded;
13381341
/* need consumer tag for later.
@@ -1351,7 +1354,9 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
13511354
}
13521355

13531356
Py_BEGIN_ALLOW_THREADS;
1354-
retval = amqp_simple_wait_frame(conn, &frame);
1357+
retval = cur_channel == 0 ?
1358+
amqp_simple_wait_frame(conn, &frame) :
1359+
amqp_simple_wait_frame_on_channel(conn, cur_channel, &frame);
13551360
Py_END_ALLOW_THREADS;
13561361
if (retval < 0) break;
13571362

@@ -1361,6 +1366,9 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
13611366
goto finally;
13621367
}
13631368

1369+
/* if piggybacked, 'channel' is still 0 at this point */
1370+
cur_channel = frame.channel;
1371+
13641372
/* channel */
13651373
channel = PyInt_FromLong((unsigned long)frame.channel);
13661374

@@ -1374,7 +1382,7 @@ PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p,
13741382

13751383
for (i = 0; body_received < body_target; i++) {
13761384
Py_BEGIN_ALLOW_THREADS;
1377-
retval = amqp_simple_wait_frame(conn, &frame);
1385+
retval = amqp_simple_wait_frame_on_channel(conn, cur_channel, &frame);
13781386
Py_END_ALLOW_THREADS;
13791387
if (retval < 0) break;
13801388

0 commit comments

Comments
 (0)