44 AddressHelper ,
55 AmqpMessage ,
66 ArgumentOutOfRangeException ,
7- BindingSpecification ,
87 Connection ,
98 ConnectionClosed ,
109 Environment ,
11- ExchangeSpecification ,
10+ OutcomeState ,
1211 QuorumQueueSpecification ,
1312 StreamSpecification ,
1413)
1514
1615from .http_requests import delete_all_connections
16+ from .utils import create_binding , publish_per_message
1717
1818
1919def test_publish_queue (connection : Connection ) -> None :
@@ -31,7 +31,7 @@ def test_publish_queue(connection: Connection) -> None:
3131 try :
3232 publisher = connection .publisher ("/queues/" + queue_name )
3333 status = publisher .publish (AmqpMessage (body = "test" ))
34- if status .ACCEPTED :
34+ if status .remote_state == OutcomeState . ACCEPTED :
3535 accepted = True
3636 except Exception :
3737 raised = True
@@ -46,6 +46,97 @@ def test_publish_queue(connection: Connection) -> None:
4646 assert raised is False
4747
4848
49+ def test_publish_per_message (connection : Connection ) -> None :
50+
51+ queue_name = "test-queue-1"
52+ queue_name_2 = "test-queue-2"
53+ management = connection .management ()
54+
55+ management .declare_queue (QuorumQueueSpecification (name = queue_name ))
56+ management .declare_queue (QuorumQueueSpecification (name = queue_name_2 ))
57+
58+ raised = False
59+
60+ publisher = None
61+ accepted = False
62+ accepted_2 = True
63+
64+ try :
65+ publisher = connection .publisher ()
66+ status = publish_per_message (
67+ publisher , addr = AddressHelper .queue_address (queue_name )
68+ )
69+ if status .remote_state == OutcomeState .ACCEPTED :
70+ accepted = True
71+ status = publish_per_message (
72+ publisher , addr = AddressHelper .queue_address (queue_name_2 )
73+ )
74+ if status .remote_state == OutcomeState .ACCEPTED :
75+ accepted_2 = True
76+ except Exception :
77+ raised = True
78+
79+ if publisher is not None :
80+ publisher .close ()
81+
82+ purged_messages_queue_1 = management .purge_queue (queue_name )
83+ purged_messages_queue_2 = management .purge_queue (queue_name_2 )
84+ management .delete_queue (queue_name )
85+ management .delete_queue (queue_name_2 )
86+ management .close ()
87+
88+ assert accepted is True
89+ assert accepted_2 is True
90+ assert purged_messages_queue_1 == 1
91+ assert purged_messages_queue_2 == 1
92+ assert raised is False
93+
94+
95+ def test_publish_per_message_to_exchange (connection : Connection ) -> None :
96+
97+ exchange_name = "test-exchange"
98+ queue_name = "test-queue"
99+ management = connection .management ()
100+ routing_key = "routing-key"
101+
102+ bind_name = create_binding (management , exchange_name , queue_name , routing_key )
103+
104+ raised = False
105+
106+ publisher = None
107+ # accepted = False
108+ accepted_2 = True
109+
110+ try :
111+ publisher = connection .publisher ()
112+ status = publish_per_message (
113+ publisher , addr = AddressHelper .exchange_address (exchange_name , routing_key )
114+ )
115+ if status .remote_state == OutcomeState .ACCEPTED :
116+ accepted = True
117+ status = publish_per_message (
118+ publisher , addr = AddressHelper .queue_address (queue_name )
119+ )
120+ if status .remote_state == OutcomeState .ACCEPTED :
121+ accepted_2 = True
122+ except Exception :
123+ raised = True
124+
125+ # if publisher is not None:
126+ publisher .close ()
127+
128+ purged_messages_queue = management .purge_queue (queue_name )
129+ management .unbind (bind_name )
130+ management .delete_exchange (exchange_name )
131+ management .delete_queue (queue_name )
132+ management .close ()
133+
134+ assert accepted is True
135+ assert accepted_2 is True
136+ assert purged_messages_queue == 1
137+ assert raised is False
138+
139+
49140def test_publish_ssl (connection_ssl : Connection ) -> None :
50141
51142 queue_name = "test-queue"
@@ -90,24 +181,37 @@ def test_publish_to_invalid_destination(connection: Connection) -> None:
90181 assert raised is True
91182
92183
184+ def test_publish_per_message_to_invalid_destination (connection : Connection ) -> None :
185+
186+ queue_name = "test-queue-1"
187+ raised = False
188+
189+ message = AmqpMessage (body = "test" )
190+ message .to_address ("/invalid_destination/" + queue_name )
191+ publisher = connection .publisher ()
192+
193+ try :
194+ publisher .publish (message )
195+ except ArgumentOutOfRangeException :
196+ raised = True
197+ except Exception :
198+ raised = False
199+
200+ if publisher is not None :
201+ publisher .close ()
202+
203+ assert raised is True
204+
205+
206+ """
93207def test_publish_exchange(connection: Connection) -> None:
94208
95209 exchange_name = "test-exchange"
96210 queue_name = "test-queue"
97211 management = connection.management()
98212 routing_key = "routing-key"
99213
100- management .declare_exchange (ExchangeSpecification (name = exchange_name ))
101-
102- management .declare_queue (QuorumQueueSpecification (name = queue_name ))
103-
104- management .bind (
105- BindingSpecification (
106- source_exchange = exchange_name ,
107- destination_queue = queue_name ,
108- binding_key = routing_key ,
109- )
110- )
214+ bind_name = create_binding(management, exchange_name, queue_name, routing_key)
111215
112216 addr = AddressHelper.exchange_address(exchange_name, routing_key)
113217
@@ -124,12 +228,14 @@ def test_publish_exchange(connection: Connection) -> None:
124228
125229 publisher.close()
126230
231+ management.unbind(bind_name)
127232 management.delete_exchange(exchange_name)
128233 management.delete_queue(queue_name)
129234 management.close()
130235
131236 assert accepted is True
132237 assert raised is False
238+ """
133239
134240
135241def test_publish_purge (connection : Connection ) -> None :
0 commit comments