Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions packages/Amqp/src/AmqpQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,57 @@ public function withArgument(string $name, $value): self

return $this;
}

/**
* Sets the maximum age of messages in the stream.
* Messages older than this will be removed by retention policy.
* Only applicable for stream queues.
*
* @param string $maxAge Duration string (e.g., '7D' for 7 days, '24h' for 24 hours, '30m' for 30 minutes, '60s' for 60 seconds)
* @return self
*/
public function withMaxAge(string $maxAge): self
{
Assert::isTrue($this->isStream, 'withMaxAge is only applicable for stream queues. Use createStreamQueue() to create a stream queue.');
$this->enqueueQueue->setArgument('x-max-age', $maxAge);

return $this;
}

/**
* Sets the maximum size of the stream in bytes.
* When exceeded, oldest segments will be removed.
* Only applicable for stream queues.
*
* @param int $maxBytes Maximum size in bytes
* @return self
*/
public function withMaxLengthBytes(int $maxBytes): self
{
Assert::isTrue($this->isStream, 'withMaxLengthBytes is only applicable for stream queues. Use createStreamQueue() to create a stream queue.');
$this->enqueueQueue->setArgument('x-max-length-bytes', $maxBytes);

return $this;
}

/**
* Sets the maximum size of stream segments in bytes.
* Smaller segments allow more granular retention but may impact performance.
* Only applicable for stream queues.
*
* @param int $segmentSize Segment size in bytes (default is 500MB in RabbitMQ)
* @return self
*/
public function withStreamMaxSegmentSizeBytes(int $segmentSize): self
{
Assert::isTrue($this->isStream, 'withStreamMaxSegmentSizeBytes is only applicable for stream queues. Use createStreamQueue() to create a stream queue.');
$this->enqueueQueue->setArgument('x-stream-max-segment-size-bytes', $segmentSize);

return $this;
}

public function isStream(): bool
{
return $this->isStream;
}
}
67 changes: 66 additions & 1 deletion packages/Amqp/src/AmqpStreamChannelBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ class AmqpStreamChannelBuilder extends EnqueueMessageChannelBuilder
{
private string $channelName;
private string $messageGroupId;
private ?string $maxAge = null;
private ?int $maxLengthBytes = null;
private ?int $streamMaxSegmentSizeBytes = null;

private function __construct(
string $channelName,
Expand Down Expand Up @@ -98,6 +101,68 @@ public function withCommitInterval(int $commitInterval): self
return $this;
}

/**
* Sets the maximum age of messages in the stream.
* Messages older than this will be removed by retention policy.
*
* @param string $maxAge Duration string (e.g., '7D' for 7 days, '24h' for 24 hours, '30m' for 30 minutes, '60s' for 60 seconds)
* @return self
*/
public function withMaxAge(string $maxAge): self
{
$this->maxAge = $maxAge;

return $this;
}

/**
* Sets the maximum size of the stream in bytes.
* When exceeded, oldest segments will be removed.
*
* @param int $maxBytes Maximum size in bytes
* @return self
*/
public function withMaxLengthBytes(int $maxBytes): self
{
$this->maxLengthBytes = $maxBytes;

return $this;
}

/**
* Sets the maximum size of stream segments in bytes.
* Smaller segments allow more granular retention but may impact performance.
*
* @param int $segmentSize Segment size in bytes (default is 500MB in RabbitMQ)
* @return self
*/
public function withStreamMaxSegmentSizeBytes(int $segmentSize): self
{
$this->streamMaxSegmentSizeBytes = $segmentSize;

return $this;
}

/**
* Returns an AmqpQueue configured with the stream settings from this builder.
*/
public function getAmqpQueue(): AmqpQueue
{
$queue = AmqpQueue::createStreamQueue($this->queueName);

if ($this->maxAge !== null) {
$queue->withMaxAge($this->maxAge);
}
if ($this->maxLengthBytes !== null) {
$queue->withMaxLengthBytes($this->maxLengthBytes);
}
if ($this->streamMaxSegmentSizeBytes !== null) {
$queue->withStreamMaxSegmentSizeBytes($this->streamMaxSegmentSizeBytes);
}

return $queue;
}

public function getMessageChannelName(): string
{
return $this->channelName;
Expand All @@ -120,6 +185,6 @@ public function isStreamingChannel(): bool

public function __toString()
{
return sprintf('AMQP Stream Channel - %s', $this->channelName);
return \sprintf('AMQP Stream Channel - %s', $this->channelName);
}
}
26 changes: 19 additions & 7 deletions packages/Amqp/src/Configuration/AmqpModule.php
Original file line number Diff line number Diff line change
Expand Up @@ -90,21 +90,33 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
}
}

