Skip to content

Commit bb59668

Browse files
committed
fix PR comments and linting
1 parent 6791ddd commit bb59668

19 files changed

+387
-1371
lines changed

eventsourcingdb/client.py

Lines changed: 175 additions & 211 deletions
Large diffs are not rendered by default.

eventsourcingdb/container.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,18 @@ def start(self) -> 'Container':
129129
return self
130130

131131
try:
132+
# Try to pull the latest image
132133
self._docker_client.images.pull(self._image_name, self._image_tag)
133134
except errors.APIError as e:
134-
raise RuntimeError(f'Warning: Could not pull image: {e}') from e
135+
# Check if we already have the image locally
136+
try:
137+
image_name = f"{self._image_name}:{self._image_tag}"
138+
self._docker_client.images.get(image_name)
139+
# If we get here, the image exists locally, so we can continue
140+
logging.warning(f"Warning: Could not pull image: {e}. Using locally cached image.")
141+
except errors.ImageNotFound:
142+
# If the image isn't available locally either, we can't continue
143+
raise RuntimeError(f'Could not pull image and no local image available: {e}') from e
135144

136145
self._cleanup_existing_containers()
137146
self._create_container()

eventsourcingdb/http_client/get_error_message.py

Lines changed: 0 additions & 23 deletions
This file was deleted.

eventsourcingdb/http_client/http_client.py

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from .get_get_headers import get_get_headers
1010
from .get_post_headers import get_post_headers
1111
from .response import Response
12-
from .validate_response import validate_response
1312

1413

1514
class HttpClient:
@@ -58,14 +57,7 @@ async def __request_executor() -> Response:
5857

5958
response = Response(async_response)
6059

61-
validated_response = None
62-
try:
63-
validated_response = await validate_response(response)
64-
except Exception as error:
65-
response.close()
66-
raise error
67-
68-
return validated_response
60+
return response
6961

7062
return await __request_executor()
7163

@@ -88,13 +80,6 @@ async def __request_executor() -> Response:
8880

8981
response = Response(async_response)
9082

91-
validated_response = None
92-
try:
93-
validated_response = await validate_response(response)
94-
except Exception as error:
95-
response.close()
96-
raise error
97-
98-
return validated_response
83+
return response
9984

10085
return await __request_executor()

eventsourcingdb/http_client/response.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,20 @@ class Response:
1111
def __init__(self, response: aiohttp.ClientResponse):
1212
self.__response: aiohttp.ClientResponse = response
1313

14-
def __aenter__(self):
14+
async def __aenter__(self):
15+
# Properly await any async initialization if needed
1516
return self
1617

17-
def __aexit__(self, exc_type, exc_val, exc_tb):
18-
self.close()
18+
async def __aexit__(self, exc_type, exc_val, exc_tb):
19+
if not self.__response.closed:
20+
self.__response.close()
1921

2022
def __enter__(self):
2123
return self
2224

2325
def __exit__(self, exc_type, exc_val, exc_tb):
24-
self.close()
25-
26-
def close(self):
27-
self.__response.close()
26+
if not self.__response.closed:
27+
self.__response.close()
2828

2929
@property
3030
def status_code(self) -> HTTPStatus:
@@ -37,3 +37,6 @@ def headers(self) -> Headers:
3737
@property
3838
def body(self) -> StreamReader:
3939
return self.__response.content
40+
41+
def __str__(self) -> str:
42+
return f'Response(status_code={self.status_code}, headers={dict(self.headers)})'

eventsourcingdb/http_client/validate_response.py

Lines changed: 0 additions & 20 deletions
This file was deleted.
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import asyncio
2+
import contextlib
3+
import signal
4+
from eventsourcingdb.client import Client
5+
from eventsourcingdb.handlers.read_events import ReadEventsOptions
6+
7+
8+
async def read_events_with_timeout(client, subject, timeout=5.0):
9+
"""Read events with a timeout, demonstrating cancellation."""
10+
try:
11+
# Use asyncio.wait_for to implement a timeout
12+
await asyncio.wait_for(
13+
process_events(client, subject),
14+
timeout=timeout
15+
)
16+
except asyncio.TimeoutError:
17+
print(f"Reading events timed out after {timeout} seconds")
18+
19+
20+
async def process_events(client, subject):
21+
"""Process events from the subject."""
22+
print(f"Starting to read events from {subject}")
23+
24+
# Using contextlib.aclosing ensures the generator is closed properly
25+
# even if we exit the context due to an exception
26+
async with contextlib.aclosing(
27+
client.read_events(
28+
subject=subject,
29+
options=ReadEventsOptions(recursive=True)
30+
)
31+
) as events:
32+
# Process events until something stops the operation
33+
count = 0
34+
async for event in events:
35+
count += 1
36+
print(f"Processed event #{count}: {event.type}")
37+
38+
# Example condition to stop processing
39+
if count >= 10:
40+
print("Reached maximum event count, stopping")
41+
break
42+
43+
print("Finished processing events")
44+
45+
46+
async def cancel_after_delay(task, delay=2.0):
47+
"""Cancel a task after a delay."""
48+
await asyncio.sleep(delay)
49+
print(f"Cancelling task after {delay} seconds")
50+
task.cancel()
51+
52+
53+
async def main():
54+
# Create client
55+
client = Client(
56+
base_url="http://localhost:8080",
57+
api_token="your-api-token"
58+
)
59+
60+
try:
61+
await client.initialize()
62+
63+
# Example 1: Using asyncio.wait_for with a timeout
64+
print("\n--- Example 1: Reading with timeout ---")
65+
await read_events_with_timeout(client, "/users", timeout=3.0)
66+
67+
# Example 2: Create a task and cancel it explicitly
68+
print("\n--- Example 2: Manual cancellation ---")
69+
event_task = asyncio.create_task(process_events(client, "/orders"))
70+
cancel_task = asyncio.create_task(cancel_after_delay(event_task, 2.0))
71+
72+
try:
73+
await event_task
74+
except asyncio.CancelledError:
75+
print("Event reading was cancelled")
76+
77+
await cancel_task
78+
79+
finally:
80+
await client.close()
81+

