Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions packages/Amqp/src/Configuration/AmqpModule.php
Original file line number Diff line number Diff line change
Expand Up @@ -90,21 +90,33 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
}
}

// First, collect all explicit AmqpQueue configurations
$explicitQueueNames = [];
foreach ($extensionObjects as $extensionObject) {
if ($extensionObject instanceof AmqpBackedMessageChannelBuilder) {
$amqpQueues[] = AmqpQueue::createWith($extensionObject->getQueueName());
} elseif ($extensionObject instanceof AmqpStreamChannelBuilder) {
$hasAmqpStreamChannelBuilder = true;
$amqpQueues[] = AmqpQueue::createStreamQueue($extensionObject->queueName);
if ($extensionObject instanceof AmqpQueue) {
$amqpQueues[] = $extensionObject;
$explicitQueueNames[$extensionObject->getQueueName()] = true;
} elseif ($extensionObject instanceof AmqpExchange) {
$amqpExchanges[] = $extensionObject;
} elseif ($extensionObject instanceof AmqpQueue) {
$amqpQueues[] = $extensionObject;
} elseif ($extensionObject instanceof AmqpBinding) {
$amqpBindings[] = $extensionObject;
}
}

// Then, process channel builders and only auto-create queues if not explicitly configured
foreach ($extensionObjects as $extensionObject) {
if ($extensionObject instanceof AmqpBackedMessageChannelBuilder) {
if (! isset($explicitQueueNames[$extensionObject->getQueueName()])) {
$amqpQueues[] = AmqpQueue::createWith($extensionObject->getQueueName());
}
} elseif ($extensionObject instanceof AmqpStreamChannelBuilder) {
$hasAmqpStreamChannelBuilder = true;
if (! isset($explicitQueueNames[$extensionObject->queueName])) {
$amqpQueues[] = AmqpQueue::createStreamQueue($extensionObject->queueName);
}
}
}

if ($hasAmqpStreamChannelBuilder && ! $messagingConfiguration->isRunningForEnterpriseLicence()) {
throw LicensingException::create('AmqpStreamChannelBuilder is available only with Ecotone Enterprise licence.');
}
Expand Down
104 changes: 104 additions & 0 deletions packages/Amqp/tests/Integration/AmqpStreamChannelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -1370,4 +1370,108 @@ public function getConsumed(): array
$this->assertEquals(['event1', 'event2', 'event3'], $consumerService2->getQueryBus()->sendWithRouting('getConsumed2'));
}

public function test_stream_queue_retention_config_is_applied_using_amqp_queue(): void
{
$channelName = 'orders';
$queueName = 'stream_retention_amqp_' . Uuid::uuid4()->toString();

$ecotoneLite = $this->bootstrapForTesting(
[OrderService::class],
[
new OrderService(),
...$this->getConnectionFactoryReferences(),
],
ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::AMQP_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE]))
->withLicenceKey(LicenceTesting::VALID_LICENCE)
->withExtensionObjects([
AmqpQueue::createStreamQueue($queueName)
->withArgument('x-max-age', '7D')
->withArgument('x-max-length-bytes', 1073741824)
->withArgument('x-stream-max-segment-size-bytes', 52428800),
AmqpStreamChannelBuilder::create(
channelName: $channelName,
startPosition: 'first',
amqpConnectionReferenceName: AmqpLibConnection::class,
queueName: $queueName,
),
])
);

// Send a message to trigger queue creation
$ecotoneLite->getCommandBus()->sendWithRouting('order.register', 'milk');

// Verify the stream works correctly with retention config
$ecotoneLite->run($channelName, ExecutionPollingMetadata::createWithFinishWhenNoMessages());
$orders = $ecotoneLite->getQueryBus()->sendWithRouting('order.getOrders');
$this->assertCount(1, $orders);
$this->assertContains('milk', $orders);

// Verify queue was created with correct arguments by re-declaring with same args (should not fail)
/** @var \Interop\Amqp\AmqpContext $context */
$context = self::getRabbitConnectionFactory()->createContext();
/** @var \Interop\Amqp\AmqpQueue $queue */
$queue = $context->createQueue($queueName);
$queue->addFlag(\Interop\Amqp\AmqpQueue::FLAG_DURABLE);
$queue->setArgument('x-queue-type', 'stream');
$queue->setArgument('x-max-age', '7D');
$queue->setArgument('x-max-length-bytes', 1073741824);
$queue->setArgument('x-stream-max-segment-size-bytes', 52428800);

// This would throw an exception if the queue exists with different arguments
$context->declareQueue($queue);
}

public function test_stream_queue_retention_config_is_applied_using_queue_argument(): void
{
$channelName = 'orders';
$queueName = 'stream_retention_arg_' . Uuid::uuid4()->toString();

$ecotoneLite = $this->bootstrapForTesting(
[OrderService::class],
[
new OrderService(),
...$this->getConnectionFactoryReferences(),
],
ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::AMQP_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE]))
->withLicenceKey(LicenceTesting::VALID_LICENCE)
->withExtensionObjects([
AmqpQueue::createStreamQueue($queueName)
->withArgument('x-max-age', '14D')
->withArgument('x-max-length-bytes', 2147483648)
->withArgument('x-stream-max-segment-size-bytes', 104857600),
AmqpStreamChannelBuilder::create(
channelName: $channelName,
startPosition: 'first',
amqpConnectionReferenceName: AmqpLibConnection::class,
queueName: $queueName,
),
])
);

// Send a message to trigger queue creation
$ecotoneLite->getCommandBus()->sendWithRouting('order.register', 'milk');

// Verify the stream works correctly with retention config
$ecotoneLite->run($channelName, ExecutionPollingMetadata::createWithFinishWhenNoMessages());
$orders = $ecotoneLite->getQueryBus()->sendWithRouting('order.getOrders');
$this->assertCount(1, $orders);
$this->assertContains('milk', $orders);

// Verify queue was created with correct arguments by re-declaring with same args (should not fail)
/** @var \Interop\Amqp\AmqpContext $context */
$context = self::getRabbitConnectionFactory()->createContext();
/** @var \Interop\Amqp\AmqpQueue $queue */
$queue = $context->createQueue($queueName);
$queue->addFlag(\Interop\Amqp\AmqpQueue::FLAG_DURABLE);
$queue->setArgument('x-queue-type', 'stream');
$queue->setArgument('x-max-age', '14D');
$queue->setArgument('x-max-length-bytes', 2147483648);
$queue->setArgument('x-stream-max-segment-size-bytes', 104857600);

// This would throw an exception if the queue exists with different arguments
$context->declareQueue($queue);
}

}