Skip to content

Commit 4335862

Browse files
author
DanielePalaia
committed
improving ack implementation
1 parent 03faa76 commit 4335862

File tree

5 files changed

+78
-27
lines changed

5 files changed

+78
-27
lines changed

examples/getting_started/main.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
from rabbitmq_amqp_python_client import (
33
BindingSpecification,
44
Connection,
5-
Delivery,
65
Event,
76
ExchangeSpecification,
87
Message,
8+
MessageAck,
99
MessagingHandler,
1010
QuorumQueueSpecification,
1111
exchange_address,
@@ -20,18 +20,19 @@ def __init__(self):
2020
self._count = 0
2121

2222
def on_message(self, event: Event):
23-
print("received message: " + event.message.body)
23+
print("received message: " + str(event.message.annotations))
2424

25-
dlv = event.delivery
25+
# annotations = {}
26+
# annotations[symbol('x-opt-string')] = 'x-test1'
27+
# MessageAck.requeue_with_annotations(event, annotations)
2628

27-
dlv.update(Delivery.ACCEPTED)
28-
dlv.settle()
29+
MessageAck.accept(event)
2930

3031
print("count " + str(self._count))
3132

3233
self._count = self._count + 1
3334

34-
if self._count == 100000:
35+
if self._count == 100:
3536
print("closing receiver")
3637
event.receiver.close()
3738
event.connection.close()
@@ -42,9 +43,6 @@ def on_connection_closed(self, event: Event):
4243
def on_link_closed(self, event: Event) -> None:
4344
print("link closed")
4445

45-
def on_rejected(self, event: Event) -> None:
46-
print("rejected")
47-
4846

4947
def create_connection() -> Connection:
5048
connection = Connection("amqp://guest:guest@localhost:5672/")
@@ -58,7 +56,7 @@ def main() -> None:
5856
exchange_name = "test-exchange"
5957
queue_name = "example-queue"
6058
routing_key = "routing-key"
61-
messages_to_publish = 100000
59+
messages_to_publish = 100
6260

6361
print("connection to amqp server")
6462
connection = create_connection()
@@ -121,14 +119,14 @@ def main() -> None:
121119
management.unbind(bind_name)
122120

123121
print("delete queue")
124-
management.delete_queue(queue_name)
122+
# management.delete_queue(queue_name)
125123

126124
print("delete exchange")
127125
management.delete_exchange(exchange_name)
128126

129127
print("closing connections")
130128
management.close()
131-
consumer.close()
129+
# consumer.close()
132130
print("after management closing")
133131
connection.close()
134132
print("after connection closing")

rabbitmq_amqp_python_client/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
ExchangeSpecification,
1010
)
1111
from .management import Management
12+
from .message_ack import MessageAck
1213
from .publisher import Publisher
14+
from .qpid.proton._data import symbol # noqa: E402
1315
from .qpid.proton._delivery import Delivery
1416
from .qpid.proton._events import Event
1517
from .qpid.proton._message import Message
@@ -46,4 +48,6 @@
4648
"MessagingHandler",
4749
"Event",
4850
"Delivery",
51+
"MessageAck",
52+
"symbol",
4953
]
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from typing import Dict
2+
3+
from .qpid.proton._data import PythonAMQPData
4+
from .qpid.proton._delivery import Delivery
5+
from .qpid.proton._events import Event
6+
7+
8+
class MessageAck:
9+
10+
@staticmethod
11+
def accept(event: Event) -> None:
12+
dlv = event.delivery
13+
dlv.update(Delivery.ACCEPTED)
14+
dlv.settle()
15+
16+
@staticmethod
17+
def discard(event: Event) -> None:
18+
dlv = event.delivery
19+
dlv.update(Delivery.REJECTED)
20+
dlv.settle()
21+
22+
@staticmethod
23+
def discard_with_annotations(
24+
event: Event, annotations: Dict[str, "PythonAMQPData"]
25+
) -> None:
26+
dlv = event.delivery
27+
dlv.local.annotations = annotations
28+
dlv.local.failed = True
29+
dlv.local.undeliverable = True
30+
31+
dlv.update(Delivery.MODIFIED)
32+
dlv.settle()
33+
34+
@staticmethod
35+
def requeue(event: Event) -> None:
36+
dlv = event.delivery
37+
dlv.update(Delivery.RELEASED)
38+
dlv.settle()
39+
40+
@staticmethod
41+
def requeue_with_annotations(
42+
event: Event, annotations: Dict[str, "PythonAMQPData"]
43+
) -> None:
44+
dlv = event.delivery
45+
if dlv.local.annotations is None:
46+
print("is none")
47+
dlv.local.annotations = annotations
48+
dlv.local.failed = False
49+
dlv.local.undeliverable = False
50+
51+
dlv.update(Delivery.MODIFIED)
52+
dlv.settle()

