Skip to content

Commit e0be74f

Browse files
author
DanielePalaia
committed
publisher implementation
1 parent 79a73cd commit e0be74f

File tree

5 files changed

+61
-17
lines changed

5 files changed

+61
-17
lines changed

examples/getting_started/main.py

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
from proton import Message
2+
13
from rabbitmq_amqp_python_client import (
24
BindingSpecification,
35
Connection,
46
ExchangeSpecification,
57
QueueSpecification,
68
QueueType,
9+
exchange_address,
710
)
811

912

@@ -31,29 +34,17 @@ def main() -> None:
3134
)
3235
)
3336

34-
"""
3537
addr = exchange_address(exchange_name, routing_key)
36-
"""
3738

38-
"""
39-
publisher = connection.publisher(addr, "getting-started-publisher")
40-
"""
39+
publisher = connection.publisher(addr)
4140

42-
"""
43-
message = Message(
44-
body='test',
45-
address='/queues/getting-started-exchangemessage',
46-
)
41+
publisher.publish(Message(body="test"))
4742

48-
publisher.Publish(message)
4943
publisher.close()
50-
"""
5144

5245
# management.unbind(binding_exchange_queue_path)
5346

54-
"""
55-
management.purge_queue(queue_info.name)
56-
"""
47+
# management.purge_queue(queue_info.name)
5748

5849
# management.delete_queue(queue_name)
5950

rabbitmq_amqp_python_client/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
from importlib import metadata
22

3+
from .address_helper import exchange_address
34
from .common import QueueType
45
from .connection import Connection
56
from .entities import (
67
BindingSpecification,
78
ExchangeSpecification,
89
QueueSpecification,
910
)
11+
from .publisher import Publisher
1012

1113
try:
1214
__version__ = metadata.version(__package__)
@@ -23,4 +25,6 @@
2325
"QueueSpecification",
2426
"BindingSpecification",
2527
"QueueType",
28+
"Publisher",
29+
"exchange_address",
2630
]

rabbitmq_amqp_python_client/address_helper.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
from .entities import BindingSpecification
22

33

4-
def exchange_address(name: str) -> str:
5-
path = "/exchanges/" + name
4+
def exchange_address(exchange_name: str, routing_key: str = "") -> str:
5+
if routing_key == "":
6+
path = "/exchanges/" + exchange_name
7+
else:
8+
path = "/exchanges/" + exchange_name + "/" + routing_key
69

710
return path
811

rabbitmq_amqp_python_client/connection.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from proton.utils import BlockingConnection
44

55
from .management import Management
6+
from .publisher import Publisher
67

78
logger = logging.getLogger(__name__)
89

@@ -30,6 +31,10 @@ def management(self) -> Management:
3031
def close(self) -> None:
3132
self._conn.close()
3233

34+
def publisher(self, destination: str) -> Publisher:
35+
publisher = Publisher(self._conn, destination)
36+
return publisher
37+
3338
# TODO: returns the current status of the connection.
3439
# def status(self) -> int:
3540
# pass
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import logging
2+
from typing import Optional
3+
4+
from proton import Message
5+
from proton.utils import (
6+
BlockingConnection,
7+
BlockingReceiver,
8+
BlockingSender,
9+
)
10+
11+
from .options import SenderOption
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
class Publisher:
17+
def __init__(self, conn: BlockingConnection, addr: str):
18+
self._sender: Optional[BlockingSender] = None
19+
self._receiver: Optional[BlockingReceiver] = None
20+
self._conn = conn
21+
self._addr = addr
22+
self._open()
23+
24+
def _open(self) -> None:
25+
print("addr is " + str(self._addr))
26+
if self._sender is None:
27+
logger.debug("Creating Sender")
28+
self._sender = self._create_sender(self._addr)
29+
30+
def publish(self, message: Message) -> None:
31+
if self._sender is not None:
32+
self._sender.send(message)
33+
34+
def close(self) -> None:
35+
if self._sender is not None:
36+
self._sender.close()
37+
# if self._receiver is not None:
38+
# self._receiver.close()
39+
40+
def _create_sender(self, addr: str) -> BlockingSender:
41+
return self._conn.create_sender(addr, options=SenderOption(addr))

0 commit comments

Comments
 (0)