Skip to content

Commit 81e72b1

Browse files
committed
feat: distributed bus with mapping
1 parent 639601c commit 81e72b1

File tree

54 files changed

+1841
-119
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1841
-119
lines changed

packages/Amqp/src/Distribution/AmqpDistributionModule.php

Lines changed: 35 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
use Ecotone\Messaging\Handler\Transformer\TransformerBuilder;
2424
use Ecotone\Messaging\MessageHeaders;
2525
use Ecotone\Messaging\Support\Assert;
26-
use Ecotone\Modelling\Config\DistributedGatewayModule;
26+
use Ecotone\Modelling\Api\Distribution\DistributedBusHeader;
2727
use Ecotone\Modelling\DistributedBus;
28-
use Ecotone\Modelling\DistributionEntrypoint;
28+
use Ecotone\Modelling\MessageHandling\Distribution\Module\DistributedHandlerModule;
2929

3030
/**
3131
* licence Apache-2.0
@@ -48,8 +48,8 @@ public function __construct(array $distributedEventHandlers, array $distributedC
4848
public static function create(AnnotationFinder $annotationFinder, InterfaceToCallRegistry $interfaceToCallRegistry): self
4949
{
5050
return new self(
51-
DistributedGatewayModule::getDistributedEventHandlerRoutingKeys($annotationFinder, $interfaceToCallRegistry),
52-
DistributedGatewayModule::getDistributedCommandHandlerRoutingKeys($annotationFinder, $interfaceToCallRegistry)
51+
DistributedHandlerModule::getDistributedEventHandlerRoutingKeys($annotationFinder, $interfaceToCallRegistry),
52+
DistributedHandlerModule::getDistributedCommandHandlerRoutingKeys($annotationFinder, $interfaceToCallRegistry)
5353
);
5454
}
5555

@@ -113,7 +113,7 @@ public function prepare(Configuration $configuration, array $extensionObjects):
113113
$configuration->registerMessageChannel(AmqpBackedMessageChannelBuilder::create($channelName, $distributedBusConfiguration->getConnectionReference()));
114114
$configuration->registerMessageHandler(
115115
TransformerBuilder::createHeaderEnricher([
116-
MessageHeaders::ROUTING_SLIP => DistributionEntrypoint::DISTRIBUTED_CHANNEL,
116+
MessageHeaders::ROUTING_SLIP => DistributedBusHeader::DISTRIBUTED_ROUTING_SLIP_VALUE,
117117
])
118118
->withEndpointId($applicationConfiguration->getServiceName())
119119
->withInputChannelName($channelName)
@@ -142,9 +142,11 @@ private function registerPublisher(AmqpDistributedBusConfiguration|AmqpMessagePu
142142
GatewayPayloadBuilder::create('command'),
143143
GatewayHeadersBuilder::create('metadata'),
144144
GatewayHeaderBuilder::create('sourceMediaType', MessageHeaders::CONTENT_TYPE),
145-
GatewayHeaderBuilder::create('routingKey', DistributionEntrypoint::DISTRIBUTED_ROUTING_KEY),
146-
GatewayHeaderBuilder::create('destination', self::AMQP_ROUTING_KEY),
147-
GatewayHeaderValueBuilder::create(DistributionEntrypoint::DISTRIBUTED_PAYLOAD_TYPE, 'command'),
145+
GatewayHeaderBuilder::create('routingKey', DistributedBusHeader::DISTRIBUTED_ROUTING_KEY),
146+
GatewayHeaderBuilder::create('targetServiceName', DistributedBusHeader::DISTRIBUTED_TARGET_SERVICE_NAME),
147+
GatewayHeaderBuilder::create('targetServiceName', self::AMQP_ROUTING_KEY),
148+
GatewayHeaderValueBuilder::create(DistributedBusHeader::DISTRIBUTED_SOURCE_SERVICE_NAME, $applicationConfiguration->getServiceName()),
149+
GatewayHeaderValueBuilder::create(DistributedBusHeader::DISTRIBUTED_PAYLOAD_TYPE, 'command'),
148150
]
149151
)
150152
)
@@ -154,9 +156,11 @@ private function registerPublisher(AmqpDistributedBusConfiguration|AmqpMessagePu
154156
[
155157
GatewayPayloadBuilder::create('command'),
156158
GatewayHeadersBuilder::create('metadata'),
157-
GatewayHeaderBuilder::create('routingKey', DistributionEntrypoint::DISTRIBUTED_ROUTING_KEY),
158-
GatewayHeaderBuilder::create('destination', self::AMQP_ROUTING_KEY),
159-
GatewayHeaderValueBuilder::create(DistributionEntrypoint::DISTRIBUTED_PAYLOAD_TYPE, 'command'),
159+
GatewayHeaderBuilder::create('routingKey', DistributedBusHeader::DISTRIBUTED_ROUTING_KEY),
160+
GatewayHeaderBuilder::create('targetServiceName', DistributedBusHeader::DISTRIBUTED_TARGET_SERVICE_NAME),
161+
GatewayHeaderBuilder::create('targetServiceName', self::AMQP_ROUTING_KEY),
162+
GatewayHeaderValueBuilder::create(DistributedBusHeader::DISTRIBUTED_SOURCE_SERVICE_NAME, $applicationConfiguration->getServiceName()),
163+
GatewayHeaderValueBuilder::create(DistributedBusHeader::DISTRIBUTED_PAYLOAD_TYPE, 'command'),
160164
]
161165
)
162166
)
@@ -167,9 +171,10 @@ private function registerPublisher(AmqpDistributedBusConfiguration|AmqpMessagePu
167171
GatewayPayloadBuilder::create('event'),
168172
GatewayHeadersBuilder::create('metadata'),
169173
GatewayHeaderBuilder::create('sourceMediaType', MessageHeaders::CONTENT_TYPE),
170-
GatewayHeaderBuilder::create('routingKey', DistributionEntrypoint::DISTRIBUTED_ROUTING_KEY),
174+
GatewayHeaderBuilder::create('routingKey', DistributedBusHeader::DISTRIBUTED_ROUTING_KEY),
171175
GatewayHeaderBuilder::create('routingKey', self::AMQP_ROUTING_KEY),
172-
GatewayHeaderValueBuilder::create(DistributionEntrypoint::DISTRIBUTED_PAYLOAD_TYPE, 'event'),
176+
GatewayHeaderValueBuilder::create(DistributedBusHeader::DISTRIBUTED_SOURCE_SERVICE_NAME, $applicationConfiguration->getServiceName()),
177+
GatewayHeaderValueBuilder::create(DistributedBusHeader::DISTRIBUTED_PAYLOAD_TYPE, 'event'),
173178
]
174179
)
175180
)
@@ -179,9 +184,24 @@ private function registerPublisher(AmqpDistributedBusConfiguration|AmqpMessagePu
179184
[
180185
GatewayPayloadBuilder::create('event'),
181186
GatewayHeadersBuilder::create('metadata'),
182-
GatewayHeaderBuilder::create('routingKey', DistributionEntrypoint::DISTRIBUTED_ROUTING_KEY),
187+
GatewayHeaderBuilder::create('routingKey', DistributedBusHeader::DISTRIBUTED_ROUTING_KEY),
183188
GatewayHeaderBuilder::create('routingKey', self::AMQP_ROUTING_KEY),
184-
GatewayHeaderValueBuilder::create(DistributionEntrypoint::DISTRIBUTED_PAYLOAD_TYPE, 'event'),
189+
GatewayHeaderValueBuilder::create(DistributedBusHeader::DISTRIBUTED_SOURCE_SERVICE_NAME, $applicationConfiguration->getServiceName()),
190+
GatewayHeaderValueBuilder::create(DistributedBusHeader::DISTRIBUTED_PAYLOAD_TYPE, 'event'),
191+
]
192+
)
193+
)
194+
->registerGatewayBuilder(
195+
GatewayProxyBuilder::create($amqpPublisher->getReferenceName(), DistributedBus::class, 'sendMessage', $amqpPublisher->getReferenceName())
196+
->withParameterConverters(
197+
[
198+
GatewayPayloadBuilder::create('payload'),
199+
GatewayHeadersBuilder::create('metadata'),
200+
GatewayHeaderBuilder::create('sourceMediaType', MessageHeaders::CONTENT_TYPE),
201+
GatewayHeaderBuilder::create('targetChannelName', DistributedBusHeader::DISTRIBUTED_ROUTING_KEY),
202+
GatewayHeaderBuilder::create('targetServiceName', self::AMQP_ROUTING_KEY),
203+
GatewayHeaderValueBuilder::create(DistributedBusHeader::DISTRIBUTED_SOURCE_SERVICE_NAME, $applicationConfiguration->getServiceName()),
204+
GatewayHeaderValueBuilder::create(DistributedBusHeader::DISTRIBUTED_PAYLOAD_TYPE, 'message'),
185205
]
186206
)
187207
)
@@ -196,19 +216,6 @@ private function registerPublisher(AmqpDistributedBusConfiguration|AmqpMessagePu
196216
->withDefaultConversionMediaType($mediaType)
197217
->withStaticHeadersToEnrich([MessageHeaders::POLLED_CHANNEL_NAME => $channelName])
198218
)
199-
->registerGatewayBuilder(
200-
GatewayProxyBuilder::create($amqpPublisher->getReferenceName(), DistributedBus::class, 'sendMessage', $amqpPublisher->getReferenceName())
201-
->withParameterConverters(
202-
[
203-
GatewayPayloadBuilder::create('payload'),
204-
GatewayHeadersBuilder::create('metadata'),
205-
GatewayHeaderBuilder::create('sourceMediaType', MessageHeaders::CONTENT_TYPE),
206-
GatewayHeaderBuilder::create('targetChannelName', DistributionEntrypoint::DISTRIBUTED_ROUTING_KEY),
207-
GatewayHeaderBuilder::create('destination', self::AMQP_ROUTING_KEY),
208-
GatewayHeaderValueBuilder::create(DistributionEntrypoint::DISTRIBUTED_PAYLOAD_TYPE, 'message'),
209-
]
210-
)
211-
)
212219
->registerMessageChannel(SimpleMessageChannelBuilder::createDirectMessageChannel($amqpPublisher->getReferenceName()));
213220
}
214221
}

packages/Amqp/tests/Fixture/DistributedCommandBus/Receiver/TicketServiceReceiver.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class TicketServiceReceiver
1616
public const CREATE_TICKET_ENDPOINT = 'createTicket';
1717
public const CREATE_TICKET_WITH_EVENT_ENDPOINT = 'createTicketWithEvent';
1818
public const GET_TICKETS_COUNT = 'getTicketsCount';
19+
public const GET_TICKETS = 'getTickets';
1920

2021
private array $tickets = [];
2122

@@ -46,8 +47,14 @@ public function registerTicketWithEvent(string $ticket, EventBus $eventBus): voi
4647
}
4748

4849
#[QueryHandler(self::GET_TICKETS_COUNT)]
49-
public function getTickets(): int
50+
public function getTicketsCount(): int
5051
{
5152
return count($this->tickets);
5253
}
54+
55+
#[QueryHandler(self::GET_TICKETS)]
56+
public function getTickets(): array
57+
{
58+
return $this->tickets;
59+
}
5360
}

packages/Amqp/tests/Fixture/DistributedEventBus/Receiver/TicketServiceReceiver.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
class TicketServiceReceiver
1616
{
1717
public const GET_TICKETS_COUNT = 'getTicketsCount';
18+
public const GET_TICKETS = 'getTickets';
1819

1920
private array $tickets = [];
2021

@@ -34,8 +35,14 @@ public function registerTicket(
3435
}
3536

3637
#[QueryHandler(self::GET_TICKETS_COUNT)]
37-
public function getTickets(): int
38+
public function getTicketsCount(): int
3839
{
3940
return count($this->tickets);
4041
}
42+
43+
#[QueryHandler(self::GET_TICKETS)]
44+
public function getTickets(): array
45+
{
46+
return $this->tickets;
47+
}
4148
}

packages/Amqp/tests/Integration/DistributedCommandBusTest.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ public function test_distributing_command_to_another_service(): void
4141

4242
$ticketService->run('ticket_service', ExecutionPollingMetadata::createWithTestingSetup(maxExecutionTimeInMilliseconds: 500));
4343
self::assertEquals(1, $ticketService->sendQueryWithRouting(TicketServiceReceiver::GET_TICKETS_COUNT));
44+
self::assertEquals(
45+
['User changed billing address'],
46+
$ticketService->sendQueryWithRouting(TicketServiceReceiver::GET_TICKETS)
47+
);
4448
}
4549

4650
public function test_distributing_command_misses_heartbeat_and_reconnects(): void

packages/Amqp/tests/Integration/DistributedEventBusTest.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ public function test_distributing_event_to_another_service(): void
3737

3838
$ticketService->run('ticket_service');
3939
self::assertEquals(1, $ticketService->sendQueryWithRouting(TicketServiceReceiver::GET_TICKETS_COUNT));
40+
self::assertEquals(
41+
['ticket was created'],
42+
$ticketService->sendQueryWithRouting(TicketServiceReceiver::GET_TICKETS)
43+
);
4044
}
4145

4246
public function test_distributing_event_and_publish_async_private_event(): void

packages/Ecotone/src/Lite/Test/Configuration/EcotoneTestSupportModule.php

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
namespace Ecotone\Lite\Test\Configuration;
66

77
use Ecotone\AnnotationFinder\AnnotationFinder;
8+
use Ecotone\Lite\Test\DistributedConversionEndpoint;
89
use Ecotone\Lite\Test\MessagingTestSupport;
910
use Ecotone\Lite\Test\TestConfiguration;
1011
use Ecotone\Messaging\Attribute\InternalHandler;
@@ -19,9 +20,14 @@
1920
use Ecotone\Messaging\Config\Container\Reference;
2021
use Ecotone\Messaging\Config\ModulePackageList;
2122
use Ecotone\Messaging\Config\ModuleReferenceSearchService;
23+
use Ecotone\Messaging\Config\ServiceConfiguration;
24+
use Ecotone\Messaging\Conversion\MediaType;
25+
use Ecotone\Messaging\Handler\Bridge\BridgeBuilder;
2226
use Ecotone\Messaging\Handler\ChannelResolver;
2327
use Ecotone\Messaging\Handler\Gateway\GatewayProxyBuilder;
2428
use Ecotone\Messaging\Handler\Gateway\ParameterToMessageConverter\GatewayHeaderBuilder;
29+
use Ecotone\Messaging\Handler\Gateway\ParameterToMessageConverter\GatewayHeadersBuilder;
30+
use Ecotone\Messaging\Handler\Gateway\ParameterToMessageConverter\GatewayHeaderValueBuilder;
2531
use Ecotone\Messaging\Handler\Gateway\ParameterToMessageConverter\GatewayPayloadBuilder;
2632
use Ecotone\Messaging\Handler\InterfaceToCallRegistry;
2733
use Ecotone\Messaging\Handler\Processor\MethodInvoker\AroundInterceptorBuilder;
@@ -30,11 +36,14 @@
3036
use Ecotone\Messaging\Handler\Processor\MethodInvoker\Converter\ReferenceBuilder;
3137
use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInterceptorBuilder;
3238
use Ecotone\Messaging\Handler\ServiceActivator\ServiceActivatorBuilder;
39+
use Ecotone\Messaging\MessageHeaders;
3340
use Ecotone\Messaging\Precedence;
3441
use Ecotone\Modelling\Attribute\CommandHandler;
3542
use Ecotone\Modelling\Attribute\EventHandler;
3643
use Ecotone\Modelling\CommandBus;
44+
use Ecotone\Modelling\DistributedBus;
3745
use Ecotone\Modelling\EventBus;
46+
use Ecotone\Modelling\MessageHandling\Distribution\DistributionEntrypoint;
3847
use Ecotone\Modelling\QueryBus;
3948

4049
#[ModuleAnnotation]
@@ -177,7 +186,8 @@ public function canHandle($extensionObject): bool
177186
{
178187
return
179188
$extensionObject instanceof TestConfiguration
180-
|| ($extensionObject instanceof MessageChannelBuilder && $extensionObject->isPollable());
189+
|| ($extensionObject instanceof MessageChannelBuilder && $extensionObject->isPollable())
190+
|| $extensionObject instanceof ServiceConfiguration;
181191
}
182192

183193
public function getModulePackageName(): string

packages/Ecotone/src/Lite/Test/FlowTestSupport.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
use Ecotone\Modelling\CommandBus;
2626
use Ecotone\Modelling\Config\AggregrateModule;
2727
use Ecotone\Modelling\Config\MessageBusChannel;
28+
use Ecotone\Modelling\DistributedBus;
2829
use Ecotone\Modelling\Event;
2930
use Ecotone\Modelling\EventBus;
3031
use Ecotone\Modelling\QueryBus;
@@ -414,6 +415,11 @@ public function getGateway(string $referenceName): object
414415
return $this->configuredMessagingSystem->getGatewayByName($referenceName);
415416
}
416417

418+
public function getDistributedBus(string $referenceName = DistributedBus::class): DistributedBus
419+
{
420+
return $this->getGateway($referenceName);
421+
}
422+
417423
/**
418424
* @param array<string, mixed> $parameters
419425
*/

