Skip to content

Commit e490a39

Browse files
committed
use clear start and close async support
1 parent a6b1933 commit e490a39

File tree

3 files changed

+179
-123
lines changed

3 files changed

+179
-123
lines changed

eventsourcingdb/client.py

Lines changed: 113 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,39 @@
1-
import json
21
from collections.abc import AsyncGenerator
3-
from http import HTTPStatus
2+
43
from types import TracebackType
54
from typing import Any, TypeVar
65

7-
from .errors import CustomError, InternalError, ServerError, ValidationError
8-
from .event import Event, EventCandidate
9-
from .http_client import HttpClient, Response
10-
from .is_event import is_event
6+
from http import HTTPStatus
7+
import json
8+
119
from .is_heartbeat import is_heartbeat
1210
from .is_stream_error import is_stream_error
13-
from .observe_events import ObserveEventsOptions
11+
from .is_event import is_event
1412
from .parse_raw_message import parse_raw_message
15-
from .read_event_types import EventType, is_event_type
1613
from .read_events import ReadEventsOptions
14+
15+
from .errors import CustomError, InternalError, ServerError, ValidationError
16+
from .event import Event, EventCandidate
17+
from .observe_events import ObserveEventsOptions
18+
from .read_event_types import EventType, is_event_type
1719
from .read_subjects import is_subject
20+
1821
from .write_events import Precondition
22+
from .http_client import HttpClient, Response
1923

20-
T = TypeVar("T")
2124

25+
T = TypeVar('T')
2226

23-
class Client:
27+
28+
class Client():
2429
def __init__(
2530
self,
2631
base_url: str,
2732
api_token: str,
2833
) -> None:
2934
self.__http_client = HttpClient(base_url=base_url, api_token=api_token)
3035

31-
async def __aenter__(self) -> "Client":
36+
async def __aenter__(self) -> 'Client':
3237
await self.__http_client.__aenter__()
3338
return self
3439

@@ -40,12 +45,6 @@ async def __aexit__(
4045
) -> None:
4146
await self.__http_client.__aexit__(exc_type, exc_val, exc_tb)
4247

43-
async def initialize(self) -> None:
44-
await self.__http_client.initialize()
45-
46-
async def close(self) -> None:
47-
await self.__http_client.close()
48-
4948
@property
5049
def http_client(self) -> HttpClient:
5150
return self.__http_client
@@ -76,197 +75,228 @@ async def verify_api_token(self) -> None:
7675
request_body = json.dumps({})
7776

7877
response: Response = await self.http_client.post(
79-
path="/api/v1/verify-api-token",
78+
path='/api/v1/verify-api-token',
8079
request_body=request_body,
8180
)
8281
async with response:
8382
if response.status_code != HTTPStatus.OK:
84-
raise ServerError(f"Failed to verify API token: {response}")
83+
raise ServerError(
84+
f'Failed to verify API token: {response}'
85+
)
8586

8687
response_data = await response.body.read()
87-
response_data = bytes.decode(response_data, encoding="utf-8")
88+
response_data = bytes.decode(response_data, encoding='utf-8')
8889
response_json = json.loads(response_data)
8990

9091
# pylint: disable=R2004
91-
if not isinstance(response_json, dict) or "type" not in response_json:
92-
raise ServerError("Failed to parse response: {response}")
92+
if not isinstance(response_json, dict) or 'type' not in response_json:
93+
raise ServerError('Failed to parse response: {response}')
9394

94-
expected_event_type = "io.eventsourcingdb.api.api-token-verified"
95-
if response_json.get("type") != expected_event_type:
96-
raise ServerError(f"Failed to verify API token: {response}")
95+
expected_event_type = 'io.eventsourcingdb.api.api-token-verified'
96+
if response_json.get('type') != expected_event_type:
97+
raise ServerError(f'Failed to verify API token: {response}')
9798

9899
async def write_events(
99100
self,
100101
event_candidates: list[EventCandidate],
101-
preconditions: list[Precondition] = None, # type: ignore
102+
preconditions: list[Precondition] = None # type: ignore
102103
) -> list[Event]:
103104
if preconditions is None:
104105
preconditions = []
105106

106107
request_body = json.dumps(
107108
{
108-
"events": [event_candidate.to_json() for event_candidate in event_candidates],
109-
"preconditions": [precondition.to_json() for precondition in preconditions],
109+
'events': [event_candidate.to_json() for event_candidate in event_candidates],
110+
'preconditions': [precondition.to_json() for precondition in preconditions]
110111
}
111112
)
112113

