Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions src/Bound.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

declare(strict_types=1);

namespace Thenativeweb\Eventsourcingdb;

use JsonSerializable;

enum BoundType: string
{
case INCLUSIVE = 'inclusive';
case EXCLUSIVE = 'exclusive';
}

class Bound implements JsonSerializable
{
public function __construct(
public string $id,
public BoundType $type,
) {
}

public function jsonSerialize(): array
{
return [
'id' => $this->id,
'type' => $this->type->value,
];
}
}
107 changes: 87 additions & 20 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use DateTimeImmutable;
use GuzzleHttp\Client as HttpClient;
use RuntimeException;
use Thenativeweb\Eventsourcingdb\Stream\NdJson;

final readonly class Client
{
Expand Down Expand Up @@ -34,7 +35,7 @@ public function ping(): void
));
}

$body = (string) $response->getBody();
$body = $response->getBody()->getContents();
$data = json_decode($body, true);

if (!isset($data['type']) || $data['type'] !== 'io.eventsourcingdb.api.ping-received') {
Expand All @@ -61,15 +62,15 @@ public function verifyApiToken(): void
));
}

$body = (string) $response->getBody();
$body = $response->getBody()->getContents();
$data = json_decode($body, true);

if (!isset($data['type']) || $data['type'] !== 'io.eventsourcingdb.api.api-token-verified') {
throw new RuntimeException('Failed to verify API token');
}
}

public function writeEvents(array $events, array $preconditions = []): array
public function writeEvents(array $events, array $preconditions = []): iterable
{
$requestBody = [
'events' => $events,
Expand Down Expand Up @@ -97,24 +98,90 @@ public function writeEvents(array $events, array $preconditions = []): array
));
}

$body = (string) $response->getBody();
$body = $response->getBody()->getContents();
if ($body === '') {
return;
}

if (!json_validate($body)) {
throw new RuntimeException('Failed to read events.');
}

$data = json_decode($body, true);
if (!is_array($data)) {
throw new RuntimeException('Failed to read events, expected an array.');
}

foreach ($data as $item) {
$cloudEvent = new CloudEvent(
$item['specversion'],
$item['id'],
new DateTimeImmutable($item['time']),
$item['source'],
$item['subject'],
$item['type'],
$item['datacontenttype'],
$item['data'],
$item['hash'],
$item['predecessorhash'],
$item['traceparent'] ?? null,
$item['tracestate'] ?? null,
);
yield $cloudEvent;
}
}

public function readEvents(string $subject, ReadEventsOptions $readEventsOptions): iterable
{
$requestBody = [
'subject' => $subject,
'options' => $readEventsOptions,
];

$response = $this->httpClient->post(
'/api/v1/read-events',
[
'headers' => [
'Authorization' => 'Bearer ' . $this->apiToken,
'Content-Type' => 'application/json',
],
'json' => $requestBody,
],
);
$status = $response->getStatusCode();

$writtenEvents = array_map(fn ($item): CloudEvent => new CloudEvent(
$item['specversion'],
$item['id'],
new DateTimeImmutable($item['time']),
$item['source'],
$item['subject'],
$item['type'],
$item['datacontenttype'],
$item['data'],
$item['hash'],
$item['predecessorhash'],
$item['traceparent'] ?? null,
$item['tracestate'] ?? null,
), $data);

return $writtenEvents;
if ($status !== 200) {
throw new RuntimeException(sprintf(
"Failed to read events, got HTTP status code '%d', expected '200'",
$status
));
}

foreach (NdJson::readStream($response->getBody()) as $eventLine) {
switch ($eventLine->type) {
case 'event':
$cloudEvent = new CloudEvent(
$eventLine->payload['specversion'],
$eventLine->payload['id'],
new DateTimeImmutable($eventLine->payload['time']),
$eventLine->payload['source'],
$eventLine->payload['subject'],
$eventLine->payload['type'],
$eventLine->payload['datacontenttype'],
$eventLine->payload['data'],
$eventLine->payload['hash'],
$eventLine->payload['predecessorhash'],
$eventLine->payload['traceparent'] ?? null,
$eventLine->payload['tracestate'] ?? null,
);
yield $cloudEvent;

break;
case 'error':
throw new RuntimeException($eventLine->payload['error'] ?? 'unknown error');
default:
throw new RuntimeException("Failed to handle unsupported line type {$eventLine->type}");
}
}
}
}
75 changes: 75 additions & 0 deletions src/ReadEventsOptions.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
<?php

declare(strict_types=1);

namespace Thenativeweb\Eventsourcingdb;

use JsonSerializable;

enum Order: string
{
case CHRONOLOGICAL = 'chronological';
case ANTICHRONOLOGICAL = 'antichronological';
}

enum ReadIfEventIsMissing: string
{
case READ_NOTHING = 'read-nothing';
case READ_EVERYTHING = 'read-everything';
}

class ReadFromLatestEvent implements JsonSerializable
{
public function __construct(
public string $subject,
public string $type,
public ReadIfEventIsMissing $ifEventIsMissing,
) {
}

public function jsonSerialize(): array
{
return [
'subject' => $this->subject,
'type' => $this->type,
'ifEventIsMissing' => $this->ifEventIsMissing->value,
];
}
}

class ReadEventsOptions implements JsonSerializable
{
public function __construct(
public bool $recursive = false,
public ?Order $order = null,
public ?Bound $lowerBound = null,
public ?Bound $upperBound = null,
public ?ReadFromLatestEvent $fromLatestEvent = null
) {
}

public function jsonSerialize(): mixed
{
$result = [
'recursive' => $this->recursive,
];

if ($this->order instanceof Order) {
$result['order'] = $this->order->value;
}

if ($this->lowerBound instanceof Bound) {
$result['lowerBound'] = $this->lowerBound->jsonSerialize();
}

if ($this->upperBound instanceof Bound) {
$result['upperBound'] = $this->upperBound->jsonSerialize();
}

if ($this->fromLatestEvent instanceof ReadFromLatestEvent) {
$result['fromLatestEvent'] = $this->fromLatestEvent->jsonSerialize();
}

return $result;
}
}
54 changes: 54 additions & 0 deletions src/Stream/NdJson.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<?php

declare(strict_types=1);

namespace Thenativeweb\Eventsourcingdb\Stream;

use Psr\Http\Message\StreamInterface;
use RuntimeException;

final readonly class NdJson
{
public static function readLine(StreamInterface $stream): string
{
$buffer = '';

while (!$stream->eof()) {
if ('' === ($byte = $stream->read(1))) {
return $buffer;
}

$buffer .= $byte;
if ($byte === "\n") {
break;
}
}

return $buffer;
}

public static function readStream(StreamInterface $stream): iterable
{
while (!$stream->eof()) {
$line = self::readLine($stream);
if ($line === '') {
continue;
}

if (!json_validate($line)) {
throw new RuntimeException('Failed to read events.');
}

$item = json_decode($line, true);
if (!is_array($item)) {
throw new RuntimeException('Failed to read events, expected an array.');
}

$eventLine = new ReadEventLine(
$item['type'] ?? 'unknown',
$item['payload'] ?? [],
);
yield $eventLine;
}
}
}
14 changes: 14 additions & 0 deletions src/Stream/ReadEventLine.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

namespace Thenativeweb\Eventsourcingdb\Stream;

final readonly class ReadEventLine
{
public function __construct(
public string $type,
public array $payload,
) {
}
}
Loading