pytest.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
[pytest]
2-
timeout = 20
2+
timeout = 30
33
asyncio_default_fixture_loop_scope = function

tests/conftest.py

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,6 @@
11
import pytest_asyncio
2-
from eventsourcingdb.client import Client
32
from eventsourcingdb.event.event_candidate import EventCandidate
4-
from eventsourcingdb.http_client.http_client import HttpClient
53
from .shared.database import Database
6-
from .shared.start_local_http_server import \
7-
start_local_http_server, \
8-
StopServer
9-
10-
pytest_plugins = ('pytest_asyncio', )
11-
12-
13-
@pytest_asyncio.fixture
14-
async def get_http_client():
15-
stop_server: StopServer | None = None
16-
client: Client | None = None
17-
18-
async def getter(attach_handlers) -> HttpClient:
19-
nonlocal stop_server
20-
nonlocal client
21-
client, stop_server = await start_local_http_server(attach_handlers)
22-
return client.http_client
23-
24-
yield getter
25-
26-
if stop_server is not None:
27-
stop_server()
28-
29-
if client is not None:
30-
await client.close()
314

325

336
@pytest_asyncio.fixture
@@ -39,26 +12,6 @@ async def database():
3912
await testing_db.stop()
4013

4114

42-
@pytest_asyncio.fixture
43-
async def get_client():
44-
stop_server: StopServer | None = None
45-
client: Client | None = None
46-
47-
async def getter(attach_handlers) -> Client:
48-
nonlocal stop_server
49-
nonlocal client
50-
client, stop_server = await start_local_http_server(attach_handlers)
51-
return client
52-
53-
yield getter
54-
55-
if stop_server is not None:
56-
stop_server()
57-
58-
if client is not None:
59-
await client.close()
60-
61-
6215
class TestData:
6316
TEST_SOURCE_STRING = 'tag:thenativeweb.io,2023:eventsourcingdb:test'
6417
REGISTERED_SUBJECT = '/users/registered'

tests/shared/database.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,14 @@ class Database:
1111
__create_key = object()
1212
__container: Container
1313

14+
CLIENT_TYPE_WITH_AUTH = 'with_authorization'
15+
CLIENT_TYPE_INVALID_URL = 'with_invalid_url'
16+
1417
def __init__(
1518
self,
1619
create_key,
1720
with_authorization_client: Client,
18-
with_invalid_url_client: Client,
21+
with_invalid_url_client: Client ,
1922
):
2023
assert create_key == Database.__create_key, \
2124
'Database objects must be created using Database.create.'
@@ -28,6 +31,8 @@ def __init__(
2831
@classmethod
2932
def _create_container(cls, api_token, image_tag):
3033
cls.__container = Container()
34+
cls.__container.with_image_tag(image_tag)
35+
cls.__container.with_api_token(api_token)
3136
return cls.__container
3237

3338
@staticmethod
@@ -77,7 +82,13 @@ async def create(cls, max_retries=3, retry_delay=2.0) -> 'Database':
7782
continue
7883

7984
try:
80-
with_authorization_client, with_invalid_url_client = await cls._initialize_clients(container, api_token)
85+
(
86+
with_authorization_client,
87+
with_invalid_url_client
88+
) = await cls._initialize_clients(
89+
container,
90+
api_token
91+
)
8192
except Exception as client_error:
8293
container.stop()
8394
raise client_error
@@ -95,13 +106,15 @@ def _get_image_tag_from_dockerfile():
95106
content = dockerfile.read().strip()
96107
return content.split(':')[-1]
97108

98-
def get_client(self, client_type: str = "with_authorization") -> Client:
99-
if client_type == "with_authorization":
109+
def get_client(self, client_type: str = CLIENT_TYPE_WITH_AUTH) -> Client:
110+
if client_type == self.CLIENT_TYPE_WITH_AUTH:
100111
return self.__with_authorization_client
101-
elif client_type == "with_invalid_url":
112+
if client_type == self.CLIENT_TYPE_INVALID_URL:
102113
return self.__with_invalid_url_client
103-
else:
104-
raise ValueError(f"Unknown client type: {client_type}")
114+
115+
raise ValueError(f"Unknown client type: {client_type}")
105116

106117
async def stop(self) -> None:
107-
self.__class__.__container.stop()
118+
# Use walrus operator for concise access and check
119+
if (container := getattr(self.__class__, '_Database__container', None)):
120+
container.stop()

0 commit comments

Comments
 (0)