tests/conftest.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
from rabbitmq_amqp_python_client import (
44
Connection,
5-
Delivery,
65
Event,
6+
MessageAck,
77
MessagingHandler,
88
)
99

@@ -44,10 +44,11 @@ def __str__(self) -> str:
4444
class MyMessageHandlerAccept(MessagingHandler):
4545

4646
def __init__(self):
47-
super().__init__(auto_accept=True, auto_settle=True)
47+
super().__init__(auto_accept=False, auto_settle=False)
4848
self._received = 0
4949

5050
def on_message(self, event: Event):
51+
MessageAck.accept(event)
5152
self._received = self._received + 1
5253
if self._received == 1000:
5354
event.connection.close()
@@ -77,16 +78,14 @@ def on_rejected(self, event: Event) -> None:
7778
print("rejected")
7879

7980

80-
class MyMessageHandlerReject(MessagingHandler):
81+
class MyMessageHandlerDiscard(MessagingHandler):
8182

8283
def __init__(self):
83-
super().__init__(auto_accept=False, auto_settle=True)
84+
super().__init__(auto_accept=False, auto_settle=False)
8485
self._received = 0
8586

8687
def on_message(self, event: Event):
87-
dlv = event.delivery
88-
dlv.update(Delivery.REJECTED)
89-
dlv.settle()
88+
MessageAck.discard(event)
9089
self._received = self._received + 1
9190
if self._received == 1000:
9291
event.connection.close()
@@ -96,13 +95,11 @@ def on_message(self, event: Event):
9695
class MyMessageHandlerRequeue(MessagingHandler):
9796

9897
def __init__(self):
99-
super().__init__(auto_accept=False, auto_settle=True)
98+
super().__init__(auto_accept=False, auto_settle=False)
10099
self._received = 0
101100

102101
def on_message(self, event: Event):
103-
dlv = event.delivery
104-
dlv.update(Delivery.RELEASED)
105-
dlv.settle()
102+
MessageAck.requeue(event)
106103
self._received = self._received + 1
107104
if self._received == 1000:
108105
event.connection.close()

tests/test_consumer.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
from .conftest import (
99
ConsumerTestException,
1010
MyMessageHandlerAccept,
11+
MyMessageHandlerDiscard,
1112
MyMessageHandlerNoack,
12-
MyMessageHandlerReject,
1313
MyMessageHandlerRequeue,
1414
create_connection,
1515
)
@@ -132,10 +132,10 @@ def test_consumer_async_queue_noack(connection: Connection) -> None:
132132
assert message_count > 0
133133

134134

135-
def test_consumer_async_queue_with_reject(connection: Connection) -> None:
135+
def test_consumer_async_queue_with_discard(connection: Connection) -> None:
136136
messages_to_send = 1000
137137

138-
queue_name = "test-queue_async_reject"
138+
queue_name = "test-queue_async_discard"
139139

140140
management = connection.management()
141141

@@ -155,7 +155,7 @@ def test_consumer_async_queue_with_reject(connection: Connection) -> None:
155155
connection_consumer = create_connection()
156156

157157
consumer = connection_consumer.consumer(
158-
addr_queue, handler=MyMessageHandlerReject()
158+
addr_queue, handler=MyMessageHandlerDiscard()
159159
)
160160

161161
try:

0 commit comments

Comments
 (0)