Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions src/Bound.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

declare(strict_types=1);

namespace Thenativeweb\Eventsourcingdb;

use JsonSerializable;

enum BoundType: string
{
case INCLUSIVE = 'inclusive';
case EXCLUSIVE = 'exclusive';
}

class Bound implements JsonSerializable
{
public function __construct(
public string $id,
public BoundType $type,
) {
}

public function jsonSerialize(): array
{
return [
'id' => $this->id,
'type' => $this->type->value,
];
}
}
123 changes: 103 additions & 20 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
namespace Thenativeweb\Eventsourcingdb;

use DateTimeImmutable;
use Exception;
use GuzzleHttp\Client as HttpClient;
use GuzzleHttp\Exception\GuzzleException;
use RuntimeException;
use Thenativeweb\Eventsourcingdb\Stream\Utils;

final readonly class Client
{
Expand All @@ -22,6 +25,9 @@ public function __construct(string $url, string $apiToken)
]);
}

/**
* @throws GuzzleException
*/
public function ping(): void
{
$response = $this->httpClient->get('/api/v1/ping');
Expand All @@ -34,14 +40,17 @@ public function ping(): void
));
}

$body = (string) $response->getBody();
$body = $response->getBody()->getContents();
$data = json_decode($body, true);

if (!isset($data['type']) || $data['type'] !== 'io.eventsourcingdb.api.ping-received') {
throw new RuntimeException('Failed to ping');
}
}

/**
* @throws GuzzleException
*/
public function verifyApiToken(): void
{
$response = $this->httpClient->post(
Expand All @@ -61,15 +70,19 @@ public function verifyApiToken(): void
));
}

$body = (string) $response->getBody();
$body = $response->getBody()->getContents();
$data = json_decode($body, true);

if (!isset($data['type']) || $data['type'] !== 'io.eventsourcingdb.api.api-token-verified') {
throw new RuntimeException('Failed to verify API token');
}
}

public function writeEvents(array $events, array $preconditions = []): array
/**
* @return iterable<CloudEvent>
* @throws Exception|GuzzleException
*/
public function writeEvents(array $events, array $preconditions = []): iterable
{
$requestBody = [
'events' => $events,
Expand Down Expand Up @@ -97,24 +110,94 @@ public function writeEvents(array $events, array $preconditions = []): array
));
}

$body = (string) $response->getBody();
$body = $response->getBody()->getContents();
if ($body === '') {
return;
}

if (!json_validate($body)) {
throw new RuntimeException('Failed to read events.');
}

$data = json_decode($body, true);
if (!is_array($data)) {
throw new RuntimeException('Failed to read events, expected an array.');
}

foreach ($data as $item) {
$cloudEvent = new CloudEvent(
$item['specversion'],
$item['id'],
new DateTimeImmutable($item['time']),
$item['source'],
$item['subject'],
$item['type'],
$item['datacontenttype'],
$item['data'],
$item['hash'],
$item['predecessorhash'],
$item['traceparent'] ?? null,
$item['tracestate'] ?? null,
);
yield $cloudEvent;
}
}

/**
* @return iterable<CloudEvent>
* @throws Exception|GuzzleException
*/
public function readEvents(string $subject, ReadEventsOptions $readEventsOptions): iterable
{
$requestBody = [
'subject' => $subject,
'options' => $readEventsOptions,
];

$response = $this->httpClient->post(
'/api/v1/read-events',
[
'headers' => [
'Authorization' => 'Bearer ' . $this->apiToken,
'Content-Type' => 'application/json',
],
'json' => $requestBody,
],
);
$status = $response->getStatusCode();

$writtenEvents = array_map(fn ($item): CloudEvent => new CloudEvent(
$item['specversion'],
$item['id'],
new DateTimeImmutable($item['time']),
$item['source'],
$item['subject'],
$item['type'],
$item['datacontenttype'],
$item['data'],
$item['hash'],
$item['predecessorhash'],
$item['traceparent'] ?? null,
$item['tracestate'] ?? null,
), $data);

return $writtenEvents;
if ($status !== 200) {
throw new RuntimeException(sprintf(
"Failed to read events, got HTTP status code '%d', expected '200'",
$status
));
}

foreach (Utils::readNdJson($response->getBody()) as $eventLine) {
switch ($eventLine->type) {
case 'event':
$cloudEvent = new CloudEvent(
$eventLine->payload['specversion'],
$eventLine->payload['id'],
new DateTimeImmutable($eventLine->payload['time']),
$eventLine->payload['source'],
$eventLine->payload['subject'],
$eventLine->payload['type'],
$eventLine->payload['datacontenttype'],
$eventLine->payload['data'],
$eventLine->payload['hash'],
$eventLine->payload['predecessorhash'],
$eventLine->payload['traceparent'] ?? null,
$eventLine->payload['tracestate'] ?? null,
);
yield $cloudEvent;

break;
case 'error':
throw new RuntimeException($eventLine->payload['error'] ?? 'unknown error');
default:
throw new RuntimeException("Failed to handle unsupported line type {$eventLine->type}");
}
}
}
}
75 changes: 75 additions & 0 deletions src/ReadEventsOptions.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
<?php

