Skip to content

Commit 43cc675

Browse files
author
Noah Hummel
authored
feat: add event schemas (#35)
* feat: add event schemas * chore: satisfy linter * fix: add missing case and update tests
1 parent 7a8808f commit 43cc675

File tree

12 files changed

+612
-12
lines changed

12 files changed

+612
-12
lines changed

eventsourcingdb_client_python/client.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
from .event.event_context import EventContext
88
from .handlers.observe_events.observe_events import observe_events
99
from .handlers.observe_events.observe_events_options import ObserveEventsOptions
10+
from .handlers.read_event_types.event_type import EventType
11+
from .handlers.read_event_types.read_event_types import read_event_types
12+
from .handlers.register_event_schema.register_event_schema import register_event_schema
1013
from .http_client.http_client import HttpClient
1114
from .handlers.ping import ping
1215
from .handlers.read_events import read_events, ReadEventsOptions
@@ -72,6 +75,13 @@ async def read_events(
7275
async for event in read_events(self, subject, options):
7376
yield event
7477

78+
async def read_event_types(self) -> AsyncGenerator[EventType, None]:
79+
async for event_type in read_event_types(self):
80+
yield event_type
81+
82+
async def register_event_schema(self, event_type: str, json_schema: str) -> None:
83+
await register_event_schema(self, event_type, json_schema)
84+
7585
async def observe_events(
7686
self,
7787
subject: str,

eventsourcingdb_client_python/handlers/read_event_types/__init__.py

Whitespace-only changes.
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from dataclasses import dataclass
2+
from typing import TypeVar
3+
4+
from eventsourcingdb_client_python.errors.validation_error import ValidationError
5+
6+
Self = TypeVar("Self", bound="EventType")
7+
8+
9+
@dataclass
10+
class EventType:
11+
event_type: str
12+
is_phantom: bool
13+
schema: str | None = None
14+
15+
@staticmethod
16+
def parse(unknown_object: dict) -> Self:
17+
event_type = unknown_object.get('eventType')
18+
if not isinstance(event_type, str):
19+
raise ValidationError(
20+
f"Failed to parse eventType '{event_type}' to str."
21+
)
22+
23+
is_phantom = unknown_object.get('isPhantom')
24+
if not isinstance(is_phantom, bool):
25+
raise ValidationError(
26+
f"Failed to parse isPhantom '{is_phantom}' to bool."
27+
)
28+
29+
schema = unknown_object.get('schema')
30+
if schema is not None and not isinstance(schema, str):
31+
raise ValidationError(
32+
f"Failed to parse schema '{schema}' to str."
33+
)
34+
35+
return EventType(
36+
event_type=event_type,
37+
is_phantom=is_phantom,
38+
schema=schema,
39+
)
40+
41+
def __hash__(self):
42+
return hash((self.event_type, self.is_phantom, self.schema))
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from typing import Any
2+
3+
EVENT_TYPE_TYPE = 'eventType'
4+
5+
6+
def is_event_type(message: Any) -> bool:
7+
if not isinstance(message, dict) or message.get('type') != EVENT_TYPE_TYPE:
8+
return False
9+
10+
payload = message.get('payload')
11+
12+
return isinstance(payload, dict)
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from collections.abc import AsyncGenerator
2+
from http import HTTPStatus
3+
4+
from .event_type import EventType
5+
from .is_event_type import is_event_type
6+
from ..is_stream_error import is_stream_error
7+
from ..parse_raw_message import parse_raw_message
8+
from ...abstract_base_client import AbstractBaseClient
9+
from ...errors.custom_error import CustomError
10+
from ...errors.internal_error import InternalError
11+
from ...errors.server_error import ServerError
12+
from ...errors.validation_error import ValidationError
13+
from ...http_client.response import Response
14+
15+
16+
async def read_event_types(
17+
client: AbstractBaseClient,
18+
) -> AsyncGenerator[EventType, None]:
19+
response: Response
20+
try:
21+
response = await client.http_client.post(
22+
path='/api/read-event-types',
23+
request_body='',
24+
)
25+
except CustomError as custom_error:
26+
raise custom_error
27+
except Exception as other_error:
28+
raise InternalError(str(other_error)) from other_error
29+
30+
with response:
31+
if response.status_code != HTTPStatus.OK:
32+
raise ServerError(
33+
'Unexpected response status: '
34+
f'{response.status_code} {HTTPStatus(response.status_code).phrase}'
35+
)
36+
async for raw_message in response.body:
37+
message = parse_raw_message(raw_message)
38+
39+
if is_stream_error(message):
40+
raise ServerError(message['payload']['error'])
41+
42+
if is_event_type(message):
43+
event_type: EventType
44+
try:
45+
event_type = EventType.parse(message['payload'])
46+
except ValidationError as validation_error:
47+
raise ServerError(str(validation_error)) from validation_error
48+
except Exception as other_error:
49+
raise InternalError(str(other_error)) from other_error
50+
51+
yield event_type
52+
continue
53+
54+
raise ServerError(
55+
f'Failed to read event types, an unexpected stream item was received \'{message}\'.'
56+
)

eventsourcingdb_client_python/handlers/read_subjects/read_subjects.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from collections.abc import Generator
1+
from collections.abc import AsyncGenerator
22
import json
33
from http import HTTPStatus
44

@@ -18,7 +18,7 @@
1818
async def read_subjects(
1919
client: AbstractBaseClient,
2020
base_subject: str
21-
) -> Generator[str, None, None]:
21+
) -> AsyncGenerator[str, None, None]:
2222
try:
2323
validate_subject(base_subject)
2424
except ValidationError as validation_error:
@@ -44,7 +44,7 @@ async def read_subjects(
4444
with response:
4545
if response.status_code != HTTPStatus.OK:
4646
raise ServerError(
47-
f'Unexpected response status: '
47+
'Unexpected response status: '
4848
f'{response.status_code} {HTTPStatus(response.status_code).phrase}'
4949
)
5050
async for raw_message in response.body:

eventsourcingdb_client_python/handlers/register_event_schema/__init__.py

Whitespace-only changes.
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import json
2+
from http import HTTPStatus
3+
4+
from ...abstract_base_client import AbstractBaseClient
5+
from ...errors.custom_error import CustomError
6+
from ...errors.internal_error import InternalError
7+
from ...errors.invalid_parameter_error import InvalidParameterError
8+
from ...errors.server_error import ServerError
9+
from ...errors.validation_error import ValidationError
10+
from ...event.validate_type import validate_type
11+
from ...http_client.response import Response
12+
13+
14+
async def register_event_schema(
15+
client: AbstractBaseClient,
16+
event_type: str,
17+
json_schema: str,
18+
) -> None:
19+
try:
20+
validate_type(event_type)
21+
except ValidationError as validation_error:
22+
raise InvalidParameterError(
23+
'event_type', str(validation_error)
24+
) from validation_error
25+
except Exception as other_error:
26+
raise InternalError(str(other_error)) from other_error
27+
28+
request_body = json.dumps({
29+
'eventType': event_type,
30+
'schema': json_schema,
31+
})
32+
33+
response: Response
34+
try:
35+
response = await client.http_client.post(
36+
path='/api/register-event-schema',
37+
request_body=request_body,
38+
)
39+
except CustomError as custom_error:
40+
raise custom_error
41+
except Exception as other_error:
42+
raise InternalError(str(other_error)) from other_error
43+
44+
with response:
45+
if response.status_code != HTTPStatus.OK:
46+
raise ServerError(
47+
'Unexpected response status: '
48+
f'{response.status_code} {HTTPStatus(response.status_code).phrase}'
49+
)
50+
51+
return

eventsourcingdb_client_python/http_client/http_client.py

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,21 +87,36 @@ def __validate_protocol_version(self, http_status_code: int, headers: Headers) -
8787
f' client \'{self.__client_configuration.protocol_version}\'.'
8888
)
8989

90-
def __validate_response(
90+
@staticmethod
91+
async def __get_error_message(response: Response):
92+
error_message = f'Request failed with status code \'{response.status_code}\''
93+
94+
# We want to purposefully ignore all errors here, as we're already error handling,
95+
# and this function just tries to get more information on a best-effort basis.
96+
# pylint: disable=too-many-try-statements
97+
try:
98+
encoded_error_reason = await response.body.read()
99+
error_reason = encoded_error_reason.decode('utf-8')
100+
error_message += f" {error_reason}"
101+
finally:
102+
pass
103+
# pylint: enable=too-many-try-statements
104+
105+
error_message += '.'
106+
107+
return error_message
108+
109+
async def __validate_response(
91110
self,
92111
response: Response
93112
) -> RetryResult[Response]:
94113
server_failure_range = Range(500, 600)
95114
if server_failure_range.lower <= response.status_code < server_failure_range.upper:
96-
return Retry(
97-
ServerError(f'Request failed with status code \'{response.status_code}\'.')
98-
)
115+
return Retry(ServerError(await self.__get_error_message(response)))
99116

100117
client_failure_range = Range(400, 500)
101118
if client_failure_range.lower <= response.status_code < client_failure_range.upper:
102-
raise ClientError(
103-
f'Request failed with status code \'{response.status_code}\'.'
104-
)
119+
raise ClientError(await self.__get_error_message(response))
105120

106121
self.__validate_protocol_version(response.status_code, response.headers)
107122

@@ -132,7 +147,7 @@ async def execute_request() -> RetryResult[Response]:
132147

133148
response = Response(response)
134149
try:
135-
result = self.__validate_response(response)
150+
result = await self.__validate_response(response)
136151
except Exception as error:
137152
response.close()
138153
raise error
@@ -171,7 +186,7 @@ async def execute_request() -> RetryResult[Response]:
171186

172187
response = Response(response)
173188
try:
174-
result = self.__validate_response(response)
189+
result = await self.__validate_response(response)
175190
except Exception as error:
176191
response.close()
177192
raise error

0 commit comments

Comments
 (0)