Skip to content

Commit 50dbe3e

Browse files
committed
feat(nats): add support for jetstream
1 parent c785ecd commit 50dbe3e

File tree

3 files changed

+38
-3
lines changed

3 files changed

+38
-3
lines changed

modules/nats/example_basic.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ async def message_handler(msg: Msg):
1414

1515

1616
async def basic_example():
17-
with NatsContainer() as nats_container:
17+
with NatsContainer(jetstream=True) as nats_container:
1818
# Get connection parameters
1919
host = nats_container.get_container_host_ip()
20-
port = nats_container.get_exposed_port(nats_container.port)
20+
port = nats_container.get_exposed_port(nats_container.client_port)
2121

2222
# Create NATS client
2323
nc = NATS()
@@ -32,7 +32,7 @@ async def basic_example():
3232
print(f"\nCreated stream: {stream.config.name}")
3333

3434
# Create consumer
35-
consumer = await js.add_consumer(stream_name="test-stream", durable_name="test-consumer")
35+
consumer = await js.add_consumer(stream="test-stream", durable_name="test-consumer")
3636
print(f"Created consumer: {consumer.name}")
3737

3838
# Subscribe to subjects

modules/nats/testcontainers/nats/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def __init__(
4747
management_port: int = 8222,
4848
expected_ready_log: str = "Server is ready",
4949
ready_timeout_secs: int = 120,
50+
jetstream: bool = False,
5051
**kwargs,
5152
) -> None:
5253
super().__init__(image, **kwargs)
@@ -55,6 +56,8 @@ def __init__(
5556
self._expected_ready_log = expected_ready_log
5657
self._ready_timeout_secs = max(ready_timeout_secs, 0)
5758
self.with_exposed_ports(self.client_port, self.management_port)
59+
if jetstream:
60+
self.with_command("-js")
5861

5962
@wait_container_is_ready()
6063
def _healthcheck(self) -> None:
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from testcontainers.nats import NatsContainer
2+
from uuid import uuid4
3+
import pytest
4+
5+
from nats import connect as nats_connect
6+
from nats.aio.client import Client as NATSClient
7+
8+
9+
async def get_client(container: NatsContainer) -> "NATSClient":
10+
"""
11+
Get a nats client.
12+
13+
Returns:
14+
client: Nats client to connect to the container.
15+
"""
16+
conn_string = container.nats_uri()
17+
client = await nats_connect(conn_string)
18+
return client
19+
20+
21+
@pytest.mark.asyncio
22+
async def test_jetstream_add_stream(anyio_backend):
23+
with NatsContainer(jetstream=True) as container:
24+
nc: NATSClient = await get_client(container)
25+
26+
topic = str(uuid4())
27+
28+
js = nc.jetstream()
29+
30+
await js.add_stream(name="test-stream", subjects=[topic])
31+
32+
await nc.close()

0 commit comments

Comments
 (0)