diff --git a/packages/Amqp/src/AmqpQueue.php b/packages/Amqp/src/AmqpQueue.php index 30de560a0..6d4714a2a 100644 --- a/packages/Amqp/src/AmqpQueue.php +++ b/packages/Amqp/src/AmqpQueue.php @@ -150,4 +150,57 @@ public function withArgument(string $name, $value): self return $this; } + + /** + * Sets the maximum age of messages in the stream. + * Messages older than this will be removed by retention policy. + * Only applicable for stream queues. + * + * @param string $maxAge Duration string (e.g., '7D' for 7 days, '24h' for 24 hours, '30m' for 30 minutes, '60s' for 60 seconds) + * @return self + */ + public function withMaxAge(string $maxAge): self + { + Assert::isTrue($this->isStream, 'withMaxAge is only applicable for stream queues. Use createStreamQueue() to create a stream queue.'); + $this->enqueueQueue->setArgument('x-max-age', $maxAge); + + return $this; + } + + /** + * Sets the maximum size of the stream in bytes. + * When exceeded, oldest segments will be removed. + * Only applicable for stream queues. + * + * @param int $maxBytes Maximum size in bytes + * @return self + */ + public function withMaxLengthBytes(int $maxBytes): self + { + Assert::isTrue($this->isStream, 'withMaxLengthBytes is only applicable for stream queues. Use createStreamQueue() to create a stream queue.'); + $this->enqueueQueue->setArgument('x-max-length-bytes', $maxBytes); + + return $this; + } + + /** + * Sets the maximum size of stream segments in bytes. + * Smaller segments allow more granular retention but may impact performance. + * Only applicable for stream queues. + * + * @param int $segmentSize Segment size in bytes (default is 500MB in RabbitMQ) + * @return self + */ + public function withStreamMaxSegmentSizeBytes(int $segmentSize): self + { + Assert::isTrue($this->isStream, 'withStreamMaxSegmentSizeBytes is only applicable for stream queues. Use createStreamQueue() to create a stream queue.'); + $this->enqueueQueue->setArgument('x-stream-max-segment-size-bytes', $segmentSize); + + return $this; + } + + public function isStream(): bool + { + return $this->isStream; + } } diff --git a/packages/Amqp/src/AmqpStreamChannelBuilder.php b/packages/Amqp/src/AmqpStreamChannelBuilder.php index c79aad188..cbe4b22bc 100644 --- a/packages/Amqp/src/AmqpStreamChannelBuilder.php +++ b/packages/Amqp/src/AmqpStreamChannelBuilder.php @@ -14,6 +14,9 @@ class AmqpStreamChannelBuilder extends EnqueueMessageChannelBuilder { private string $channelName; private string $messageGroupId; + private ?string $maxAge = null; + private ?int $maxLengthBytes = null; + private ?int $streamMaxSegmentSizeBytes = null; private function __construct( string $channelName, @@ -98,6 +101,68 @@ public function withCommitInterval(int $commitInterval): self return $this; } + /** + * Sets the maximum age of messages in the stream. + * Messages older than this will be removed by retention policy. + * + * @param string $maxAge Duration string (e.g., '7D' for 7 days, '24h' for 24 hours, '30m' for 30 minutes, '60s' for 60 seconds) + * @return self + */ + public function withMaxAge(string $maxAge): self + { + $this->maxAge = $maxAge; + + return $this; + } + + /** + * Sets the maximum size of the stream in bytes. + * When exceeded, oldest segments will be removed. + * + * @param int $maxBytes Maximum size in bytes + * @return self + */ + public function withMaxLengthBytes(int $maxBytes): self + { + $this->maxLengthBytes = $maxBytes; + + return $this; + } + + /** + * Sets the maximum size of stream segments in bytes. + * Smaller segments allow more granular retention but may impact performance. + * + * @param int $segmentSize Segment size in bytes (default is 500MB in RabbitMQ) + * @return self + */ + public function withStreamMaxSegmentSizeBytes(int $segmentSize): self + { + $this->streamMaxSegmentSizeBytes = $segmentSize; + + return $this; + } + + /** + * Returns an AmqpQueue configured with the stream settings from this builder. + */ + public function getAmqpQueue(): AmqpQueue + { + $queue = AmqpQueue::createStreamQueue($this->queueName); + + if ($this->maxAge !== null) { + $queue->withMaxAge($this->maxAge); + } + if ($this->maxLengthBytes !== null) { + $queue->withMaxLengthBytes($this->maxLengthBytes); + } + if ($this->streamMaxSegmentSizeBytes !== null) { + $queue->withStreamMaxSegmentSizeBytes($this->streamMaxSegmentSizeBytes); + } + + return $queue; + } + public function getMessageChannelName(): string { return $this->channelName; @@ -120,6 +185,6 @@ public function isStreamingChannel(): bool public function __toString() { - return sprintf('AMQP Stream Channel - %s', $this->channelName); + return \sprintf('AMQP Stream Channel - %s', $this->channelName); } } diff --git a/packages/Amqp/src/Configuration/AmqpModule.php b/packages/Amqp/src/Configuration/AmqpModule.php index e2582c9bb..1eb0fd4c3 100644 --- a/packages/Amqp/src/Configuration/AmqpModule.php +++ b/packages/Amqp/src/Configuration/AmqpModule.php @@ -90,21 +90,33 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO } } + // First pass: collect explicitly defined AmqpQueue instances + $explicitQueueNames = []; foreach ($extensionObjects as $extensionObject) { - if ($extensionObject instanceof AmqpBackedMessageChannelBuilder) { - $amqpQueues[] = AmqpQueue::createWith($extensionObject->getQueueName()); - } elseif ($extensionObject instanceof AmqpStreamChannelBuilder) { - $hasAmqpStreamChannelBuilder = true; - $amqpQueues[] = AmqpQueue::createStreamQueue($extensionObject->queueName); + if ($extensionObject instanceof AmqpQueue) { + $amqpQueues[] = $extensionObject; + $explicitQueueNames[$extensionObject->getQueueName()] = true; } elseif ($extensionObject instanceof AmqpExchange) { $amqpExchanges[] = $extensionObject; - } elseif ($extensionObject instanceof AmqpQueue) { - $amqpQueues[] = $extensionObject; } elseif ($extensionObject instanceof AmqpBinding) { $amqpBindings[] = $extensionObject; } } + // Second pass: create queues from channel builders if not explicitly defined + foreach ($extensionObjects as $extensionObject) { + if ($extensionObject instanceof AmqpBackedMessageChannelBuilder) { + if (! isset($explicitQueueNames[$extensionObject->getQueueName()])) { + $amqpQueues[] = AmqpQueue::createWith($extensionObject->getQueueName()); + } + } elseif ($extensionObject instanceof AmqpStreamChannelBuilder) { + $hasAmqpStreamChannelBuilder = true; + if (! isset($explicitQueueNames[$extensionObject->queueName])) { + $amqpQueues[] = $extensionObject->getAmqpQueue(); + } + } + } + if ($hasAmqpStreamChannelBuilder && ! $messagingConfiguration->isRunningForEnterpriseLicence()) { throw LicensingException::create('AmqpStreamChannelBuilder is available only with Ecotone Enterprise licence.'); } diff --git a/packages/Amqp/tests/Integration/AmqpStreamChannelTest.php b/packages/Amqp/tests/Integration/AmqpStreamChannelTest.php index fdb37019d..1920d0396 100644 --- a/packages/Amqp/tests/Integration/AmqpStreamChannelTest.php +++ b/packages/Amqp/tests/Integration/AmqpStreamChannelTest.php @@ -1370,4 +1370,124 @@ public function getConsumed(): array $this->assertEquals(['event1', 'event2', 'event3'], $consumerService2->getQueryBus()->sendWithRouting('getConsumed2')); } + public function test_stream_queue_retention_config_is_applied_using_amqp_queue(): void + { + $channelName = 'orders'; + $queueName = 'stream_retention_amqp_' . Uuid::uuid4()->toString(); + + $ecotoneLite = $this->bootstrapForTesting( + [OrderService::class], + [ + new OrderService(), + ...$this->getConnectionFactoryReferences(), + ], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::AMQP_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withLicenceKey(LicenceTesting::VALID_LICENCE) + ->withExtensionObjects([ + AmqpQueue::createStreamQueue($queueName) + ->withMaxAge('7D') + ->withMaxLengthBytes(1073741824) + ->withStreamMaxSegmentSizeBytes(52428800), + AmqpStreamChannelBuilder::create( + channelName: $channelName, + startPosition: 'first', + amqpConnectionReferenceName: AmqpLibConnection::class, + queueName: $queueName, + ), + ]) + ); + + // Send a message to trigger queue creation + $ecotoneLite->getCommandBus()->sendWithRouting('order.register', 'milk'); + + // Verify queue arguments via RabbitMQ Management API + $queueInfo = $this->getQueueInfoFromManagementApi($queueName); + $this->assertNotNull($queueInfo, 'Queue should exist in RabbitMQ'); + $this->assertEquals('stream', $queueInfo['type'] ?? $queueInfo['arguments']['x-queue-type'] ?? null); + $this->assertEquals('7D', $queueInfo['arguments']['x-max-age'] ?? null); + $this->assertEquals(1073741824, $queueInfo['arguments']['x-max-length-bytes'] ?? null); + $this->assertEquals(52428800, $queueInfo['arguments']['x-stream-max-segment-size-bytes'] ?? null); + + // Verify the stream still works + $ecotoneLite->run($channelName, ExecutionPollingMetadata::createWithFinishWhenNoMessages()); + $orders = $ecotoneLite->getQueryBus()->sendWithRouting('order.getOrders'); + $this->assertCount(1, $orders); + $this->assertContains('milk', $orders); + } + + public function test_stream_queue_retention_config_is_applied_using_channel_builder(): void + { + $channelName = 'orders'; + $queueName = 'stream_retention_builder_' . Uuid::uuid4()->toString(); + + $ecotoneLite = $this->bootstrapForTesting( + [OrderService::class], + [ + new OrderService(), + ...$this->getConnectionFactoryReferences(), + ], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::AMQP_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withLicenceKey(LicenceTesting::VALID_LICENCE) + ->withExtensionObjects([ + AmqpStreamChannelBuilder::create( + channelName: $channelName, + startPosition: 'first', + amqpConnectionReferenceName: AmqpLibConnection::class, + queueName: $queueName, + ) + ->withMaxAge('14D') + ->withMaxLengthBytes(2147483648) + ->withStreamMaxSegmentSizeBytes(104857600), + ]) + ); + + // Send a message to trigger queue creation + $ecotoneLite->getCommandBus()->sendWithRouting('order.register', 'milk'); + + // Verify queue arguments via RabbitMQ Management API + $queueInfo = $this->getQueueInfoFromManagementApi($queueName); + $this->assertNotNull($queueInfo, 'Queue should exist in RabbitMQ'); + $this->assertEquals('stream', $queueInfo['type'] ?? $queueInfo['arguments']['x-queue-type'] ?? null); + $this->assertEquals('14D', $queueInfo['arguments']['x-max-age'] ?? null); + $this->assertEquals(2147483648, $queueInfo['arguments']['x-max-length-bytes'] ?? null); + $this->assertEquals(104857600, $queueInfo['arguments']['x-stream-max-segment-size-bytes'] ?? null); + + // Verify the stream still works + $ecotoneLite->run($channelName, ExecutionPollingMetadata::createWithFinishWhenNoMessages()); + $orders = $ecotoneLite->getQueryBus()->sendWithRouting('order.getOrders'); + $this->assertCount(1, $orders); + $this->assertContains('milk', $orders); + } + + /** + * Helper method to query RabbitMQ Management API for queue information. + * @return array|null + */ + private function getQueueInfoFromManagementApi(string $queueName): ?array + { + $rabbitHost = \getenv('RABBIT_HOST') ?: 'amqp://guest:guest@localhost:5672/%2f'; + $parsed = \parse_url($rabbitHost); + $host = $parsed['host'] ?? 'localhost'; + $user = $parsed['user'] ?? 'guest'; + $pass = $parsed['pass'] ?? 'guest'; + + $managementUrl = "http://{$host}:15672/api/queues/%2F/" . \urlencode($queueName); + + $context = \stream_context_create([ + 'http' => [ + 'header' => 'Authorization: Basic ' . \base64_encode("{$user}:{$pass}"), + 'timeout' => 5, + ], + ]); + + $response = @\file_get_contents($managementUrl, false, $context); + if ($response === false) { + return null; + } + + return \json_decode($response, true); + } + }