File tree Expand file tree Collapse file tree 6 files changed +50
-9
lines changed
rabbitmq_amqp_python_client Expand file tree Collapse file tree 6 files changed +50
-9
lines changed Original file line number Diff line number Diff line change @@ -9,6 +9,13 @@ This library is in early stages of development. It is meant to be used with Rabb
99- poetry install: resolves and install dependencies
1010- poetry run pytest: run the tests
1111
12+ ## Installation
13+
14+ The client is distributed via [ ` PIP ` ] ( https://pypi.org/project/rabbitmq-amqp-python-client/ ) :
15+ ``` bash
16+ pip install rabbitmq-amqp-python-client
17+ ```
18+
1219## Getting Started
1320
1421An example is provided in ./getting_started_main.py you can run it after starting a RabbitMQ 4.0 broker with:
Original file line number Diff line number Diff line change @@ -81,7 +81,7 @@ def main() -> None:
8181 exchange_name = "test-exchange"
8282 queue_name = "example-queue"
8383 routing_key = "routing-key"
84- messages_to_publish = 100
84+ messages_to_publish = 100000
8585
8686 print ("connection to amqp server" )
8787 connection = create_connection ()
@@ -120,7 +120,13 @@ def main() -> None:
120120
121121 # publish 10 messages
122122 for i in range (messages_to_publish ):
123- publisher .publish (Message (body = "test" ))
123+ status = publisher .publish (Message (body = "test" ))
124+ if status .ACCEPTED :
125+ print ("message accepted" )
126+ elif status .RELEASED :
127+ print ("message not routed" )
128+ elif status .REJECTED :
129+ print ("message not rejected" )
124130
125131 publisher .close ()
126132
Original file line number Diff line number Diff line change 5858 "ArgumentOutOfRangeException" ,
5959 "SslConfigurationContext" ,
6060 "ClientCert" ,
61+ "Delivery" ,
6162]
Original file line number Diff line number Diff line change @@ -21,6 +21,21 @@ def test(self, link: Link) -> bool:
2121 return bool (link .is_sender )
2222
2323
24+ class SenderOptionUnseattle (LinkOption ): # type: ignore
25+ def __init__ (self , addr : str ):
26+ self ._addr = addr
27+
28+ def apply (self , link : Link ) -> None :
29+ link .source .address = self ._addr
30+ link .snd_settle_mode = Link .SND_UNSETTLED
31+ link .rcv_settle_mode = Link .RCV_FIRST
32+ link .properties = PropertyDict ({symbol ("paired" ): True })
33+ link .source .dynamic = False
34+
35+ def test (self , link : Link ) -> bool :
36+ return bool (link .is_sender )
37+
38+
2439class ReceiverOption (LinkOption ): # type: ignore
2540 def __init__ (self , addr : str ):
2641 self ._addr = addr
Original file line number Diff line number Diff line change 11import logging
22from typing import Optional
33
4- from .options import SenderOption
4+ from .options import SenderOptionUnseattle
5+ from .qpid .proton ._delivery import Delivery
56from .qpid .proton ._message import Message
67from .qpid .proton .utils import (
78 BlockingConnection ,
@@ -23,14 +24,14 @@ def _open(self) -> None:
2324 logger .debug ("Creating Sender" )
2425 self ._sender = self ._create_sender (self ._addr )
2526
26- def publish (self , message : Message ) -> None :
27+ def publish (self , message : Message ) -> Delivery :
2728 if self ._sender is not None :
28- self ._sender .send (message )
29+ return self ._sender .send (message )
2930
3031 def close (self ) -> None :
3132 logger .debug ("Closing Sender" )
3233 if self ._sender is not None :
3334 self ._sender .close ()
3435
3536 def _create_sender (self , addr : str ) -> BlockingSender :
36- return self ._conn .create_sender (addr , options = SenderOption (addr ))
37+ return self ._conn .create_sender (addr , options = SenderOptionUnseattle (addr ))
Original file line number Diff line number Diff line change @@ -18,17 +18,24 @@ def test_publish_queue(connection: Connection) -> None:
1818
1919 raised = False
2020
21+ publisher = None
22+ accepted = False
23+
2124 try :
2225 publisher = connection .publisher ("/queues/" + queue_name )
23- publisher .publish (Message (body = "test" ))
26+ status = publisher .publish (Message (body = "test" ))
27+ if status .ACCEPTED :
28+ accepted = True
2429 except Exception :
2530 raised = True
2631
27- publisher .close ()
32+ if publisher is not None :
33+ publisher .close ()
2834
2935 management .delete_queue (queue_name )
3036 management .close ()
3137
38+ assert accepted is True
3239 assert raised is False
3340
3441
@@ -98,10 +105,13 @@ def test_publish_exchange(connection: Connection) -> None:
98105 addr = AddressHelper .exchange_address (exchange_name , routing_key )
99106
100107 raised = False
108+ accepted = False
101109
102110 try :
103111 publisher = connection .publisher (addr )
104- publisher .publish (Message (body = "test" ))
112+ status = publisher .publish (Message (body = "test" ))
113+ if status .ACCEPTED :
114+ accepted = True
105115 except Exception :
106116 raised = True
107117
@@ -111,6 +121,7 @@ def test_publish_exchange(connection: Connection) -> None:
111121 management .delete_queue (queue_name )
112122 management .close ()
113123
124+ assert accepted is True
114125 assert raised is False
115126
116127
You can’t perform that action at this time.
0 commit comments