Skip to content

Commit 9906f8d

Browse files
authored
fix subscribers list (rabbitmq-community#246)
Fixes: rabbitmq-community#241 This PR refactors the consumer/subscriber system to use numeric IDs instead of string names for internal tracking, addressing issues with subscriber list management. The changes enhance performance and eliminate bugs associated with string-based subscriber identification. - Converts subscriber tracking from string-based names to integer-based IDs for better performance and reliability - Updates MessageContext and EventContext to include stream information directly instead of requiring lookups - Adds validation for max_subscribers_by_connection limits and introduces new exception types Breaking changes ====== 1. Subscriber returns not the `subscriber_id`, but instead the`reference` The bug was here [Tag 0.3.1]: https://github.com/rabbitmq-community/rstream/blob/654a9ef23118d96098fe00861b3d661da7886030/rstream/consumer.py#L197-L203 Given two references with the same name the `subscriber = self._subscribers[reference]` is not consistent. With this PR the `_subscribers` is `[int, _Subscriber]` ```python self._subscribers: dict[int, _Subscriber] = {} ``` Where the int is the subscriber id that _must_ be unique for connection by protocol. 2. remove the `get_stream` function `message_context.consumer.get_stream(message_context.subscriber_name)` The `get_stream` is not needed anymore since the `stream` is now passed on the `message_context` and also `event_context` 3. `subscribe_name` is now optional ```python class MessageContext: consumer: Consumer stream: str subscriber_id: int subscriber_name: Optional[str] offset: int timestamp: int ``` --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent cdbae44 commit 9906f8d

23 files changed

+369
-125
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ install:
88
format:
99
which poetry
1010
poetry --version
11-
poetry run isort --skip venv .
11+
poetry run isort .
1212
poetry run black --exclude=venv .
1313
poetry run flake8 --exclude=venv,local_tests,docs/examples --max-line-length=120 --ignore=E203,W503,E701,E704
1414
poetry run mypy .

docs/examples/basic_consumers/basic_consumer.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ async def consume():
2424
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close()))
2525

2626
async def on_message(msg: AMQPMessage, message_context: MessageContext):
27-
stream = message_context.consumer.get_stream(message_context.subscriber_name)
28-
offset = message_context.offset
29-
print("Got message: {} from stream {}, offset {}".format(msg, stream, offset))
27+
print(
28+
"Got message: {} from stream {}, offset {}".format(
29+
msg, message_context.stream, message_context.offset
30+
)
31+
)
3032

3133
await consumer.start()
3234
await consumer.subscribe(stream=STREAM, callback=on_message, decoder=amqp_decoder)

docs/examples/basic_consumers/basic_consumer_binary.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ async def consume():
2424
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close()))
2525

2626
async def on_message(msg: AMQPMessage, message_context: MessageContext):
27-
stream = message_context.consumer.get_stream(message_context.subscriber_name)
28-
offset = message_context.offset
29-
print("Got message: {} from stream {}, offset {}".format(msg, stream, offset))
27+
print(
28+
"Got message: {} from stream {}, offset {}".format(
29+
msg, message_context.stream, message_context.offset
30+
)
31+
)
3032

3133
await consumer.start()
3234
await consumer.subscribe(stream=STREAM, callback=on_message)

docs/examples/basic_consumers/basic_consumer_offset_next.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,13 @@ async def consume():
2626
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close()))
2727

2828
async def on_message(msg: AMQPMessage, message_context: MessageContext):
29-
stream = message_context.consumer.get_stream(message_context.subscriber_name)
30-
offset = message_context.offset
31-
32-
print("Got message: {}".format(msg) + " from stream " + stream + " offset: " + str(offset))
29+
print(
30+
"Got message: {}".format(msg)
31+
+ " from stream "
32+
+ message_context.stream
33+
+ " offset: "
34+
+ str(message_context.offset)
35+
)
3336

3437
await consumer.start()
3538
await consumer.subscribe(

docs/examples/basic_consumers/basic_consumer_offset_offset.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@ async def consume():
2626
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close()))
2727

