Skip to content

Commit 1539689

Browse files
committed
add container
1 parent d77908d commit 1539689

File tree

5 files changed

+161
-22
lines changed

5 files changed

+161
-22
lines changed

eventsourcingdb/client.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
from collections.abc import AsyncGenerator
22
from types import TracebackType
3-
from typing import Optional, Type, TypeVar, AsyncIterator
4-
from eventsourcingdb.json_encoder import EventSourcingDBJSONEncoder
3+
from typing import Optional, Type, TypeVar
54

65
from eventsourcingdb.abstract_base_client import AbstractBaseClient
76

8-
from .client_configuration import ClientConfiguration
97
from .event.event import Event
108
from .event.event_candidate import EventCandidate
119
from .event.event_context import EventContext

eventsourcingdb/container.py

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
import time
2+
import docker
3+
import requests
4+
from typing import Optional
5+
from docker import errors
6+
7+
from eventsourcingdb.client import Client
8+
9+
10+
class Container:
11+
def __init__(
12+
self,
13+
image_name: str = "thenativeweb/eventsourcingdb",
14+
image_tag: str = "0.113.2",
15+
api_token: str = "secret",
16+
internal_port: int = 3000
17+
):
18+
self._image_name = image_name
19+
self._image_tag = image_tag
20+
self._internal_port = internal_port
21+
self._api_token = api_token
22+
self._container = None
23+
self._docker_client = docker.from_env()
24+
self._mapped_port: Optional[int] = None
25+
self._host = "localhost"
26+
27+
def with_image_tag(self, tag: str) -> "Container":
28+
self._image_tag = tag
29+
return self
30+
31+
def with_api_token(self, token: str) -> "Container":
32+
self._api_token = token
33+
return self
34+
35+
def with_port(self, port: int) -> "Container":
36+
self._internal_port = port
37+
return self
38+
39+
def start(self) -> "Container":
40+
if self._container is not None:
41+
return self
42+
43+
try:
44+
self._docker_client.images.pull(self._image_name, self._image_tag)
45+
except errors.APIError as e:
46+
print(f"Warning: Could not pull image: {e}")
47+
48+
self._cleanup_existing_containers()
49+
self._create_container()
50+
self._fetch_mapped_port()
51+
self._wait_for_http("/api/v1/ping", timeout=20)
52+
53+
return self
54+
55+
def _cleanup_existing_containers(self) -> None:
56+
try:
57+
for container in self._docker_client.containers.list(
58+
filters={"ancestor": f"{self._image_name}:{self._image_tag}"}):
59+
container.stop()
60+
container.remove()
61+
except Exception as e:
62+
print(f"Warning: Error stopping existing containers: {e}")
63+
64+
def _create_container(self) -> None:
65+
port_bindings = {f"{self._internal_port}/tcp": None}
66+
self._container = self._docker_client.containers.run(
67+
f"{self._image_name}:{self._image_tag}",
68+
command=[
69+
"run",
70+
"--api-token",
71+
self._api_token,
72+
"--data-directory-temporary",
73+
"--http-enabled",
74+
"--https-enabled=false",
75+
],
76+
ports=port_bindings, # type: ignore
77+
detach=True,
78+
) # type: ignore
79+
80+
def _fetch_mapped_port(self) -> None:
81+
if self._container is None:
82+
raise RuntimeError("Container failed to start")
83+
84+
max_retries, retry_delay = 5, 1
85+
86+
for attempt in range(max_retries):
87+
try:
88+
container_info = self._docker_client.api.inspect_container(
89+
self._container.id)
90+
port_mappings = container_info["NetworkSettings"]["Ports"].get(f"{self._internal_port}/tcp")
91+
92+
if port_mappings and "HostPort" in port_mappings[0]:
93+
self._mapped_port = int(port_mappings[0]["HostPort"])
94+
return
95+
except (KeyError, IndexError, AttributeError):
96+
pass
97+
98+
if attempt < max_retries - 1:
99+
time.sleep(retry_delay)
100+
101+
# Failed to get port, clean up
102+
self._stop_and_remove_container()
103+
raise RuntimeError("Failed to determine mapped port")
104+
105+
def _wait_for_http(self, path: str, timeout: int) -> None:
106+
base_url = self.get_base_url()
107+
url = f"{base_url}{path}"
108+
start_time = time.time()
109+
110+
while time.time() - start_time < timeout:
111+
try:
112+
response = requests.get(url, timeout=2)
113+
if response.status_code == 200:
114+
return
115+
except requests.RequestException:
116+
pass
117+
time.sleep(0.5)
118+
119+
self._stop_and_remove_container()
120+
raise TimeoutError(f"Service failed to become ready within {timeout} seconds")
121+
122+
def _stop_and_remove_container(self) -> None:
123+
if self._container is None:
124+
return
125+
126+
try:
127+
self._container.stop()
128+
self._container.remove()
129+
except Exception as e:
130+
print(f"Warning: Error stopping container: {e}")
131+
finally:
132+
self._container = None
133+
self._mapped_port = None
134+
135+
def get_host(self) -> str:
136+
if self._container is None:
137+
raise RuntimeError("Container must be running.")
138+
return self._host
139+
140+
def get_mapped_port(self) -> int:
141+
if self._container is None or self._mapped_port is None:
142+
raise RuntimeError("Container must be running.")
143+
return self._mapped_port
144+
145+
def get_base_url(self) -> str:
146+
if self._container is None:
147+
raise RuntimeError("Container must be running.")
148+
return f"http://{self.get_host()}:{self.get_mapped_port()}"
149+
150+
def get_api_token(self) -> str:
151+
return self._api_token
152+
153+
def is_running(self) -> bool:
154+
return self._container is not None
155+
156+
def stop(self) -> None:
157+
self._stop_and_remove_container()
158+
159+
def get_client(self) -> Client:
160+
return Client(self.get_base_url(), self.get_api_token())

