diff --git a/packages/Amqp/src/Configuration/AmqpModule.php b/packages/Amqp/src/Configuration/AmqpModule.php index e2582c9bb..e8e69e2fc 100644 --- a/packages/Amqp/src/Configuration/AmqpModule.php +++ b/packages/Amqp/src/Configuration/AmqpModule.php @@ -90,21 +90,33 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO } } + // First, collect all explicit AmqpQueue configurations + $explicitQueueNames = []; foreach ($extensionObjects as $extensionObject) { - if ($extensionObject instanceof AmqpBackedMessageChannelBuilder) { - $amqpQueues[] = AmqpQueue::createWith($extensionObject->getQueueName()); - } elseif ($extensionObject instanceof AmqpStreamChannelBuilder) { - $hasAmqpStreamChannelBuilder = true; - $amqpQueues[] = AmqpQueue::createStreamQueue($extensionObject->queueName); + if ($extensionObject instanceof AmqpQueue) { + $amqpQueues[] = $extensionObject; + $explicitQueueNames[$extensionObject->getQueueName()] = true; } elseif ($extensionObject instanceof AmqpExchange) { $amqpExchanges[] = $extensionObject; - } elseif ($extensionObject instanceof AmqpQueue) { - $amqpQueues[] = $extensionObject; } elseif ($extensionObject instanceof AmqpBinding) { $amqpBindings[] = $extensionObject; } } + // Then, process channel builders and only auto-create queues if not explicitly configured + foreach ($extensionObjects as $extensionObject) { + if ($extensionObject instanceof AmqpBackedMessageChannelBuilder) { + if (! isset($explicitQueueNames[$extensionObject->getQueueName()])) { + $amqpQueues[] = AmqpQueue::createWith($extensionObject->getQueueName()); + } + } elseif ($extensionObject instanceof AmqpStreamChannelBuilder) { + $hasAmqpStreamChannelBuilder = true; + if (! isset($explicitQueueNames[$extensionObject->queueName])) { + $amqpQueues[] = AmqpQueue::createStreamQueue($extensionObject->queueName); + } + } + } + if ($hasAmqpStreamChannelBuilder && ! $messagingConfiguration->isRunningForEnterpriseLicence()) { throw LicensingException::create('AmqpStreamChannelBuilder is available only with Ecotone Enterprise licence.'); } diff --git a/packages/Amqp/tests/Integration/AmqpStreamChannelTest.php b/packages/Amqp/tests/Integration/AmqpStreamChannelTest.php index fdb37019d..e664558cf 100644 --- a/packages/Amqp/tests/Integration/AmqpStreamChannelTest.php +++ b/packages/Amqp/tests/Integration/AmqpStreamChannelTest.php @@ -1370,4 +1370,108 @@ public function getConsumed(): array $this->assertEquals(['event1', 'event2', 'event3'], $consumerService2->getQueryBus()->sendWithRouting('getConsumed2')); } + public function test_stream_queue_retention_config_is_applied_using_amqp_queue(): void + { + $channelName = 'orders'; + $queueName = 'stream_retention_amqp_' . Uuid::uuid4()->toString(); + + $ecotoneLite = $this->bootstrapForTesting( + [OrderService::class], + [ + new OrderService(), + ...$this->getConnectionFactoryReferences(), + ], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::AMQP_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withLicenceKey(LicenceTesting::VALID_LICENCE) + ->withExtensionObjects([ + AmqpQueue::createStreamQueue($queueName) + ->withArgument('x-max-age', '7D') + ->withArgument('x-max-length-bytes', 1073741824) + ->withArgument('x-stream-max-segment-size-bytes', 52428800), + AmqpStreamChannelBuilder::create( + channelName: $channelName, + startPosition: 'first', + amqpConnectionReferenceName: AmqpLibConnection::class, + queueName: $queueName, + ), + ]) + ); + + // Send a message to trigger queue creation + $ecotoneLite->getCommandBus()->sendWithRouting('order.register', 'milk'); + + // Verify the stream works correctly with retention config + $ecotoneLite->run($channelName, ExecutionPollingMetadata::createWithFinishWhenNoMessages()); + $orders = $ecotoneLite->getQueryBus()->sendWithRouting('order.getOrders'); + $this->assertCount(1, $orders); + $this->assertContains('milk', $orders); + + // Verify queue was created with correct arguments by re-declaring with same args (should not fail) + /** @var \Interop\Amqp\AmqpContext $context */ + $context = self::getRabbitConnectionFactory()->createContext(); + /** @var \Interop\Amqp\AmqpQueue $queue */ + $queue = $context->createQueue($queueName); + $queue->addFlag(\Interop\Amqp\AmqpQueue::FLAG_DURABLE); + $queue->setArgument('x-queue-type', 'stream'); + $queue->setArgument('x-max-age', '7D'); + $queue->setArgument('x-max-length-bytes', 1073741824); + $queue->setArgument('x-stream-max-segment-size-bytes', 52428800); + + // This would throw an exception if the queue exists with different arguments + $context->declareQueue($queue); + } + + public function test_stream_queue_retention_config_is_applied_using_queue_argument(): void + { + $channelName = 'orders'; + $queueName = 'stream_retention_arg_' . Uuid::uuid4()->toString(); + + $ecotoneLite = $this->bootstrapForTesting( + [OrderService::class], + [ + new OrderService(), + ...$this->getConnectionFactoryReferences(), + ], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::AMQP_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withLicenceKey(LicenceTesting::VALID_LICENCE) + ->withExtensionObjects([ + AmqpQueue::createStreamQueue($queueName) + ->withArgument('x-max-age', '14D') + ->withArgument('x-max-length-bytes', 2147483648) + ->withArgument('x-stream-max-segment-size-bytes', 104857600), + AmqpStreamChannelBuilder::create( + channelName: $channelName, + startPosition: 'first', + amqpConnectionReferenceName: AmqpLibConnection::class, + queueName: $queueName, + ), + ]) + ); + + // Send a message to trigger queue creation + $ecotoneLite->getCommandBus()->sendWithRouting('order.register', 'milk'); + + // Verify the stream works correctly with retention config + $ecotoneLite->run($channelName, ExecutionPollingMetadata::createWithFinishWhenNoMessages()); + $orders = $ecotoneLite->getQueryBus()->sendWithRouting('order.getOrders'); + $this->assertCount(1, $orders); + $this->assertContains('milk', $orders); + + // Verify queue was created with correct arguments by re-declaring with same args (should not fail) + /** @var \Interop\Amqp\AmqpContext $context */ + $context = self::getRabbitConnectionFactory()->createContext(); + /** @var \Interop\Amqp\AmqpQueue $queue */ + $queue = $context->createQueue($queueName); + $queue->addFlag(\Interop\Amqp\AmqpQueue::FLAG_DURABLE); + $queue->setArgument('x-queue-type', 'stream'); + $queue->setArgument('x-max-age', '14D'); + $queue->setArgument('x-max-length-bytes', 2147483648); + $queue->setArgument('x-stream-max-segment-size-bytes', 104857600); + + // This would throw an exception if the queue exists with different arguments + $context->declareQueue($queue); + } + }