Skip to content

Commit 64d0efd

Browse files
author
bstrausser
committed
Client-Free(ish) NATS container
1 parent 2c4f171 commit 64d0efd

File tree

3 files changed

+167
-0
lines changed

3 files changed

+167
-0
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
#
2+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
3+
# not use this file except in compliance with the License. You may obtain
4+
# a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
10+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
11+
# License for the specific language governing permissions and limitations
12+
# under the License.
13+
14+
15+
from testcontainers.core.container import DockerContainer
16+
from testcontainers.core.waiting_utils import wait_container_is_ready, wait_for_logs
17+
18+
19+
class NatsContainer(DockerContainer):
20+
"""
21+
Nats container.
22+
23+
Example:
24+
25+
.. doctest::
26+
27+
>>> from testcontainers.nats import NatsContainer
28+
29+
>>> with NatsContainer() as nats_container:
30+
... nc = nats_container.get_client()
31+
"""
32+
33+
def __init__(
34+
self,
35+
image: str = "nats:latest",
36+
client_port: int = 4222,
37+
management_port: int = 8222,
38+
expected_ready_log: str = "Server is ready",
39+
ready_timeout_secs: int = 120,
40+
**kwargs,
41+
) -> None:
42+
super().__init__(image, **kwargs)
43+
self.client_port = client_port
44+
self.management_port = management_port
45+
self._expected_ready_log = expected_ready_log
46+
self._ready_timeout_secs = max(ready_timeout_secs, 0)
47+
self.with_exposed_ports(self.client_port, self.management_port)
48+
49+
@wait_container_is_ready()
50+
def _healthcheck(self) -> None:
51+
wait_for_logs(self, self._expected_ready_log, timeout=self._ready_timeout_secs)
52+
53+
def nats_uri(self) -> str:
54+
return f"nats://{self.get_container_host_ip()}:{self.get_exposed_port(self.client_port)}"
55+
56+
def nats_host_and_port(self) -> tuple[str, int]:
57+
return self.get_container_host_ip(), self.get_exposed_port(self.client_port)
58+
59+
def nats_management_uri(self) -> str:
60+
return f"nats://{self.get_container_host_ip()}:{self.get_exposed_port(self.management_port)}"
61+
62+
def start(self) -> "NatsContainer":
63+
super().start()
64+
self._healthcheck()
65+
return self

modules/nats/tests/test_nats.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
from testcontainers.nats import NatsContainer
2+
from uuid import uuid4
3+
import pytest
4+
5+
6+
"""
7+
If you are developing this and you want to test more advanced scenarios using a client
8+
Activate your poetry shell.
9+
pip install nats-py
10+
This will get nats-py into your environment but keep it out of the project
11+
12+
13+
"""
14+
15+
16+
NO_NATS_CLIENT = True
17+
try:
18+
from nats import connect as nats_connect
19+
from nats.aio.client import Client as NATSClient
20+
21+
NO_NATS_CLIENT = False
22+
except ImportError:
23+
pass
24+
25+
26+
async def get_client(container: NatsContainer) -> "NATSClient":
27+
"""
28+
Get a nats client.
29+
30+
Returns:
31+
client: Nats client to connect to the container.
32+
"""
33+
conn_string = container.nats_uri()
34+
client = await nats_connect(conn_string)
35+
return client
36+
37+
38+
def test_basic_container_ops():
39+
with NatsContainer() as container:
40+
# Not sure how to get type information without doing this
41+
container: NatsContainer = container
42+
h, p = container.nats_host_and_port()
43+
assert h == "localhost"
44+
uri = container.nats_uri()
45+
management_uri = container.nats_management_uri()
46+
47+
assert uri != management_uri
48+
49+
50+
pytest.mark.usefixtures("anyio_backend")
51+
52+
53+
@pytest.mark.skipif(NO_NATS_CLIENT, reason="No NATS Client Available")
54+
@pytest.mark.parametrize("anyio_backend", ["asyncio"])
55+
async def test_pubsub(anyio_backend):
56+
with NatsContainer() as container:
57+
nc: NATSClient = await get_client(container)
58+
59+
topic = str(uuid4())
60+
61+
sub = await nc.subscribe(topic)
62+
sent_message = b"Test-Containers"
63+
await nc.publish(topic, b"Test-Containers")
64+
received_msg = await sub.next_msg()
65+
print("Received:", received_msg)
66+
assert sent_message == received_msg.data
67+
await nc.flush()
68+
await nc.close()
69+
70+
71+
pytest.mark.usefixtures("anyio_backend")
72+
73+
74+
@pytest.mark.parametrize("anyio_backend", ["asyncio"])
75+
@pytest.mark.skipif(NO_NATS_CLIENT, reason="No NATS Client Available")
76+
async def test_more_complex_example(anyio_backend):
77+
with NatsContainer() as container:
78+
nc: NATSClient = await get_client(container)
79+
80+
await nc.publish("greet.joe", b"hello")
81+
82+
sub = await nc.subscribe("greet.*")
83+
84+
try:
85+
await sub.next_msg(timeout=0.1)
86+
except TimeoutError:
87+
pass
88+
89+
await nc.publish("greet.joe", b"hello.joe")
90+
await nc.publish("greet.pam", b"hello.pam")
91+
92+
first = await sub.next_msg(timeout=0.1)
93+
assert b"hello.joe" == first.data
94+
95+
second = await sub.next_msg(timeout=0.1)
96+
assert b"hello.pam" == second.data
97+
98+
await nc.publish("greet.bob", b"hello")
99+
100+
await sub.unsubscribe()
101+
await nc.drain()

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ packages = [
4141
{ include = "testcontainers", from = "modules/mongodb" },
4242
{ include = "testcontainers", from = "modules/mssql" },
4343
{ include = "testcontainers", from = "modules/mysql" },
44+
{ include = "testcontainers", from = "modules/nats" },
4445
{ include = "testcontainers", from = "modules/neo4j" },
4546
{ include = "testcontainers", from = "modules/nginx" },
4647
{ include = "testcontainers", from = "modules/opensearch" },

0 commit comments

Comments
 (0)