Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@
use Ecotone\Modelling\CommandBus;
use Ecotone\Modelling\EventBus;
use Ecotone\Modelling\QueryBus;
use Ecotone\Projecting\InMemory\InMemoryEventStoreStreamSource;
use Ecotone\Projecting\InMemory\InMemoryEventStoreStreamSourceBuilder;
use Ecotone\Projecting\InMemory\InMemoryProjectionStateStorageBuilder;
use Ecotone\Projecting\InMemory\InMemoryStreamSourceBuilder;
use Ecotone\Projecting\InMemory\InMemoryProjectionStateStorage;
use Ecotone\Projecting\ProjectionStateStorageReference;
use Ecotone\Projecting\StreamSourceReference;

#[ModuleAnnotation]
/**
Expand Down Expand Up @@ -197,6 +199,13 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
GatewayHeaderBuilder::create('channelName', 'ecotone.test_support_gateway.channel_name'),
]));
}

$messagingConfiguration->registerServiceDefinition(
InMemoryEventStoreStreamSource::class,
new Definition(InMemoryEventStoreStreamSource::class, [
new Reference(InMemoryEventStore::class),
])
);
}

public function canHandle($extensionObject): bool
Expand Down Expand Up @@ -229,8 +238,6 @@ public function getModuleExtensions(ServiceConfiguration $serviceConfiguration,
}
}

// If EVENT_SOURCING_PACKAGE is enabled but no EventSourcingConfiguration is provided,
// it means DBAL mode is being used, so don't register InMemoryEventStoreStreamSource
if (! $hasEventSourcingConfiguration) {
$shouldRegisterInMemoryStreamSource = false;
}
Expand All @@ -240,15 +247,7 @@ public function getModuleExtensions(ServiceConfiguration $serviceConfiguration,
return [];
}

// Check if user has registered a custom InMemoryStreamSourceBuilder
// If so, don't register InMemoryEventStoreStreamSourceBuilder to avoid conflicts
foreach ($serviceExtensions as $extensionObject) {
if ($extensionObject instanceof InMemoryStreamSourceBuilder) {
return [];
}
}

return [new InMemoryEventStoreStreamSourceBuilder(), new InMemoryProjectionStateStorageBuilder()];
return [new StreamSourceReference(InMemoryEventStoreStreamSource::class), new ProjectionStateStorageReference(InMemoryProjectionStateStorage::class)];
}

public function getModulePackageName(): string
Expand Down
6 changes: 6 additions & 0 deletions packages/Ecotone/src/Messaging/Config/ModuleClassList.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,13 @@
use Ecotone\Modelling\MessageHandling\MetadataPropagator\MessageHeadersPropagatorInterceptor;
use Ecotone\Modelling\QueryBus;
use Ecotone\OpenTelemetry\Configuration\OpenTelemetryModule;
use Ecotone\Projecting\Config\PartitionProviderRegistryModule;
use Ecotone\Projecting\Config\ProjectingAttributeModule;
use Ecotone\Projecting\Config\ProjectingConsoleCommands;
use Ecotone\Projecting\Config\ProjectingModule;
use Ecotone\Projecting\Config\ProjectionStateStorageRegistryModule;
use Ecotone\Projecting\Config\StreamFilterRegistryModule;
use Ecotone\Projecting\Config\StreamSourceRegistryModule;
use Ecotone\Projecting\EventStoreAdapter\EventStoreAdapterModule;
use Ecotone\Redis\Configuration\RedisMessageConsumerModule;
use Ecotone\Redis\Configuration\RedisMessagePublisherModule;
Expand Down Expand Up @@ -114,9 +117,12 @@ class ModuleClassList
InstantRetryAttributeModule::class,
DynamicMessageChannelModule::class,
EventSourcedRepositoryModule::class,
PartitionProviderRegistryModule::class,
ProjectingModule::class,
ProjectingAttributeModule::class,
ProjectionStateStorageRegistryModule::class,
StreamFilterRegistryModule::class,
StreamSourceRegistryModule::class,
EventStoreAdapterModule::class,

/** Attribute based configurations */
Expand Down
21 changes: 21 additions & 0 deletions packages/Ecotone/src/Projecting/Attribute/PartitionProvider.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

/*
* licence Enterprise
*/
declare(strict_types=1);

namespace Ecotone\Projecting\Attribute;

use Attribute;

/**
* Marks a class as a custom PartitionProvider.
* The class must implement \Ecotone\Projecting\PartitionProvider interface.
* Userland partition providers are prioritized over built-in ones.
*/
#[Attribute(Attribute::TARGET_CLASS)]
final class PartitionProvider
{
}

