Skip to content

Commit df95e21

Browse files
authored
Feat: from aggregate stream (#593)
* feat: aggregate stream * fixes * from aggregate stream
1 parent 71b1162 commit df95e21

File tree

4 files changed

+338
-0
lines changed

4 files changed

+338
-0
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ecotone\EventSourcing\Attribute;
6+
7+
use Attribute;
8+
use Ecotone\EventSourcing\EventStore;
9+
10+
/*
11+
* Configures a projection to read from an aggregate's event stream.
12+
* Automatically reads Stream and AggregateType attributes from the aggregate class.
13+
*
14+
* This simplifies projection configuration by avoiding duplication of stream
15+
* and aggregate type configuration that is already defined on the aggregate.
16+
*
17+
* Example usage:
18+
* ```php
19+
* #[ProjectionV2('order_list')]
20+
* #[AggregateStream(Order::class)]
21+
* class OrderListProjection { ... }
22+
* ```
23+
*
24+
* licence Enterprise
25+
*/
26+
#[Attribute(Attribute::TARGET_CLASS)]
27+
class FromAggregateStream
28+
{
29+
/**
30+
* @param class-string $aggregateClass The aggregate class to read Stream and AggregateType from.
31+
* Must be an EventSourcingAggregate.
32+
* @param string $eventStoreReferenceName Reference name for the event store
33+
*/
34+
public function __construct(
35+
public readonly string $aggregateClass,
36+
public readonly string $eventStoreReferenceName = EventStore::class
37+
) {
38+
}
39+
}
40+

packages/PdoEventSourcing/src/Config/ProophProjectingModule.php

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88
namespace Ecotone\EventSourcing\Config;
99

1010
use Ecotone\AnnotationFinder\AnnotationFinder;
11+
use Ecotone\EventSourcing\Attribute\FromAggregateStream;
12+
use Ecotone\EventSourcing\Attribute\AggregateType;
1113
use Ecotone\EventSourcing\Attribute\FromStream;
14+
use Ecotone\EventSourcing\Attribute\Stream;
1215
use Ecotone\EventSourcing\Projecting\AggregateIdPartitionProviderBuilder;
1316
use Ecotone\EventSourcing\Projecting\PartitionState\DbalProjectionStateStorageBuilder;
1417
use Ecotone\EventSourcing\Projecting\StreamSource\EventStoreAggregateStreamSourceBuilder;
@@ -21,6 +24,7 @@
2124
use Ecotone\Messaging\Config\ModuleReferenceSearchService;
2225
use Ecotone\Messaging\Config\ServiceConfiguration;
2326
use Ecotone\Messaging\Handler\InterfaceToCallRegistry;
27+
use Ecotone\Modelling\Attribute\EventSourcingAggregate;
2428
use Ecotone\Projecting\Attribute\Partitioned;
2529
use Ecotone\Projecting\Attribute\ProjectionV2;
2630
use Ecotone\Projecting\EventStoreAdapter\EventStoreChannelAdapter;
@@ -69,6 +73,47 @@ public static function create(AnnotationFinder $annotationRegistrationService, I
6973
}
7074
}
7175

76+
// Handle AggregateStream attribute
77+
foreach ($annotationRegistrationService->findAnnotatedClasses(FromAggregateStream::class) as $classname) {
78+
$projectionAttribute = $annotationRegistrationService->findAttributeForClass($classname, ProjectionV2::class);
79+
$aggregateStreamAttribute = $annotationRegistrationService->findAttributeForClass($classname, FromAggregateStream::class);
80+
$customScopeStrategyAttribute = $annotationRegistrationService->findAttributeForClass($classname, Partitioned::class);
81+
82+
if (! $projectionAttribute || ! $aggregateStreamAttribute) {
83+
continue;
84+
}
85+
86+
$aggregateClass = $aggregateStreamAttribute->aggregateClass;
87+
$projectionName = $projectionAttribute->name;
88+
89+
$eventSourcingAggregateAttribute = $annotationRegistrationService->findAttributeForClass($aggregateClass, EventSourcingAggregate::class);
90+
if ($eventSourcingAggregateAttribute === null) {
91+
throw ConfigurationException::create("Class {$aggregateClass} referenced in #[AggregateStream] for projection {$projectionName} must be an EventSourcingAggregate. Add #[EventSourcingAggregate] attribute to the class.");
92+
}
93+
94+
$streamAttribute = $annotationRegistrationService->findAttributeForClass($aggregateClass, Stream::class);
95+
$streamName = $streamAttribute?->getName() ?? $aggregateClass;
96+
97+
$aggregateTypeAttribute = $annotationRegistrationService->findAttributeForClass($aggregateClass, AggregateType::class);
98+
$aggregateType = $aggregateTypeAttribute?->getName() ?? $aggregateClass;
99+
100+
$handledProjections[] = $projectionName;
101+
102+
if ($customScopeStrategyAttribute !== null) {
103+
$extensions[] = new EventStoreAggregateStreamSourceBuilder(
104+
$projectionName,
105+
$aggregateType,
106+
$streamName,
107+
);
108+
$extensions[] = new AggregateIdPartitionProviderBuilder($projectionName, $aggregateType, $streamName);
109+
} else {
110+
$extensions[] = new EventStoreGlobalStreamSourceBuilder(
111+
$streamName,
112+
[$projectionName],
113+
);
114+
}
115+
}
116+
72117
if (! empty($handledProjections)) {
73118
$extensions[] = new DbalProjectionStateStorageBuilder($handledProjections);
74119
}

