Skip to content

Commit 64e8657

Browse files
committed
Implement basic NATS JetStream transport
1 parent 1001431 commit 64e8657

File tree

5 files changed

+475
-0
lines changed

5 files changed

+475
-0
lines changed

examples/nats_receive.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from __future__ import annotations
2+
3+
from pprint import pformat
4+
5+
from kombu import Connection, Consumer, Exchange, Queue, eventloop
6+
7+
exchange = Exchange("exchange", "direct", durable=False)
8+
msg_queue = Queue("queue", exchange=exchange, routing_key="messages")
9+
10+
11+
def pretty(obj):
12+
return pformat(obj, indent=4)
13+
14+
15+
def process_msg(body, message):
16+
print(f"Received message: {body!r}")
17+
print(f" properties:\n{pretty(message.properties)}")
18+
print(f" delivery_info:\n{pretty(message.delivery_info)}")
19+
message.ack()
20+
21+
22+
with Connection("nats://localhost:4222") as connection:
23+
with Consumer(connection, msg_queue, callbacks=[process_msg]) as consumer:
24+
for msg in eventloop(connection):
25+
pass

examples/nats_send.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from __future__ import annotations
2+
3+
from kombu import Connection, Exchange, Queue
4+
5+
exchange = Exchange("exchange", "direct", durable=False)
6+
msg_queue = Queue("queue", exchange=exchange, routing_key="messages")
7+
8+
9+
with Connection("nats://localhost:4222") as conn:
10+
producer = conn.Producer()
11+
producer.publish(
12+
"hello world", exchange=exchange, routing_key="messages", declare=[msg_queue]
13+
)

kombu/transport/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def supports_librabbitmq() -> bool | None:
4545
'azureservicebus': 'kombu.transport.azureservicebus:Transport',
4646
'pyro': 'kombu.transport.pyro:Transport',
4747
'gcpubsub': 'kombu.transport.gcpubsub:Transport',
48+
'nats': 'kombu.transport.nats_jetstream:Transport',
4849
}
4950

5051
_transport_cache = {}

0 commit comments

Comments
 (0)