Skip to content

Commit 5008c5c

Browse files
committed
fix httpclient
1 parent 04d6466 commit 5008c5c

File tree

7 files changed

+105
-171
lines changed

7 files changed

+105
-171
lines changed

eventsourcingdb/client.py

Lines changed: 57 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
from collections.abc import AsyncGenerator
2+
from types import TracebackType
3+
from typing import Optional, Type, TypeVar, AsyncIterator
24

35
from .client_configuration import ClientConfiguration
46
from .event.event_candidate import EventCandidate
@@ -16,34 +18,36 @@
1618
from .handlers.write_events import Precondition, write_events
1719

1820

19-
# pylint: disable=R6007
20-
# Reason: This class explicitly specifies the return type as None
21-
# for better readability. Even though it is not necessary,
22-
# it makes the return type clear without needing to read any
23-
# documentation or code.
24-
class Client():
21+
T = TypeVar('T')
22+
23+
class Client:
2524
def __init__(
2625
self,
2726
base_url: str,
2827
api_token: str,
2928
):
30-
configuration = ClientConfiguration(
31-
base_url=base_url,
32-
api_token=api_token,
33-
)
29+
self.__http_client = HttpClient(base_url=base_url, api_token=api_token)
3430

35-
self.__http_client = HttpClient(configuration)
31+
async def __aenter__(self):
32+
await self.__http_client.initialize()
33+
return self
34+
35+
async def __aexit__(
36+
self,
37+
exc_type: Optional[Type[BaseException]],
38+
exc_val: Optional[BaseException],
39+
exc_tb: Optional[TracebackType]
40+
) -> None:
41+
await self.__http_client.close()
3642

37-
#TODO: is this necessary? __enter__, __exit__
38-
async def initialize(self) -> None:
43+
# Keeping these for backward compatibility and explicit resource management
44+
"""async def initialize(self) -> None:
3945
await self.__http_client.initialize()
4046
41-
#TODO: is this necessary? magic method __enter__, __exit__
4247
async def close(self) -> None:
43-
await self.__http_client.close()
44-
45-
@property
46-
def http_client(self) -> HttpClient:
48+
await self.__http_client.close() # TODO: should we mix object orientation and functional programming?
49+
"""
50+
def http_client(self) -> # TODO: should we mix object orientation and functional programming?tpClient:
4751
return self.__http_client
4852