packages/PdoEventSourcing/tests/Projecting/Global/SynchronousEventDrivenProjectionTest.php

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@
55
namespace Test\Ecotone\EventSourcing\Projecting\Global;
66

77
use Doctrine\DBAL\Connection;
8+
use Ecotone\EventSourcing\Attribute\FromAggregateStream;
89
use Ecotone\EventSourcing\Attribute\FromStream;
910
use Ecotone\EventSourcing\Attribute\ProjectionDelete;
1011
use Ecotone\EventSourcing\Attribute\ProjectionInitialization;
1112
use Ecotone\EventSourcing\Attribute\ProjectionReset;
1213
use Ecotone\EventSourcing\EventSourcingConfiguration;
1314
use Ecotone\Lite\EcotoneLite;
1415
use Ecotone\Lite\Test\FlowTestSupport;
16+
use Ecotone\Messaging\Config\ConfigurationException;
1517
use Ecotone\Messaging\Config\ModulePackageList;
1618
use Ecotone\Messaging\Config\ServiceConfiguration;
1719
use Ecotone\Modelling\Attribute\EventHandler;
@@ -32,6 +34,10 @@
3234
use Test\Ecotone\EventSourcing\Fixture\Ticket\Event\TicketWasRegistered;
3335
use Test\Ecotone\EventSourcing\Fixture\Ticket\Ticket;
3436
use Test\Ecotone\EventSourcing\Fixture\Ticket\TicketEventConverter;
37+
use Test\Ecotone\EventSourcing\Projecting\App\Ordering\Command\PlaceOrder;
38+
use Test\Ecotone\EventSourcing\Projecting\App\Ordering\Event\OrderWasPlaced;
39+
use Test\Ecotone\EventSourcing\Projecting\App\Ordering\EventsConverter;
40+
use Test\Ecotone\EventSourcing\Projecting\App\Ordering\Order;
3541
use Test\Ecotone\EventSourcing\Projecting\ProjectingTestCase;
3642