packages/Ecotone/src/Lite/Test/TestConfiguration.php

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,19 @@ final class TestConfiguration
1515
{
1616
/**
1717
* @param string[] $spiedChannelNames
18-
* @param RegisterAggregateRepositoryChannels[] $relatedAggregates
1918
*/
2019
private function __construct(
2120
private bool $failOnCommandHandlerNotFound,
2221
private bool $failOnQueryHandlerNotFound,
2322
private ?MediaType $pollableChannelMediaTypeConversion,
2423
private string $channelToConvertOn,
2524
private array $spiedChannelNames,
26-
private array $relatedAggregates
2725
) {
2826
}
2927

3028
public static function createWithDefaults(): self
3129
{
32-
return new self(true, true, null, '', [], []);
30+
return new self(true, true, null, '', []);
3331
}
3432

3533
public function withFailOnCommandHandlerNotFound(bool $shouldFail): self
@@ -95,12 +93,4 @@ public function getSpiedChannels(): array
9593
{
9694
return $this->spiedChannelNames;
9795
}
98-
99-
/**
100-
* @return RegisterAggregateRepositoryChannels[]
101-
*/
102-
public function getAggregatesAndSagasUnderTest(): array
103-
{
104-
return $this->relatedAggregates;
105-
}
10696
}

