File tree Expand file tree Collapse file tree 5 files changed +61
-17
lines changed
rabbitmq_amqp_python_client Expand file tree Collapse file tree 5 files changed +61
-17
lines changed Original file line number Diff line number Diff line change 1+ from proton import Message
2+
13from rabbitmq_amqp_python_client import (
24 BindingSpecification ,
35 Connection ,
46 ExchangeSpecification ,
57 QueueSpecification ,
68 QueueType ,
9+ exchange_address ,
710)
811
912
@@ -31,29 +34,17 @@ def main() -> None:
3134 )
3235 )
3336
34- """
3537 addr = exchange_address (exchange_name , routing_key )
36- """
3738
38- """
39- publisher = connection.publisher(addr, "getting-started-publisher")
40- """
39+ publisher = connection .publisher (addr )
4140
42- """
43- message = Message(
44- body='test',
45- address='/queues/getting-started-exchangemessage',
46- )
41+ publisher .publish (Message (body = "test" ))
4742
48- publisher.Publish(message)
4943 publisher .close ()
50- """
5144
5245 # management.unbind(binding_exchange_queue_path)
5346
54- """
55- management.purge_queue(queue_info.name)
56- """
47+ # management.purge_queue(queue_info.name)
5748
5849 # management.delete_queue(queue_name)
5950
Original file line number Diff line number Diff line change 11from importlib import metadata
22
3+ from .address_helper import exchange_address
34from .common import QueueType
45from .connection import Connection
56from .entities import (
67 BindingSpecification ,
78 ExchangeSpecification ,
89 QueueSpecification ,
910)
11+ from .publisher import Publisher
1012
1113try :
1214 __version__ = metadata .version (__package__ )
2325 "QueueSpecification" ,
2426 "BindingSpecification" ,
2527 "QueueType" ,
28+ "Publisher" ,
29+ "exchange_address" ,
2630]
Original file line number Diff line number Diff line change 11from .entities import BindingSpecification
22
33
4- def exchange_address (name : str ) -> str :
5- path = "/exchanges/" + name
4+ def exchange_address (exchange_name : str , routing_key : str = "" ) -> str :
5+ if routing_key == "" :
6+ path = "/exchanges/" + exchange_name
7+ else :
8+ path = "/exchanges/" + exchange_name + "/" + routing_key
69
710 return path
811
Original file line number Diff line number Diff line change 33from proton .utils import BlockingConnection
44
55from .management import Management
6+ from .publisher import Publisher
67
78logger = logging .getLogger (__name__ )
89
@@ -30,6 +31,10 @@ def management(self) -> Management:
3031 def close (self ) -> None :
3132 self ._conn .close ()
3233
34+ def publisher (self , destination : str ) -> Publisher :
35+ publisher = Publisher (self ._conn , destination )
36+ return publisher
37+
3338 # TODO: returns the current status of the connection.
3439 # def status(self) -> int:
3540 # pass
Original file line number Diff line number Diff line change 1+ import logging
2+ from typing import Optional
3+
4+ from proton import Message
5+ from proton .utils import (
6+ BlockingConnection ,
7+ BlockingReceiver ,
8+ BlockingSender ,
9+ )
10+
11+ from .options import SenderOption
12+
13+ logger = logging .getLogger (__name__ )
14+
15+
16+ class Publisher :
17+ def __init__ (self , conn : BlockingConnection , addr : str ):
18+ self ._sender : Optional [BlockingSender ] = None
19+ self ._receiver : Optional [BlockingReceiver ] = None
20+ self ._conn = conn
21+ self ._addr = addr
22+ self ._open ()
23+
24+ def _open (self ) -> None :
25+ print ("addr is " + str (self ._addr ))
26+ if self ._sender is None :
27+ logger .debug ("Creating Sender" )
28+ self ._sender = self ._create_sender (self ._addr )
29+
30+ def publish (self , message : Message ) -> None :
31+ if self ._sender is not None :
32+ self ._sender .send (message )
33+
34+ def close (self ) -> None :
35+ if self ._sender is not None :
36+ self ._sender .close ()
37+ # if self._receiver is not None:
38+ # self._receiver.close()
39+
40+ def _create_sender (self , addr : str ) -> BlockingSender :
41+ return self ._conn .create_sender (addr , options = SenderOption (addr ))
You can’t perform that action at this time.
0 commit comments