3743
/**
@@ -188,6 +194,132 @@ classesToResolve: [$projection::class, Ticket::class, TicketEventConverter::clas
188194
self::assertEquals([['ticket_id' => '124', 'ticket_type' => 'info']], $ecotone->sendQueryWithRouting('getInProgressTickets'));
189195
}
190196

197+
public function test_building_global_projection_with_aggregate_stream_attribute(): void
198+
{
199+
$projection = $this->createOrderListProjectionWithAggregateStream();
200+
201+
$ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore(
202+
classesToResolve: [$projection::class, Order::class, EventsConverter::class],
203+
containerOrAvailableServices: [$projection, new EventsConverter(), self::getConnectionFactory()],
204+
configuration: ServiceConfiguration::createWithDefaults()
205+
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([
206+
ModulePackageList::DBAL_PACKAGE,
207+
ModulePackageList::EVENT_SOURCING_PACKAGE,
208+
ModulePackageList::ASYNCHRONOUS_PACKAGE,
209+
])),
210+
runForProductionEventStore: true,
211+
licenceKey: LicenceTesting::VALID_LICENCE,
212+
);
213+
214+
$ecotone->deleteProjection($projection::NAME)
215+
->initializeProjection($projection::NAME);
216+
self::assertEquals([], $ecotone->sendQueryWithRouting('getOrders'));
217+
218+
$ecotone->sendCommand(new PlaceOrder('order-1', 'laptop', 2));
219+
self::assertEquals([
220+
['order_id' => 'order-1', 'product' => 'laptop', 'quantity' => '2'],
221+
], $ecotone->sendQueryWithRouting('getOrders'));
222+
223+
$ecotone->sendCommand(new PlaceOrder('order-2', 'phone', 1));
224+
self::assertEquals([
225+
['order_id' => 'order-1', 'product' => 'laptop', 'quantity' => '2'],
226+
['order_id' => 'order-2', 'product' => 'phone', 'quantity' => '1'],
227+
], $ecotone->sendQueryWithRouting('getOrders'));
228+
229+
// Test reset and catchup
230+
$ecotone->resetProjection($projection::NAME)
231+
->triggerProjection($projection::NAME);
232+
233+
self::assertEquals([
234+
['order_id' => 'order-1', 'product' => 'laptop', 'quantity' => '2'],
235+
['order_id' => 'order-2', 'product' => 'phone', 'quantity' => '1'],
236+
], $ecotone->sendQueryWithRouting('getOrders'));
237+
}
238+
239+
public function test_aggregate_stream_throws_exception_for_non_event_sourcing_aggregate(): void
240+
{
241+
// Create a projection that references a non-EventSourcingAggregate class
242+
$projection = new #[ProjectionV2('invalid_projection'), FromAggregateStream(\stdClass::class)] class {
243+
#[EventHandler('*')]
244+
public function handle(array $event): void
245+
{
246+
}
247+
};
248+
249+
$this->expectException(ConfigurationException::class);
250+
$this->expectExceptionMessage('must be an EventSourcingAggregate');
251+
252+
EcotoneLite::bootstrapFlowTestingWithEventStore(
253+
classesToResolve: [$projection::class],
254+
containerOrAvailableServices: [$projection, self::getConnectionFactory()],
255+
configuration: ServiceConfiguration::createWithDefaults()
256+
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([
257+
ModulePackageList::DBAL_PACKAGE,
258+
ModulePackageList::EVENT_SOURCING_PACKAGE,
259+
ModulePackageList::ASYNCHRONOUS_PACKAGE,
260+
])),
261+
runForProductionEventStore: true,
262+
licenceKey: LicenceTesting::VALID_LICENCE,
263+
);
264+
}
265+
266+
private function createOrderListProjectionWithAggregateStream(): object
267+
{
268+
$connection = $this->getConnection();
269+
270+
return new #[ProjectionV2(self::NAME), FromAggregateStream(Order::class)] class ($connection) {
271+
public const NAME = 'order_list_aggregate_stream';
272+
273+
public function __construct(private Connection $connection)
274+
{
275+
}
276+
277+
#[QueryHandler('getOrders')]
278+
public function getOrders(): array
279+
{
280+
return $this->connection->executeQuery(<<<SQL
281+
SELECT * FROM order_list_aggregate_stream ORDER BY order_id ASC
282+
SQL)->fetchAllAssociative();
283+
}
284+
285+
#[EventHandler]
286+
public function addOrder(OrderWasPlaced $event): void
287+
{
288+
$this->connection->executeStatement(<<<SQL
289+
INSERT INTO order_list_aggregate_stream VALUES (?,?,?)
290+
SQL, [$event->orderId, $event->product, $event->quantity]);
291+
}
292+
293+
#[ProjectionInitialization]
294+
public function initialization(): void
295+
{
296+
$this->connection->executeStatement(<<<SQL
297+
CREATE TABLE IF NOT EXISTS order_list_aggregate_stream (
298+
order_id VARCHAR(36) PRIMARY KEY,
299+
product VARCHAR(255),
300+
quantity INT
301+
)
302+
SQL);
303+
}
304+
305+
#[ProjectionDelete]
306+
public function delete(): void
307+
{
308+
$this->connection->executeStatement(<<<SQL
309+
DROP TABLE IF EXISTS order_list_aggregate_stream
310+
SQL);
311+
}
312+
313+
#[ProjectionReset]
314+
public function reset(): void
315+
{
316+
$this->connection->executeStatement(<<<SQL
317+
DELETE FROM order_list_aggregate_stream
318+
SQL);
319+
}
320+
};
321+
}
322+
191323
private function createNotificationEventHandler(): object
192324
{
193325
return new class () {

0 commit comments

Comments
 (0)