// First pass: collect explicitly defined AmqpQueue instances
$explicitQueueNames = [];
foreach ($extensionObjects as $extensionObject) {
if ($extensionObject instanceof AmqpBackedMessageChannelBuilder) {
$amqpQueues[] = AmqpQueue::createWith($extensionObject->getQueueName());
} elseif ($extensionObject instanceof AmqpStreamChannelBuilder) {
$hasAmqpStreamChannelBuilder = true;
$amqpQueues[] = AmqpQueue::createStreamQueue($extensionObject->queueName);
if ($extensionObject instanceof AmqpQueue) {
$amqpQueues[] = $extensionObject;
$explicitQueueNames[$extensionObject->getQueueName()] = true;
} elseif ($extensionObject instanceof AmqpExchange) {
$amqpExchanges[] = $extensionObject;
} elseif ($extensionObject instanceof AmqpQueue) {
$amqpQueues[] = $extensionObject;
} elseif ($extensionObject instanceof AmqpBinding) {
$amqpBindings[] = $extensionObject;
}
}

// Second pass: create queues from channel builders if not explicitly defined
foreach ($extensionObjects as $extensionObject) {
if ($extensionObject instanceof AmqpBackedMessageChannelBuilder) {
if (! isset($explicitQueueNames[$extensionObject->getQueueName()])) {
$amqpQueues[] = AmqpQueue::createWith($extensionObject->getQueueName());
}
} elseif ($extensionObject instanceof AmqpStreamChannelBuilder) {
$hasAmqpStreamChannelBuilder = true;
if (! isset($explicitQueueNames[$extensionObject->queueName])) {
$amqpQueues[] = $extensionObject->getAmqpQueue();
}
}
}

if ($hasAmqpStreamChannelBuilder && ! $messagingConfiguration->isRunningForEnterpriseLicence()) {
throw LicensingException::create('AmqpStreamChannelBuilder is available only with Ecotone Enterprise licence.');
}
Expand Down
120 changes: 120 additions & 0 deletions packages/Amqp/tests/Integration/AmqpStreamChannelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -1370,4 +1370,124 @@ public function getConsumed(): array
$this->assertEquals(['event1', 'event2', 'event3'], $consumerService2->getQueryBus()->sendWithRouting('getConsumed2'));
}

public function test_stream_queue_retention_config_is_applied_using_amqp_queue(): void
{
$channelName = 'orders';
$queueName = 'stream_retention_amqp_' . Uuid::uuid4()->toString();

$ecotoneLite = $this->bootstrapForTesting(
[OrderService::class],
[
new OrderService(),
...$this->getConnectionFactoryReferences(),
],
ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::AMQP_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE]))
->withLicenceKey(LicenceTesting::VALID_LICENCE)
->withExtensionObjects([
AmqpQueue::createStreamQueue($queueName)
->withMaxAge('7D')
->withMaxLengthBytes(1073741824)
->withStreamMaxSegmentSizeBytes(52428800),
AmqpStreamChannelBuilder::create(
channelName: $channelName,
startPosition: 'first',
amqpConnectionReferenceName: AmqpLibConnection::class,
queueName: $queueName,
),
])
);