eventsourcingdb/handlers/observe_events/observe_events.py

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,8 @@
1010
from ..parse_raw_message import parse_raw_message
1111
from ...errors.custom_error import CustomError
1212
from ...errors.internal_error import InternalError
13-
from ...errors.invalid_parameter_error import InvalidParameterError
1413
from ...errors.server_error import ServerError
15-
from ...errors.validation_error import ValidationError
1614
from ...event.event import Event
17-
from ...event.validate_subject import validate_subject
1815
from .observe_events_options import ObserveEventsOptions
1916
from ...http_client.response import Response
2017

@@ -24,20 +21,6 @@ async def observe_events(
2421
subject: str,
2522
options: ObserveEventsOptions
2623
) -> AsyncGenerator[Event, None]:
27-
"""try:
28-
validate_subject(subject)
29-
except ValidationError as validation_error:
30-
raise InvalidParameterError('subject', str(validation_error)) from validation_error
31-
except Exception as other_error:
32-
raise InternalError(str(other_error)) from other_error
33-
34-
try:
35-
options.validate()
36-
except ValidationError as validation_error:
37-
raise InvalidParameterError('options', str(validation_error)) from validation_error
38-
except Exception as other_error:
39-
raise InternalError(str(other_error)) from other_error
40-
"""
4124
request_body = json.dumps({
4225
'subject': subject,
4326
'options': options.to_json()

eventsourcingdb/handlers/observe_events/observe_events_options.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ def to_json(self) -> dict[str, Any]:
4848
'recursive': self.recursive,
4949
}
5050

51-
# Directly use the object
5251
if self.lower_bound is not None:
5352
result['lowerBound'] = self.lower_bound.to_json()
5453

tests/shared/start_local_http_server.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import asyncio
2-
import time
32
from collections.abc import Callable
43
from multiprocessing import get_context
54

0 commit comments

Comments
 (0)