diff --git a/packages/Ecotone/src/Modelling/Api/Distribution/DistributedServiceMap.php b/packages/Ecotone/src/Modelling/Api/Distribution/DistributedServiceMap.php index e3aec7ce5..9fb070904 100644 --- a/packages/Ecotone/src/Modelling/Api/Distribution/DistributedServiceMap.php +++ b/packages/Ecotone/src/Modelling/Api/Distribution/DistributedServiceMap.php @@ -25,7 +25,7 @@ final class DistributedServiceMap implements DefinedObject { /** * @param array $commandMapping - service name -> channel name (for command routing) - * @param array, exclude: array}> $eventSubscriptions - channel name -> ['keys' => [...] or null, 'exclude' => [...]] + * @param array, exclude: array, include: array}> $eventSubscriptions - channel name -> ['keys' => [...] or null, 'exclude' => [...], 'include' => [...]] * @param array $distributedBusAnnotations * @param bool|null $legacyMode - null = not set, true = legacy (withServiceMapping), false = new API (withCommandMapping/withEventMapping) */ @@ -58,6 +58,7 @@ public function withServiceMapping(string $serviceName, string $channelName, ?ar $self->eventSubscriptions[$channelName] = [ 'keys' => $subscriptionRoutingKeys, 'exclude' => [], + 'include' => [], ]; return $self; @@ -83,17 +84,26 @@ public function withCommandMapping(string $targetServiceName, string $channelNam * * @param string $channelName Target channel to send events to * @param array $subscriptionKeys Routing key patterns to match - * @param array $excludeEventsFromServices Service names whose events should NOT be sent to this channel + * @param array $excludePublishingServices Service names whose events should NOT be sent to this channel + * @param array $includePublishingServices Service names whose events should ONLY be sent to this channel (whitelist) */ - public function withEventMapping(string $channelName, array $subscriptionKeys, array $excludeEventsFromServices = []): self + public function withEventMapping(string $channelName, array $subscriptionKeys, array $excludePublishingServices = [], array $includePublishingServices = []): self { + if ($excludePublishingServices !== [] && $includePublishingServices !== []) { + throw ConfigurationException::create( + "Cannot use both 'excludePublishingServices' and 'includePublishingServices' in the same event mapping for channel '{$channelName}'. " . + 'These parameters are mutually exclusive - use either exclude (blacklist) or include (whitelist), not both.' + ); + } + $self = clone $this; $self->assertNotInLegacyMode('withEventMapping'); $self->legacyMode = false; $self->eventSubscriptions[$channelName] = [ 'keys' => $subscriptionKeys, - 'exclude' => $excludeEventsFromServices, + 'exclude' => $excludePublishingServices, + 'include' => $includePublishingServices, ]; return $self; @@ -153,7 +163,7 @@ public function getAllChannelNamesBesides(string $serviceName, string $routingKe /** * NEW MODE ONLY - Get all subscription channels for an event. - * Uses explicit exclude list from eventSubscriptions config. + * Uses explicit exclude/include list from eventSubscriptions config. * * @param string $sourceServiceName The service publishing the event * @param string $routingKey The event routing key @@ -166,11 +176,16 @@ public function getAllSubscriptionChannels(string $sourceServiceName, string $ro foreach ($this->eventSubscriptions as $channel => $config) { $keys = $config['keys']; $exclude = $config['exclude']; + $include = $config['include']; if (in_array($sourceServiceName, $exclude, true)) { continue; } + if ($include !== [] && ! \in_array($sourceServiceName, $include, true)) { + continue; + } + foreach ($keys as $subscriptionEventFilter) { if (BusRoutingMap::globMatch($subscriptionEventFilter, $routingKey)) { $filteredChannels[] = $channel; diff --git a/packages/Ecotone/tests/Messaging/Unit/Distributed/DistributedBusWithExplicitServiceMapTest.php b/packages/Ecotone/tests/Messaging/Unit/Distributed/DistributedBusWithExplicitServiceMapTest.php index 3ac54eab9..6a48b4843 100644 --- a/packages/Ecotone/tests/Messaging/Unit/Distributed/DistributedBusWithExplicitServiceMapTest.php +++ b/packages/Ecotone/tests/Messaging/Unit/Distributed/DistributedBusWithExplicitServiceMapTest.php @@ -290,7 +290,7 @@ public function test_it_does_not_publish_event_to_publishing_service_when_exclud ->withCommandMapping(targetServiceName: TestServiceName::TICKET_SERVICE, channelName: $ticketChannelName) ->withCommandMapping(targetServiceName: TestServiceName::USER_SERVICE, channelName: $userChannelName) ->withEventMapping(channelName: $ticketChannelName, subscriptionKeys: ['*']) - ->withEventMapping(channelName: $userChannelName, subscriptionKeys: ['*'], excludeEventsFromServices: [TestServiceName::USER_SERVICE]) + ->withEventMapping(channelName: $userChannelName, subscriptionKeys: ['*'], excludePublishingServices: [TestServiceName::USER_SERVICE]) ); $ticketService = $this->bootstrapEcotone(TestServiceName::TICKET_SERVICE, ['Test\Ecotone\Messaging\Fixture\Distributed\DistributedEventBus\ReceiverTicket'], [new \Test\Ecotone\Messaging\Fixture\Distributed\DistributedEventBus\ReceiverTicket\TicketServiceReceiver()], $distributedTicketQueue); @@ -851,4 +851,40 @@ public function test_cannot_use_with_event_mapping_after_legacy_with_service_map ->withServiceMapping('service1', 'channel1') ->withEventMapping('channel2', ['*']); } + + public function test_it_publishes_event_only_to_channel_when_source_service_is_in_include_list(): void + { + $distributedTicketQueue = SimpleMessageChannelBuilder::createQueueChannel($ticketChannelName = 'distributed_ticket_channel'); + $distributedUserQueue = SimpleMessageChannelBuilder::createQueueChannel($userChannelName = 'distributed_user_channel'); + $userService = $this->bootstrapEcotone( + TestServiceName::USER_SERVICE, + [], + [], + [$distributedTicketQueue, $distributedUserQueue], + DistributedServiceMap::initialize() + ->withCommandMapping(targetServiceName: TestServiceName::TICKET_SERVICE, channelName: $ticketChannelName) + ->withCommandMapping(targetServiceName: TestServiceName::USER_SERVICE, channelName: $userChannelName) + ->withEventMapping(channelName: $ticketChannelName, subscriptionKeys: ['*'], includePublishingServices: [TestServiceName::TICKET_SERVICE]) + ->withEventMapping(channelName: $userChannelName, subscriptionKeys: ['*']) + ); + $ticketService = $this->bootstrapEcotone(TestServiceName::TICKET_SERVICE, ['Test\Ecotone\Messaging\Fixture\Distributed\DistributedEventBus\ReceiverTicket'], [new \Test\Ecotone\Messaging\Fixture\Distributed\DistributedEventBus\ReceiverTicket\TicketServiceReceiver()], $distributedTicketQueue); + + $userService->getDistributedBus()->publishEvent( + 'userService.billing.DetailsWereChanged', + 'User changed billing address', + metadata: ['token' => '123'] + ); + + self::assertNull($ticketService->getMessageChannel($ticketChannelName)->receive()); + self::assertNotNull($userService->getMessageChannel($userChannelName)->receive()); + } + + public function test_cannot_use_both_exclude_and_include_publishing_services(): void + { + $this->expectException(ConfigurationException::class); + $this->expectExceptionMessage("Cannot use both 'excludePublishingServices' and 'includePublishingServices' in the same event mapping for channel 'channel1'. These parameters are mutually exclusive - use either exclude (blacklist) or include (whitelist), not both."); + + DistributedServiceMap::initialize() + ->withEventMapping('channel1', ['*'], excludePublishingServices: ['service1'], includePublishingServices: ['service2']); + } }