Skip to content

Commit f32c375

Browse files
committed
Refactor to RdbmsEventStoreDoctrineDbal
Generic Rdbms logic is moved to the main event-sourcing library, leaving the Doctrine Dbal implementation details in this repository.
1 parent 5f36153 commit f32c375

17 files changed

+265
-600
lines changed

composer.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"name": "gember/event-store-doctrine-dbal",
2+
"name": "gember/rdbms-event-store-doctrine-dbal",
33
"description": "Gember Event store (gember/event-sourcing) implementation based on doctrine/dbal",
44
"license": "MIT",
55
"type": "library",
@@ -36,12 +36,12 @@
3636
},
3737
"autoload": {
3838
"psr-4": {
39-
"Gember\\EventStoreDoctrineDbal\\": "src/"
39+
"Gember\\RdbmsEventStoreDoctrineDbal\\": "src/"
4040
}
4141
},
4242
"autoload-dev": {
4343
"psr-4": {
44-
"Gember\\EventStoreDoctrineDbal\\Test\\": "tests/"
44+
"Gember\\RdbmsEventStoreDoctrineDbal\\Test\\": "tests/"
4545
}
4646
},
4747
"config": {

src/DoctrineDbalEventStore.php

Lines changed: 0 additions & 92 deletions
This file was deleted.
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Gember\RdbmsEventStoreDoctrineDbal;
6+
7+
use DateTimeImmutable;
8+
use Gember\EventSourcing\EventStore\Rdbms\RdbmsEvent;
9+
use JsonException;
10+
use DateMalformedStringException;
11+
12+
/**
13+
* @phpstan-import-type Row from DoctrineDbalRdbmsEventStoreRepository
14+
*/
15+
final readonly class DoctrineDbalRdbmsEventFactory
16+
{
17+
/**
18+
* @param Row $row
19+
*
20+
* @throws JsonException
21+
* @throws DateMalformedStringException
22+
*/
23+
public function createFromRow(array $row): RdbmsEvent
24+
{
25+
return new RdbmsEvent(
26+
$row['eventId'],
27+
[],
28+
$row['eventName'],
29+
$row['payload'],
30+
(array) json_decode($row['metadata'], true, flags: JSON_THROW_ON_ERROR),
31+
new DateTimeImmutable($row['appliedAt']),
32+
);
33+
}
34+
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Gember\RdbmsEventStoreDoctrineDbal;
6+
7+
use Doctrine\DBAL\ArrayParameterType;
8+
use Doctrine\DBAL\Connection;
9+
use Gember\EventSourcing\EventStore\Rdbms\RdbmsEvent;
10+
use Gember\EventSourcing\EventStore\Rdbms\RdbmsEventStoreRepository;
11+
use Gember\RdbmsEventStoreDoctrineDbal\TableSchema\EventStoreRelationTableSchema;
12+
use Gember\RdbmsEventStoreDoctrineDbal\TableSchema\EventStoreTableSchema;
13+
use Override;
14+
use Throwable;
15+
16+
/**
17+
* @phpstan-type Row array{
18+
* eventId: string,
19+
* eventName: string,
20+
* payload: string,
21+
* metadata: string,
22+
* appliedAt: string,
23+
* domainId: string
24+
* }
25+
*/
26+
final readonly class DoctrineDbalRdbmsEventStoreRepository implements RdbmsEventStoreRepository
27+
{
28+
public function __construct(
29+
private Connection $connection,
30+
private EventStoreTableSchema $eventStoreTableSchema,
31+
private EventStoreRelationTableSchema $eventStoreRelationTableSchema,
32+
private DoctrineDbalRdbmsEventFactory $rdbmsEventFactory,
33+
) {}
34+
35+
#[Override]
36+
public function getEvents(array $domainIds, array $eventNames): array
37+
{
38+
$eventStoreSchema = $this->eventStoreTableSchema;
39+
$eventStoreRelationSchema = $this->eventStoreRelationTableSchema;
40+
41+
$rows = $this->connection->createQueryBuilder()
42+
->select(
43+
<<<DQL
44+
es.{$eventStoreSchema->eventIdFieldName} as eventId,
45+
es.{$eventStoreSchema->payloadFieldName} as payload,
46+
es.{$eventStoreSchema->eventNameFieldName} as eventName,
47+
es.{$eventStoreSchema->appliedAtFieldName} as appliedAt,
48+
es.{$eventStoreSchema->metadataFieldName} as metadata,
49+
esr.{$eventStoreRelationSchema->domainIdFieldName} as domainId
50+
DQL
51+
)
52+
->from($eventStoreSchema->tableName, 'es')
53+
->join('es', $eventStoreRelationSchema->tableName, 'esr', sprintf(
54+
'es.%s = esr.%s',
55+
$eventStoreSchema->eventIdFieldName,
56+
$eventStoreRelationSchema->eventIdFieldName,
57+
))
58+
->andWhere(sprintf('esr.%s IN(:domainIds)', $eventStoreRelationSchema->domainIdFieldName))
59+
->setParameter('domainIds', $domainIds, ArrayParameterType::STRING)
60+
->orderBy(sprintf('es.%s', $eventStoreSchema->appliedAtFieldName), 'asc')
61+
->executeQuery()
62+
->fetchAllAssociative();
63+
64+
/** @var array<string, RdbmsEvent> $events */
65+
$events = [];
66+
67+
/** @var Row $row */
68+
foreach ($rows as $row) {
69+
$eventId = $row['eventId'];
70+
71+
$event = $events[$eventId] ?? $this->rdbmsEventFactory->createFromRow($row);
72+
73+
$events[$eventId] = $event->withDomainId($row['domainId']);
74+
}
75+
76+
return array_values($events);
77+
}
78+
79+
#[Override]
80+
public function getLastEventIdPersisted(array $domainIds, array $eventNames): ?string
81+
{
82+
$eventStoreSchema = $this->eventStoreTableSchema;
83+
$eventStoreRelationSchema = $this->eventStoreRelationTableSchema;
84+
85+
/** @var array{id?: string} $row */
86+
$row = $this->connection->createQueryBuilder()
87+
->select(sprintf('es.%s as id', $eventStoreSchema->eventIdFieldName))
88+
->from($eventStoreSchema->tableName, 'es')
89+
->join('es', $eventStoreRelationSchema->tableName, 'esr', sprintf(
90+
'es.%s = esr.%s',
91+
$eventStoreSchema->eventIdFieldName,
92+
$eventStoreRelationSchema->eventIdFieldName,
93+
))
94+
->andWhere(sprintf('esr.%s IN(:domainIds)', $eventStoreRelationSchema->domainIdFieldName))
95+
->setParameter('domainIds', $domainIds, ArrayParameterType::STRING)
96+
->orderBy(sprintf('es.%s', $eventStoreSchema->appliedAtFieldName), 'desc')
97+
->executeQuery()
98+
->fetchFirstColumn();
99+
100+
return $row['id'] ?? null;
101+
}
102+
103+
#[Override]
104+
public function saveEvents(array $events): void
105+
{
106+
$eventStoreSchema = $this->eventStoreTableSchema;
107+
$eventStoreRelationSchema = $this->eventStoreRelationTableSchema;
108+
109+
$this->connection->beginTransaction();
110+
111+
try {
112+
foreach ($events as $event) {
113+
$this->connection->createQueryBuilder()
114+
->insert($eventStoreSchema->tableName)
115+
->setValue($eventStoreSchema->eventIdFieldName, ':id')
116+
->setValue($eventStoreSchema->eventNameFieldName, ':eventName')
117+
->setValue($eventStoreSchema->payloadFieldName, ':payload')
118+
->setValue($eventStoreSchema->metadataFieldName, ':metadata')
119+
->setValue($eventStoreSchema->appliedAtFieldName, ':appliedAt')
120+
->setParameters([
121+
'id' => $event->eventId,
122+
'eventName' => $event->eventName,
123+
'payload' => $event->payload,
124+
'metadata' => json_encode($event->metadata, JSON_THROW_ON_ERROR),
125+
'appliedAt' => $event->appliedAt->format($eventStoreSchema->appliedAtFieldFormat),
126+
])
127+
->executeStatement();
128+
129+
foreach ($event->domainIds as $domainId) {
130+
$this->connection->createQueryBuilder()
131+
->insert($eventStoreRelationSchema->tableName)
132+
->setValue($eventStoreRelationSchema->eventIdFieldName, ':eventId')
133+
->setValue($eventStoreRelationSchema->domainIdFieldName, ':domainId')
134+
->setParameters([
135+
'eventId' => $event->eventId,
136+
'domainId' => $domainId,
137+
])
138+
->executeStatement();
139+
}
140+
}
141+
142+
$this->connection->commit();
143+
} catch (Throwable $exception) {
144+
$this->connection->rollBack();
145+
146+
throw $exception;
147+
}
148+
}
149+
}

0 commit comments

Comments
 (0)