// Send a message to trigger queue creation
$ecotoneLite->getCommandBus()->sendWithRouting('order.register', 'milk');

// Verify queue arguments via RabbitMQ Management API
$queueInfo = $this->getQueueInfoFromManagementApi($queueName);
$this->assertNotNull($queueInfo, 'Queue should exist in RabbitMQ');
$this->assertEquals('stream', $queueInfo['type'] ?? $queueInfo['arguments']['x-queue-type'] ?? null);
$this->assertEquals('7D', $queueInfo['arguments']['x-max-age'] ?? null);
$this->assertEquals(1073741824, $queueInfo['arguments']['x-max-length-bytes'] ?? null);
$this->assertEquals(52428800, $queueInfo['arguments']['x-stream-max-segment-size-bytes'] ?? null);

// Verify the stream still works
$ecotoneLite->run($channelName, ExecutionPollingMetadata::createWithFinishWhenNoMessages());
$orders = $ecotoneLite->getQueryBus()->sendWithRouting('order.getOrders');
$this->assertCount(1, $orders);
$this->assertContains('milk', $orders);
}

public function test_stream_queue_retention_config_is_applied_using_channel_builder(): void
{
$channelName = 'orders';
$queueName = 'stream_retention_builder_' . Uuid::uuid4()->toString();

$ecotoneLite = $this->bootstrapForTesting(
[OrderService::class],
[
new OrderService(),
...$this->getConnectionFactoryReferences(),
],
ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::AMQP_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE]))
->withLicenceKey(LicenceTesting::VALID_LICENCE)
->withExtensionObjects([
AmqpStreamChannelBuilder::create(
channelName: $channelName,
startPosition: 'first',
amqpConnectionReferenceName: AmqpLibConnection::class,
queueName: $queueName,
)
->withMaxAge('14D')
->withMaxLengthBytes(2147483648)
->withStreamMaxSegmentSizeBytes(104857600),
])
);

// Send a message to trigger queue creation
$ecotoneLite->getCommandBus()->sendWithRouting('order.register', 'milk');

// Verify queue arguments via RabbitMQ Management API
$queueInfo = $this->getQueueInfoFromManagementApi($queueName);
$this->assertNotNull($queueInfo, 'Queue should exist in RabbitMQ');
$this->assertEquals('stream', $queueInfo['type'] ?? $queueInfo['arguments']['x-queue-type'] ?? null);
$this->assertEquals('14D', $queueInfo['arguments']['x-max-age'] ?? null);
$this->assertEquals(2147483648, $queueInfo['arguments']['x-max-length-bytes'] ?? null);
$this->assertEquals(104857600, $queueInfo['arguments']['x-stream-max-segment-size-bytes'] ?? null);

// Verify the stream still works
$ecotoneLite->run($channelName, ExecutionPollingMetadata::createWithFinishWhenNoMessages());
$orders = $ecotoneLite->getQueryBus()->sendWithRouting('order.getOrders');
$this->assertCount(1, $orders);
$this->assertContains('milk', $orders);
}

/**
* Helper method to query RabbitMQ Management API for queue information.
* @return array<string, mixed>|null
*/
private function getQueueInfoFromManagementApi(string $queueName): ?array
{
$rabbitHost = \getenv('RABBIT_HOST') ?: 'amqp://guest:guest@localhost:5672/%2f';
$parsed = \parse_url($rabbitHost);
$host = $parsed['host'] ?? 'localhost';
$user = $parsed['user'] ?? 'guest';
$pass = $parsed['pass'] ?? 'guest';

$managementUrl = "http://{$host}:15672/api/queues/%2F/" . \urlencode($queueName);

$context = \stream_context_create([
'http' => [
'header' => 'Authorization: Basic ' . \base64_encode("{$user}:{$pass}"),
'timeout' => 5,
],
]);

$response = @\file_get_contents($managementUrl, false, $context);
if ($response === false) {
return null;
}

return \json_decode($response, true);
}

}
Loading