Skip to content

Commit 8b38252

Browse files
authored
Feat/projection event emitter (#588)
* feat: projection v2 event emitter support * emitter * feed ci
1 parent 4ee32cd commit 8b38252

File tree

16 files changed

+356
-71
lines changed

16 files changed

+356
-71
lines changed

packages/Ecotone/src/Messaging/Handler/Filter/MessageFilterBuilder.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,21 @@ public static function createBoolHeaderFilter(string $headerName, ?bool $default
5454
);
5555
}
5656

57+
/**
58+
* Creates a filter that returns the NEGATED value of a boolean header.
59+
* When header is true, the message PASSES through.
60+
* When header is false, the message is DISCARDED.
61+
*
62+
* @param bool|null $defaultResultWhenHeaderIsMissing When no presented exception will be thrown on missing header
63+
*/
64+
public static function createNotBoolHeaderFilter(string $headerName, ?bool $defaultResultWhenHeaderIsMissing = null): self
65+
{
66+
return new self(
67+
new NotBoolHeaderBasedFilter($headerName, $defaultResultWhenHeaderIsMissing),
68+
InterfaceToCallReference::create(NotBoolHeaderBasedFilter::class, 'filter')
69+
);
70+
}
71+
5772
/**
5873
* @inheritDoc
5974
*/
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ecotone\Messaging\Handler\Filter;
6+
7+
use Ecotone\Messaging\Config\Container\DefinedObject;
8+
use Ecotone\Messaging\Config\Container\Definition;
9+
use Ecotone\Messaging\Message;
10+
11+
/**
12+
* Filter that returns the NEGATED value of a boolean header.
13+
* When header is true, filter returns false (pass through).
14+
* When header is false, filter returns true (discard).
15+
*
16+
* licence Apache-2.0
17+
*/
18+
final class NotBoolHeaderBasedFilter implements DefinedObject
19+
{
20+
public function __construct(private string $headerName, private ?bool $defaultResultWhenHeaderIsMissing)
21+
{
22+
}
23+
24+
public function filter(Message $message): bool
25+
{
26+
if (! is_null($this->defaultResultWhenHeaderIsMissing) && ! $message->getHeaders()->containsKey($this->headerName)) {
27+
return $this->defaultResultWhenHeaderIsMissing;
28+
}
29+
30+
return ! (bool)$message->getHeaders()->get($this->headerName);
31+
}
32+
33+
public function getDefinition(): Definition
34+
{
35+
return new Definition(self::class, [
36+
$this->headerName,
37+
$this->defaultResultWhenHeaderIsMissing,
38+
]);
39+
}
40+
}
41+

packages/Ecotone/src/Modelling/MessageHandling/MetadataPropagator/MessageHeadersPropagatorInterceptor.php

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class MessageHeadersPropagatorInterceptor
2121
private array $currentlyPropagatedHeaders = [];
2222
private bool $isPollingConsumer = false;
2323

