Skip to content

Commit 92a3e6c

Browse files
committed
Setup Dbal implementation of EventStore
1 parent 15d37f9 commit 92a3e6c

12 files changed

+685
-0
lines changed

src/DoctrineDbalEventStore.php

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Gember\EventStoreDoctrineDbal;
6+
7+
use Gember\EventSourcing\AggregateRoot\AggregateRootId;
8+
use Gember\EventSourcing\EventStore\EventEnvelope;
9+
use Gember\EventSourcing\EventStore\EventStore;
10+
use Gember\EventSourcing\EventStore\EventStoreFailedException;
11+
use Gember\EventSourcing\EventStore\Metadata;
12+
use Gember\EventSourcing\EventStore\NoEventsForAggregateRootException;
13+
use Gember\EventSourcing\EventStore\OptimisticLockException;
14+
use Gember\EventSourcing\Registry\Event\EventRegistry;
15+
use Gember\EventSourcing\Util\Serialization\Serializer\Serializer;
16+
use Gember\EventSourcing\Util\Time\Clock\Clock;
17+
use Gember\EventStoreDoctrineDbal\Repository\EventStoreRepository;
18+
use Throwable;
19+
20+
final readonly class DoctrineDbalEventStore implements EventStore
21+
{
22+
public function __construct(
23+
private EventStoreRepository $repository,
24+
private Serializer $serializer,
25+
private EventRegistry $eventRegistry,
26+
private Clock $clock,
27+
) {}
28+
29+
public function load(AggregateRootId $aggregateRootId, int $playhead = 0): array
30+
{
31+
try {
32+
$rows = $this->repository->getRows($aggregateRootId, $playhead);
33+
} catch (Throwable $exception) {
34+
throw EventStoreFailedException::withException($exception);
35+
}
36+
37+
if ($rows === []) {
38+
throw NoEventsForAggregateRootException::withAggregateRootId($aggregateRootId);
39+
}
40+
41+
try {
42+
return array_map(
43+
fn($row) => new EventEnvelope(
44+
$row['eventId'],
45+
$this->serializer->deserialize($row['payload'], $this->eventRegistry->retrieve($row['eventName'])),
46+
(int) $row['playhead'],
47+
$this->clock->now($row['appliedAt']),
48+
new Metadata((array) json_decode($row['metadata'], true, flags: JSON_THROW_ON_ERROR)),
49+
),
50+
$rows,
51+
);
52+
} catch (Throwable $exception) {
53+
throw EventStoreFailedException::withException($exception);
54+
}
55+
}
56+
57+
public function append(AggregateRootId $aggregateRootId, EventEnvelope ...$envelopes): void
58+
{
59+
$this->guardOptimisticLock($aggregateRootId, ...$envelopes);
60+
61+
try {
62+
$this->repository->saveRows($aggregateRootId, ...$envelopes);
63+
} catch (Throwable $exception) {
64+
throw EventStoreFailedException::withException($exception);
65+
}
66+
}
67+
68+
/**
69+
* @throws OptimisticLockException
70+
*/
71+
private function guardOptimisticLock(AggregateRootId $aggregateRootId, EventEnvelope ...$envelopes): void
72+
{
73+
foreach ($envelopes as $envelope) {
74+
$firstPlayhead = $envelope->playhead;
75+
break;
76+
}
77+
78+
if (!isset($firstPlayhead)) {
79+
return;
80+
}
81+
82+
$lastPlayheadInStore = $this->repository->getLastPlayhead($aggregateRootId);
83+
84+
if ($lastPlayheadInStore === null) {
85+
return;
86+
}
87+
88+
if ($lastPlayheadInStore >= $firstPlayhead) {
89+
throw OptimisticLockException::create();
90+
}
91+
}
92+
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Gember\EventStoreDoctrineDbal\Repository;
6+
7+
use Doctrine\DBAL\Connection;
8+
use Doctrine\DBAL\Exception;
9+
use Gember\EventSourcing\AggregateRoot\AggregateRootId;
10+
use Gember\EventSourcing\EventNameResolver\EventNameResolver;
11+
use Gember\EventSourcing\EventNameResolver\UnresolvableEventNameException;
12+
use Gember\EventSourcing\EventStore\EventEnvelope;
13+
use Gember\EventSourcing\Util\Serialization\Serializer\SerializationFailedException;
14+
use Gember\EventSourcing\Util\Serialization\Serializer\Serializer;
15+
use Gember\EventStoreDoctrineDbal\TableSchema\TableSchema;
16+
use JsonException;
17+
use Throwable;
18+
19+
/**
20+
* @phpstan-import-type RowPayload from EventStoreRepository
21+
*/
22+
final readonly class DoctrineDbalEventStoreRepository implements EventStoreRepository
23+
{
24+
public function __construct(
25+
private Connection $connection,
26+
private TableSchema $tableSchema,
27+
private EventNameResolver $eventNameResolver,
28+
private Serializer $serializer,
29+
) {}
30+
31+
/**
32+
* @throws Exception
33+
*/
34+
public function getRows(AggregateRootId $aggregateRootId, int $playhead): array
35+
{
36+
$schema = $this->tableSchema;
37+
38+
/** @var list<RowPayload> */
39+
return $this->connection->createQueryBuilder()
40+
->select(
41+
<<<DQL
42+
{$schema->eventIdFieldName} as eventId,
43+
{$schema->payloadFieldName} as payload,
44+
{$schema->eventNameFieldName} as eventName,
45+
{$schema->playheadFieldName} as playhead,
46+
{$schema->appliedAtFieldName} as appliedAt,
47+
{$schema->metadataFieldName} as metadata
48+
DQL
49+
)
50+
->from($schema->tableName)
51+
->where(sprintf('%s >= :playhead', $schema->payloadFieldName))
52+
->andWhere(sprintf('%s = :aggregateRootId', $schema->aggregateRootIdFieldName))
53+
->setParameters([
54+
'playhead' => $playhead,
55+
'aggregateRootId' => $aggregateRootId,
56+
])
57+
->orderBy($schema->appliedAtFieldName, 'desc')
58+
->executeQuery()
59+
->fetchAllAssociative();
60+
}
61+
62+
/**
63+
* @throws Exception
64+
* @throws Throwable
65+
* @throws UnresolvableEventNameException
66+
* @throws SerializationFailedException
67+
* @throws JsonException
68+
*/
69+
public function saveRows(AggregateRootId $aggregateRootId, EventEnvelope ...$envelopes): void
70+
{
71+
$schema = $this->tableSchema;
72+
73+
try {
74+
$this->connection->beginTransaction();
75+
76+
foreach ($envelopes as $envelope) {
77+
$this->connection->createQueryBuilder()
78+
->insert($schema->tableName)
79+
->setValue($schema->eventIdFieldName, ':id')
80+
->setValue($schema->aggregateRootIdFieldName, ':aggregateRootId')
81+
->setValue($schema->eventNameFieldName, ':eventName')
82+
->setValue($schema->payloadFieldName, ':payload')
83+
->setValue($schema->playheadFieldName, ':playhead')
84+
->setValue($schema->metadataFieldName, ':metadata')
85+
->setValue($schema->appliedAtFieldName, ':appliedAt')
86+
->setParameters([
87+
'id' => $envelope->eventId,
88+
'aggregateRootId' => (string) $aggregateRootId,
89+
'eventName' => $this->eventNameResolver->resolve($envelope->event::class),
90+
'payload' => $this->serializer->serialize($envelope->event),
91+
'playhead' => $envelope->playhead,
92+
'metadata' => json_encode($envelope->metadata->metadata, JSON_THROW_ON_ERROR),
93+
'appliedAt' => $envelope->appliedAt->format($schema->appliedAtFieldFormat),
94+
])
95+
->executeStatement();
96+
}
97+
98+
$this->connection->commit();
99+
} catch (Throwable $exception) {
100+
$this->connection->rollBack();
101+
102+
throw $exception;
103+
}
104+
}
105+
106+
/**
107+
* @throws Exception
108+
*/
109+
public function getLastPlayhead(AggregateRootId $aggregateRootId): ?int
110+
{
111+
$schema = $this->tableSchema;
112+
113+
$row = $this->connection->createQueryBuilder()
114+
->select($schema->playheadFieldName)
115+
->andWhere(sprintf('%s = :aggregateRootId', $schema->aggregateRootIdFieldName))
116+
->setParameters([
117+
'aggregateRootId' => $aggregateRootId,
118+
])
119+
->orderBy($schema->appliedAtFieldName, 'desc')
120+
->setMaxResults(1)
121+
->executeQuery()
122+
->fetchAssociative();
123+
124+
if ($row === false) {
125+
return null;
126+
}
127+
128+
return (int) $row[$schema->playheadFieldName]; // @phpstan-ignore cast.int
129+
}
130+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Gember\EventStoreDoctrineDbal\Repository;
6+
7+
use Gember\EventSourcing\AggregateRoot\AggregateRootId;
8+
use Gember\EventSourcing\EventStore\EventEnvelope;
9+
10+
/**
11+
* @phpstan-type RowPayload array{
12+
* eventId: string,
13+
* payload: string,
14+
* eventName: string,
15+
* playhead: string,
16+
* appliedAt: string,
17+
* metadata: string
18+
* }
19+
*/
20+
interface EventStoreRepository
21+
{
22+
/**
23+
* @return list<RowPayload>
24+
*/
25+
public function getRows(AggregateRootId $aggregateRootId, int $playhead): array;
26+
27+
public function saveRows(AggregateRootId $aggregateRootId, EventEnvelope ...$envelopes): void;
28+
29+
public function getLastPlayhead(AggregateRootId $aggregateRootId): ?int;
30+
}

src/TableSchema/TableSchema.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Gember\EventStoreDoctrineDbal\TableSchema;
6+
7+
final readonly class TableSchema
8+
{
9+
public function __construct(
10+
public string $tableName,
11+
public string $eventIdFieldName,
12+
public string $aggregateRootIdFieldName,
13+
public string $eventNameFieldName,
14+
public string $payloadFieldName,
15+
public string $playheadFieldName,
16+
public string $metadataFieldName,
17+
public string $appliedAtFieldName,
18+
public string $appliedAtFieldFormat,
19+
) {}
20+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Gember\EventStoreDoctrineDbal\TableSchema;
6+
7+
final readonly class TableSchemaFactory
8+
{
9+
public static function createDefault(
10+
string $tableName = 'event_store',
11+
string $eventIdFieldName = 'id',
12+
string $aggregateRootIdFieldName = 'aggregate_root_id',
13+
string $eventNameFieldName = 'event_name',
14+
string $payloadFieldName = 'payload',
15+
string $playheadFieldName = 'playhead',
16+
string $metadataFieldName = 'metadata',
17+
string $appliedAtFieldName = 'applied_at',
18+
string $appliedAtFieldFormat = 'Y-m-d H:i:s.u',
19+
): TableSchema {
20+
return new TableSchema(
21+
$tableName,
22+
$eventIdFieldName,
23+
$aggregateRootIdFieldName,
24+
$eventNameFieldName,
25+
$payloadFieldName,
26+
$playheadFieldName,
27+
$metadataFieldName,
28+
$appliedAtFieldName,
29+
$appliedAtFieldFormat,
30+
);
31+
}
32+
}

0 commit comments

Comments
 (0)