Skip to content

Commit 9575a89

Browse files
committed
add feature run_eventql_query
1 parent bb59668 commit 9575a89

File tree

5 files changed

+143
-51
lines changed

5 files changed

+143
-51
lines changed

eventsourcingdb/client.py

Lines changed: 46 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from collections.abc import AsyncGenerator
22
import contextlib
33
from types import TracebackType
4-
from typing import TypeVar
4+
from typing import Any, TypeVar
55

66
from http import HTTPStatus
77
import json
@@ -110,9 +110,8 @@ async def write_events(
110110

111111
if response.status_code != HTTPStatus.OK:
112112
raise ServerError(
113-
f'Unexpected response status: '
114-
f'{response.status_code} {HTTPStatus(response.status_code).phrase}.'
115-
)
113+
f'Unexpected response status: '
114+
)
116115

117116
response_data = await response.body.read()
118117
response_data = bytes.decode(response_data, encoding='utf-8')
@@ -144,9 +143,8 @@ async def read_events(
144143
async with response:
145144
if response.status_code != HTTPStatus.OK:
146145
raise ServerError(
147-
f'Unexpected response status: '
148-
f'{response.status_code} {HTTPStatus(response.status_code).phrase}'
149-
)
146+
f'Unexpected response status: {response}'
147+
)
150148
async for raw_message in response.body:
151149
message = parse_raw_message(raw_message)
152150

@@ -189,11 +187,36 @@ async def read_events(
189187
f'{message}.'
190188
)
191189

192-
async def run_eventql_query(self, query: str) -> AsyncGenerator[Event]:
193-
# TODO: read events nehmen. Das Responsehandling ist gleich wie
194-
# read_events. ein object was eine query property hat, Return ist ein any.
195-
# da kann alles mögliche sein.
196-
raise NotImplementedError("run_eventql_query is not implemented yet.")
190+
async def run_eventql_query(self, query: str) -> AsyncGenerator[Any, None]:
191+
request_body = json.dumps({
192+
'query': query,
193+
})
194+
response: Response = await self.__http_client.post(
195+
path='/api/v1/run-eventql-query',
196+
request_body=request_body,
197+
)
198+
199+
async with response:
200+
if response.status_code != HTTPStatus.OK:
201+
raise ServerError(
202+
f'Unexpected response status: {response}'
203+
)
204+
async for raw_message in response.body:
205+
message = parse_raw_message(raw_message)
206+
207+
if is_stream_error(message):
208+
raise ServerError(f'{message["payload"]["error"]}.')
209+
210+
if message.get('type') == 'row':
211+
payload = message['payload']
212+
213+
yield payload
214+
continue
215+
216+
raise ServerError(
217+
f'Failed to execute EventQL query, an unexpected stream item was received: '
218+
f'{message}.'
219+
)
197220

198221
async def observe_events(
199222
self,
@@ -205,17 +228,15 @@ async def observe_events(
205228
'options': options.to_json()
206229
})
207230

208-
response: Response
209-
response = await self.http_client.post(
231+
response : Response = await self.http_client.post(
210232
path='/api/v1/observe-events',
211233
request_body=request_body,
212234
)
213235

214-
with response:
236+
async with response:
215237
if response.status_code != HTTPStatus.OK:
216238
raise ServerError(
217-
f'Unexpected response status: '
218-
f'{response.status_code} {HTTPStatus(response.status_code).phrase}'
239+
f'Unexpected response status: {response}'
219240
)
220241
async for raw_message in response.body:
221242
message = parse_raw_message(raw_message)
@@ -256,23 +277,17 @@ async def register_event_schema(self, event_type: str, json_schema: dict) -> Non
256277
'schema': json_schema,
257278
})
258279