24-
public function storeHeaders(MethodInvocation $methodInvocation, Message $message, ?PropagateHeaders $propagateHeaders = null)
24+
public function storeHeaders(MethodInvocation|\Closure $methodInvocation, Message $message, ?PropagateHeaders $propagateHeaders = null)
2525
{
2626
if ($propagateHeaders !== null && ! $propagateHeaders->doPropagation()) {
2727
$userlandHeaders = [];
@@ -40,7 +40,11 @@ public function storeHeaders(MethodInvocation $methodInvocation, Message $messag
4040

4141
$this->currentlyPropagatedHeaders[] = $userlandHeaders;
4242
try {
43-
$reply = $methodInvocation->proceed();
43+
if ($methodInvocation instanceof MethodInvocation) {
44+
$reply = $methodInvocation->proceed();
45+
} else {
46+
$reply = $methodInvocation();
47+
}
4448
} finally {
4549
array_pop($this->currentlyPropagatedHeaders);
4650
}

packages/Ecotone/src/Projecting/Attribute/ProjectionConfiguration.php

Lines changed: 0 additions & 22 deletions
This file was deleted.
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
<?php
2+
3+
/*
4+
* licence Enterprise
5+
*/
6+
declare(strict_types=1);
7+
8+
namespace Ecotone\Projecting\Attribute;
9+
10+
use Attribute;
11+
12+
/**
13+
* Configure projection deployment settings for blue/green deployment scenarios.
14+
*/
15+
#[Attribute(Attribute::TARGET_CLASS)]
16+
class ProjectionDeployment
17+
{
18+
public function __construct(
19+
/**
20+
* When true, projection will not be automatically initialized.
21+
* It will require manual trigger
22+
*
23+
* Default: false (automatic initialization)
24+
*/
25+
public readonly bool $manualKickOff = false,
26+
/**
27+
* When false, emitted events via EventStreamEmitter will not be published.
28+
* Use this for blue/green deployment to rebuild projection without
29+
* re-emitting events to downstream consumers.
30+
*
31+
* Default: true (events are emitted)
32+
*/
33+
public readonly bool $live = true,
34+
) {
35+
}
36+
}
37+

packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use Ecotone\Modelling\Config\Routing\BusRouteSelector;
2020
use Ecotone\Modelling\Config\Routing\BusRoutingKeyResolver;
2121
use Ecotone\Modelling\Config\Routing\BusRoutingMapBuilder;
22+
use Ecotone\Modelling\MessageHandling\MetadataPropagator\MessageHeadersPropagatorInterceptor;
2223
use Ecotone\Projecting\EcotoneProjectorExecutor;
2324
use Ecotone\Projecting\ProjectingHeaders;
2425

@@ -33,6 +34,7 @@ public function __construct(
3334
private string $projectionName,
3435
private ?string $partitionHeader = null,
3536
private bool $automaticInitialization = true,
37+
private bool $isLive = true,
3638
private array $namedEvents = [],
3739
private ?string $initChannel = null,
3840
private ?string $deleteChannel = null,
@@ -100,12 +102,14 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc
100102
{
101103
$routerProcessor = $this->buildExecutionRouter($builder);
102104
return new Definition(EcotoneProjectorExecutor::class, [
103-
new Reference(MessagingEntrypoint::class), // Headers propagation is required for EventStreamEmitter
105+
new Reference(MessagingEntrypoint::class),
106+
new Reference(MessageHeadersPropagatorInterceptor::class),
104107
$this->projectionName,
105108
$routerProcessor,
106109
$this->initChannel,
107110
$this->deleteChannel,
108111
$this->flushChannel,
112+
$this->isLive,
109113
]);
110114
}
111115

packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
use Ecotone\Projecting\Attribute\Partitioned;
3636
use Ecotone\Projecting\Attribute\Polling;
3737
use Ecotone\Projecting\Attribute\ProjectionBatchSize;
38-
use Ecotone\Projecting\Attribute\ProjectionConfiguration;
38+
use Ecotone\Projecting\Attribute\ProjectionDeployment;
3939
use Ecotone\Projecting\Attribute\ProjectionFlush;
4040
use Ecotone\Projecting\Attribute\ProjectionV2;
4141
use Ecotone\Projecting\Attribute\Streaming;
@@ -80,13 +80,15 @@ public static function create(AnnotationFinder $annotationRegistrationService, I
8080
$batchSizeAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionBatchSize::class);
8181
$pollingAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, Polling::class);
8282
$streamingAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, Streaming::class);
83-
$projectionConfiguration = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionConfiguration::class) ?? new ProjectionConfiguration();
83+
$projectionDeployment = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionDeployment::class);
8484
$partitionAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, Partitioned::class);
8585

8686
$partitionHeaderName = $partitionAttribute?->partitionHeaderName;
87-
$automaticInitialization = $partitionAttribute ? true : $projectionConfiguration->automaticInitialization;
87+
// Resolve automatic initialization: manualKickOff: true means automaticInitialization: false
88+
$automaticInitialization = self::resolveAutomaticInitialization($partitionAttribute, $projectionDeployment);
89+
$isLive = $projectionDeployment?->live ?? true;
8890

