Skip to content

Commit eae7460

Browse files
committed
Add NATS demo server to examples
1 parent 64e8657 commit eae7460

File tree

3 files changed

+31
-8
lines changed

3 files changed

+31
-8
lines changed

examples/nats_receive.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,21 @@
11
from __future__ import annotations
22

3+
import sys
34
from pprint import pformat
45

56
from kombu import Connection, Consumer, Exchange, Queue, eventloop
67

8+
LOCAL_SERVER = "localhost"
9+
DEMO_SERVER = "demo.nats.io"
10+
11+
server = LOCAL_SERVER
12+
use_demo_server = len(sys.argv) > 1 and sys.argv[1] == "--demo"
13+
if use_demo_server:
14+
server = DEMO_SERVER
15+
16+
717
exchange = Exchange("exchange", "direct", durable=False)
8-
msg_queue = Queue("queue", exchange=exchange, routing_key="messages")
18+
msg_queue = Queue("kombu_demo", exchange=exchange, routing_key="messages")
919

1020

1121
def pretty(obj):
@@ -19,7 +29,7 @@ def process_msg(body, message):
1929
message.ack()
2030

2131

22-
with Connection("nats://localhost:4222") as connection:
32+
with Connection(f"nats://{server}:4222") as connection:
2333
with Consumer(connection, msg_queue, callbacks=[process_msg]) as consumer:
2434
for msg in eventloop(connection):
2535
pass

examples/nats_send.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,29 @@
11
from __future__ import annotations
22

3+
import sys
4+
5+
from nats.js.api import StorageType
6+
37
from kombu import Connection, Exchange, Queue
48

9+
LOCAL_SERVER = "localhost"
10+
DEMO_SERVER = "demo.nats.io"
11+
12+
server = LOCAL_SERVER
13+
use_demo_server = len(sys.argv) > 1 and sys.argv[1] == "--demo"
14+
if use_demo_server:
15+
server = DEMO_SERVER
16+
17+
518
exchange = Exchange("exchange", "direct", durable=False)
6-
msg_queue = Queue("queue", exchange=exchange, routing_key="messages")
19+
msg_queue = Queue("kombu_demo", exchange=exchange, routing_key="messages")
720

821

9-
with Connection("nats://localhost:4222") as conn:
22+
with Connection(f"nats://{server}:4222", transport_options={
23+
"stream_config": {
24+
"storage": StorageType.FILE,
25+
}
26+
}) as conn:
1027
producer = conn.Producer()
1128
producer.publish(
1229
"hello world", exchange=exchange, routing_key="messages", declare=[msg_queue]

kombu/transport/nats_jetstream.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,6 @@ def _ensure_stream(self, queue):
191191

192192
stream_config = StreamConfig(
193193
name=stream_name,
194-
# subjects=[f"{queue}.>"],
195194
subjects=[queue],
196195
retention=RetentionPolicy.WORK_QUEUE,
197196
max_consumers=-1,
@@ -227,7 +226,6 @@ def _ensure_consumer(self, queue):
227226
durable_name=consumer_name,
228227
deliver_policy=DeliverPolicy.ALL,
229228
ack_policy=AckPolicy.EXPLICIT,
230-
# filter_subject=f"{queue}.>",
231229
filter_subject=queue,
232230
)
233231

@@ -242,7 +240,6 @@ def _ensure_consumer(self, queue):
242240
def _put(self, queue, message, **kwargs):
243241
"""Put a message on a queue."""
244242
self._ensure_stream(queue)
245-
# subject = f"{queue}.{message.get('id', '')}"
246243
subject = queue
247244
self._event_loop.run_until_complete(
248245
self._js.publish(subject, str_to_bytes(dumps(message)))
@@ -256,7 +253,6 @@ def _get(self, queue, **kwargs):
256253
try:
257254
pull_sub = self._event_loop.run_until_complete(
258255
self._js.pull_subscribe(
259-
# f"{queue}.>",
260256
queue,
261257
self._get_consumer_name(queue),
262258
stream=self._get_stream_name(queue),

0 commit comments

Comments
 (0)