44 AddressHelper ,
55 AmqpMessage ,
66 ArgumentOutOfRangeException ,
7- BindingSpecification ,
87 Connection ,
98 ConnectionClosed ,
109 Environment ,
11- ExchangeSpecification ,
10+ OutcomeState ,
1211 QuorumQueueSpecification ,
1312 StreamSpecification ,
1413)
1514
1615from .http_requests import delete_all_connections
16+ from .utils import create_binding , publish_per_message
1717
1818
1919def test_publish_queue (connection : Connection ) -> None :
@@ -31,18 +31,64 @@ def test_publish_queue(connection: Connection) -> None:
3131 try :
3232 publisher = connection .publisher ("/queues/" + queue_name )
3333 status = publisher .publish (AmqpMessage (body = "test" ))
34- if status .ACCEPTED :
34+ if status .remote_state == OutcomeState .ACCEPTED :
35+ accepted = True
36+ except Exception :
37+ raised = True
38+
39+ if publisher is not None :
40+ publisher .close ()
41+
42+ management .delete_queue (queue_name )
43+ management .close ()
44+
45+ assert accepted is True
46+ assert raised is False
47+
48+
49+ def test_publish_per_message (connection : Connection ) -> None :
50+
51+ queue_name = "test-queue-1"
52+ queue_name_2 = "test-queue-2"
53+ management = connection .management ()
54+
55+ management .declare_queue (QuorumQueueSpecification (name = queue_name ))
56+ management .declare_queue (QuorumQueueSpecification (name = queue_name_2 ))
57+
58+ raised = False
59+
60+ publisher = None
61+ accepted = False
62+ accepted_2 = True
63+
64+ try :
65+ publisher = connection .publisher ()
66+ status = publish_per_message (
67+ publisher , addr = AddressHelper .queue_address (queue_name )
68+ )
69+ if status .remote_state == OutcomeState .ACCEPTED :
3570 accepted = True
71+ status = publish_per_message (
72+ publisher , addr = AddressHelper .queue_address (queue_name_2 )
73+ )
74+ if status .remote_state == OutcomeState .ACCEPTED :
75+ accepted_2 = True
3676 except Exception :
3777 raised = True
3878
3979 if publisher is not None :
4080 publisher .close ()
4181
82+ purged_messages_queue_1 = management .purge_queue (queue_name )
83+ purged_messages_queue_2 = management .purge_queue (queue_name_2 )
4284 management .delete_queue (queue_name )
85+ management .delete_queue (queue_name_2 )
4386 management .close ()
4487
4588 assert accepted is True
89+ assert accepted_2 is True
90+ assert purged_messages_queue_1 == 1
91+ assert purged_messages_queue_2 == 1
4692 assert raised is False
4793
4894
@@ -90,24 +136,36 @@ def test_publish_to_invalid_destination(connection: Connection) -> None:
90136 assert raised is True
91137
92138
139+ def test_publish_per_message_to_invalid_destination (connection : Connection ) -> None :
140+
141+ queue_name = "test-queue-1"
142+ raised = False
143+
144+ message = AmqpMessage (body = "test" )
145+ message .to_address ("/invalid_destination/" + queue_name )
146+ publisher = connection .publisher ()
147+
148+ try :
149+ publisher .publish (message )
150+ except ArgumentOutOfRangeException :
151+ raised = True
152+ except Exception :
153+ raised = False
154+
155+ if publisher is not None :
156+ publisher .close ()
157+
158+ assert raised is True
159+
160+
93161def test_publish_exchange (connection : Connection ) -> None :
94162
95163 exchange_name = "test-exchange"
96164 queue_name = "test-queue"
97165 management = connection .management ()
98166 routing_key = "routing-key"
99167
100- management .declare_exchange (ExchangeSpecification (name = exchange_name ))
101-
102- management .declare_queue (QuorumQueueSpecification (name = queue_name ))
103-
104- management .bind (
105- BindingSpecification (
106- source_exchange = exchange_name ,
107- destination_queue = queue_name ,
108- binding_key = routing_key ,
109- )
110- )
168+ bind_name = create_binding (management , exchange_name , queue_name , routing_key )
111169
112170 addr = AddressHelper .exchange_address (exchange_name , routing_key )
113171
@@ -124,6 +182,7 @@ def test_publish_exchange(connection: Connection) -> None:
124182
125183 publisher .close ()
126184
185+ management .unbind (bind_name )
127186 management .delete_exchange (exchange_name )
128187 management .delete_queue (queue_name )
129188 management .close ()
@@ -265,3 +324,49 @@ def test_queue_info_for_stream_with_validations(connection: Connection) -> None:
265324 for i in range (messages_to_send ):
266325
267326 publisher .publish (AmqpMessage (body = "test" ))
327+
328+
329+ def test_publish_per_message_exchange (connection : Connection ) -> None :
330+
331+ exchange_name = "test-exchange-per-message"
332+ queue_name = "test-queue-per-message"
333+ management = connection .management ()
334+ routing_key = "routing-key-per-message"
335+
336+ bind_name = create_binding (management , exchange_name , queue_name , routing_key )
337+
338+ raised = False
339+
340+ publisher = None
341+ # accepted = False
342+ accepted_2 = False
343+
344+ try :
345+ publisher = connection .publisher ()
346+ # status = publish_per_message(
347+ # publisher, addr=AddressHelper.exchange_address(exchange_name, routing_key)
348+ # )
349+ # if status.remote_state == OutcomeState.ACCEPTED:
350+ # accepted = True
351+ status = publish_per_message (
352+ publisher , addr = AddressHelper .queue_address (queue_name )
353+ )
354+ if status .remote_state == OutcomeState .ACCEPTED :
355+ accepted_2 = True
356+ except Exception :
357+ raised = True
358+
359+ # if publisher is not None:
360+ publisher .close ()
361+
362+ purged_messages_queue = management .purge_queue (queue_name )
363+ management .unbind (bind_name )
364+ management .delete_exchange (exchange_name )
365+ management .delete_queue (queue_name )
366+
367+ management .close ()
368+
369+ # assert accepted is True
370+ assert accepted_2 is True
371+ assert purged_messages_queue == 1
372+ assert raised is False
0 commit comments