2828
async def on_message(msg: AMQPMessage, message_context: MessageContext):
29-
stream = message_context.consumer.get_stream(message_context.subscriber_name)
30-
offset = message_context.offset
31-
print("Got message: {} from stream {}, offset {}".format(msg, stream, offset))
29+
print(
30+
"Got message: {} from stream {}, offset {}".format(
31+
msg, message_context.stream, message_context.offset
32+
)
33+
)
3234

3335
await consumer.start()
3436
# Possible values of OffsetType are: FIRST (default), NEXT, LAST, TIMESTAMP and OFFSET

docs/examples/filtering/consumer_filtering.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ async def consume():
2828
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close()))
2929

3030
async def on_message(msg: AMQPMessage, message_context: MessageContext):
31-
stream = message_context.consumer.get_stream(message_context.subscriber_name)
32-
offset = message_context.offset
33-
print("Got message: {} from stream {}, offset {}".format(msg, stream, offset))
31+
print(
32+
"Got message: {} from stream {}, offset {}".format(
33+
msg, message_context.stream, message_context.offset
34+
)
35+
)
3436
print("Application property: " + str(msg.application_properties[b"region"]))
3537
global cont
3638
cont = cont + 1

docs/examples/filtering/super_stream_consumer_filtering.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ async def consume():
2424
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close()))
2525

2626
async def on_message(msg: AMQPMessage, message_context: MessageContext):
27-
stream = message_context.consumer.get_stream(message_context.subscriber_name)
28-
offset = message_context.offset
29-
print("Got message: {} from stream {}, offset {}".format(msg, stream, offset))
27+
print(
28+
"Got message: {} from stream {}, offset {}".format(
29+
msg, message_context.stream, message_context.offset
30+
)
31+
)
3032
print("Application property: " + str(msg.application_properties[b"region"]))
3133
global cont
3234
cont = cont + 1

docs/examples/manual_server_offset_tracking/consumer.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,24 @@ async def on_message(msg: AMQPMessage, message_context: MessageContext):
2323
global lock
2424

2525
consumer = message_context.consumer
26-
stream = await message_context.consumer.stream(message_context.subscriber_name)
27-
offset = message_context.offset
2826

29-
print("Got message: {} from stream {}, offset {}".format(msg, stream, offset))
27+
print(
28+
"Got message: {} from stream {}, offset {}".format(
29+
msg, message_context.stream, message_context.offset
30+
)
31+
)
3032

3133
# store the offset every 1000 messages received
3234
async with lock:
3335
cont = cont + 1
3436
# store the offset every 1000 messages received
3537
if cont % 1000 == 0:
36-
await consumer.store_offset(
37-
stream=stream, offset=offset, subscriber_name=message_context.subscriber_name
38-
)
38+
if message_context.subscriber_name is not None:
39+
await consumer.store_offset(
40+
stream=message_context.stream,
41+
offset=message_context.offset,
42+
subscriber_name=message_context.subscriber_name,
43+
)
3944

4045

4146
async def consume():

docs/examples/reliable_client/BestPracticesClient.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,11 @@ async def on_message(msg: AMQPMessage, message_context: MessageContext):
183183
messages_consumed = messages_consumed + 1
184184
# some printf after some messages consumed in order to check that we are working...
185185
if (messages_consumed % 100000) == 0:
186-
stream = await message_context.consumer.stream(message_context.subscriber_name)
187-
offset = message_context.offset
188-
print("Received message: {} from stream: {} - message offset: {}".format(msg, stream, offset))
186+
print(
187+
"Received message: {} from stream: {} - message offset: {}".format(
188+
msg, message_context.stream, message_context.offset
189+
)
190+
)
189191

190192

191193
async def publish(rabbitmq_configuration: dict):

docs/examples/reliable_client/appsettings.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
"Virtualhost": "/",
88
"LoadBalancer": true,
99
"SuperStream": true,
10-
"MaxPublishersByConnection": 256,
11-
"MaxSubscribersByConnection": 256,
10+
"MaxPublishersByConnection": 250,
11+
"MaxSubscribersByConnection": 250,
1212
"Producers": 3,
1313
"Consumers": 3,
1414
"DelayDuringSendMs":0,

0 commit comments

Comments
 (0)