packages/Ecotone/src/Messaging/Channel/DynamicChannel/DynamicMessageChannelBuilder.php

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
namespace Ecotone\Messaging\Channel\DynamicChannel;
66

77
use Ecotone\Messaging\Channel\DynamicChannel\ReceivingStrategy\CustomReceivingStrategy;
8+
use Ecotone\Messaging\Channel\DynamicChannel\ReceivingStrategy\NoReceivingStrategy;
89
use Ecotone\Messaging\Channel\DynamicChannel\ReceivingStrategy\RoundRobinReceivingStrategy;
910
use Ecotone\Messaging\Channel\DynamicChannel\ReceivingStrategy\SkippingReceivingStrategy;
1011
use Ecotone\Messaging\Channel\DynamicChannel\SendingStrategy\CustomSendingStrategy;
1112
use Ecotone\Messaging\Channel\DynamicChannel\SendingStrategy\HeaderSendingStrategy;
13+
use Ecotone\Messaging\Channel\DynamicChannel\SendingStrategy\NoSendingStrategy;
1214
use Ecotone\Messaging\Channel\DynamicChannel\SendingStrategy\RoundRobinSendingStrategy;
1315
use Ecotone\Messaging\Channel\MessageChannelBuilder;
1416
use Ecotone\Messaging\Config\Container\Definition;
@@ -17,6 +19,7 @@
1719
use Ecotone\Messaging\Gateway\MessagingEntrypoint;
1820
use Ecotone\Messaging\Handler\ChannelResolver;
1921
use Ecotone\Messaging\Handler\Logger\LoggingGateway;
22+
use Ecotone\Messaging\MessageChannel;
2023
use Ecotone\Messaging\Support\Assert;
2124

