Skip to content

Commit 9f93f9b

Browse files
wundiiawugoloroden
authored
feat: implemented registerEventSchema, readSubjects and readEventTypes plus IsEventQlTrue for writeEvents (#21)
Co-authored-by: awu <[email protected]> Co-authored-by: Golo Roden <[email protected]>
1 parent e937756 commit 9f93f9b

12 files changed

+541
-5
lines changed

README.md

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ If you only want to write events in case a subject (such as `/books/42`) does no
7373
use Thenativeweb\Eventsourcingdb\IsSubjectPristine;
7474

7575
$writtenEvents = $client->writeEvents([
76-
// ...
76+
// events
7777
], [
7878
new IsSubjectPristine('/books/42'),
7979
]);
@@ -87,14 +87,30 @@ If you only want to write events in case the last event of a subject (such as `/
8787
use Thenativeweb\Eventsourcingdb\IsSubjectOnEventId;
8888

8989
$writtenEvents = $client->writeEvents([
90-
// ...
90+
// events
9191
], [
9292
new IsSubjectOnEventId('/books/42', '0'),
9393
]);
9494
```
9595

9696
*Note that according to the CloudEvents standard, event IDs must be of type string.*
9797

98+
#### Using the `isEventQlTrue` precondition
99+
100+
If you want to write events depending on an EventQL query, use the `IsEventQlTrue` function to create a precondition:
101+
102+
```php
103+
use Thenativeweb\Eventsourcingdb\IsEventQlTrue;
104+
105+
$writtenEvents = $client->writeEvents([
106+
// events
107+
], [
108+
new IsEventQlTrue("FROM e IN events WHERE e.type == 'io.eventsourcingdb.library.book-borrowed' PROJECT INTO COUNT() < 10")
109+
]);
110+
```
111+
112+
*Note that the query must return a single row with a single value, which is interpreted as a boolean.*
113+
98114
### Reading Events
99115

100116
To read all events of a subject, call the `readEvents` function with the subject and an options object. Set the `recursive` option to `false`. This ensures that only events of the given subject are returned, not events of nested subjects.
@@ -348,6 +364,92 @@ foreach ($events as $event) {
348364
}
349365
```
350366

367+
### Registering an Event Schema
368+
369+
To register an event schema, call the `registerEventSchema` function and hand over an event type and the desired schema:
370+
371+
```php
372+
$eventType = 'io.eventsourcingdb.library.book-acquired';
373+
$schema = [
374+
'type' => 'object',
375+
'properties' => [
376+
'title' => ['type' => 'string'],
377+
'author' => ['type' => 'string'],
378+
'isbn' => ['type' => 'string'],
379+
],
380+
'required' => [
381+
'title',
382+
'author',
383+
'isbn',
384+
],
385+
'additionalProperties' => false,
386+
];
387+
388+
$client->registerEventSchema($eventType, $schema);
389+
```
390+
391+
### Listing Subjects
392+
393+
To list all subjects, call the `readSubjects` function with `/` as the base subject. The function returns an asynchronous iterator, which you can use e.g. inside a `foreach` loop:
394+
395+
```php
396+
$subjects = $client->readSubjects('/');
397+
398+
foreach($subjects as $subject) {
399+
// ...
400+
}
401+
```
402+
403+
If you only want to list subjects within a specific branch, provide the desired base subject instead:
404+
405+
```php
406+
$subjects = $client->readSubjects('/books');
407+
408+
foreach($subjects as $subject) {
409+
// ...
410+
}
411+
```
412+
413+
#### Aborting Listing
414+
415+
If you need to abort listing use `abortIn` before or within the `foreach` loop. The `abortIn` method expects the abort time in seconds. However, this only works if there is currently an iteration going on:
416+
417+
```php
418+
$subjects = $client->readSubjects('/');
419+
420+
$client->abortIn(0.1);
421+
foreach($subjects as $subject) {
422+
// ...
423+
$client->abortIn(0.1);
424+
}
425+
```
426+
427+
### Listing Event Types
428+
429+
To list all event types, call the `readEventTypes` function. The function returns an asynchronous iterator, which you can use e.g. inside a `foreach` loop:
430+
431+
```php
432+
$eventTypes = $client->readEventTypes();
433+
434+
foreach($eventTypes as $eventType) {
435+
// ...
436+
}
437+
```
438+
439+
#### Aborting Listing
440+
441+
If you need to abort listing use `abortIn` before or within the `foreach` loop. The `abortIn` method expects the abort time in seconds. However, this only works if there is currently an iteration going on:
442+
443+
```php
444+
$eventTypes = $client->readEventTypes();
445+
446+
$client->abortIn(0.1);
447+
foreach($eventTypes as $eventType) {
448+
// ...
449+
$client->abortIn(0.1);
450+
}
451+
```
452+
351453
### Using Testcontainers
352454

353455
Import the `Container` class, call the `start` function to run a test container, get a client, run your test code, and finally call the `stop` function to stop the test container:

docker/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
FROM thenativeweb/eventsourcingdb:1.0.3
1+
FROM thenativeweb/eventsourcingdb:preview

src/Client.php

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,4 +257,95 @@ public function observeEvents(string $subject, ObserveEventsOptions $observeEven
257257
}
258258
}
259259
}
260+
261+
public function registerEventSchema(string $eventType, array $schema): void
262+
{
263+
$response = $this->httpClient->post(
264+
'/api/v1/register-event-schema',
265+
$this->apiToken,
266+
[
267+
'eventType' => $eventType,
268+
'schema' => $schema,
269+
],
270+
);
271+
272+
$status = $response->getStatusCode();
273+
if ($status !== 200) {
274+
throw new RuntimeException(sprintf(
275+
"Failed to register event schema, got HTTP status code '%d', expected '200'",
276+
$status
277+
));
278+
}
279+
}
280+
281+
public function readSubjects(string $baseSubject): iterable
282+
{
283+
$response = $this->httpClient->post(
284+
'/api/v1/read-subjects',
285+
$this->apiToken,
286+
[
287+
'baseSubject' => $baseSubject,
288+
],
289+
);
290+
291+
$status = $response->getStatusCode();
292+
if ($status !== 200) {
293+
throw new RuntimeException(sprintf(
294+
"Failed to read subjects, got HTTP status code '%d', expected '200'",
295+
$status
296+
));
297+
}
298+
299+
foreach (NdJson::readStream($response->getStream()) as $eventLine) {
300+
switch ($eventLine->type) {
301+
case 'heartbeat':
302+
break;
303+
case 'subject':
304+
$subject = $eventLine->payload['subject'];
305+
yield $subject;
306+
307+
break;
308+
case 'error':
309+
throw new RuntimeException($eventLine->payload['error'] ?? 'unknown error');
310+
default:
311+
throw new RuntimeException("Failed to handle unsupported line type {$eventLine->type}");
312+
}
313+
}
314+
}
315+
316+
public function readEventTypes(): iterable
317+
{
318+
$response = $this->httpClient->post(
319+
'/api/v1/read-event-types',
320+
$this->apiToken,
321+
);
322+
323+
$status = $response->getStatusCode();
324+
if ($status !== 200) {
325+
throw new RuntimeException(sprintf(
326+
"Failed to read event types, got HTTP status code '%d', expected '200'",
327+
$status
328+
));
329+
}
330+
331+
foreach (NdJson::readStream($response->getStream()) as $eventLine) {
332+
switch ($eventLine->type) {
333+
case 'heartbeat':
334+
break;
335+
case 'eventType':
336+
$eventType = new EventType(
337+
$eventLine->payload['eventType'],
338+
$eventLine->payload['isPhantom'],
339+
$eventLine->payload['schema'] ?? [],
340+
);
341+
yield $eventType;
342+
343+
break;
344+
case 'error':
345+
throw new RuntimeException($eventLine->payload['error'] ?? 'unknown error');
346+
default:
347+
throw new RuntimeException("Failed to handle unsupported line type {$eventLine->type}");
348+
}
349+
}
350+
}
260351
}

src/EventType.php

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Thenativeweb\Eventsourcingdb;
6+
7+
use JsonSerializable;
8+
9+
class EventType implements JsonSerializable
10+
{
11+
public function __construct(
12+
public string $eventType,
13+
public bool $isPhantom,
14+
public array $schema = [],
15+
) {
16+
}
17+
18+
public function jsonSerialize(): array
19+
{
20+
return [
21+
'eventType' => $this->eventType,
22+
'isPhantom' => $this->isPhantom,
23+
'schema' => $this->schema,
24+
];
25+
}
26+
}

src/IsEventQlTrue.php

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Thenativeweb\Eventsourcingdb;
6+
7+
use JsonSerializable;
8+
9+
final readonly class IsEventQlTrue implements JsonSerializable
10+
{
11+
public function __construct(
12+
public string $query,
13+
) {
14+
}
15+
16+
public function jsonSerialize(): array
17+
{
18+
return [
19+
'type' => 'isEventQlTrue',
20+
'payload' => [
21+
'query' => $this->query,
22+
],
23+
];
24+
}
25+
}

tests/ObserveEventsTest.php

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use PHPUnit\Framework\TestCase;
66
use Thenativeweb\Eventsourcingdb\Bound;
77
use Thenativeweb\Eventsourcingdb\BoundType;
8+
use Thenativeweb\Eventsourcingdb\CloudEvent;
89
use Thenativeweb\Eventsourcingdb\EventCandidate;
910
use Thenativeweb\Eventsourcingdb\ObserveEventsOptions;
1011
use Thenativeweb\Eventsourcingdb\ObserveFromLatestEvent;
@@ -146,6 +147,7 @@ public function testObserversWithLowerBound(): void
146147
}
147148

148149
$this->assertCount(1, $eventsObserved);
150+
$this->assertInstanceOf(CloudEvent::class, $eventsObserved[0]);
149151
$this->assertSame('1', $eventsObserved[0]->id);
150152
$this->assertSame(42, $eventsObserved[0]->data['value']);
151153
}
@@ -191,6 +193,7 @@ public function testObserversFromLatestEvent(): void
191193
}
192194

193195
$this->assertCount(1, $eventsObserved);
196+
$this->assertInstanceOf(CloudEvent::class, $eventsObserved[0]);
194197
$this->assertSame('1', $eventsObserved[0]->id);
195198
$this->assertSame(42, $eventsObserved[0]->data['value']);
196199
}
@@ -259,9 +262,9 @@ public function testObserverAllEventsPerformanceBenchmark(): void
259262
);
260263

261264
$startTime = microtime(true);
262-
$maxExecutionTime = 3.0;
265+
$maxExecutionTime = 2.0;
263266

264-
$this->client->abortIn(1.5);
267+
$this->client->abortIn(0.5);
265268
foreach ($this->client->observeEvents('/test', $observeEventsOptions) as $event) {
266269
$eventsObserved[] = $event;
267270
}

0 commit comments

Comments
 (0)