89-
$projectionBuilder = new EcotoneProjectionExecutorBuilder($projectionAttribute->name, $partitionHeaderName, $automaticInitialization, $namedEvents, batchSize: $batchSizeAttribute?->batchSize);
91+
$projectionBuilder = new EcotoneProjectionExecutorBuilder($projectionAttribute->name, $partitionHeaderName, $automaticInitialization, $isLive, $namedEvents, batchSize: $batchSizeAttribute?->batchSize);
9092

9193
$asynchronousChannelName = self::getProjectionAsynchronousChannel($annotationRegistrationService, $projectionClassName);
9294
$isPolling = $pollingAttribute !== null;
@@ -276,4 +278,22 @@ private static function verifyCorrectApiUsage(bool $isPolling, ?string $asynchro
276278
);
277279
}
278280
}
281+
282+
private static function resolveAutomaticInitialization(
283+
?Partitioned $partitionAttribute,
284+
?ProjectionDeployment $projectionDeployment,
285+
): bool {
286+
// Partitioned projections always require automatic initialization
287+
if ($partitionAttribute !== null) {
288+
return true;
289+
}
290+
291+
// ProjectionDeployment: manualKickOff: true means automaticInitialization: false
292+
if ($projectionDeployment !== null) {
293+
return ! $projectionDeployment->manualKickOff;
294+
}
295+
296+
// Default: automatic initialization is enabled
297+
return true;
298+
}
279299
}

packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,20 @@
1414
use Ecotone\Messaging\Support\MessageBuilder;
1515
use Ecotone\Modelling\Event;
1616

17+
use Ecotone\Modelling\MessageHandling\MetadataPropagator\MessageHeadersPropagatorInterceptor;
1718
use function is_null;
1819