113114
response: Response
114115
response = await self.http_client.post(
115-
path="/api/v1/write-events",
116+
path='/api/v1/write-events',
116117
request_body=request_body,
117118
)
118119

119120
if response.status_code != HTTPStatus.OK:
120-
raise ServerError(f"Unexpected response status: {response}")
121+
raise ServerError(
122+
f'Unexpected response status: {response}'
123+
)
121124

122125
response_data = await response.body.read()
123-
response_data = bytes.decode(response_data, encoding="utf-8")
126+
response_data = bytes.decode(response_data, encoding='utf-8')
124127
response_data = json.loads(response_data)
125128

126129
if not isinstance(response_data, list):
127-
raise ServerError(f"Failed to parse response '{response_data}' to list.")
130+
raise ServerError(
131+
f'Failed to parse response \'{response_data}\' to list.')
128132

129133
result = []
130134
for unparsed_event_context in response_data:
131135
result.append(Event.parse(unparsed_event_context))
132136
return result
133137

134-
async def read_events(self, subject: str, options: ReadEventsOptions) -> AsyncGenerator[Event]:
135-
request_body = json.dumps({"subject": subject, "options": options.to_json()})
138+
async def read_events(
139+
self,
140+
subject: str,
141+
options: ReadEventsOptions
142+
) -> AsyncGenerator[Event]:
143+
request_body = json.dumps({
144+
'subject': subject,
145+
'options': options.to_json()
146+
})
136147
response: Response = await self.__http_client.post(
137-
path="/api/v1/read-events",
148+
path='/api/v1/read-events',
138149
request_body=request_body,
139150
)
140151

141152
async with response:
142153
if response.status_code != HTTPStatus.OK:
143-
raise ServerError(f"Unexpected response status: {response}")
154+
raise ServerError(
155+
f'Unexpected response status: {response}'
156+
)
144157
async for raw_message in response.body:
145158
message = parse_raw_message(raw_message)
146159

147160
if is_stream_error(message):
148-
raise ServerError(f"{message['payload']['error']}.")
161+
raise ServerError(f'{message["payload"]["error"]}.')
149162

150163
if is_event(message):
151-
event = Event.parse(message["payload"])
164+
event = Event.parse(message['payload'])
152165
yield event
153166
continue
154167

155168
raise ServerError(
156-
f"Failed to read events, an unexpected stream item was received: {message}."
169+
f'Failed to read events, an unexpected stream item was received: '
170+
f'{message}.'
157171
)
158172

159173
async def run_eventql_query(self, query: str) -> AsyncGenerator[Any]:
160-
request_body = json.dumps(
161-
{
162-
"query": query,
163-
}
164-
)
174+
request_body = json.dumps({
175+
'query': query,
176+
})
165177
response: Response = await self.__http_client.post(
166-
path="/api/v1/run-eventql-query",
178+
path='/api/v1/run-eventql-query',
167179
request_body=request_body,
168180
)
169181

170182
async with response:
171183
if response.status_code != HTTPStatus.OK:
172-
raise ServerError(f"Unexpected response status: {response}")
184+
raise ServerError(
185+
f'Unexpected response status: {response}'
186+
)
173187
async for raw_message in response.body:
174188
message = parse_raw_message(raw_message)
175189

176190
if is_stream_error(message):
177-
error_message = message.get("payload", {}).get("error", "Unknown error")
191+
error_message = message.get('payload', {}).get('error', 'Unknown error')
178192
raise ServerError(f"{error_message}.")
179193
# pylint: disable=R2004
180-
if message.get("type") == "row":
181-
payload = message["payload"]
194+
if message.get('type') == 'row':
195+
payload = message['payload']
182196

183197
yield payload
184198
continue
185199

186200
raise ServerError(
187-
"Failed to execute EventQL query, an unexpected stream item was received: "
188-
f"{message}."
201+
'Failed to execute EventQL query, an unexpected stream item was received: '
202+
f'{message}.'
189203
)
190204

191205
async def observe_events(
192-
self, subject: str, options: ObserveEventsOptions
206+
self,
207+
subject: str,
208+
options: ObserveEventsOptions
193209
) -> AsyncGenerator[Event]:
194-
request_body = json.dumps({"subject": subject, "options": options.to_json()})
210+
request_body = json.dumps({
211+
'subject': subject,
212+
'options': options.to_json()
213+
})
195214

196215
response: Response = await self.http_client.post(
197-
path="/api/v1/observe-events",
216+
path='/api/v1/observe-events',
198217
request_body=request_body,
199218
)
200219