4953
# TODO: should we mix object orientation and functional programming?
@@ -89,9 +93,13 @@ async def read_events(
8993
return http.status(200);
9094
}
9195
"""
92-
async for event in read_events(self, subject, options):
93-
yield event
94-
96+
"""Read events with proper cancellation support."""
97+
generator = read_events(self, subject, options)
98+
try:
99+
async for item in generator:
100+
yield item
101+
finally:
102+
await generator.aclose()
95103
# TODO: run eventql query
96104
async def run_eventql_query(self, query: str) -> AsyncGenerator[Event, None]:
97105
"""
@@ -106,26 +114,42 @@ async def observe_events(
106114
"""
107115
TODO: the same issue like read_events. contextmanager
108116
"""
109-
async for event in observe_events(self, subject, options):
110-
yield event
111-
112-
async def register_event_schema(self, event_type: str, json_schema: str) -> None: # TODO: no json_schema is dict no string anymore
113-
# no context manager liek read_events
117+
async def observe_events(
118+
self,
119+
subject: str,
120+
options: ObserveEventsOptions
121+
) -> AsyncGenerator[StoreItem, None]:
122+
generator = observe_events(self, subject, options)
123+
try:
124+
async for item in generator:
125+
yield item
126+
finally:
127+
await generator.aclose()
128+
129+
async def register_event_schema(self, event_type: str, json_schema: dict) -> None:
130+
# Updated type hint to reflect it should be a dict
114131
await register_event_schema(self, event_type, json_schema)
115132

116133
async def read_subjects(
117134
self,
118135
base_subject: str
119136
) -> AsyncGenerator[str, None]:
120-
# TODO: the same issue like read_events. contextmanager
121-
async for subject in read_subjects(self, base_subject):
122-
yield subject
123-
137+
"""Read subjects with proper cancellation support."""
138+
generator = read_subjects(self, base_subject)
139+
try:
140+
async for item in generator:
141+
yield item
142+
finally:
143+
await generator.aclose()
124144

125145
async def read_event_types(self) -> AsyncGenerator[EventType, None]:
126-
# TODO: the same issue like read_events. contextmanager
127-
async for event_type in read_event_types(self):
128-
yield event_type
146+
"""Read event types with proper cancellation support."""
147+
generator = read_event_types(self)
148+
try:
149+
async for item in generator:
150+
yield item
151+
finally:
152+
await generator.aclose()
129153

130154

131155

Lines changed: 44 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,17 @@
11
from collections import namedtuple
22
from collections.abc import Callable, Coroutine
3-
from http import HTTPStatus
3+
from types import TracebackType
4+
from typing import Any, Optional, Type
45

56
import aiohttp
67
from aiohttp import ClientSession
78

8-
from .response import Response, Headers
9-
from ..client_configuration import ClientConfiguration
9+
from .response import Response
1010
from ..errors.client_error import ClientError
1111
from ..errors.custom_error import CustomError
1212
from ..errors.internal_error import InternalError
1313
from ..errors.server_error import ServerError
1414
from ..util import url
15-
from ..util.retry.retry_result import Retry, Return, RetryResult
16-
from ..util.retry.retry_with_backoff import retry_with_backoff
17-
from ..util.retry.retry_error import RetryError
1815

1916
Range = namedtuple('Range', 'lower, upper')
2017

@@ -26,71 +23,62 @@ class UninitializedError(CustomError):
2623
class HttpClient:
2724
def __init__(
2825
self,
29-
client_configuration: ClientConfiguration
26+
base_url: str,
27+
api_token: str,
3028
):
31-
self.__client_configuration: ClientConfiguration = client_configuration
29+
self.__base_url = base_url
30+
self.__api_token = api_token
3231
self.__session: ClientSession | None = None
32+
33+
async def __aenter__(self):
34+
await self.initialize()
35+
return self
36+
37+
async def __aexit__(
38+
self,
39+
exc_type: Optional[Type[BaseException]],
40+
exc_val: Optional[BaseException],
41+
exc_tb: Optional[TracebackType]
42+
) -> None:
43+
await self.close()
3344

45+
# Keep for backward compatibility
3446
async def initialize(self) -> None:
35-
self.__session = aiohttp.ClientSession(
36-
timeout=aiohttp.ClientTimeout(
37-
connect=self.__client_configuration.timeout_seconds,
38-
sock_read=self.__client_configuration.timeout_seconds
39-
)
40-
)
47+
self.__session = aiohttp.ClientSession()
4148

49+
# Keep for backward compatibility
4250
async def close(self):
4351
if self.__session is not None:
4452
await self.__session.close()
53+
self.__session = None
4554

4655
async def __execute_request(
4756
self,
48-
execute_request: Callable[[], Coroutine[None, None, RetryResult[Response]]]
57+
execute_request: Callable[[], Coroutine[Any, Any, Response]]
4958
) -> Response:
5059
try:
51-
return await retry_with_backoff(
52-
self.__client_configuration.max_tries,
53-
execute_request
54-
)
55-
except RetryError as retry_error:
56-
raise ServerError(str(retry_error)) from retry_error
60+
result = await execute_request()
61+
return result
5762
except CustomError as custom_error:
5863
raise custom_error
5964
except aiohttp.ClientError as request_error:
6065
raise ServerError(str(request_error)) from request_error
6166
except Exception as other_error:
6267
raise InternalError(str(other_error)) from other_error
6368

64-
def __validate_protocol_version(self, http_status_code: int, headers: Headers) -> None:
65-
if http_status_code != HTTPStatus.UNPROCESSABLE_ENTITY:
66-
return
67-
68-
server_protocol_version = headers.get(
69-
'x-eventsourcingdb-protocol-version'
70-
)
71-
72-
if server_protocol_version is None:
73-
server_protocol_version = 'unknown version'
74-
75-
raise ClientError(
76-
f'Protocol version mismatch, server \'{server_protocol_version}\','
77-
f' client \'{self.__client_configuration.protocol_version}\'.'
78-
)
7969

8070
@staticmethod
8171
async def __get_error_message(response: Response):
8272
error_message = f'Request failed with status code \'{response.status_code}\''
8373

8474
# We want to purposefully ignore all errors here, as we're already error handling,
8575
# and this function just tries to get more information on a best-effort basis.
86-
# pylint: disable=too-many-try-statements
8776
try:
8877
encoded_error_reason = await response.body.read()
8978
error_reason = encoded_error_reason.decode('utf-8')
9079
error_message += f" {error_reason}"
9180
finally:
9281
pass
93-
# pylint: enable=too-many-try-statements
9482

9583
error_message += '.'
9684

@@ -99,23 +87,20 @@ async def __get_error_message(response: Response):
9987
async def __validate_response(
10088
self,
10189
response: Response
102-
) -> RetryResult[Response]:
90+
) -> Response:
10391
server_failure_range = Range(500, 600)
10492
if server_failure_range.lower <= response.status_code < server_failure_range.upper:
105-
return Retry(ServerError(await self.__get_error_message(response)))
93+
raise ServerError(await self.__get_error_message(response))
10694