1920
class EcotoneProjectorExecutor implements ProjectorExecutor
2021
{
2122
public function __construct(
2223
private MessagingEntrypoint $messagingEntrypoint,
24+
private MessageHeadersPropagatorInterceptor $messageHeadersPropagatorInterceptor,
2325
private string $projectionName, // this is required for event stream emitter so it can create a stream with this name
2426
private MessageProcessor $routerProcessor,
2527
private ?string $initChannel = null,
2628
private ?string $deleteChannel = null,
2729
private ?string $flushChannel = null,
30+
private bool $isLive = true,
2831
) {
2932
}
3033

@@ -33,16 +36,20 @@ public function project(Event $event, mixed $userState = null): mixed
3336
$metadata = $event->getMetadata();
3437
$metadata[ProjectingHeaders::PROJECTION_STATE] = $userState ?? null;
3538
$metadata[ProjectingHeaders::PROJECTION_EVENT_NAME] = $event->getEventName();
36-
37-
// Those three headers are required by EventStreamEmitter
3839
$metadata[ProjectingHeaders::PROJECTION_NAME] = $this->projectionName;
39-
$metadata[ProjectingHeaders::PROJECTION_IS_REBUILDING] = false;
40+
$metadata[ProjectingHeaders::PROJECTION_LIVE] = $this->isLive;
4041
$metadata[MessageHeaders::STREAM_BASED_SOURCED] = true; // this one is required for correct header propagation in EventStreamEmitter...
4142
$metadata[MessageHeaders::REPLY_CHANNEL] = $responseQueue = new QueueChannel('response_channel');
42-
$this->routerProcessor->process(
43-
MessageBuilder::withPayload($event->getPayload())
44-
->setMultipleHeaders($metadata)
45-
->build()
43+
44+
$requestMessage = MessageBuilder::withPayload($event->getPayload())
45+
->setMultipleHeaders($metadata)
46+
->build();
47+
48+
$this->messageHeadersPropagatorInterceptor->storeHeaders(
49+
function () use ($requestMessage) {
50+
$this->routerProcessor->process($requestMessage);
51+
},
52+
$requestMessage
4653
);
4754
$response = $responseQueue->receive();
4855
$newUserState = $response?->getPayload();

packages/Ecotone/src/Projecting/ProjectingHeaders.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ class ProjectingHeaders
1212
public const PROJECTION_STATE = 'projection.state';
1313
public const PROJECTION_NAME = 'projection.name';
1414
public const PROJECTION_EVENT_NAME = 'projection.event_name';
15-
public const PROJECTION_IS_REBUILDING = 'projection.is_rebuilding';
15+
/**
16+
* Indicates whether the projection is live and should emit events.
17+
* When false, events emitted via EventStreamEmitter will be filtered out.
18+
*/
19+
public const PROJECTION_LIVE = 'projection.live';
1620
public const MANUAL_INITIALIZATION = 'projection.manual_initialization';
1721
}

packages/Ecotone/tests/Projecting/ProjectingTest.php

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
use Ecotone\Modelling\Attribute\EventHandler;
2121
use Ecotone\Modelling\Event;
2222
use Ecotone\Projecting\Attribute\Partitioned;
23-
use Ecotone\Projecting\Attribute\ProjectionConfiguration;
23+
use Ecotone\Projecting\Attribute\ProjectionDeployment;
2424
use Ecotone\Projecting\Attribute\ProjectionFlush;
2525
use Ecotone\Projecting\Attribute\ProjectionV2;
2626
use Ecotone\Projecting\InMemory\InMemoryStreamSourceBuilder;
@@ -178,7 +178,7 @@ public function init(): void
178178

179179
public function test_it_skips_execution_when_automatic_initialization_is_off_and_not_initialized(): void
180180
{
181-
$projection = new #[ProjectionV2('projection_with_manual_initialization'), ProjectionConfiguration(automaticInitialization: false)] class {
181+
$projection = new #[ProjectionV2('projection_with_manual_initialization'), ProjectionDeployment(manualKickOff: true)] class {
182182
public const TICKET_CREATED = 'ticket.created';
183183
public array $projectedEvents = [];
184184

@@ -261,7 +261,7 @@ public function init(): void
261261

262262
public function test_auto_initialization_mode_processes_events(): void
263263
{
264-
$projection = new #[ProjectionV2('auto_projection'), ProjectionConfiguration(automaticInitialization: true)] class {
264+
$projection = new #[ProjectionV2('auto_projection'), ProjectionDeployment(manualKickOff: false)] class {
265265
public const TICKET_CREATED = 'ticket.created';
266266
public array $projectedEvents = [];
267267
public int $initCallCount = 0;
@@ -301,7 +301,7 @@ public function init(): void
301301

302302
public function test_skip_initialization_mode_skips_events_when_not_initialized(): void
303303
{
304-
$projection = new #[ProjectionV2('skip_projection'), ProjectionConfiguration(automaticInitialization: false)] class {
304+
$projection = new #[ProjectionV2('skip_projection'), ProjectionDeployment(manualKickOff: true)] class {
305305
public const TICKET_CREATED = 'ticket.created';
306306
public array $projectedEvents = [];
307307
public int $initCallCount = 0;
@@ -341,7 +341,7 @@ public function init(): void
341341

342342
public function test_skip_mode_with_multiple_events(): void
343343
{
344-
$projection = new #[ProjectionV2('skip_multiple_events'), ProjectionConfiguration(automaticInitialization: false)] class {
344+
$projection = new #[ProjectionV2('skip_multiple_events'), ProjectionDeployment(manualKickOff: true)] class {
345345
public const TICKET_CREATED = 'ticket.created';
346346
public array $projectedEvents = [];
347347
public int $initCallCount = 0;
@@ -384,7 +384,7 @@ public function init(): void
384384

385385
public function test_auto_mode_with_multiple_events(): void
386386
{
387-
$projection = new #[ProjectionV2('auto_multiple_events'), ProjectionConfiguration(automaticInitialization: true)] class {
387+
$projection = new #[ProjectionV2('auto_multiple_events'), ProjectionDeployment(manualKickOff: false)] class {
388388
public const TICKET_CREATED = 'ticket.created';
389389
public array $projectedEvents = [];
390390
public int $initCallCount = 0;

0 commit comments

Comments
 (0)