Skip to content

Commit 6669bba

Browse files
atstaeffgoloroden
andauthored
feat: Upgrade to latest version of eventsourcingdb (#56)
* test: upgrade test to latest version of eventsourcingdb * style: some linting and remove comments * fix: QA linting issues * fix: linting issues * fix: ping issue. reponse has changed * fix: linting issue in start_local_http_server.py * fix: default port expose in containerized_testing_database.py * fix: linting in container.py * fix: linting in container.py * fix: linting container.py * fix: linting in container.py * feat: modify the schema by using item. Remove the function is_item. Not needed anymore. Payload has changed. * feat: change lower_bound_id and upper_bound_id to lower_bound and upper_bound * feat: test changed for lowerbound and upperbound * feat: change valdiation issues with LowerBound and UpperBound changes to object * feat: update " and ' in lower_bound.py and upper_bound.py * feat: modify lower_bound and upper_bound validation * style: * merge-conflict * fix: merge conflict * fix: library name to eventsourcingdb * chore: fix PR comments * chore: modify tests for uppervound and lowerbound by using the objec tnot primary data type * style: modify linting --------- Co-authored-by: Golo Roden <[email protected]>
1 parent 9334215 commit 6669bba

28 files changed

+300
-147
lines changed

Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@ qa: analyze test
33
analyze:
44
@poetry run pylint eventsourcingdb tests
55

6+
format:
7+
@poetry run autopep8 --in-place --aggressive --max-line-length=100 --recursive eventsourcingdb tests
8+
69
test:
710
@poetry run pytest --maxfail=1
811

912
clean:
1013

1114
build: qa clean
1215

13-
.PHONY: analyze build clean qa test
16+
.PHONY: analyze build clean format qa test

eventsourcingdb/event/event.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
# due to its business context. Splitting it into smaller
1212
# methods would increase cognitive load and make the
1313
# code less readable.
14+
15+
1416
class Event(EventContext):
1517
def __init__(
1618
self,
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from typing import Any
2+
3+
EVENT_TYPE = 'event'
4+
5+
6+
def is_event(message: Any) -> bool:
7+
# Check if this is the new format message
8+
if isinstance(message, dict) and message.get('type') == EVENT_TYPE:
9+
payload = message.get('payload')
10+
return isinstance(payload, dict) and isinstance(payload.get('hash'), str)
11+
12+
# Otherwise check for old format (just for backward compatibility)
13+
if not isinstance(message, dict):
14+
return False
15+
16+
payload = message.get('payload')
17+
return (
18+
isinstance(payload, dict)
19+
and isinstance(payload.get('event'), dict)
20+
and isinstance(payload.get('hash'), str)
21+
)

eventsourcingdb/handlers/is_item.py

Lines changed: 0 additions & 16 deletions
This file was deleted.
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from dataclasses import dataclass
2+
3+
from eventsourcingdb.errors.invalid_parameter_error import InvalidParameterError
4+
5+
6+
@dataclass
7+
class LowerBound:
8+
id: str
9+
type: str
10+
11+
def __post_init__(self):
12+
if self.type not in {'inclusive', 'exclusive'}:
13+
raise InvalidParameterError(
14+
parameter_name="LowerBound",
15+
reason='type must be either "inclusive" or "exclusive"'
16+
)
17+
if int(self.id) < 0:
18+
raise InvalidParameterError(
19+
parameter_name='LowerBound',
20+
reason='id must be non-negative'
21+
)

eventsourcingdb/handlers/observe_events/observe_events.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from http import HTTPStatus
44

55
from ..is_heartbeat import is_heartbeat
6-
from ..is_item import is_item
6+
from ..is_event import is_event
77
from ..is_stream_error import is_stream_error
88
from ..parse_raw_message import parse_raw_message
99
from ...abstract_base_client import AbstractBaseClient
@@ -23,6 +23,8 @@
2323
# for better readability. Even though it is not necessary,
2424
# it makes the return type clear without needing to read any
2525
# documentation or code.
26+
27+
2628
async def observe_events(
2729
client: AbstractBaseClient,
2830
subject: str,
@@ -50,7 +52,7 @@ async def observe_events(
5052
response: Response
5153
try:
5254
response = await client.http_client.post(
53-
path='/api/observe-events',
55+
path='/api/v1/observe-events',
5456
request_body=request_body,
5557
)
5658
except CustomError as custom_error:
@@ -73,8 +75,24 @@ async def observe_events(
7375
if is_stream_error(message):
7476
raise ServerError(f'{message["payload"]["error"]}.')
7577

76-
if is_item(message):
77-
event = Event.parse(message['payload']['event'])
78+
if is_event(message):
79+
event = Event.parse(message['payload'])
80+
# Add client-side filtering by event ID
81+
event_id = int(message['payload']['id'])
82+
83+
if options.lower_bound is not None:
84+
# For inclusive, include events with ID >= lower bound
85+
if (
86+
options.lower_bound.type == 'inclusive' and # pylint: disable=R2004
87+
int(event_id) < int(options.lower_bound.id)
88+
):
89+
continue
90+
# For exclusive, include events with ID > lower bound
91+
if (
92+
options.lower_bound.type == 'exclusive' and # pylint: disable=R2004
93+
int(event_id) <= int(options.lower_bound.id)
94+
):
95+
continue
7896

7997
yield StoreItem(event, message['payload']['hash'])
8098
continue

eventsourcingdb/handlers/observe_events/observe_events_options.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,26 @@
11
from dataclasses import dataclass
22

3+
from ..lower_bound import LowerBound
34
from ...errors.validation_error import ValidationError
45
from ...event.validate_subject import validate_subject
56
from ...event.validate_type import validate_type
6-
from ...util.is_non_negativ_integer import is_non_negative_integer
77
from .observe_from_latest_event import ObserveFromLatestEvent
88

99

1010
@dataclass
1111
class ObserveEventsOptions:
1212
recursive: bool
13-
lower_bound_id: str | None = None
13+
lower_bound: LowerBound | None = None
1414
from_latest_event: ObserveFromLatestEvent | None = None
1515

1616
def validate(self) -> None:
17-
if self.lower_bound_id is not None and not is_non_negative_integer(self.lower_bound_id):
17+
if self.lower_bound is not None and not isinstance(self.lower_bound, LowerBound):
1818
raise ValidationError(
19-
'ReadEventOptions are invalid: lower_bound_id must be 0 or greater.'
19+
'ObserveEventsOptions are invalid: lower_bound must be a LowerBound object.'
2020
)
2121

2222
if self.from_latest_event is not None:
23-
if self.lower_bound_id is not None:
23+
if self.lower_bound is not None:
2424
raise ValidationError(
2525
'ReadEventsOptions are invalid: '
2626
'lowerBoundId and fromLatestEvent are mutually exclusive'
@@ -47,8 +47,13 @@ def to_json(self):
4747
'recursive': self.recursive
4848
}
4949

50-
if self.lower_bound_id is not None:
51-
json['lowerBoundId'] = self.lower_bound_id
50+
# Directly use the object
51+
if self.lower_bound is not None:
52+
json['lowerBound'] = {
53+
'id': str(self.lower_bound.id), # Ensure ID is a string
54+
'type': self.lower_bound.type
55+
}
56+
5257
if self.from_latest_event is not None:
5358
json['fromLatestEvent'] = self.from_latest_event.to_json()
5459

eventsourcingdb/handlers/ping.py

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,45 @@
11
from http import HTTPStatus
2-
2+
import json
33
from ..abstract_base_client import AbstractBaseClient
44
from ..errors.server_error import ServerError
55

6+
# Define constants for response values
7+
OK_RESPONSE = "OK"
8+
STATUS_OK = "ok"
9+
10+
# CloudEvent field names
11+
SPECVERSION_FIELD = "specversion"
12+
TYPE_FIELD = "type"
13+
PING_RECEIVED_TYPE = "io.eventsourcingdb.ping-received"
14+
615

716
async def ping(client: AbstractBaseClient) -> None:
8-
response = await client.http_client.get('/api/ping')
9-
response_body = bytes.decode(await response.body.read(), encoding='utf-8')
17+
response = await client.http_client.get("/api/v1/ping")
18+
response_body = bytes.decode(await response.body.read(), encoding="utf-8")
19+
20+
if response.status_code != HTTPStatus.OK:
21+
raise ServerError(f"Received unexpected response: {response_body}")
22+
23+
# Check old format (plain "OK")
24+
if response_body == OK_RESPONSE:
25+
return
26+
27+
try:
28+
response_json = json.loads(response_body)
29+
except json.JSONDecodeError as exc:
30+
raise ServerError(f"Received unexpected response: {response_body}") from exc
31+
32+
# Check if it's a JSON with status field
33+
if isinstance(response_json, dict) and response_json.get("status") == STATUS_OK:
34+
return
35+
36+
# Check if it's a CloudEvent format (has specversion, type fields)
37+
if (
38+
isinstance(response_json, dict)
39+
and SPECVERSION_FIELD in response_json
40+
and TYPE_FIELD in response_json
41+
):
42+
if response_json.get(TYPE_FIELD) == PING_RECEIVED_TYPE:
43+
return
1044

11-
if response.status_code != HTTPStatus.OK or response_body != HTTPStatus.OK.phrase:
12-
raise ServerError(f'Received unexpected response: {response_body}')
45+
raise ServerError(f"Received unexpected response: {response_body}")

eventsourcingdb/handlers/read_event_types/read_event_types.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717
# for better readability. Even though it is not necessary,
1818
# it makes the return type clear without needing to read any
1919
# documentation or code.
20+
21+
2022
async def read_event_types(
2123
client: AbstractBaseClient,
2224
) -> AsyncGenerator[EventType, None]:
2325
response: Response
2426
try:
2527
response = await client.http_client.post(
26-
path='/api/read-event-types',
28+
path='/api/v1/read-event-types',
2729
request_body='',
2830
)
2931
except CustomError as custom_error:

eventsourcingdb/handlers/read_events/read_events.py

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from ...event.event import Event
1212
from ...event.validate_subject import validate_subject
1313
from ...http_client.response import Response
14-
from ..is_item import is_item
14+
from ..is_event import is_event
1515
from ..is_stream_error import is_stream_error
1616
from ..parse_raw_message import parse_raw_message
1717
from ..store_item import StoreItem
@@ -22,6 +22,8 @@
2222
# for better readability. Even though it is not necessary,
2323
# it makes the return type clear without needing to read any
2424
# documentation or code.
25+
26+
2527
async def read_events(
2628
client: AbstractBaseClient,
2729
subject: str,
@@ -49,7 +51,7 @@ async def read_events(
4951
response: Response
5052
try:
5153
response = await client.http_client.post(
52-
path='/api/read-events',
54+
path='/api/v1/read-events',
5355
request_body=request_body,
5456
)
5557
except CustomError as custom_error:
@@ -69,8 +71,38 @@ async def read_events(
6971
if is_stream_error(message):
7072
raise ServerError(f'{message["payload"]["error"]}.')
7173

72-
if is_item(message):
73-
event = Event.parse(message['payload']['event'])
74+
if is_event(message):
75+
event = Event.parse(message['payload'])
76+
77+
event_id = int(message['payload']['id']) # Access ID from raw payload
78+
79+
if options.lower_bound is not None:
80+
# For inclusive, include events with ID >= lower bound
81+
if (
82+
options.lower_bound.type == 'inclusive' and # pylint: disable=R2004
83+
int(event_id) < int(options.lower_bound.id)
84+
):
85+
continue
86+
# For exclusive, include events with ID > lower bound
87+
if (
88+
options.lower_bound.type == 'exclusive' and # pylint: disable=R2004
89+
int(event_id) <= int(options.lower_bound.id)
90+
):
91+
continue
92+
93+
if options.upper_bound is not None:
94+
# For inclusive, include events with ID <= upper bound
95+
if (
96+
options.upper_bound.type == 'inclusive' and # pylint: disable=R2004
97+
int(event_id) > int(options.upper_bound.id)
98+
):
99+
continue
100+
# For exclusive, include events with ID < upper bound
101+
if (
102+
options.upper_bound.type == 'exclusive' and # pylint: disable=R2004
103+
int(event_id) >= int(options.upper_bound.id)
104+
):
105+
continue
74106

75107
yield StoreItem(event, message['payload']['hash'])
76108
continue

0 commit comments

Comments
 (0)