99 ConnectionClosed ,
1010 Environment ,
1111 ExchangeSpecification ,
12+ OutcomeState ,
1213 QuorumQueueSpecification ,
1314 StreamSpecification ,
1415)
1516
1617from .http_requests import delete_all_connections
18+ from .utils import create_binding , publish_per_message
1719
1820
1921def test_publish_queue (connection : Connection ) -> None :
@@ -31,18 +33,112 @@ def test_publish_queue(connection: Connection) -> None:
3133 try :
3234 publisher = connection .publisher ("/queues/" + queue_name )
3335 status = publisher .publish (AmqpMessage (body = "test" ))
34- if status .ACCEPTED :
36+ if status .remote_state == OutcomeState .ACCEPTED :
37+ accepted = True
38+ except Exception :
39+ raised = True
40+
41+ if publisher is not None :
42+ publisher .close ()
43+
44+ management .delete_queue (queue_name )
45+ management .close ()
46+
47+ assert accepted is True
48+ assert raised is False
49+
50+
51+ def test_publish_per_message (connection : Connection ) -> None :
52+
53+ queue_name = "test-queue-1"
54+ queue_name_2 = "test-queue-2"
55+ management = connection .management ()
56+
57+ management .declare_queue (QuorumQueueSpecification (name = queue_name ))
58+ management .declare_queue (QuorumQueueSpecification (name = queue_name_2 ))
59+
60+ raised = False
61+
62+ publisher = None
63+ accepted = False
64+ accepted_2 = True
65+
66+ try :
67+ publisher = connection .publisher ()
68+ status = publish_per_message (
69+ publisher , addr = AddressHelper .queue_address (queue_name )
70+ )
71+ if status .remote_state == OutcomeState .ACCEPTED :
72+ accepted = True
73+ status = publish_per_message (
74+ publisher , addr = AddressHelper .queue_address (queue_name_2 )
75+ )
76+ if status .remote_state == OutcomeState .ACCEPTED :
77+ accepted_2 = True
78+ except Exception :
79+ raised = True
80+
81+ if publisher is not None :
82+ publisher .close ()
83+
84+ purged_messages_queue_1 = management .purge_queue (queue_name )
85+ purged_messages_queue_2 = management .purge_queue (queue_name_2 )
86+ management .delete_queue (queue_name )
87+ management .delete_queue (queue_name_2 )
88+ management .close ()
89+
90+ assert accepted is True
91+ assert accepted_2 is True
92+ assert purged_messages_queue_1 == 1
93+ assert purged_messages_queue_2 == 1
94+ assert raised is False
95+
96+
97+ def test_publish_per_message_to_exchange (connection : Connection ) -> None :
98+
99+ exchange_name = "test-bind-exchange-to-queue-exchange"
100+ queue_name = "test-bind-exchange-to-queue-queue"
101+ routing_key = "routing-key"
102+
103+ management = connection .management ()
104+
105+ binding_exchange_queue_path = create_binding (
106+ management , exchange_name , queue_name , routing_key
107+ )
108+
109+ raised = False
110+
111+ publisher = None
112+ accepted = False
113+ accepted_2 = True
114+
115+ try :
116+ publisher = connection .publisher ()
117+ status = publish_per_message (
118+ publisher , addr = AddressHelper .exchange_address (exchange_name , routing_key )
119+ )
120+ if status .remote_state == OutcomeState .ACCEPTED :
35121 accepted = True
122+ status = publish_per_message (
123+ publisher , addr = AddressHelper .queue_address (queue_name )
124+ )
125+ if status .remote_state == OutcomeState .ACCEPTED :
126+ accepted_2 = True
36127 except Exception :
37128 raised = True
38129
39130 if publisher is not None :
40131 publisher .close ()
41132
133+ purged_messages_queue = management .purge_queue (queue_name )
134+ management .unbind (binding_exchange_queue_path )
135+ management .delete_exchange (exchange_name )
42136 management .delete_queue (queue_name )
43137 management .close ()
44138
45139 assert accepted is True
140+ assert accepted_2 is True
141+ assert purged_messages_queue == 2
46142 assert raised is False
47143
48144
@@ -90,24 +186,37 @@ def test_publish_to_invalid_destination(connection: Connection) -> None:
90186 assert raised is True
91187
92188
189+ def test_publish_per_message_to_invalid_destination (connection : Connection ) -> None :
190+
191+ queue_name = "test-queue-1"
192+ raised = False
193+
194+ publisher = None
195+ message = AmqpMessage (body = "test" )
196+ message .to_address ("/invalid_destination/" + queue_name )
197+
198+ try :
199+ publisher = connection .publisher ()
200+ publisher .publish (message )
201+ except ArgumentOutOfRangeException :
202+ raised = True
203+ except Exception :
204+ raised = False
205+
206+ if publisher is not None :
207+ publisher .close ()
208+
209+ assert raised is True
210+
211+
93212def test_publish_exchange (connection : Connection ) -> None :
94213
95214 exchange_name = "test-exchange"
96215 queue_name = "test-queue"
97216 management = connection .management ()
98217 routing_key = "routing-key"
99218
100- management .declare_exchange (ExchangeSpecification (name = exchange_name ))
101-
102- management .declare_queue (QuorumQueueSpecification (name = queue_name ))
103-
104- management .bind (
105- BindingSpecification (
106- source_exchange = exchange_name ,
107- destination_queue = queue_name ,
108- binding_key = routing_key ,
109- )
110- )
219+ create_binding (management , exchange_name , queue_name , routing_key )
111220
112221 addr = AddressHelper .exchange_address (exchange_name , routing_key )
113222
0 commit comments