Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions packages/PdoEventSourcing/src/Attribute/FromAggregateStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

declare(strict_types=1);

namespace Ecotone\EventSourcing\Attribute;

use Attribute;
use Ecotone\EventSourcing\EventStore;

/*
* Configures a projection to read from an aggregate's event stream.
* Automatically reads Stream and AggregateType attributes from the aggregate class.
*
* This simplifies projection configuration by avoiding duplication of stream
* and aggregate type configuration that is already defined on the aggregate.
*
* Example usage:
* ```php
* #[ProjectionV2('order_list')]
* #[AggregateStream(Order::class)]
* class OrderListProjection { ... }
* ```
*
* licence Enterprise
*/
#[Attribute(Attribute::TARGET_CLASS)]
class FromAggregateStream
{
/**
* @param class-string $aggregateClass The aggregate class to read Stream and AggregateType from.
* Must be an EventSourcingAggregate.
* @param string $eventStoreReferenceName Reference name for the event store
*/
public function __construct(
public readonly string $aggregateClass,
public readonly string $eventStoreReferenceName = EventStore::class
) {
}
}

45 changes: 45 additions & 0 deletions packages/PdoEventSourcing/src/Config/ProophProjectingModule.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
namespace Ecotone\EventSourcing\Config;

use Ecotone\AnnotationFinder\AnnotationFinder;
use Ecotone\EventSourcing\Attribute\FromAggregateStream;
use Ecotone\EventSourcing\Attribute\AggregateType;
use Ecotone\EventSourcing\Attribute\FromStream;
use Ecotone\EventSourcing\Attribute\Stream;
use Ecotone\EventSourcing\Projecting\AggregateIdPartitionProviderBuilder;
use Ecotone\EventSourcing\Projecting\PartitionState\DbalProjectionStateStorageBuilder;
use Ecotone\EventSourcing\Projecting\StreamSource\EventStoreAggregateStreamSourceBuilder;
Expand All @@ -21,6 +24,7 @@
use Ecotone\Messaging\Config\ModuleReferenceSearchService;
use Ecotone\Messaging\Config\ServiceConfiguration;
use Ecotone\Messaging\Handler\InterfaceToCallRegistry;
use Ecotone\Modelling\Attribute\EventSourcingAggregate;
use Ecotone\Projecting\Attribute\Partitioned;
use Ecotone\Projecting\Attribute\ProjectionV2;
use Ecotone\Projecting\EventStoreAdapter\EventStoreChannelAdapter;
Expand Down Expand Up @@ -69,6 +73,47 @@ public static function create(AnnotationFinder $annotationRegistrationService, I
}
}

// Handle AggregateStream attribute
foreach ($annotationRegistrationService->findAnnotatedClasses(FromAggregateStream::class) as $classname) {
$projectionAttribute = $annotationRegistrationService->findAttributeForClass($classname, ProjectionV2::class);
$aggregateStreamAttribute = $annotationRegistrationService->findAttributeForClass($classname, FromAggregateStream::class);
$customScopeStrategyAttribute = $annotationRegistrationService->findAttributeForClass($classname, Partitioned::class);

if (! $projectionAttribute || ! $aggregateStreamAttribute) {
continue;
}

$aggregateClass = $aggregateStreamAttribute->aggregateClass;
$projectionName = $projectionAttribute->name;

$eventSourcingAggregateAttribute = $annotationRegistrationService->findAttributeForClass($aggregateClass, EventSourcingAggregate::class);
if ($eventSourcingAggregateAttribute === null) {
throw ConfigurationException::create("Class {$aggregateClass} referenced in #[AggregateStream] for projection {$projectionName} must be an EventSourcingAggregate. Add #[EventSourcingAggregate] attribute to the class.");
}

$streamAttribute = $annotationRegistrationService->findAttributeForClass($aggregateClass, Stream::class);
$streamName = $streamAttribute?->getName() ?? $aggregateClass;

$aggregateTypeAttribute = $annotationRegistrationService->findAttributeForClass($aggregateClass, AggregateType::class);
$aggregateType = $aggregateTypeAttribute?->getName() ?? $aggregateClass;

$handledProjections[] = $projectionName;

if ($customScopeStrategyAttribute !== null) {
$extensions[] = new EventStoreAggregateStreamSourceBuilder(
$projectionName,
$aggregateType,
$streamName,
);
$extensions[] = new AggregateIdPartitionProviderBuilder($projectionName, $aggregateType, $streamName);
} else {
$extensions[] = new EventStoreGlobalStreamSourceBuilder(
$streamName,
[$projectionName],
);
}
}

