diff --git a/packages/Amqp/src/Distribution/AmqpDistributedBusConfiguration.php b/packages/Amqp/src/Distribution/AmqpDistributedBusConfiguration.php index 6505fbabf..dc8c98f1c 100644 --- a/packages/Amqp/src/Distribution/AmqpDistributedBusConfiguration.php +++ b/packages/Amqp/src/Distribution/AmqpDistributedBusConfiguration.php @@ -25,6 +25,7 @@ class AmqpDistributedBusConfiguration private string $referenceName; private string $headerMapper = '*'; private bool $defaultPersistentDelivery = true; + private bool $autoDeclare = true; private string $distributionType; private function __construct(string $amqpConnectionReference, ?string $outputDefaultConversionMediaType, string $referenceName, string $distributionType) @@ -97,6 +98,22 @@ public function getDefaultPersistentDelivery(): bool return $this->defaultPersistentDelivery; } + /** + * Whether to automatically declare the exchange and queues on send. + * When set to false, the exchange and queues must be declared manually. + */ + public function withAutoDeclare(bool $autoDeclare): static + { + $this->autoDeclare = $autoDeclare; + + return $this; + } + + public function isAutoDeclare(): bool + { + return $this->autoDeclare; + } + /** * @return string */ diff --git a/packages/Amqp/src/Distribution/AmqpDistributionModule.php b/packages/Amqp/src/Distribution/AmqpDistributionModule.php index d070c713f..df2d3d4e8 100644 --- a/packages/Amqp/src/Distribution/AmqpDistributionModule.php +++ b/packages/Amqp/src/Distribution/AmqpDistributionModule.php @@ -104,7 +104,11 @@ public function prepare(Configuration $configuration, array $extensionObjects): Assert::isFalse($applicationConfiguration->getServiceName() === ServiceConfiguration::DEFAULT_SERVICE_NAME, "Service name can't be default when using distribution. Set up correct Service Name"); $channelName = self::CHANNEL_PREFIX . $applicationConfiguration->getServiceName(); - $configuration->registerMessageChannel(AmqpBackedMessageChannelBuilder::create($channelName, $distributedBusConfiguration->getConnectionReference())); + $amqpChannel = AmqpBackedMessageChannelBuilder::create($channelName, $distributedBusConfiguration->getConnectionReference()); + if (! $distributedBusConfiguration->isAutoDeclare()) { + $amqpChannel = $amqpChannel->withAutoDeclare(false); + } + $configuration->registerMessageChannel($amqpChannel); $configuration->registerMessageHandler( TransformerBuilder::createHeaderEnricher([ MessageHeaders::ROUTING_SLIP => DistributedBusHeader::DISTRIBUTED_ROUTING_SLIP_VALUE, @@ -205,7 +209,7 @@ private function registerPublisher(AmqpDistributedBusConfiguration|AmqpMessagePu ->withEndpointId($amqpPublisher->getReferenceName() . '.handler') ->withInputChannelName($amqpPublisher->getReferenceName()) ->withDefaultPersistentMode($amqpPublisher->getDefaultPersistentDelivery()) - ->withAutoDeclareOnSend(true) + ->withAutoDeclareOnSend($amqpPublisher->isAutoDeclare()) ->withHeaderMapper($amqpPublisher->getHeaderMapper()) ->withRoutingKeyFromHeader(self::AMQP_ROUTING_KEY) ->withDefaultConversionMediaType($mediaType) diff --git a/packages/Amqp/tests/Integration/DistributedCommandBusTest.php b/packages/Amqp/tests/Integration/DistributedCommandBusTest.php index 3e4fe83e7..7cee438e1 100644 --- a/packages/Amqp/tests/Integration/DistributedCommandBusTest.php +++ b/packages/Amqp/tests/Integration/DistributedCommandBusTest.php @@ -4,7 +4,10 @@ namespace Test\Ecotone\Amqp\Integration; +use Ecotone\Amqp\Distribution\AmqpDistributedBusConfiguration; +use Ecotone\Amqp\Distribution\AmqpDistributionModule; use Ecotone\Lite\Test\FlowTestSupport; +use Ecotone\Messaging\Attribute\ServiceContext; use Ecotone\Messaging\Config\ModulePackageList; use Ecotone\Messaging\Config\ServiceConfiguration; use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata; @@ -93,6 +96,51 @@ public function test_distributing_command_misses_heartbeat_and_reconnects(): voi self::assertGreaterThanOrEqual(3, $ticketService->sendQueryWithRouting(TicketNotificationEventHandler::GET_TICKETS_NOTIFICATION_COUNT)); } + public function test_sending_fails_when_auto_declare_disabled_and_exchange_not_declared(): void + { + // Delete the distributed exchange to ensure it doesn't exist + $context = $this->getCachedConnectionFactory()->createContext(); + try { + $context->deleteTopic($context->createTopic(AmqpDistributionModule::AMQP_DISTRIBUTED_EXCHANGE)); + } catch (\Exception) { + // Exchange may not exist + } + + $publisherConfiguration = new class { + #[ServiceContext] + public function registerPublisher(): AmqpDistributedBusConfiguration + { + return AmqpDistributedBusConfiguration::createPublisher() + ->withAutoDeclare(false); + } + }; + + $userService = $this->bootstrapFlowTesting( + classesToResolve: [UserService::class, $publisherConfiguration::class], + containerOrAvailableServices: array_merge( + $this->getConnectionFactoryReferences(), + [new UserService(), $publisherConfiguration] + ), + configuration: ServiceConfiguration::createWithDefaults() + ->withServiceName('user_service') + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ + ModulePackageList::ASYNCHRONOUS_PACKAGE, + ModulePackageList::AMQP_PACKAGE, + ])), + pathToRootCatalog: __DIR__ . '/../../', + ); + + $this->expectException(\Exception::class); + + /** @var DistributedBus $distributedBus */ + $distributedBus = $userService->getGateway(DistributedBus::class); + $distributedBus->sendCommand( + TicketServiceMessagingConfiguration::SERVICE_NAME, + TicketServiceReceiver::CREATE_TICKET_ENDPOINT, + 'test payload', + ); + } + private function bootstrapEcotone(string $serviceName, array $namespaces, array $services, array $amqpConfig = []): FlowTestSupport { return $this->bootstrapFlowTesting(