Skip to content

Commit f301a70

Browse files
authored
Feat/streaming channels for distributed services (#590)
* streaming test * feat: streaming channels for distributed services * fixes
1 parent 795dd41 commit f301a70

File tree

5 files changed

+1025
-31
lines changed

5 files changed

+1025
-31
lines changed

packages/Ecotone/src/Modelling/Api/Distribution/DistributedServiceMap.php

Lines changed: 137 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
namespace Ecotone\Modelling\Api\Distribution;
66

77
use Ecotone\Messaging\Attribute\Asynchronous;
8+
use Ecotone\Messaging\Config\ConfigurationException;
89
use Ecotone\Messaging\Config\Container\AttributeDefinition;
910
use Ecotone\Messaging\Config\Container\DefinedObject;
1011
use Ecotone\Messaging\Config\Container\Definition;
@@ -19,15 +20,17 @@
1920
final class DistributedServiceMap implements DefinedObject
2021
{
2122
/**
22-
* @param array<string, string> $serviceMapping
23-
* @param array<string, array<string>> $subscriptionRoutingKeys
23+
* @param array<string, string> $commandMapping - service name -> channel name (for command routing)
24+
* @param array<string, array{keys: ?array<string>, exclude: array<string>}> $eventSubscriptions - channel name -> ['keys' => [...] or null, 'exclude' => [...]]
2425
* @param array<object> $distributedBusAnnotations
26+
* @param bool|null $legacyMode - null = not set, true = legacy (withServiceMapping), false = new API (withCommandMapping/withEventMapping)
2527
*/
2628
public function __construct(
2729
private string $referenceName,
28-
private array $serviceMapping = [],
29-
private ?array $subscriptionRoutingKeys = null,
30+
private array $commandMapping = [],
31+
private array $eventSubscriptions = [],
3032
private array $distributedBusAnnotations = [],
33+
private ?bool $legacyMode = null,
3134
) {
3235
Assert::allObjects($this->distributedBusAnnotations, 'Annotations passed to DistributedServiceMap, must all be objects');
3336
}
@@ -38,13 +41,56 @@ public static function initialize(string $referenceName = DistributedBus::class)
3841
}
3942

4043
/**
44+
* @deprecated Use withCommandMapping() and withEventMapping() instead
4145
* @param array|null $subscriptionRoutingKeys If null subscribing to all events, if empty array to none, if non empty array then keys will be used to match the name
4246
*/
4347
public function withServiceMapping(string $serviceName, string $channelName, ?array $subscriptionRoutingKeys = null): self
4448
{
4549
$self = clone $this;
46-
$self->serviceMapping[$serviceName] = $channelName;
47-
$self->subscriptionRoutingKeys[$serviceName] = $subscriptionRoutingKeys;
50+
$self->assertNotInNewMode('withServiceMapping');
51+
$self->legacyMode = true;
52+
53+
$self->commandMapping[$serviceName] = $channelName;
54+
$self->eventSubscriptions[$channelName] = [
55+
'keys' => $subscriptionRoutingKeys,
56+
'exclude' => [],
57+
];
58+
59+
return $self;
60+
}
61+
62+
/**
63+
* Maps a service to a channel for command routing only.
64+
* Does NOT create any event subscription.
65+
*/
66+
public function withCommandMapping(string $targetServiceName, string $channelName): self
67+
{
68+
$self = clone $this;
69+
$self->assertNotInLegacyMode('withCommandMapping');
70+
$self->legacyMode = false;
71+
72+
$self->commandMapping[$targetServiceName] = $channelName;
73+
74+
return $self;
75+
}
76+
77+
/**
78+
* Creates an event subscription for a channel with explicit subscription keys.
79+
*
80+
* @param string $channelName Target channel to send events to
81+
* @param array<string> $subscriptionKeys Routing key patterns to match
82+
* @param array<string> $excludeEventsFromServices Service names whose events should NOT be sent to this channel
83+
*/
84+
public function withEventMapping(string $channelName, array $subscriptionKeys, array $excludeEventsFromServices = []): self
85+
{
86+
$self = clone $this;
87+
$self->assertNotInLegacyMode('withEventMapping');
88+
$self->legacyMode = false;
89+
90+
$self->eventSubscriptions[$channelName] = [
91+
'keys' => $subscriptionKeys,
92+
'exclude' => $excludeEventsFromServices,
93+
];
4894

4995
return $self;
5096
}
@@ -60,43 +106,86 @@ public function withAsynchronousChannel(string $channelName): self
60106
/**
61107
* @return array<string, string>
62108
*/
63-
public function getServiceMapping(): array
109+
public function getCommandMapping(): array
64110
{
65-
return $this->serviceMapping;
111+
return $this->commandMapping;
66112
}
67113

114+
/**
115+
* LEGACY MODE ONLY - Get all channels except the one belonging to the given service.
116+
* Uses service name to channel mapping for exclusion.
117+
*
118+
* @deprecated For new mode, use getAllSubscriptionChannels() instead
119+
*/
68120
public function getAllChannelNamesBesides(string $serviceName, string $routingKey): array
69121
{
70122
$filteredChannels = [];
123+
$excludeChannel = $this->commandMapping[$serviceName] ?? null;
124+
125+
foreach ($this->eventSubscriptions as $channel => $config) {
126+
if ($channel === $excludeChannel) {
127+
continue;
128+
}
129+
130+
$keys = $config['keys'];
131+
132+
if ($keys === null) {
133+
$filteredChannels[] = $channel;
134+
135+
continue;
136+
}
71137

72-
foreach ($this->serviceMapping as $service => $channel) {
73-
if ($service !== $serviceName) {
74-
if ($this->subscriptionRoutingKeys[$service] === null) {
138+
foreach ($keys as $subscriptionEventFilter) {
139+
if (BusRoutingMap::globMatch($subscriptionEventFilter, $routingKey)) {
75140
$filteredChannels[] = $channel;
76141

77-
continue;
142+
break;
78143
}
144+
}
145+
}
146+
147+
return $filteredChannels;
148+
}
149+
150+
/**
151+
* NEW MODE ONLY - Get all subscription channels for an event.
152+
* Uses explicit exclude list from eventSubscriptions config.
153+
*
154+
* @param string $sourceServiceName The service publishing the event
155+
* @param string $routingKey The event routing key
156+
* @return array<string>
157+
*/
158+
public function getAllSubscriptionChannels(string $sourceServiceName, string $routingKey): array
159+
{
160+
$filteredChannels = [];
161+
162+
foreach ($this->eventSubscriptions as $channel => $config) {
163+
$keys = $config['keys'];
164+
$exclude = $config['exclude'];
165+
166+
if (\in_array($sourceServiceName, $exclude, true)) {
167+
continue;
168+
}
79169

80-
foreach ($this->subscriptionRoutingKeys[$service] as $subscriptionEventFilter) {
81-
if (BusRoutingMap::globMatch($subscriptionEventFilter, $routingKey)) {
82-
$filteredChannels[] = $channel;
170+
foreach ($keys as $subscriptionEventFilter) {
171+
if (BusRoutingMap::globMatch($subscriptionEventFilter, $routingKey)) {
172+
$filteredChannels[] = $channel;
83173

84-
break;
85-
}
174+
break;
86175
}
87176
}
88177
}
89178

90-
return $filteredChannels;
179+
return array_unique($filteredChannels);
91180
}
92181

93182
public function getChannelNameFor(string $serviceName): string
94183
{
95-
if (! array_key_exists($serviceName, $this->serviceMapping)) {
184+
if (! \array_key_exists($serviceName, $this->commandMapping)) {
96185
throw new UnknownDistributedDestination("Service {$serviceName} is not registered in distributed service map");
97186
}
98187

99-
return $this->serviceMapping[$serviceName];
188+
return $this->commandMapping[$serviceName];
100189
}
101190

102191
public function getReferenceName(): string
@@ -112,16 +201,42 @@ public function getDistributedBusAnnotations(): array
112201
return $this->distributedBusAnnotations;
113202
}
114203

204+
public function isLegacyMode(): bool
205+
{
206+
return $this->legacyMode === true;
207+
}
208+
115209
public function getDefinition(): Definition
116210
{
117211
return Definition::createFor(
118212
self::class,
119213
[
120214
$this->referenceName,
121-
$this->serviceMapping,
122-
$this->subscriptionRoutingKeys,
215+
$this->commandMapping,
216+
$this->eventSubscriptions,
123217
$this->distributedBusAnnotations,
218+
$this->legacyMode,
124219
]
125220
);
126221
}
222+
223+
private function assertNotInLegacyMode(string $methodName): void
224+
{
225+
if ($this->legacyMode === true) {
226+
throw ConfigurationException::create(
227+
"Cannot use {$methodName}() after withServiceMapping(). " .
228+
'Use either legacy API (withServiceMapping) or new API (withCommandMapping/withEventMapping), not both.'
229+
);
230+
}
231+
}
232+
233+
private function assertNotInNewMode(string $methodName): void
234+
{
235+
if ($this->legacyMode === false) {
236+
throw ConfigurationException::create(
237+
"Cannot use {$methodName}() after withCommandMapping() or withEventMapping(). " .
238+
'Use either legacy API (withServiceMapping) or new API (withCommandMapping/withEventMapping), not both.'
239+
);
240+
}
241+
}
127242
}

packages/Ecotone/src/Modelling/MessageHandling/Distribution/DistributedOutboundRouter.php

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,13 @@ public function route(
3131
$routingKey,
3232
): array {
3333
if ($payloadType === 'event') {
34-
return $this->distributedServiceMap->getAllChannelNamesBesides($this->thisServiceName, $routingKey);
35-
} elseif (in_array($payloadType, ['command', 'message'])) {
36-
Assert::isTrue($targetedServiceName !== null, sprintf('
34+
if ($this->distributedServiceMap->isLegacyMode()) {
35+
return $this->distributedServiceMap->getAllChannelNamesBesides($this->thisServiceName, $routingKey);
36+
} else {
37+
return $this->distributedServiceMap->getAllSubscriptionChannels($this->thisServiceName, $routingKey);
38+
}
39+
} elseif (\in_array($payloadType, ['command', 'message'])) {
40+
Assert::isTrue($targetedServiceName !== null, \sprintf('
3741
Cannot send commands to shared channel - `%s`. Commands follow point-to-point semantics, and shared channels are reserved for events only.
3842
Change your channel to standard pollable channel.
3943
', $targetedServiceName));

packages/Ecotone/src/Modelling/MessageHandling/Distribution/Module/DistributedBusWithServiceMapModule.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
5656
throw LicensingException::create('Distributed Bus with Service Map is available only as part of Ecotone Enterprise.');
5757
}
5858

59-
foreach ($distributedServiceMap->getServiceMapping() as $serviceName => $channelName) {
59+
foreach ($distributedServiceMap->getCommandMapping() as $serviceName => $channelName) {
6060
if (! $this->isMessageChannelAvailable($messageChannels, $channelName)) {
6161
throw ConfigurationException::create("Service Map has Service {$serviceName} mapped to channel {$channelName} but it is not available in message channels. Have you forgot to register it?");
6262
}

0 commit comments

Comments
 (0)