Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Monorepo/Benchmark/ProjectingBenchmark.php
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public function bench_ecotone_projection_backfill(): void
Assert::assertEquals([],
self::$ecotone->getQueryBus()->sendWithRouting('product.getPriceChange', self::$expectedProductIds[0])
);
$projectionManager->backfill();
$projectionManager->prepareBackfill();
Assert::assertEquals([
new PriceChange(100, 0),
new PriceChange(120, 20),
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@
*/
namespace Monorepo\ExampleAppEventSourcing\EcotoneProjection;

use Ecotone\EventSourcing\Attribute\FromAggregateStream;
use Ecotone\EventSourcing\Attribute\ProjectionDelete;
use Ecotone\Modelling\Attribute\EventHandler;
use Ecotone\Modelling\Attribute\QueryHandler;
use Ecotone\Projecting\Attribute\ProjectionV2;
use Monorepo\ExampleAppEventSourcing\Common\Event\PriceWasChanged;
use Monorepo\ExampleAppEventSourcing\Common\Event\ProductWasRegistered;
use Monorepo\ExampleAppEventSourcing\Common\PriceChange;
use Monorepo\ExampleAppEventSourcing\Common\Product;

#[\Ecotone\Projecting\Attribute\ProjectionV2("price_change_over_time")]
#[ProjectionV2("price_change_over_time")]
#[FromAggregateStream(Product::class)]
class PriceChangeOverTimeProjectionWithEcotoneProjection
{
public const NAME = "price_change_over_time";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,40 +1,37 @@
<?php

/*
* licence Enterprise
*/
declare(strict_types=1);

namespace Ecotone\EventSourcing\Attribute;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me, those attributes are related to the pdo event sourcing module. If you want to project from another event source (let's say another event store implementation), you would create another attribute that would register its event store as an event source

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They do not hold any database (pdo) specific, they just stand what stream should be used (maybe the aggregate-type is a bit specific, but it could be delivered as separate attribute).

So I think it's fine to allow it to be part of generic Projecton API?


use Attribute;
use Ecotone\EventSourcing\EventStore;
use Ecotone\Modelling\Attribute\EventSourcingAggregate;

/*
/**
* Configures a projection to read from an aggregate's event stream.
* Automatically reads Stream and AggregateType attributes from the aggregate class.
*
* This simplifies projection configuration by avoiding duplication of stream
* and aggregate type configuration that is already defined on the aggregate.
* Automatically resolves Stream and AggregateType from the aggregate class.
*
* Example usage:
* ```php
* #[ProjectionV2('order_list')]
* #[AggregateStream(Order::class)]
* #[FromAggregateStream(Order::class)]
* class OrderListProjection { ... }
* ```
*
* licence Enterprise
*/
#[Attribute(Attribute::TARGET_CLASS | Attribute::IS_REPEATABLE)]
class FromAggregateStream
readonly class FromAggregateStream
{
/**
* @param class-string $aggregateClass The aggregate class to read Stream and AggregateType from.
* @param class-string $aggregateClass The aggregate class to read stream info from.
* Must be an EventSourcingAggregate.
* @param string $eventStoreReferenceName Reference name for the event store
*/
public function __construct(
public readonly string $aggregateClass,
public readonly string $eventStoreReferenceName = EventStore::class
public string $aggregateClass,
public string $eventStoreReferenceName = EventStore::class
) {
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
use Attribute;
use Ecotone\EventSourcing\EventStore;

/**
* Configures a projection to read from a specific event stream.
*/
#[Attribute(Attribute::TARGET_CLASS | Attribute::IS_REPEATABLE)]
readonly class FromStream
{
Expand All @@ -20,3 +23,4 @@ public function __construct(
) {
}
}

2 changes: 1 addition & 1 deletion packages/Ecotone/src/Lite/Test/FlowTestSupport.php
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public function triggerProjection(string|array $projectionName): self

foreach ($projectionName as $name) {
if ($this->getGateway(ProjectionRegistry::class)->has($name)) {
$this->getGateway(ProjectionRegistry::class)->get($name)->backfill();
$this->getGateway(ProjectionRegistry::class)->get($name)->prepareBackfill();
} else {
$this->getGateway(ProjectionManager::class)->triggerProjection($name);
}
Expand Down
2 changes: 2 additions & 0 deletions packages/Ecotone/src/Messaging/Config/ModuleClassList.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
use Ecotone\Projecting\Config\ProjectingAttributeModule;
use Ecotone\Projecting\Config\ProjectingConsoleCommands;
use Ecotone\Projecting\Config\ProjectingModule;
use Ecotone\Projecting\Config\StreamFilterRegistryModule;
use Ecotone\Projecting\EventStoreAdapter\EventStoreAdapterModule;
use Ecotone\Redis\Configuration\RedisMessageConsumerModule;
use Ecotone\Redis\Configuration\RedisMessagePublisherModule;
Expand Down Expand Up @@ -115,6 +116,7 @@ class ModuleClassList
EventSourcedRepositoryModule::class,
ProjectingModule::class,
ProjectingAttributeModule::class,
StreamFilterRegistryModule::class,
EventStoreAdapterModule::class,

/** Attribute based configurations */
Expand Down
38 changes: 38 additions & 0 deletions packages/Ecotone/src/Projecting/Attribute/ProjectionBackfill.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

/*
* licence Enterprise
*/
declare(strict_types=1);

namespace Ecotone\Projecting\Attribute;

use Attribute;
use InvalidArgumentException;

/**
* Configure projection backfill settings.
* This attribute controls how partitions are batched during backfill operations.
*/
#[Attribute(Attribute::TARGET_CLASS)]
class ProjectionBackfill
{
public function __construct(
/**
* Number of partitions to process in a single batch during backfill.
* Must be at least 1.
*/
public readonly int $backfillPartitionBatchSize = 100,
/**
* Async channel name for backfill operations.
* When set, backfill batches are sent to this channel first, then routed to the backfill handler.
* When null, backfill executes synchronously.
*/
public readonly ?string $asyncChannelName = null,
) {
if ($this->backfillPartitionBatchSize < 1) {
throw new InvalidArgumentException('Backfill partition batch size must be at least 1');
}
}
}

18 changes: 0 additions & 18 deletions packages/Ecotone/src/Projecting/Attribute/ProjectionBatchSize.php

This file was deleted.

25 changes: 25 additions & 0 deletions packages/Ecotone/src/Projecting/Attribute/ProjectionExecution.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

/*
* licence Enterprise
*/
declare(strict_types=1);

namespace Ecotone\Projecting\Attribute;

use Attribute;

#[Attribute(Attribute::TARGET_CLASS)]
class ProjectionExecution
{
public function __construct(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed from ProjectionBatchSize attribute

/**
* Configure the batch size for loading events during projection execution.
* * This controls how many events are loaded from the stream in a single batch.
*/
public readonly int $eventLoadingBatchSize
)
{
}
}
56 changes: 56 additions & 0 deletions packages/Ecotone/src/Projecting/BackfillExecutorHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?php

/*
* licence Enterprise
*/
declare(strict_types=1);

namespace Ecotone\Projecting;

use Ecotone\Messaging\Endpoint\Interceptor\TerminationListener;

/**
* Handles execution of projection backfill batches.
* This handler is invoked via MessagingEntrypoint to execute backfill operations
* for a given projection with specified limit and offset parameters.
*/
class BackfillExecutorHandler
{
public const BACKFILL_EXECUTOR_CHANNEL = 'ecotone.projection.backfill.executor';

public function __construct(
private ProjectionRegistry $projectionRegistry,
private TerminationListener $terminationListener,
) {
}

/**
* Execute backfill for a specific partition batch.
*
* @param string $projectionName The name of the projection to backfill
* @param int|null $limit The maximum number of partitions to process in this batch (null for unlimited)
* @param int $offset The offset to start from
* @param string $streamName The stream name to filter partitions
* @param string|null $aggregateType The aggregate type to filter partitions (optional)
* @param string $eventStoreReferenceName The event store reference name
*/
public function executeBackfillBatch(
string $projectionName,
?int $limit = null,
int $offset = 0,
string $streamName = '',
?string $aggregateType = null,
string $eventStoreReferenceName = '',
): void {
$projectingManager = $this->projectionRegistry->get($projectionName);
$streamFilter = new StreamFilter($streamName, $aggregateType, $eventStoreReferenceName);

foreach ($projectingManager->getPartitionProvider()->partitions($streamFilter, $limit, $offset) as $partition) {
$projectingManager->execute($partition, true);
if ($this->terminationListener->shouldTerminate()) {
break;
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

class EcotoneProjectionExecutorBuilder implements ProjectionExecutorBuilder
{
private const DEFAULT_BATCH_SIZE = 1_000;
private const DEFAULT_EVENT_LOADING_BATCH_SIZE = 1_000;
private const DEFAULT_BACKFILL_PARTITION_BATCH_SIZE = 100;

/**
* @param AnnotatedDefinition[] $projectionEventHandlers
Expand All @@ -41,7 +42,9 @@ public function __construct(
private ?string $flushChannel = null,
private array $projectionEventHandlers = [],
private ?string $asyncChannelName = null,
private ?int $batchSize = null,
private ?int $eventLoadingBatchSize = null,
private ?int $backfillPartitionBatchSize = null,
private ?string $backfillAsyncChannelName = null,
) {
if ($this->partitionHeader && ! $this->automaticInitialization) {
throw new ConfigurationException("Cannot set partition header for projection {$this->projectionName} with automatic initialization disabled");
Expand Down Expand Up @@ -93,9 +96,19 @@ public function automaticInitialization(): bool
return $this->automaticInitialization;
}

public function batchSize(): int
public function eventLoadingBatchSize(): int
{
return $this->batchSize ?? self::DEFAULT_BATCH_SIZE;
return $this->eventLoadingBatchSize ?? self::DEFAULT_EVENT_LOADING_BATCH_SIZE;
}

public function backfillPartitionBatchSize(): int
{
return $this->backfillPartitionBatchSize ?? self::DEFAULT_BACKFILL_PARTITION_BATCH_SIZE;
}

public function backfillAsyncChannelName(): ?string
{
return $this->backfillAsyncChannelName;
}

public function compile(MessagingContainerBuilder $builder): Definition|Reference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
use Ecotone\Modelling\Attribute\NamedEvent;
use Ecotone\Projecting\Attribute\Partitioned;
use Ecotone\Projecting\Attribute\Polling;
use Ecotone\Projecting\Attribute\ProjectionBatchSize;
use Ecotone\Projecting\Attribute\ProjectionBackfill;
use Ecotone\Projecting\Attribute\ProjectionExecution;
use Ecotone\Projecting\Attribute\ProjectionDeployment;
use Ecotone\Projecting\Attribute\ProjectionFlush;
use Ecotone\Projecting\Attribute\ProjectionV2;
Expand Down Expand Up @@ -77,7 +78,8 @@ public static function create(AnnotationFinder $annotationRegistrationService, I
$eventStreamingProjections = [];
foreach ($annotationRegistrationService->findAnnotatedClasses(ProjectionV2::class) as $projectionClassName) {
$projectionAttribute = $annotationRegistrationService->getAttributeForClass($projectionClassName, ProjectionV2::class);
$batchSizeAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionBatchSize::class);
$batchSizeAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionExecution::class);
$backfillAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionBackfill::class);
$pollingAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, Polling::class);
$streamingAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, Streaming::class);
$projectionDeployment = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionDeployment::class);
Expand All @@ -88,7 +90,16 @@ public static function create(AnnotationFinder $annotationRegistrationService, I
$automaticInitialization = self::resolveAutomaticInitialization($partitionAttribute, $projectionDeployment);
$isLive = $projectionDeployment?->live ?? true;

$projectionBuilder = new EcotoneProjectionExecutorBuilder($projectionAttribute->name, $partitionHeaderName, $automaticInitialization, $isLive, $namedEvents, batchSize: $batchSizeAttribute?->batchSize);
$projectionBuilder = new EcotoneProjectionExecutorBuilder(
$projectionAttribute->name,
$partitionHeaderName,
$automaticInitialization,
$isLive,
$namedEvents,
eventLoadingBatchSize: $batchSizeAttribute?->eventLoadingBatchSize,
backfillPartitionBatchSize: $backfillAttribute?->backfillPartitionBatchSize,
backfillAsyncChannelName: $backfillAttribute?->asyncChannelName,
);

$asynchronousChannelName = self::getProjectionAsynchronousChannel($annotationRegistrationService, $projectionClassName);
$isPolling = $pollingAttribute !== null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public function backfillProjection(string $name): void
if (! $this->registry->has($name)) {
throw new InvalidArgumentException("There is no projection with name {$name}");
}
$this->registry->get($name)->backfill();
$this->registry->get($name)->prepareBackfill();
}

#[ConsoleCommand('ecotone:projection:delete')]
Expand Down
Loading