diff --git a/src/Bound.php b/src/Bound.php new file mode 100644 index 0000000..2e9604d --- /dev/null +++ b/src/Bound.php @@ -0,0 +1,30 @@ + $this->id, + 'type' => $this->type->value, + ]; + } +} diff --git a/src/Client.php b/src/Client.php index 6e32bc0..ca07d77 100644 --- a/src/Client.php +++ b/src/Client.php @@ -7,6 +7,7 @@ use DateTimeImmutable; use GuzzleHttp\Client as HttpClient; use RuntimeException; +use Thenativeweb\Eventsourcingdb\Stream\NdJson; final readonly class Client { @@ -34,7 +35,7 @@ public function ping(): void )); } - $body = (string) $response->getBody(); + $body = $response->getBody()->getContents(); $data = json_decode($body, true); if (!isset($data['type']) || $data['type'] !== 'io.eventsourcingdb.api.ping-received') { @@ -61,7 +62,7 @@ public function verifyApiToken(): void )); } - $body = (string) $response->getBody(); + $body = $response->getBody()->getContents(); $data = json_decode($body, true); if (!isset($data['type']) || $data['type'] !== 'io.eventsourcingdb.api.api-token-verified') { @@ -69,7 +70,7 @@ public function verifyApiToken(): void } } - public function writeEvents(array $events, array $preconditions = []): array + public function writeEvents(array $events, array $preconditions = []): iterable { $requestBody = [ 'events' => $events, @@ -97,24 +98,90 @@ public function writeEvents(array $events, array $preconditions = []): array )); } - $body = (string) $response->getBody(); + $body = $response->getBody()->getContents(); + if ($body === '') { + return; + } + + if (!json_validate($body)) { + throw new RuntimeException('Failed to read events.'); + } + $data = json_decode($body, true); + if (!is_array($data)) { + throw new RuntimeException('Failed to read events, expected an array.'); + } + + foreach ($data as $item) { + $cloudEvent = new CloudEvent( + $item['specversion'], + $item['id'], + new DateTimeImmutable($item['time']), + $item['source'], + $item['subject'], + $item['type'], + $item['datacontenttype'], + $item['data'], + $item['hash'], + $item['predecessorhash'], + $item['traceparent'] ?? null, + $item['tracestate'] ?? null, + ); + yield $cloudEvent; + } + } + + public function readEvents(string $subject, ReadEventsOptions $readEventsOptions): iterable + { + $requestBody = [ + 'subject' => $subject, + 'options' => $readEventsOptions, + ]; + + $response = $this->httpClient->post( + '/api/v1/read-events', + [ + 'headers' => [ + 'Authorization' => 'Bearer ' . $this->apiToken, + 'Content-Type' => 'application/json', + ], + 'json' => $requestBody, + ], + ); + $status = $response->getStatusCode(); - $writtenEvents = array_map(fn ($item): CloudEvent => new CloudEvent( - $item['specversion'], - $item['id'], - new DateTimeImmutable($item['time']), - $item['source'], - $item['subject'], - $item['type'], - $item['datacontenttype'], - $item['data'], - $item['hash'], - $item['predecessorhash'], - $item['traceparent'] ?? null, - $item['tracestate'] ?? null, - ), $data); - - return $writtenEvents; + if ($status !== 200) { + throw new RuntimeException(sprintf( + "Failed to read events, got HTTP status code '%d', expected '200'", + $status + )); + } + + foreach (NdJson::readStream($response->getBody()) as $eventLine) { + switch ($eventLine->type) { + case 'event': + $cloudEvent = new CloudEvent( + $eventLine->payload['specversion'], + $eventLine->payload['id'], + new DateTimeImmutable($eventLine->payload['time']), + $eventLine->payload['source'], + $eventLine->payload['subject'], + $eventLine->payload['type'], + $eventLine->payload['datacontenttype'], + $eventLine->payload['data'], + $eventLine->payload['hash'], + $eventLine->payload['predecessorhash'], + $eventLine->payload['traceparent'] ?? null, + $eventLine->payload['tracestate'] ?? null, + ); + yield $cloudEvent; + + break; + case 'error': + throw new RuntimeException($eventLine->payload['error'] ?? 'unknown error'); + default: + throw new RuntimeException("Failed to handle unsupported line type {$eventLine->type}"); + } + } } } diff --git a/src/ReadEventsOptions.php b/src/ReadEventsOptions.php new file mode 100644 index 0000000..d0483a3 --- /dev/null +++ b/src/ReadEventsOptions.php @@ -0,0 +1,75 @@ + $this->subject, + 'type' => $this->type, + 'ifEventIsMissing' => $this->ifEventIsMissing->value, + ]; + } +} + +class ReadEventsOptions implements JsonSerializable +{ + public function __construct( + public bool $recursive = false, + public ?Order $order = null, + public ?Bound $lowerBound = null, + public ?Bound $upperBound = null, + public ?ReadFromLatestEvent $fromLatestEvent = null + ) { + } + + public function jsonSerialize(): mixed + { + $result = [ + 'recursive' => $this->recursive, + ]; + + if ($this->order instanceof Order) { + $result['order'] = $this->order->value; + } + + if ($this->lowerBound instanceof Bound) { + $result['lowerBound'] = $this->lowerBound->jsonSerialize(); + } + + if ($this->upperBound instanceof Bound) { + $result['upperBound'] = $this->upperBound->jsonSerialize(); + } + + if ($this->fromLatestEvent instanceof ReadFromLatestEvent) { + $result['fromLatestEvent'] = $this->fromLatestEvent->jsonSerialize(); + } + + return $result; + } +} diff --git a/src/Stream/NdJson.php b/src/Stream/NdJson.php new file mode 100644 index 0000000..09b55f7 --- /dev/null +++ b/src/Stream/NdJson.php @@ -0,0 +1,54 @@ +eof()) { + if ('' === ($byte = $stream->read(1))) { + return $buffer; + } + + $buffer .= $byte; + if ($byte === "\n") { + break; + } + } + + return $buffer; + } + + public static function readStream(StreamInterface $stream): iterable + { + while (!$stream->eof()) { + $line = self::readLine($stream); + if ($line === '') { + continue; + } + + if (!json_validate($line)) { + throw new RuntimeException('Failed to read events.'); + } + + $item = json_decode($line, true); + if (!is_array($item)) { + throw new RuntimeException('Failed to read events, expected an array.'); + } + + $eventLine = new ReadEventLine( + $item['type'] ?? 'unknown', + $item['payload'] ?? [], + ); + yield $eventLine; + } + } +} diff --git a/src/Stream/ReadEventLine.php b/src/Stream/ReadEventLine.php new file mode 100644 index 0000000..e1eb06c --- /dev/null +++ b/src/Stream/ReadEventLine.php @@ -0,0 +1,14 @@ +container = $this->startContainer(); - $this->client = $this->container->getClient(); - } - - protected function tearDown(): void - { - $this->container->stop(); - parent::tearDown(); - } - - public function testPingSucceedsWhenServerIsReachable(): void - { - $this->client->ping(); - $this->expectNotToPerformAssertions(); - } - - public function testPingFailsWhenServerIsUnreachable(): void - { - $port = $this->container->getMappedPort(); - $client = new Client("http://non-existent-host:{$port}", $this->container->getApiToken()); - - $this->expectException(\Throwable::class); - $client->ping(); - } - - public function testVerifyApiTokenDoesNotThrowAnErrorIfTheTokenIsValid(): void - { - $client = $this->container->getClient(); - $client->verifyApiToken(); - $this->expectNotToPerformAssertions(); - } - - public function testVerifyApiTokenThrowsAnErrorIfTheTokenIsInvalid(): void - { - $baseUrl = $this->container->getBaseUrl(); - $apiToken = $this->container->getApiToken() . '-invalid'; - $client = new Client($baseUrl, $apiToken); - $this->expectException(\Throwable::class); - $client->verifyApiToken(); - } - - public function testWriteEventsWritesASingleEvent(): void - { - $eventCandidate = new EventCandidate( - 'https://www.eventsourcingdb.io', - '/test', - 'io.eventsourcingdb.test', - [ - 'value' => 42, - ], - ); - - $writtenEvents = $this->client->writeEvents([ - $eventCandidate, - ]); - - $this->assertCount(1, $writtenEvents); - $this->assertSame('0', $writtenEvents[0]->id); - } - - public function testWriteEventsWritesMultipleEvents(): void - { - $firstEvent = new EventCandidate( - 'https://www.eventsourcingdb.io', - '/test', - 'io.eventsourcingdb.test', - [ - 'value' => 23, - ], - ); - - $secondEvent = new EventCandidate( - 'https://www.eventsourcingdb.io', - '/test', - 'io.eventsourcingdb.test', - [ - 'value' => 42, - ], - ); - - $writtenEvents = $this->client->writeEvents([ - $firstEvent, - $secondEvent, - ]); - - $this->assertCount(2, $writtenEvents); - $this->assertSame('0', $writtenEvents[0]->id); - $this->assertSame(23, $writtenEvents[0]->data['value']); - $this->assertSame('1', $writtenEvents[1]->id); - $this->assertSame(42, $writtenEvents[1]->data['value']); - } - - public function testWriteEventsSupportsTheIsSubjectPristinePrecondition(): void - { - $firstEvent = new EventCandidate( - 'https://www.eventsourcingdb.io', - '/test', - 'io.eventsourcingdb.test', - [ - 'value' => 23, - ], - ); - - $this->client->writeEvents([ - $firstEvent, - ]); - - $secondEvent = new EventCandidate( - 'https://www.eventsourcingdb.io', - '/test', - 'io.eventsourcingdb.test', - [ - 'value' => 42, - ], - ); - - $this->expectExceptionMessage("Failed to write events, got HTTP status code '409', expected '200'"); - - $this->client->writeEvents([ - $secondEvent, - ], [ - new IsSubjectPristine('/test'), - ]); - } - - public function testWriteEventsSupportsTheIsSubjectOnEventIdPrecondition(): void - { - $firstEvent = new EventCandidate( - 'https://www.eventsourcingdb.io', - '/test', - 'io.eventsourcingdb.test', - [ - 'value' => 23, - ], - ); - - $this->client->writeEvents([ - $firstEvent, - ]); - - $secondEvent = new EventCandidate( - 'https://www.eventsourcingdb.io', - '/test', - 'io.eventsourcingdb.test', - [ - 'value' => 42, - ], - ); - - $this->expectExceptionMessage("Failed to write events, got HTTP status code '409', expected '200'"); - $this->client->writeEvents([ - $secondEvent, - ], [ - new IsSubjectOnEventId('/test', '1'), - ]); - } -} diff --git a/tests/ClientTestTrait.php b/tests/ClientTestTrait.php index d19eb26..18df10e 100644 --- a/tests/ClientTestTrait.php +++ b/tests/ClientTestTrait.php @@ -4,10 +4,28 @@ namespace Thenativeweb\Eventsourcingdb\Tests; +use Thenativeweb\Eventsourcingdb\Client; use Thenativeweb\Eventsourcingdb\Container; trait ClientTestTrait { + private Container $container; + + private Client $client; + + protected function setUp(): void + { + parent::setUp(); + $this->container = $this->startContainer(); + $this->client = $this->container->getClient(); + } + + protected function tearDown(): void + { + $this->container->stop(); + parent::tearDown(); + } + protected function startContainer(): Container { $imageVersion = getImageVersionFromDockerfile(); diff --git a/tests/PingTest.php b/tests/PingTest.php new file mode 100644 index 0000000..3c0ca42 --- /dev/null +++ b/tests/PingTest.php @@ -0,0 +1,28 @@ +client->ping(); + $this->expectNotToPerformAssertions(); + } + + public function testFailsWhenServerIsUnreachable(): void + { + $port = $this->container->getMappedPort(); + $client = new Client("http://non-existent-host:{$port}", $this->container->getApiToken()); + + $this->expectException(\Throwable::class); + $client->ping(); + } +} diff --git a/tests/ReadEventsTest.php b/tests/ReadEventsTest.php new file mode 100644 index 0000000..06dd60a --- /dev/null +++ b/tests/ReadEventsTest.php @@ -0,0 +1,308 @@ +client->readEvents('/', $readEventsOptions) as $readEvent) { + $didReadEvents = true; + } + + $this->assertFalse($didReadEvents, 'Expected no events to be read, but some were found.'); + } + + public function testReadsAllEventsFromTheGivenSubject(): void + { + $firstEvent = new EventCandidate( + source: 'https://www.eventsourcingdb.io', + subject: '/test', + type: 'io.eventsourcingdb.test', + data: [ + 'value' => 23, + ], + ); + + $secondEvent = new EventCandidate( + source: 'https://www.eventsourcingdb.io', + subject: '/test', + type: 'io.eventsourcingdb.test', + data: [ + 'value' => 42, + ], + ); + + iterator_count($this->client->writeEvents([ + $firstEvent, + $secondEvent, + ])); + + $eventsRead = []; + $readEventsOptions = new ReadEventsOptions(false); + + foreach ($this->client->readEvents('/test', $readEventsOptions) as $event) { + $eventsRead[] = $event; + } + + $this->assertCount(2, $eventsRead); + } + + public function testReadsRecursively(): void + { + $firstEvent = new EventCandidate( + source: 'https://www.eventsourcingdb.io', + subject: '/test', + type: 'io.eventsourcingdb.test', + data: [ + 'value' => 23, + ], + ); + + $secondEvent = new EventCandidate( + source: 'https://www.eventsourcingdb.io', + subject: '/test', + type: 'io.eventsourcingdb.test', + data: [ + 'value' => 42, + ], + ); + + iterator_count($this->client->writeEvents([ + $firstEvent, + $secondEvent, + ])); + + $eventsRead = []; + $readEventsOptions = new ReadEventsOptions(true); + + foreach ($this->client->readEvents('/test', $readEventsOptions) as $event) { + $eventsRead[] = $event; + } + + $this->assertCount(2, $eventsRead); + } + + public function testReadsChronologically(): void + { + $firstEvent = new EventCandidate( + source: 'https://www.eventsourcingdb.io', + subject: '/test', + type: 'io.eventsourcingdb.test', + data: [ + 'value' => 23, + ], + ); + + $secondEvent = new EventCandidate( + source: 'https://www.eventsourcingdb.io', + subject: '/test', + type: 'io.eventsourcingdb.test', + data: [ + 'value' => 42, + ], + ); + + iterator_count($this->client->writeEvents([ + $firstEvent, + $secondEvent, + ])); + + $eventsRead = []; + $readEventsOptions = new ReadEventsOptions( + recursive: false, + order: Order::CHRONOLOGICAL, + ); + + foreach ($this->client->readEvents('/test', $readEventsOptions) as $event) { + $eventsRead[] = $event; + } + + $this->assertCount(2, $eventsRead); + $this->assertSame('0', $eventsRead[0]->id); + $this->assertSame(23, $eventsRead[0]->data['value']); + $this->assertSame('1', $eventsRead[1]->id); + $this->assertSame(42, $eventsRead[1]->data['value']); + } + + public function testReadsAntiChronologically(): void + { + $firstEvent = new EventCandidate( + source: 'https://www.eventsourcingdb.io', + subject: '/test', + type: 'io.eventsourcingdb.test', + data: [ + 'value' => 23, + ], + ); + + $secondEvent = new EventCandidate( + source: 'https://www.eventsourcingdb.io', + subject: '/test', + type: 'io.eventsourcingdb.test', + data: [ + 'value' => 42, + ], + ); + + iterator_count($this->client->writeEvents([ + $firstEvent, + $secondEvent, + ])); + + $eventsRead = []; + $readEventsOptions = new ReadEventsOptions( + recursive: false, + order: Order::ANTICHRONOLOGICAL, + ); + + foreach ($this->client->readEvents('/test', $readEventsOptions) as $event) { + $eventsRead[] = $event; + } + + $this->assertCount(2, $eventsRead); + $this->assertSame('1', $eventsRead[0]->id); + $this->assertSame(42, $eventsRead[0]->data['value']); + $this->assertSame('0', $eventsRead[1]->id); + $this->assertSame(23, $eventsRead[1]->data['value']); + } + + public function testReadsWithLowerBound(): void + { + $firstEvent = new EventCandidate( + source: 'https://www.eventsourcingdb.io', + subject: '/test', + type: 'io.eventsourcingdb.test', + data: [ + 'value' => 23, + ], + ); + + $secondEvent = new EventCandidate( + source: 'https://www.eventsourcingdb.io', + subject: '/test', + type: 'io.eventsourcingdb.test', + data: [ + 'value' => 42, + ], + ); + + iterator_count($this->client->writeEvents([ + $firstEvent, + $secondEvent, + ])); + + $eventsRead = []; + $readEventsOptions = new ReadEventsOptions( + recursive: false, + lowerBound: new Bound('1', BoundType::INCLUSIVE), + ); + + foreach ($this->client->readEvents('/test', $readEventsOptions) as $event) { + $eventsRead[] = $event; + } + + $this->assertCount(1, $eventsRead); + $this->assertSame('1', $eventsRead[0]->id); + $this->assertSame(42, $eventsRead[0]->data['value']); + } + + public function testReadsWithUpperBound(): void + { + $firstEvent = new EventCandidate( + source: 'https://www.eventsourcingdb.io', + subject: '/test', + type: 'io.eventsourcingdb.test', + data: [ + 'value' => 23, + ], + ); + + $secondEvent = new EventCandidate( + source: 'https://www.eventsourcingdb.io', + subject: '/test', + type: 'io.eventsourcingdb.test', + data: [ + 'value' => 42, + ], + ); + + iterator_count($this->client->writeEvents([ + $firstEvent, + $secondEvent, + ])); + + $eventsRead = []; + $readEventsOptions = new ReadEventsOptions( + recursive: false, + upperBound: new Bound('0', BoundType::INCLUSIVE), + ); + + foreach ($this->client->readEvents('/test', $readEventsOptions) as $event) { + $eventsRead[] = $event; + } + + $this->assertCount(1, $eventsRead); + $this->assertSame('0', $eventsRead[0]->id); + $this->assertSame(23, $eventsRead[0]->data['value']); + } + + public function testReadsFromLatestEvent(): void + { + $firstEvent = new EventCandidate( + source: 'https://www.eventsourcingdb.io', + subject: '/test', + type: 'io.eventsourcingdb.test.foo', + data: [ + 'value' => 23, + ], + ); + + $secondEvent = new EventCandidate( + source: 'https://www.eventsourcingdb.io', + subject: '/test', + type: 'io.eventsourcingdb.test.bar', + data: [ + 'value' => 42, + ], + ); + + iterator_count($this->client->writeEvents([ + $firstEvent, + $secondEvent, + ])); + + $eventsRead = []; + $readEventsOptions = new ReadEventsOptions( + recursive: false, + fromLatestEvent: new ReadFromLatestEvent( + subject: '/test', + type: 'io.eventsourcingdb.test.bar', + ifEventIsMissing: ReadIfEventIsMissing::READ_EVERYTHING, + ), + ); + + foreach ($this->client->readEvents('/test', $readEventsOptions) as $event) { + $eventsRead[] = $event; + } + + $this->assertCount(1, $eventsRead); + $this->assertSame('1', $eventsRead[0]->id); + $this->assertSame(42, $eventsRead[0]->data['value']); + } +} diff --git a/tests/Stream/NdJsonTest.php b/tests/Stream/NdJsonTest.php new file mode 100644 index 0000000..16b26f6 --- /dev/null +++ b/tests/Stream/NdJsonTest.php @@ -0,0 +1,143 @@ +getReadReturnValues($jsonLines); + + $eofReturnValues = array_fill(0, count($byteLine), false); + $eofReturnValues[] = true; + + return $eofReturnValues; + } + + public function getReadReturnValues(string $jsonLines): array + { + $eofFixChar = '-'; + $jsonLineCount = substr_count($jsonLines, "\n"); + return str_split($jsonLines . str_repeat($eofFixChar, $jsonLineCount)); + } + + public function testReadLineReadsUntilNewline(): void + { + $stream = $this->createMock(StreamInterface::class); + $stream->method('eof') + ->willReturnOnConsecutiveCalls(false, false, false, true); + + $stream->method('read') + ->willReturnOnConsecutiveCalls('f', 'o', "\n"); + + $line = NdJson::readLine($stream); + $this->assertSame("fo\n", $line); + } + + public function testReadLineReturnsEmptyStringIfNothingToRead(): void + { + $stream = $this->createMock(StreamInterface::class); + $stream->method('eof')->willReturn(false); + $stream->method('read')->willReturn(''); + + $line = NdJson::readLine($stream); + $this->assertSame('', $line); + } + + public function testReadStreamYieldsEventLines(): void + { + $json1 = json_encode([ + 'type' => 'event1', + 'payload' => [ + 'foo' => 'bar', + ], + ]) . "\n"; + $json2 = json_encode([ + 'type' => 'event2', + 'payload' => [ + 'baz' => 'qux', + ], + ]) . "\n"; + $jsonLines = $json1 . $json2; + + $stream = $this->createMock(StreamInterface::class); + $stream->method('eof') + ->willReturnOnConsecutiveCalls(...$this->getEofReturnValues($jsonLines)); + $stream->method('read') + ->willReturnOnConsecutiveCalls(...$this->getReadReturnValues($jsonLines)); + + $events = iterator_to_array(NdJson::readStream($stream)); + + $this->assertCount(2, $events); + $this->assertInstanceOf(ReadEventLine::class, $events[0]); + $this->assertSame('event1', $events[0]->type); + $this->assertSame([ + 'foo' => 'bar', + ], $events[0]->payload); + + $this->assertInstanceOf(ReadEventLine::class, $events[1]); + $this->assertSame('event2', $events[1]->type); + $this->assertSame([ + 'baz' => 'qux', + ], $events[1]->payload); + } + + public function testReadStreamSkipsEmptyLines(): void + { + $jsonLines = json_encode([ + 'type' => 'event', + 'payload' => [], + ]) . "\n"; + + $stream = $this->createMock(StreamInterface::class); + $stream->method('eof') + ->willReturnOnConsecutiveCalls(...$this->getEofReturnValues($jsonLines)); + $stream->method('read') + ->willReturnOnConsecutiveCalls(...$this->getReadReturnValues($jsonLines)); + + $events = iterator_to_array(NdJson::readStream($stream)); + + $this->assertCount(1, $events); + $this->assertSame('event', $events[0]->type); + } + + public function testReadStreamThrowsOnInvalidJson(): void + { + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage('Failed to read events.'); + + $jsonLines = "{ invalid json }\n"; + + $stream = $this->createMock(StreamInterface::class); + $stream->method('eof') + ->willReturnOnConsecutiveCalls(...$this->getEofReturnValues($jsonLines)); + $stream->method('read') + ->willReturnOnConsecutiveCalls(...$this->getReadReturnValues($jsonLines)); + + iterator_to_array(NdJson::readStream($stream)); + } + + public function testReadStreamThrowsIfDecodedJsonIsNotArray(): void + { + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage('Failed to read events, expected an array.'); + + $jsonLines = json_encode('just a string') . "\n"; + + $stream = $this->createMock(StreamInterface::class); + $stream->method('eof') + ->willReturnOnConsecutiveCalls(...$this->getEofReturnValues($jsonLines)); + $stream->method('read') + ->willReturnOnConsecutiveCalls(...$this->getReadReturnValues($jsonLines)); + + iterator_to_array(NdJson::readStream($stream)); + } +} diff --git a/tests/VerifyApiTokenTest.php b/tests/VerifyApiTokenTest.php new file mode 100644 index 0000000..e32f483 --- /dev/null +++ b/tests/VerifyApiTokenTest.php @@ -0,0 +1,28 @@ +container->getClient(); + $client->verifyApiToken(); + $this->expectNotToPerformAssertions(); + } + + public function testThrowsAnErrorIfTheTokenIsInvalid(): void + { + $baseUrl = $this->container->getBaseUrl(); + $apiToken = $this->container->getApiToken() . '-invalid'; + $client = new Client($baseUrl, $apiToken); + $this->expectException(\Throwable::class); + $client->verifyApiToken(); + } +} diff --git a/tests/WriteEventsTest.php b/tests/WriteEventsTest.php new file mode 100644 index 0000000..6b8fffe --- /dev/null +++ b/tests/WriteEventsTest.php @@ -0,0 +1,138 @@ + 42, + ], + ); + + $writtenEvents = $this->client->writeEvents([ + $eventCandidate, + ]); + + $writtenEvents = iterator_to_array($writtenEvents); + $this->assertCount(1, $writtenEvents); + $this->assertSame('0', $writtenEvents[0]->id); + } + + public function testWritesMultipleEvents(): void + { + $firstEvent = new EventCandidate( + source: 'https://www.eventsourcingdb.io', + subject: '/test', + type: 'io.eventsourcingdb.test', + data: [ + 'value' => 23, + ], + ); + + $secondEvent = new EventCandidate( + source: 'https://www.eventsourcingdb.io', + subject: '/test', + type: 'io.eventsourcingdb.test', + data: [ + 'value' => 42, + ], + ); + + $writtenEvents = $this->client->writeEvents([ + $firstEvent, + $secondEvent, + ]); + + $writtenEvents = iterator_to_array($writtenEvents); + $this->assertCount(2, $writtenEvents); + $this->assertSame('0', $writtenEvents[0]->id); + $this->assertSame(23, $writtenEvents[0]->data['value']); + $this->assertSame('1', $writtenEvents[1]->id); + $this->assertSame(42, $writtenEvents[1]->data['value']); + } + + public function testSupportsTheIsSubjectPristinePrecondition(): void + { + $firstEvent = new EventCandidate( + source: 'https://www.eventsourcingdb.io', + subject: '/test', + type: 'io.eventsourcingdb.test', + data: [ + 'value' => 23, + ], + ); + + iterator_count($this->client->writeEvents([ + $firstEvent, + ])); + + $secondEvent = new EventCandidate( + source: 'https://www.eventsourcingdb.io', + subject: '/test', + type: 'io.eventsourcingdb.test', + data: [ + 'value' => 42, + ], + ); + + $this->expectExceptionMessage("Failed to write events, got HTTP status code '409', expected '200'"); + + iterator_to_array($this->client->writeEvents( + [ + $secondEvent, + ], + [ + new IsSubjectPristine('/test'), + ], + )); + } + + public function testSupportsTheIsSubjectOnEventIdPrecondition(): void + { + $firstEvent = new EventCandidate( + source: 'https://www.eventsourcingdb.io', + subject: '/test', + type: 'io.eventsourcingdb.test', + data: [ + 'value' => 23, + ], + ); + + iterator_count($this->client->writeEvents([ + $firstEvent, + ])); + + $secondEvent = new EventCandidate( + source: 'https://www.eventsourcingdb.io', + subject: '/test', + type: 'io.eventsourcingdb.test', + data: [ + 'value' => 42, + ], + ); + + $this->expectExceptionMessage("Failed to write events, got HTTP status code '409', expected '200'"); + iterator_to_array($this->client->writeEvents( + [ + $secondEvent, + ], + [ + new IsSubjectOnEventId('/test', '1'), + ], + )); + } +}