10795
client_failure_range = Range(400, 500)
10896
if client_failure_range.lower <= response.status_code < client_failure_range.upper:
10997
raise ClientError(await self.__get_error_message(response))
11098

111-
self.__validate_protocol_version(response.status_code, response.headers)
112-
113-
return Return(response)
99+
return response
114100

115101
def __get_post_request_headers(self) -> dict[str, str]:
116102
headers = {
117-
'X-EventSourcingDB-Protocol-Version': self.__client_configuration.protocol_version,
118-
'Authorization': f'Bearer {self.__client_configuration.api_token}',
103+
'Authorization': f'Bearer {self.__api_token}',
119104
'Content-Type': 'application/json'
120105
}
121106

@@ -125,37 +110,31 @@ async def post(self, path: str, request_body: str) -> Response:
125110
if self.__session is None:
126111
raise UninitializedError()
127112

128-
async def execute_request() -> RetryResult[Response]:
129-
response = await self.__session.post(
113+
async def execute_request() -> Response:
114+
async_response = await self.__session.post( # type: ignore
130115
url.join_segments(
131-
self.__client_configuration.base_url,
116+
self.__base_url,
132117
path
133118
),
134119
data=request_body,
135120
headers=self.__get_post_request_headers(),
136121
)
137122

138-
response = Response(response)
123+
response = Response(async_response)
139124
try:
140125
result = await self.__validate_response(response)
126+
return result
141127
except Exception as error:
142128
response.close()
143129
raise error
144130

145-
if isinstance(result, Retry):
146-
response.close()
147-
148-
return result
149-
150131
return await self.__execute_request(execute_request)
151132

152133
def __get_get_request_headers(self, with_authorization: bool) -> dict[str, str]:
153-
headers = {
154-
'X-EventSourcingDB-Protocol-Version': self.__client_configuration.protocol_version,
155-
}
134+
headers = {}
156135

157136
if with_authorization:
158-
headers['Authorization'] = f'Bearer {self.__client_configuration.api_token}'
137+
headers['Authorization'] = f'Bearer {self.__api_token}'
159138

160139
return headers
161140

@@ -167,23 +146,21 @@ async def get(
167146
if self.__session is None:
168147
raise UninitializedError()
169148

170-
async def execute_request() -> RetryResult[Response]:
171-
response = await self.__session.get(
149+
async def execute_request() -> Response:
150+
async_response = await self.__session.get( # type: ignore
172151
url.join_segments(
173-
self.__client_configuration.base_url, path),
152+
self.__base_url,
153+
path
154+
),
174155
headers=self.__get_get_request_headers(with_authorization),
175156
)
176157

177-
response = Response(response)
158+
response = Response(async_response)
178159
try:
179160
result = await self.__validate_response(response)
161+
return result
180162
except Exception as error:
181163
response.close()
182164
raise error
183165

184-
if isinstance(result, Retry):
185-
response.close()
186-
187-
return result
188-
189166
return await self.__execute_request(execute_request)

eventsourcingdb/http_client/response.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from http import HTTPStatus
44

55
import aiohttp
6+
from aiohttp import streams as aiohttp_streams
67

78
Headers = Mapping[str, str]
89

@@ -11,10 +12,10 @@ class Response:
1112
def __init__(self, response: aiohttp.ClientResponse):
1213
self.__response: aiohttp.ClientResponse = response
1314

14-
def __enter__(self):
15+
def __aenter__(self):
1516
return self
1617

17-
def __exit__(self, exc_type, exc_val, exc_tb):
18+
def __aexit__(self, exc_type, exc_val, exc_tb):
1819
self.close()
1920

2021
def close(self):
@@ -27,7 +28,6 @@ def status_code(self) -> HTTPStatus:
2728
@property
2829
def headers(self) -> Headers:
2930
return self.__response.headers
30-
3131
@property
32-
def body(self) -> asyncio.StreamReader:
32+
def body(self) -> aiohttp_streams.StreamReader:
3333
return self.__response.content

eventsourcingdb/util/retry/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)