21 changes: 21 additions & 0 deletions packages/Ecotone/src/Projecting/Attribute/StateStorage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

/*
* licence Enterprise
*/
declare(strict_types=1);

namespace Ecotone\Projecting\Attribute;

use Attribute;

/**
* Marks a class as a custom ProjectionStateStorage.
* The class must implement \Ecotone\Projecting\ProjectionStateStorage interface.
* Userland state storages are prioritized over built-in ones.
*/
#[Attribute(Attribute::TARGET_CLASS)]
final class StateStorage
{
}

21 changes: 21 additions & 0 deletions packages/Ecotone/src/Projecting/Attribute/StreamSource.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

/*
* licence Enterprise
*/
declare(strict_types=1);

namespace Ecotone\Projecting\Attribute;

use Attribute;

/**
* Marks a class as a custom StreamSource.
* The class must implement \Ecotone\Projecting\StreamSource interface.
* Userland stream sources are prioritized over built-in ones.
*/
#[Attribute(Attribute::TARGET_CLASS)]
final class StreamSource
{
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
<?php

/*
* licence Enterprise
*/
declare(strict_types=1);

namespace Ecotone\Projecting\Config;

use Ecotone\AnnotationFinder\AnnotationFinder;
use Ecotone\Messaging\Attribute\ModuleAnnotation;
use Ecotone\Messaging\Config\Annotation\AnnotatedDefinitionReference;
use Ecotone\Messaging\Config\Annotation\AnnotationModule;
use Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ExtensionObjectResolver;
use Ecotone\Messaging\Config\Annotation\ModuleConfiguration\NoExternalConfigurationModule;
use Ecotone\Messaging\Config\Configuration;
use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Config\Container\Reference;
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ModuleReferenceSearchService;
use Ecotone\Messaging\Handler\InterfaceToCallRegistry;
use Ecotone\Projecting\Attribute\PartitionProvider as PartitionProviderAttribute;
use Ecotone\Projecting\Attribute\ProjectionV2;
use Ecotone\Projecting\PartitionProviderReference;
use Ecotone\Projecting\PartitionProviderRegistry;
use Ecotone\Projecting\SinglePartitionProvider;

#[ModuleAnnotation]
class PartitionProviderRegistryModule extends NoExternalConfigurationModule implements AnnotationModule
{
/**
* @param string[] $allProjectionNames
* @param string[] $userlandPartitionProviderReferences
*/
public function __construct(
private array $allProjectionNames = [],
private array $userlandPartitionProviderReferences = [],
) {
}

public static function create(AnnotationFinder $annotationFinder, InterfaceToCallRegistry $interfaceToCallRegistry): static
{
$allProjectionNames = [];
foreach ($annotationFinder->findAnnotatedClasses(ProjectionV2::class) as $projectionClassName) {
$projectionAttribute = $annotationFinder->getAttributeForClass($projectionClassName, ProjectionV2::class);
$allProjectionNames[] = $projectionAttribute->name;
}

$userlandPartitionProviderReferences = [];
foreach ($annotationFinder->findAnnotatedClasses(PartitionProviderAttribute::class) as $providerClassName) {
$userlandPartitionProviderReferences[] = AnnotatedDefinitionReference::getReferenceForClassName($annotationFinder, $providerClassName);
}

return new self($allProjectionNames, $userlandPartitionProviderReferences);
}

public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void
{
$partitionProviderReferences = ExtensionObjectResolver::resolve(
PartitionProviderReference::class,
$extensionObjects
);

$partitionedProjectionNames = [];
foreach ($partitionProviderReferences as $ref) {
$partitionedProjectionNames = array_merge($partitionedProjectionNames, $ref->getPartitionedProjectionNames());
}
$partitionedProjectionNames = array_unique($partitionedProjectionNames);

$nonPartitionedProjectionNames = array_values(array_diff($this->allProjectionNames, $partitionedProjectionNames));

$userlandProviders = array_map(
fn (string $reference) => new Reference($reference),
$this->userlandPartitionProviderReferences
);

$builtinProviders = array_map(
fn (PartitionProviderReference $ref) => new Reference($ref->getReferenceName()),
$partitionProviderReferences
);

$messagingConfiguration->registerServiceDefinition(
SinglePartitionProvider::class,
new Definition(SinglePartitionProvider::class, [$nonPartitionedProjectionNames])
);

$builtinProviders[] = new Reference(SinglePartitionProvider::class);

$allProviders = array_merge($userlandProviders, $builtinProviders);

$messagingConfiguration->registerServiceDefinition(
PartitionProviderRegistry::class,
new Definition(PartitionProviderRegistry::class, [$allProviders])
);
}

public function canHandle($extensionObject): bool
{
return $extensionObject instanceof PartitionProviderReference;
}

public function getModulePackageName(): string
{
return ModulePackageList::CORE_PACKAGE;
}
}

42 changes: 11 additions & 31 deletions packages/Ecotone/src/Projecting/Config/ProjectingModule.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@
use Ecotone\Messaging\Handler\ServiceActivator\MessageProcessorActivatorBuilder;
use Ecotone\Projecting\BackfillExecutorHandler;
use Ecotone\Projecting\InMemory\InMemoryProjectionRegistry;
use Ecotone\Projecting\PartitionProvider;
use Ecotone\Projecting\PartitionProviderRegistry;
use Ecotone\Projecting\ProjectingHeaders;
use Ecotone\Projecting\ProjectingManager;
use Ecotone\Projecting\ProjectionRegistry;
use Ecotone\Projecting\ProjectionStateStorage;
use Ecotone\Projecting\ProjectionStateStorageRegistry;
use Ecotone\Projecting\SinglePartitionProvider;
use Ecotone\Projecting\StreamFilterRegistry;
use Ecotone\Projecting\StreamSource;
use Ramsey\Uuid\Uuid;
use Ecotone\Projecting\StreamSourceRegistry;

/**
* This module allows to configure projections in a standard way
Expand All @@ -61,33 +60,15 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
{
$serviceConfiguration = ExtensionObjectResolver::resolveUnique(ServiceConfiguration::class, $extensionObjects, ServiceConfiguration::createWithDefaults());
$projectionBuilders = ExtensionObjectResolver::resolve(ProjectionExecutorBuilder::class, $extensionObjects);
$componentBuilders = ExtensionObjectResolver::resolve(ProjectionComponentBuilder::class, $extensionObjects);

if (! empty($projectionBuilders) && ! $messagingConfiguration->isRunningForEnterpriseLicence()) {
throw ConfigurationException::create('Projections are part of Ecotone Enterprise. To use projections, please acquire an enterprise licence.');
}

/** @var array<string, array<string, string>> $components [projection name][component name][reference] */
$components = [];
foreach ($componentBuilders as $componentBuilder) {
foreach ($projectionBuilders as $projectionBuilder) {
$projectionName = $projectionBuilder->projectionName();
foreach ([StreamSource::class, PartitionProvider::class, ProjectionStateStorage::class] as $component) {
if ($componentBuilder->canHandle($projectionName, $component)) {
if (isset($components[$projectionName][$component])) {
throw ConfigurationException::create(
"Projection with name {$projectionName} is already registered for component {$component} with reference {$components[$projectionName][$component]}."
. ' You can only register one component of each type per projection. Please check your configuration.'
);
}

$reference = Uuid::uuid4()->toString();
$moduleReferenceSearchService->store($reference, $componentBuilder);
$components[$projectionName][$component] = new Reference($reference);
}
}
}
}
$messagingConfiguration->registerServiceDefinition(
SinglePartitionProvider::class,
new Definition(SinglePartitionProvider::class)
);

$projectionRegistryMap = [];
foreach ($projectionBuilders as $projectionBuilder) {
Expand All @@ -98,10 +79,10 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
$messagingConfiguration->registerServiceDefinition(
$projectingManagerReference = ProjectingManager::class . ':' . $projectionName,
new Definition(ProjectingManager::class, [
$components[$projectionName][ProjectionStateStorage::class] ?? throw ConfigurationException::create("Projection with name {$projectionName} does not have projection state storage configured. Please check your configuration."),
new Reference(ProjectionStateStorageRegistry::class),
new Reference($reference),
$components[$projectionName][StreamSource::class] ?? throw ConfigurationException::create("Projection with name {$projectionName} does not have stream source configured. Please check your configuration."),
$components[$projectionName][PartitionProvider::class] ?? new Definition(SinglePartitionProvider::class),
new Reference(StreamSourceRegistry::class),
new Reference(PartitionProviderRegistry::class),
new Reference(StreamFilterRegistry::class),
$projectionName,
new Reference(TerminationListener::class),
Expand Down Expand Up @@ -186,8 +167,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
public function canHandle($extensionObject): bool
{
return $extensionObject instanceof ServiceConfiguration
|| $extensionObject instanceof ProjectionExecutorBuilder
|| $extensionObject instanceof ProjectionComponentBuilder;
|| $extensionObject instanceof ProjectionExecutorBuilder;
}

public function getModuleExtensions(ServiceConfiguration $serviceConfiguration, array $serviceExtensions, ?InterfaceToCallRegistry $interfaceToCallRegistry = null): array
Expand Down

This file was deleted.

Loading