if (! empty($handledProjections)) {
$extensions[] = new DbalProjectionStateStorageBuilder($handledProjections);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
namespace Test\Ecotone\EventSourcing\Projecting\Global;

use Doctrine\DBAL\Connection;
use Ecotone\EventSourcing\Attribute\FromAggregateStream;
use Ecotone\EventSourcing\Attribute\FromStream;
use Ecotone\EventSourcing\Attribute\ProjectionDelete;
use Ecotone\EventSourcing\Attribute\ProjectionInitialization;
use Ecotone\EventSourcing\Attribute\ProjectionReset;
use Ecotone\EventSourcing\EventSourcingConfiguration;
use Ecotone\Lite\EcotoneLite;
use Ecotone\Lite\Test\FlowTestSupport;
use Ecotone\Messaging\Config\ConfigurationException;
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ServiceConfiguration;
use Ecotone\Modelling\Attribute\EventHandler;
Expand All @@ -32,6 +34,10 @@
use Test\Ecotone\EventSourcing\Fixture\Ticket\Event\TicketWasRegistered;
use Test\Ecotone\EventSourcing\Fixture\Ticket\Ticket;
use Test\Ecotone\EventSourcing\Fixture\Ticket\TicketEventConverter;
use Test\Ecotone\EventSourcing\Projecting\App\Ordering\Command\PlaceOrder;
use Test\Ecotone\EventSourcing\Projecting\App\Ordering\Event\OrderWasPlaced;
use Test\Ecotone\EventSourcing\Projecting\App\Ordering\EventsConverter;
use Test\Ecotone\EventSourcing\Projecting\App\Ordering\Order;
use Test\Ecotone\EventSourcing\Projecting\ProjectingTestCase;

/**
Expand Down Expand Up @@ -188,6 +194,132 @@ classesToResolve: [$projection::class, Ticket::class, TicketEventConverter::clas
self::assertEquals([['ticket_id' => '124', 'ticket_type' => 'info']], $ecotone->sendQueryWithRouting('getInProgressTickets'));
}

public function test_building_global_projection_with_aggregate_stream_attribute(): void
{
$projection = $this->createOrderListProjectionWithAggregateStream();

$ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore(
classesToResolve: [$projection::class, Order::class, EventsConverter::class],
containerOrAvailableServices: [$projection, new EventsConverter(), self::getConnectionFactory()],
configuration: ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([
ModulePackageList::DBAL_PACKAGE,
ModulePackageList::EVENT_SOURCING_PACKAGE,
ModulePackageList::ASYNCHRONOUS_PACKAGE,
])),
runForProductionEventStore: true,
licenceKey: LicenceTesting::VALID_LICENCE,
);

$ecotone->deleteProjection($projection::NAME)
->initializeProjection($projection::NAME);
self::assertEquals([], $ecotone->sendQueryWithRouting('getOrders'));

$ecotone->sendCommand(new PlaceOrder('order-1', 'laptop', 2));
self::assertEquals([
['order_id' => 'order-1', 'product' => 'laptop', 'quantity' => '2'],
], $ecotone->sendQueryWithRouting('getOrders'));

$ecotone->sendCommand(new PlaceOrder('order-2', 'phone', 1));
self::assertEquals([
['order_id' => 'order-1', 'product' => 'laptop', 'quantity' => '2'],
['order_id' => 'order-2', 'product' => 'phone', 'quantity' => '1'],
], $ecotone->sendQueryWithRouting('getOrders'));

// Test reset and catchup
$ecotone->resetProjection($projection::NAME)
->triggerProjection($projection::NAME);

self::assertEquals([
['order_id' => 'order-1', 'product' => 'laptop', 'quantity' => '2'],
['order_id' => 'order-2', 'product' => 'phone', 'quantity' => '1'],
], $ecotone->sendQueryWithRouting('getOrders'));
}

public function test_aggregate_stream_throws_exception_for_non_event_sourcing_aggregate(): void
{
// Create a projection that references a non-EventSourcingAggregate class
$projection = new #[ProjectionV2('invalid_projection'), FromAggregateStream(\stdClass::class)] class {
#[EventHandler('*')]
public function handle(array $event): void
{
}
};

$this->expectException(ConfigurationException::class);
$this->expectExceptionMessage('must be an EventSourcingAggregate');

EcotoneLite::bootstrapFlowTestingWithEventStore(
classesToResolve: [$projection::class],
containerOrAvailableServices: [$projection, self::getConnectionFactory()],
configuration: ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([
ModulePackageList::DBAL_PACKAGE,
ModulePackageList::EVENT_SOURCING_PACKAGE,
ModulePackageList::ASYNCHRONOUS_PACKAGE,
])),
runForProductionEventStore: true,
licenceKey: LicenceTesting::VALID_LICENCE,
);
}

private function createOrderListProjectionWithAggregateStream(): object
{
$connection = $this->getConnection();

return new #[ProjectionV2(self::NAME), FromAggregateStream(Order::class)] class ($connection) {
public const NAME = 'order_list_aggregate_stream';

public function __construct(private Connection $connection)
{
}

#[QueryHandler('getOrders')]
public function getOrders(): array
{
return $this->connection->executeQuery(<<<SQL
SELECT * FROM order_list_aggregate_stream ORDER BY order_id ASC
SQL)->fetchAllAssociative();
}

#[EventHandler]
public function addOrder(OrderWasPlaced $event): void
{
$this->connection->executeStatement(<<<SQL
INSERT INTO order_list_aggregate_stream VALUES (?,?,?)
SQL, [$event->orderId, $event->product, $event->quantity]);
}

#[ProjectionInitialization]
public function initialization(): void
{
$this->connection->executeStatement(<<<SQL
CREATE TABLE IF NOT EXISTS order_list_aggregate_stream (
order_id VARCHAR(36) PRIMARY KEY,
product VARCHAR(255),
quantity INT
)
SQL);
}

#[ProjectionDelete]
public function delete(): void
{
$this->connection->executeStatement(<<<SQL
DROP TABLE IF EXISTS order_list_aggregate_stream
SQL);
}

#[ProjectionReset]
public function reset(): void
{
$this->connection->executeStatement(<<<SQL
DELETE FROM order_list_aggregate_stream
SQL);
}
};
}

private function createNotificationEventHandler(): object
{
return new class () {
Expand Down
Loading