44 AddressHelper ,
55 AmqpMessage ,
66 ArgumentOutOfRangeException ,
7- BindingSpecification ,
87 Connection ,
98 ConnectionClosed ,
109 Environment ,
11- ExchangeSpecification ,
10+ OutcomeState ,
1211 QuorumQueueSpecification ,
1312 StreamSpecification ,
1413)
1514
1615from .http_requests import delete_all_connections
16+ from .utils import create_binding , publish_per_message
1717
1818
1919def test_publish_queue (connection : Connection ) -> None :
@@ -31,7 +31,7 @@ def test_publish_queue(connection: Connection) -> None:
3131 try :
3232 publisher = connection .publisher ("/queues/" + queue_name )
3333 status = publisher .publish (AmqpMessage (body = "test" ))
34- if status .ACCEPTED :
34+ if status .remote_state == OutcomeState . ACCEPTED :
3535 accepted = True
3636 except Exception :
3737 raised = True
@@ -46,6 +46,52 @@ def test_publish_queue(connection: Connection) -> None:
4646 assert raised is False
4747
4848
49+ def test_publish_per_message (connection : Connection ) -> None :
50+
51+ queue_name = "test-queue-1"
52+ queue_name_2 = "test-queue-2"
53+ management = connection .management ()
54+
55+ management .declare_queue (QuorumQueueSpecification (name = queue_name ))
56+ management .declare_queue (QuorumQueueSpecification (name = queue_name_2 ))
57+
58+ raised = False
59+
60+ publisher = None
61+ accepted = False
62+ accepted_2 = True
63+
64+ try :
65+ publisher = connection .publisher ()
66+ status = publish_per_message (
67+ publisher , addr = AddressHelper .queue_address (queue_name )
68+ )
69+ if status .remote_state == OutcomeState .ACCEPTED :
70+ accepted = True
71+ status = publish_per_message (
72+ publisher , addr = AddressHelper .queue_address (queue_name_2 )
73+ )
74+ if status .remote_state == OutcomeState .ACCEPTED :
75+ accepted_2 = True
76+ except Exception :
77+ raised = True
78+
79+ if publisher is not None :
80+ publisher .close ()
81+
82+ purged_messages_queue_1 = management .purge_queue (queue_name )
83+ purged_messages_queue_2 = management .purge_queue (queue_name_2 )
84+ management .delete_queue (queue_name )
85+ management .delete_queue (queue_name_2 )
86+ management .close ()
87+
88+ assert accepted is True
89+ assert accepted_2 is True
90+ assert purged_messages_queue_1 == 1
91+ assert purged_messages_queue_2 == 1
92+ assert raised is False
93+
94+
4995def test_publish_ssl (connection_ssl : Connection ) -> None :
5096
5197 queue_name = "test-queue"
@@ -90,24 +136,36 @@ def test_publish_to_invalid_destination(connection: Connection) -> None:
90136 assert raised is True
91137
92138
139+ def test_publish_per_message_to_invalid_destination (connection : Connection ) -> None :
140+
141+ queue_name = "test-queue-1"
142+ raised = False
143+
144+ message = AmqpMessage (body = "test" )
145+ message .to_address ("/invalid_destination/" + queue_name )
146+ publisher = connection .publisher ()
147+
148+ try :
149+ publisher .publish (message )
150+ except ArgumentOutOfRangeException :
151+ raised = True
152+ except Exception :
153+ raised = False
154+
155+ if publisher is not None :
156+ publisher .close ()
157+
158+ assert raised is True
159+
160+
93161def test_publish_exchange (connection : Connection ) -> None :
94162
95163 exchange_name = "test-exchange"
96164 queue_name = "test-queue"
97165 management = connection .management ()
98166 routing_key = "routing-key"
99167
100- management .declare_exchange (ExchangeSpecification (name = exchange_name ))
101-
102- management .declare_queue (QuorumQueueSpecification (name = queue_name ))
103-
104- management .bind (
105- BindingSpecification (
106- source_exchange = exchange_name ,
107- destination_queue = queue_name ,
108- binding_key = routing_key ,
109- )
110- )
168+ bind_name = create_binding (management , exchange_name , queue_name , routing_key )
111169
112170 addr = AddressHelper .exchange_address (exchange_name , routing_key )
113171
@@ -124,6 +182,7 @@ def test_publish_exchange(connection: Connection) -> None:
124182
125183 publisher .close ()
126184
185+ management .unbind (bind_name )
127186 management .delete_exchange (exchange_name )
128187 management .delete_queue (queue_name )
129188 management .close ()
@@ -265,3 +324,50 @@ def test_queue_info_for_stream_with_validations(connection: Connection) -> None:
265324 for i in range (messages_to_send ):
266325
267326 publisher .publish (AmqpMessage (body = "test" ))
327+
328+
329+ def test_publish_per_message_exchange (connection : Connection ) -> None :
330+
331+ # exchange_name = "test-exchange-per-message"
332+ # queue_name = "test-queue-per-message"
333+ # management = connection.management()
334+ # routing_key = "routing-key-per-message"
335+
336+ # create_binding(management, exchange_name, queue_name, routing_key)
337+
338+ raised = False
339+
340+ # publisher = None
341+ # accepted = False
342+ # accepted_2 = True
343+
344+ """
345+ try:
346+ publisher = connection.publisher()
347+ status = publish_per_message(
348+ publisher, addr=AddressHelper.exchange_address(exchange_name, routing_key)
349+ )
350+ if status.remote_state == OutcomeState.ACCEPTED:
351+ accepted = True
352+ status = publish_per_message(
353+ publisher, addr=AddressHelper.queue_address(queue_name)
354+ )
355+ if status.remote_state == OutcomeState.ACCEPTED:
356+ accepted_2 = True
357+ except Exception:
358+ raised = True
359+
360+ # if publisher is not None:
361+ publisher.close()
362+
363+ purged_messages_queue = management.purge_queue(queue_name)
364+ management.unbind(bind_name)
365+ management.delete_exchange(exchange_name)
366+ management.delete_queue(queue_name)
367+ management.close()
368+ """
369+
370+ # assert accepted is True
371+ # assert accepted_2 is True
372+ # assert purged_messages_queue == 2
373+ assert raised is False
0 commit comments