201220
async with response:
202221
if response.status_code != HTTPStatus.OK:
203-
raise ServerError(f"Unexpected response status: {response}")
222+
raise ServerError(
223+
f'Unexpected response status: {response}'
224+
)
204225
async for raw_message in response.body:
205226
message = parse_raw_message(raw_message)
206227

207228
if is_heartbeat(message):
208229
continue
209230

210231
if is_stream_error(message):
211-
raise ServerError(f"{message['payload']['error']}.")
232+
raise ServerError(f'{message["payload"]["error"]}.')
212233

213234
if is_event(message):
214-
event = Event.parse(message["payload"])
235+
event = Event.parse(message['payload'])
215236
yield event
216237
continue
217238

218239
raise ServerError(
219-
f"Failed to read events, an unexpected stream item was received: {message}."
240+
f'Failed to read events, an unexpected stream item was received: '
241+
f'{message}.'
220242
)
221243

222244
async def register_event_schema(self, event_type: str, json_schema: dict) -> None:
223-
request_body = json.dumps(
224-
{
225-
"eventType": event_type,
226-
"schema": json_schema,
227-
}
228-
)
245+
request_body = json.dumps({
246+
'eventType': event_type,
247+
'schema': json_schema,
248+
})
229249

230250
response: Response = await self.http_client.post(
231-
path="/api/v1/register-event-schema",
251+
path='/api/v1/register-event-schema',
232252
request_body=request_body,
233253
)
234254

235255
async with response:
236256
if response.status_code != HTTPStatus.OK:
237-
raise ServerError(f"Unexpected response status: {response} ")
257+
raise ServerError(
258+
f'Unexpected response status: {response} '
259+
)
238260

239-
async def read_subjects(self, base_subject: str) -> AsyncGenerator[str]:
240-
request_body = json.dumps({"baseSubject": base_subject})
261+
async def read_subjects(
262+
self,
263+
base_subject: str
264+
) -> AsyncGenerator[str]:
265+
request_body = json.dumps({
266+
'baseSubject': base_subject
267+
})
241268

242269
response: Response = await self.http_client.post(
243-
path="/api/v1/read-subjects",
270+
path='/api/v1/read-subjects',
244271
request_body=request_body,
245272
)
246273

247274
async with response:
248275
if response.status_code != HTTPStatus.OK:
249-
raise ServerError(f"Unexpected response status: {response}")
276+
raise ServerError(
277+
f'Unexpected response status: {response}'
278+
)
250279
async for raw_message in response.body:
251280
message = parse_raw_message(raw_message)
252281

253282
if is_stream_error(message):
254-
raise ServerError(message["payload"]["error"])
283+
raise ServerError(message['payload']['error'])
255284

256285
if is_subject(message):
257-
yield message["payload"]["subject"]
286+
yield message['payload']['subject']
258287
continue
259288

260289
raise ServerError(
261-
f"Failed to read subjects, an unexpected stream item was received '{message}'."
290+
f'Failed to read subjects, an unexpected stream item '
291+
f'was received \'{message}\'.'
262292
)
263293

264294
async def read_event_types(self) -> AsyncGenerator[EventType]:
265295
response: Response
266296
try:
267297
response = await self.http_client.post(
268-
path="/api/v1/read-event-types",
269-
request_body="",
298+
path='/api/v1/read-event-types',
299+
request_body='',
270300
)
271301
except CustomError as custom_error:
272302
raise custom_error
@@ -275,17 +305,19 @@ async def read_event_types(self) -> AsyncGenerator[EventType]:
275305

276306
async with response:
277307
if response.status_code != HTTPStatus.OK:
278-
raise ServerError(f"Unexpected response status: {response}")
308+
raise ServerError(
309+
f'Unexpected response status: {response}'
310+
)
279311
async for raw_message in response.body:
280312
message = parse_raw_message(raw_message)
281313

282314
if is_stream_error(message):
283-
raise ServerError(message["payload"]["error"])
315+
raise ServerError(message['payload']['error'])
284316

285317
if is_event_type(message):
286318
event_type: EventType
287319
try:
288-
event_type = EventType.parse(message["payload"])
320+
event_type = EventType.parse(message['payload'])
289321
except ValidationError as validation_error:
290322
raise ServerError(str(validation_error)) from validation_error
291323
except Exception as other_error:
@@ -295,6 +327,6 @@ async def read_event_types(self) -> AsyncGenerator[EventType]:
295327
continue
296328

297329
raise ServerError(
298-
f"Failed to read event types, an unexpected "
299-
f"stream item was received '{message}'."
300-
)
330+
f'Failed to read event types, an unexpected '
331+
f'stream item was received \'{message}\'.'
332+
)

0 commit comments

Comments
 (0)