declare(strict_types=1);

namespace Thenativeweb\Eventsourcingdb;

use JsonSerializable;

enum Order: string
{
case CHRONOLOGICAL = 'chronological';
case ANTICHRONOLOGICAL = 'antichronological';
}

enum ReadIfEventIsMissing: string
{
case READ_NOTHING = 'read-nothing';
case READ_EVERYTHING = 'read-everything';
}

class ReadFromLatestEvent implements JsonSerializable
{
public function __construct(
public string $subject,
public string $type,
public ReadIfEventIsMissing $ifEventIsMissing,
) {
}

public function jsonSerialize(): array
{
return [
'subject' => $this->subject,
'type' => $this->type,
'ifEventIsMissing' => $this->ifEventIsMissing->value,
];
}
}

class ReadEventsOptions implements JsonSerializable
{
public function __construct(
public bool $recursive = false,
public ?Order $order = null,
public ?Bound $lowerBound = null,
public ?Bound $upperBound = null,
public ?ReadFromLatestEvent $fromLatestEvent = null
) {
}

public function jsonSerialize(): mixed
{
$result = [
'recursive' => $this->recursive,
];

if ($this->order instanceof Order) {
$result['order'] = $this->order->value;
}

if ($this->lowerBound instanceof Bound) {
$result['lowerBound'] = $this->lowerBound->jsonSerialize();
}

if ($this->upperBound instanceof Bound) {
$result['upperBound'] = $this->upperBound->jsonSerialize();
}

if ($this->fromLatestEvent instanceof ReadFromLatestEvent) {
$result['fromLatestEvent'] = $this->fromLatestEvent->jsonSerialize();
}

return $result;
}
}
14 changes: 14 additions & 0 deletions src/Stream/ReadEventLine.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

namespace Thenativeweb\Eventsourcingdb\Stream;

final readonly class ReadEventLine
{
public function __construct(
public string $type,
public array $payload,
) {
}
}
57 changes: 57 additions & 0 deletions src/Stream/Utils.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<?php

declare(strict_types=1);

namespace Thenativeweb\Eventsourcingdb\Stream;

use Psr\Http\Message\StreamInterface;
use RuntimeException;

final readonly class Utils
{
public static function readLine(StreamInterface $stream): string
{
$buffer = '';

while (!$stream->eof()) {
if ('' === ($byte = $stream->read(1))) {
return $buffer;
}

$buffer .= $byte;
if ($byte === "\n") {
break;
}
}

return $buffer;
}

/**
* @return iterable<ReadEventLine>
*/
public static function readNdJson(StreamInterface $stream): iterable
{
while (!$stream->eof()) {
$line = self::readLine($stream);
if ($line === '') {
continue;
}

if (!json_validate($line)) {
throw new RuntimeException('Failed to read events.');
}

$item = json_decode($line, true);
if (!is_array($item)) {
throw new RuntimeException('Failed to read events, expected an array.');
}

$eventLine = new ReadEventLine(
$item['type'] ?? 'unknown',
$item['payload'] ?? [],
);
yield $eventLine;
}
}
}
Loading