2225
/**
@@ -55,6 +58,11 @@ public static function createRoundRobinWithDifferentChannels(
5558
);
5659
}
5760

61+
public function hasReceiveStrategy(): bool
62+
{
63+
return !($this->channelReceivingStrategy->getClassName() === NoReceivingStrategy::class);
64+
}
65+
5866
/**
5967
* Creates with default round robin strategy for sending and receiving
6068
*
@@ -72,6 +80,26 @@ public static function createRoundRobin(
7280
);
7381
}
7482

83+
public static function createNoStrategy(string $thisMessageChannelName): self
84+
{
85+
return new self(
86+
$thisMessageChannelName,
87+
new Definition(NoSendingStrategy::class, [$thisMessageChannelName]),
88+
new Definition(NoReceivingStrategy::class, [$thisMessageChannelName]),
89+
);
90+
}
91+
92+
public static function createWithSendOnlyStrategy(MessageChannelBuilder $targetMessageChannel): self
93+
{
94+
return (new self(
95+
$targetMessageChannel->getMessageChannelName(),
96+
new Definition(RoundRobinSendingStrategy::class, [[$targetMessageChannel->getMessageChannelName()]]),
97+
new Definition(NoReceivingStrategy::class, [$targetMessageChannel->getMessageChannelName()]),
98+
))->withInternalChannels([
99+
$targetMessageChannel->getMessageChannelName() => $targetMessageChannel,
100+
]);
101+
}
102+
75103
/**
76104
* @param string $headerName Name of the header that will be used to decide on channel name
77105
* @param string[] $headerMapping Mapping of header value to channel name. If null header value wil be taken as channel name

packages/Ecotone/src/Messaging/Channel/DynamicChannel/InternalChannelResolver.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Ecotone\Messaging\Handler\ChannelResolver;
88
use Ecotone\Messaging\MessageChannel;
99
use Ecotone\Messaging\PollableChannel;
10+
use Ecotone\Messaging\Support\Assert;
1011

1112
/**
1213
* licence Apache-2.0
@@ -26,11 +27,16 @@ public function resolve(MessageChannel|string $channelName): MessageChannel
2627
{
2728
foreach ($this->internalChannels as $internalChannel) {
2829
if ($internalChannel['name'] === $channelName) {
30+
Assert::isTrue($internalChannel['channel'] instanceof PollableChannel, "Dynamic Message Channels can only be used together with Pollable Channels. Internal channel {$channelName} is not pollable");
31+
2932
return $internalChannel['channel'];
3033
}
3134
}
3235

33-
return $this->channelResolver->resolve($channelName);
36+
$messageChannel = $this->channelResolver->resolve($channelName);
37+
Assert::isTrue($messageChannel instanceof PollableChannel, "Dynamic Message Channels can only be used together with Pollable Channels. Channel {$channelName} is not pollable");
38+
39+
return $messageChannel;
3440
}
3541

3642
public function hasChannelWithName(string $channelName): bool

0 commit comments

Comments
 (0)