259-
response: Response
260-
response = await self.http_client.post(
280+
response: Response = await self.http_client.post(
261281
path='/api/v1/register-event-schema',
262282
request_body=request_body,
263283
)
264284

265-
with response:
285+
async with response:
266286
if response.status_code != HTTPStatus.OK:
267287
raise ServerError(
268-
'Unexpected response status: '
269-
f'{response.status_code} '
270-
f'{HTTPStatus(response.status_code).phrase} '
271-
f'{str(response)} '
288+
f'Unexpected response status: {response} '
272289
)
273290

274-
return
275-
276291
async def read_subjects(
277292
self,
278293
base_subject: str
@@ -281,19 +296,15 @@ async def read_subjects(
281296
'baseSubject': base_subject
282297
})
283298

284-
response: Response
285-
response = await self.http_client.post(
299+
response : Response = await self.http_client.post(
286300
path='/api/v1/read-subjects',
287301
request_body=request_body,
288302
)
289303

290-
with response:
304+
async with response:
291305
if response.status_code != HTTPStatus.OK:
292306
raise ServerError(
293-
'Unexpected response status: '
294-
f'{response.status_code} '
295-
f'{HTTPStatus(response.status_code).phrase} '
296-
f'{str(response)} '
307+
f'Unexpected response status: {response}'
297308
)
298309
async for raw_message in response.body:
299310
message = parse_raw_message(raw_message)
@@ -322,11 +333,10 @@ async def read_event_types(self) -> AsyncGenerator[EventType]:
322333
except Exception as other_error:
323334
raise InternalError(str(other_error)) from other_error
324335

325-
with response:
336+
async with response:
326337
if response.status_code != HTTPStatus.OK:
327338
raise ServerError(
328-
'Unexpected response status: '
329-
f'{response.status_code} {HTTPStatus(response.status_code).phrase}'
339+
f'Unexpected response status: {response}'
330340
)
331341
async for raw_message in response.body:
332342
message = parse_raw_message(raw_message)

eventsourcingdb/http_client/http_client.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,21 +45,19 @@ async def post(self, path: str, request_body: str) -> Response:
4545
if self.__session is None:
4646
raise CustomError()
4747

48-
async def __request_executor() -> Response:
49-
url_path = url.join_segments(self.__base_url, path)
50-
headers = get_post_headers(self.__api_token)
48+
url_path = url.join_segments(self.__base_url, path)
49+
headers = get_post_headers(self.__api_token)
5150

52-
async_response = await self.__session.post( # type: ignore
53-
url_path,
54-
data=request_body,
55-
headers=headers,
56-
)
51+
async_response = await self.__session.post( # type: ignore
52+
url_path,
53+
data=request_body,
54+
headers=headers,
55+
)
5756

58-
response = Response(async_response)
57+
response = Response(async_response)
5958

60-
return response
59+
return response
6160

62-
return await __request_executor()
6361

6462
async def get(
6563
self,

eventsourcingdb/http_client/response.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,8 @@ def body(self) -> StreamReader:
3939
return self.__response.content
4040

4141
def __str__(self) -> str:
42-
return f'Response(status_code={self.status_code}, headers={dict(self.headers)})'
42+
status_code_text = f'{self.status_code} {HTTPStatus(self.status_code).phrase}'
43+
header_text = dict(self.headers)
44+
body_text = self.body.read_nowait().decode("utf-8")
45+
status_code_text = f'{self.status_code} {HTTPStatus(self.status_code).phrase}'
46+
return f'status_code={status_code_text}, headers={header_text}, body={body_text}'

tests/test_register_event_schema.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ async def test_throws_error_if_schema_conflicts_with_existing_events(
4444
]
4545
)
4646

47-
with pytest.raises(ServerError, match="Bad Request"):
47+
with pytest.raises(ServerError, match='missing properties'):
4848
await client.register_event_schema(
4949
"com.gornisht.ekht",
5050
{
@@ -69,7 +69,7 @@ async def test_throws_error_if_schema_already_exists(
6969
}
7070
)
7171

72-
with pytest.raises(ServerError):
72+
with pytest.raises(ServerError, match='schema already exists'):
7373
await client.register_event_schema(
7474
"com.gornisht.ekht",
7575
{
@@ -86,7 +86,7 @@ async def test_throws_error_if_schema_is_invalid(
8686
):
8787
client = database.get_client()
8888

89-
with pytest.raises(ServerError):
89+
with pytest.raises(ServerError, match='value must be "object"'):
9090
await client.register_event_schema(
9191
"com.gornisht.ekht",
9292
{

tests/test_run_eventql_query.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import asyncio
2+
3+
from aiohttp import ClientConnectorDNSError
4+
import pytest
5+
6+
from eventsourcingdb.errors.server_error import ServerError
7+
from eventsourcingdb.event.event_candidate import EventCandidate
8+
9+
from .conftest import TestData
10+
from .shared.database import Database
11+
from .shared.event.assert_event import assert_event_equals
12+
13+
14+
class TestRunEventQLQuery:
15+
@staticmethod
16+
@pytest.mark.asyncio
17+
async def test_throws_error_if_server_is_not_reachable(
18+
database: Database
19+
):
20+
client = database.get_client("with_invalid_url")
21+
22+
with pytest.raises(ClientConnectorDNSError):
23+
async for _ in client.run_eventql_query('FROM e IN events PROJECT INTO e'):
24+
pass
25+
26+
@staticmethod
27+
@pytest.mark.asyncio
28+
async def test_reads_no_rows_if_query_does_not_return_any_rows(
29+
database: Database
30+
):
31+
client = database.get_client()
32+
33+
did_read_rows = False
34+
async for _ in client.run_eventql_query('FROM e IN events PROJECT INTO e'):
35+
did_read_rows = True
36+
37+
assert did_read_rows is False
38+
39+
@staticmethod
40+
@pytest.mark.asyncio
41+
async def test_reads_all_rows_the_query_returns(
42+
database: Database
43+
):
44+
client = database.get_client()
45+
46+
first_event = EventCandidate(
47+
source='https://www.eventsourcingdb.io',
48+
subject='/test',
49+
type='io.eventsourcingdb.test',
50+
data={
51+
'value': 23,
52+
},
53+
)
54+
55+
second_event = EventCandidate(
56+
source='https://www.eventsourcingdb.io',
57+
subject='/test',
58+
type='io.eventsourcingdb.test',
59+
data={
60+
'value': 42,
61+
},
62+
)
63+
64+
await client.write_events([first_event, second_event])
65+
66+
rows_read = []
67+
async for row in client.run_eventql_query('FROM e IN events PROJECT INTO e'):
68+
rows_read.append(row)
69+
70+
assert len(rows_read) == 2
71+
72+
first_row = rows_read[0]
73+
# Use dictionary access instead of attribute access
74+
assert first_row['id'] == '0'
75+
assert first_row['data']['value'] == 23
76+
77+
second_row = rows_read[1]
78+
assert second_row['id'] == '1'
79+
assert second_row['data']['value'] == 42
80+

0 commit comments

Comments
 (0)