44 AddressHelper ,
55 AmqpMessage ,
66 ArgumentOutOfRangeException ,
7- BindingSpecification ,
87 Connection ,
98 ConnectionClosed ,
109 Environment ,
11- ExchangeSpecification ,
10+ OutcomeState ,
1211 QuorumQueueSpecification ,
1312 StreamSpecification ,
1413)
1514
1615from .http_requests import delete_all_connections
16+ from .utils import create_binding , publish_per_message
1717
1818
1919def test_publish_queue (connection : Connection ) -> None :
@@ -31,20 +31,117 @@ def test_publish_queue(connection: Connection) -> None:
3131 try :
3232 publisher = connection .publisher ("/queues/" + queue_name )
3333 status = publisher .publish (AmqpMessage (body = "test" ))
34- if status .ACCEPTED :
34+ if status .remote_state == OutcomeState .ACCEPTED :
35+ accepted = True
36+ except Exception :
37+ raised = True
38+
39+ if publisher is not None :
40+ publisher .close ()
41+
42+ management .delete_queue (queue_name )
43+ management .close ()
44+
45+ assert accepted is True
46+ assert raised is False
47+
48+
49+ """
50+ def test_publish_per_message(connection: Connection) -> None:
51+
52+ queue_name = "test-queue-1"
53+ queue_name_2 = "test-queue-2"
54+ management = connection.management()
55+
56+ management.declare_queue(QuorumQueueSpecification(name=queue_name))
57+ management.declare_queue(QuorumQueueSpecification(name=queue_name_2))
58+
59+ raised = False
60+
61+ publisher = None
62+ accepted = False
63+ accepted_2 = True
64+
65+ try:
66+ publisher = connection.publisher()
67+ status = publish_per_message(
68+ publisher, addr=AddressHelper.queue_address(queue_name)
69+ )
70+ if status.remote_state == OutcomeState.ACCEPTED:
3571 accepted = True
72+ status = publish_per_message(
73+ publisher, addr=AddressHelper.queue_address(queue_name_2)
74+ )
75+ if status.remote_state == OutcomeState.ACCEPTED:
76+ accepted_2 = True
3677 except Exception:
3778 raised = True
3879
3980 if publisher is not None:
4081 publisher.close()
4182
83+ purged_messages_queue_1 = management.purge_queue(queue_name)
84+ purged_messages_queue_2 = management.purge_queue(queue_name_2)
4285 management.delete_queue(queue_name)
86+ management.delete_queue(queue_name_2)
4387 management.close()
4488
4589 assert accepted is True
90+ assert accepted_2 is True
91+ assert purged_messages_queue_1 == 1
92+ assert purged_messages_queue_2 == 1
4693 assert raised is False
4794
95+ """
96+ """
97+ def test_publish_per_message_to_exchange(connection: Connection) -> None:
98+
99+ exchange_name = "test-bind-exchange-to-queue-exchange"
100+ queue_name = "test-bind-exchange-to-queue-queue"
101+ routing_key = "routing-key"
102+
103+ management = connection.management()
104+
105+ binding_exchange_queue_path = create_binding(
106+ management, exchange_name, queue_name, routing_key
107+ )
108+
109+ raised = False
110+
111+ publisher = None
112+ accepted = False
113+ accepted_2 = True
114+
115+ try:
116+ publisher = connection.publisher()
117+ status = publish_per_message(
118+ publisher, addr=AddressHelper.exchange_address(exchange_name, routing_key)
119+ )
120+ if status.remote_state == OutcomeState.ACCEPTED:
121+ accepted = True
122+ status = publish_per_message(
123+ publisher, addr=AddressHelper.queue_address(queue_name)
124+ )
125+ if status.remote_state == OutcomeState.ACCEPTED:
126+ accepted_2 = True
127+ except Exception:
128+ raised = True
129+
130+ if publisher is not None:
131+ publisher.close()
132+
133+ purged_messages_queue = management.purge_queue(queue_name)
134+ management.unbind(binding_exchange_queue_path)
135+ management.delete_exchange(exchange_name)
136+ management.delete_queue(queue_name)
137+ management.close()
138+
139+ assert accepted is True
140+ assert accepted_2 is True
141+ assert purged_messages_queue == 2
142+ assert raised is False
143+ """
144+
48145
49146def test_publish_ssl (connection_ssl : Connection ) -> None :
50147
@@ -90,24 +187,39 @@ def test_publish_to_invalid_destination(connection: Connection) -> None:
90187 assert raised is True
91188
92189
190+ """
191+ def test_publish_per_message_to_invalid_destination(connection: Connection) -> None:
192+
193+ queue_name = "test-queue-1"
194+ raised = False
195+
196+ publisher = None
197+ message = AmqpMessage(body="test")
198+ message.to_address("/invalid_destination/" + queue_name)
199+
200+ try:
201+ publisher = connection.publisher()
202+ publisher.publish(message)
203+ except ArgumentOutOfRangeException:
204+ raised = True
205+ except Exception:
206+ raised = False
207+
208+ if publisher is not None:
209+ publisher.close()
210+
211+ assert raised is True
212+ """
213+
214+
93215def test_publish_exchange (connection : Connection ) -> None :
94216
95217 exchange_name = "test-exchange"
96218 queue_name = "test-queue"
97219 management = connection .management ()
98220 routing_key = "routing-key"
99221
100- management .declare_exchange (ExchangeSpecification (name = exchange_name ))
101-
102- management .declare_queue (QuorumQueueSpecification (name = queue_name ))
103-
104- management .bind (
105- BindingSpecification (
106- source_exchange = exchange_name ,
107- destination_queue = queue_name ,
108- binding_key = routing_key ,
109- )
110- )
222+ create_binding (management , exchange_name , queue_name , routing_key )
111223
112224 addr = AddressHelper .exchange_address (exchange_name , routing_key )
113225
0 commit comments