99 ConnectionClosed ,
1010 Environment ,
1111 ExchangeSpecification ,
12+ OutcomeState ,
1213 QuorumQueueSpecification ,
1314 StreamSpecification ,
1415)
1516
1617from .http_requests import delete_all_connections
18+ from .utils import create_binding , publish_per_message
1719
1820
1921def test_publish_queue (connection : Connection ) -> None :
@@ -31,7 +33,7 @@ def test_publish_queue(connection: Connection) -> None:
3133 try :
3234 publisher = connection .publisher ("/queues/" + queue_name )
3335 status = publisher .publish (AmqpMessage (body = "test" ))
34- if status .ACCEPTED :
36+ if status .remote_state == OutcomeState . ACCEPTED :
3537 accepted = True
3638 except Exception :
3739 raised = True
@@ -46,6 +48,97 @@ def test_publish_queue(connection: Connection) -> None:
4648 assert raised is False
4749
4850
51+ def test_publish_per_message (connection : Connection ) -> None :
52+
53+ queue_name = "test-queue-1"
54+ queue_name_2 = "test-queue-2"
55+ management = connection .management ()
56+
57+ management .declare_queue (QuorumQueueSpecification (name = queue_name ))
58+ management .declare_queue (QuorumQueueSpecification (name = queue_name_2 ))
59+
60+ raised = False
61+
62+ publisher = None
63+ accepted = False
64+ accepted_2 = True
65+
66+ try :
67+ publisher = connection .publisher ()
68+ status = publish_per_message (
69+ publisher , addr = AddressHelper .queue_address (queue_name )
70+ )
71+ if status .remote_state == OutcomeState .ACCEPTED :
72+ accepted = True
73+ status = publish_per_message (
74+ publisher , addr = AddressHelper .queue_address (queue_name_2 )
75+ )
76+ if status .remote_state == OutcomeState .ACCEPTED :
77+ accepted_2 = True
78+ except Exception :
79+ raised = True
80+
81+ if publisher is not None :
82+ publisher .close ()
83+
84+ purged_messages_queue_1 = management .purge_queue (queue_name )
85+ purged_messages_queue_2 = management .purge_queue (queue_name_2 )
86+ management .delete_queue (queue_name )
87+ management .delete_queue (queue_name_2 )
88+ management .close ()
89+
90+ assert accepted is True
91+ assert accepted_2 is True
92+ assert purged_messages_queue_1 == 1
93+ assert purged_messages_queue_2 == 1
94+ assert raised is False
95+
96+
97+ def test_publish_per_message_to_exchange (connection : Connection ) -> None :
98+
99+ exchange_name = "test-exchange"
100+ queue_name = "test-queue"
101+ management = connection .management ()
102+ routing_key = "routing-key"
103+
104+ bind_name = create_binding (management , exchange_name , queue_name , routing_key )
105+
106+ raised = False
107+
108+ publisher = None
109+ # accepted = False
110+ accepted_2 = True
111+
112+ try :
113+ publisher = connection .publisher ()
114+ # status = publish_per_message(
115+ # publisher, addr=AddressHelper.exchange_address(exchange_name, routing_key)
116+ # )
117+ # if status.remote_state == OutcomeState.ACCEPTED:
118+ # accepted = True
119+ status = publish_per_message (
120+ publisher , addr = AddressHelper .queue_address (queue_name )
121+ )
122+ if status .remote_state == OutcomeState .ACCEPTED :
123+ accepted_2 = True
124+ except Exception :
125+ raised = True
126+
127+ # if publisher is not None:
128+ publisher .close ()
129+
130+ # purged_messages_queue = management.purge_queue(queue_name)
131+ management .unbind (bind_name )
132+ management .delete_exchange (exchange_name )
133+ management .delete_queue (queue_name )
134+ management .close ()
135+
136+ # assert accepted is True
137+ assert accepted_2 is True
138+ # assert purged_messages_queue == 1
139+ assert raised is False
140+
141+
49142def test_publish_ssl (connection_ssl : Connection ) -> None :
50143
51144 queue_name = "test-queue"
@@ -90,24 +183,37 @@ def test_publish_to_invalid_destination(connection: Connection) -> None:
90183 assert raised is True
91184
92185
186+ def test_publish_per_message_to_invalid_destination (connection : Connection ) -> None :
187+
188+ queue_name = "test-queue-1"
189+ raised = False
190+
191+ message = AmqpMessage (body = "test" )
192+ message .to_address ("/invalid_destination/" + queue_name )
193+ publisher = connection .publisher ()
194+
195+ try :
196+ publisher .publish (message )
197+ except ArgumentOutOfRangeException :
198+ raised = True
199+ except Exception :
200+ raised = False
201+
202+ if publisher is not None :
203+ publisher .close ()
204+
205+ assert raised is True
206+
207+
208+ """
93209def test_publish_exchange(connection: Connection) -> None:
94210
95211 exchange_name = "test-exchange"
96212 queue_name = "test-queue"
97213 management = connection.management()
98214 routing_key = "routing-key"
99215
100- management .declare_exchange (ExchangeSpecification (name = exchange_name ))
101-
102- management .declare_queue (QuorumQueueSpecification (name = queue_name ))
103-
104- management .bind (
105- BindingSpecification (
106- source_exchange = exchange_name ,
107- destination_queue = queue_name ,
108- binding_key = routing_key ,
109- )
110- )
216+ bind_name = create_binding(management, exchange_name, queue_name, routing_key)
111217
112218 addr = AddressHelper.exchange_address(exchange_name, routing_key)
113219
@@ -124,12 +230,14 @@ def test_publish_exchange(connection: Connection) -> None:
124230
125231 publisher.close()
126232
233+ management.unbind(bind_name)
127234 management.delete_exchange(exchange_name)
128235 management.delete_queue(queue_name)
129236 management.close()
130237
131238 assert accepted is True
132239 assert raised is False
240+ """
133241
134242
135243def test_publish_purge (connection : Connection ) -> None :
0 commit comments