Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions packages/Amqp/src/AmqpChannelManager.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
<?php

declare(strict_types=1);

namespace Ecotone\Amqp;

use Ecotone\Messaging\Channel\Manager\ChannelManager;
use Ecotone\Messaging\Config\Container\Definition;
use Interop\Amqp\AmqpQueue;
use Interop\Queue\ConnectionFactory;

/**
* Channel manager for AMQP message channels.
* Handles initialization and deletion of AMQP queues and exchanges.
*
* licence Apache-2.0
*/
final class AmqpChannelManager implements ChannelManager
{
public function __construct(
private string $channelName,
private string $queueName,
private ConnectionFactory $connectionFactory,
private AmqpAdmin $amqpAdmin,
private bool $shouldAutoInitialize,
private bool $isStreamChannel = false,
) {
}

public function getChannelName(): string
{
return $this->channelName;
}

public function getChannelType(): string
{
return $this->isStreamChannel ? 'amqp_stream' : 'amqp';
}

public function initialize(): void
{
if ($this->isInitialized()) {
return;
}

$context = $this->connectionFactory->createContext();
$this->amqpAdmin->declareQueueWithBindings($this->queueName, $context);
}

public function delete(): void
{
$context = $this->connectionFactory->createContext();
$queue = $this->amqpAdmin->getQueueByName($this->queueName);
$context->deleteQueue($queue);
}

public function isInitialized(): bool
{
try {
$context = $this->connectionFactory->createContext();
$queue = $this->amqpAdmin->getQueueByName($this->queueName);

// Use passive declaration to check if queue exists
$passiveQueue = clone $queue;
$passiveQueue->addFlag(AmqpQueue::FLAG_PASSIVE);
$context->declareQueue($passiveQueue);

return true;
} catch (\Exception $e) {
// Queue doesn't exist or other error
return false;
}
}

public function shouldBeInitializedAutomatically(): bool
{
return $this->shouldAutoInitialize;
}

public function getDefinition(): Definition
{
return new Definition(self::class, [
$this->channelName,
$this->queueName,
$this->connectionFactory,
$this->amqpAdmin,
$this->shouldAutoInitialize,
$this->isStreamChannel,
]);
}
}

130 changes: 130 additions & 0 deletions packages/Amqp/src/Configuration/AmqpChannelManagerModule.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
<?php

declare(strict_types=1);

namespace Ecotone\Amqp\Configuration;

use Ecotone\Amqp\AmqpAdmin;
use Ecotone\Amqp\AmqpBackedMessageChannelBuilder;
use Ecotone\Amqp\AmqpChannelManager;
use Ecotone\Amqp\AmqpStreamChannelBuilder;
use Ecotone\AnnotationFinder\AnnotationFinder;
use Ecotone\Messaging\Attribute\ModuleAnnotation;
use Ecotone\Messaging\Channel\Manager\ChannelInitializationConfiguration;
use Ecotone\Messaging\Channel\Manager\ChannelManagerReference;
use Ecotone\Messaging\Config\Annotation\AnnotationModule;
use Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ExtensionObjectResolver;
use Ecotone\Messaging\Config\Annotation\ModuleConfiguration\NoExternalConfigurationModule;
use Ecotone\Messaging\Config\Configuration;
use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Config\Container\Reference;
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ModuleReferenceSearchService;
use Ecotone\Messaging\Config\ServiceConfiguration;
use Ecotone\Messaging\Handler\InterfaceToCallRegistry;

#[ModuleAnnotation]
/**
* Module responsible for registering AMQP channel managers.
* Handles channel initialization configuration for AMQP message channels.
*
* licence Apache-2.0
*/
final class AmqpChannelManagerModule extends NoExternalConfigurationModule implements AnnotationModule
{
public static function create(AnnotationFinder $annotationRegistrationService, InterfaceToCallRegistry $interfaceToCallRegistry): static
{
return new self();
}

public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void
{
// Get channel initialization configuration
$channelInitConfig = ExtensionObjectResolver::resolveUnique(
ChannelInitializationConfiguration::class,
$extensionObjects,
ChannelInitializationConfiguration::createWithDefaults()
);

$shouldAutoInitialize = $channelInitConfig->isAutomaticChannelInitializationEnabled();

// Configure channel builders and register channel managers
foreach ($extensionObjects as $extensionObject) {
if ($extensionObject instanceof AmqpBackedMessageChannelBuilder) {
if (!$shouldAutoInitialize) {
$extensionObject->withAutoDeclare($shouldAutoInitialize);
}

$channelName = $extensionObject->getMessageChannelName();
$queueName = $extensionObject->getQueueName();
$connectionRef = $extensionObject->getInboundChannelAdapter()->getConnectionReferenceName();

$managerRef = "amqp_channel_manager.{$channelName}";
$messagingConfiguration->registerServiceDefinition(
$managerRef,
new Definition(AmqpChannelManager::class, [
$channelName,
$queueName,
new Reference($connectionRef),
new Reference(AmqpAdmin::REFERENCE_NAME),
$shouldAutoInitialize,
false, // isStreamChannel
])
);
} elseif ($extensionObject instanceof AmqpStreamChannelBuilder) {
if (!$shouldAutoInitialize) {
$extensionObject->withAutoDeclare($shouldAutoInitialize);
}

$channelName = $extensionObject->getMessageChannelName();
$queueName = $extensionObject->queueName;
$connectionRef = $extensionObject->getInboundChannelAdapter()->getConnectionReferenceName();

$managerRef = "amqp_channel_manager.{$channelName}";
$messagingConfiguration->registerServiceDefinition(
$managerRef,
new Definition(AmqpChannelManager::class, [
$channelName,
$queueName,
new Reference($connectionRef),
new Reference(AmqpAdmin::REFERENCE_NAME),
$shouldAutoInitialize,
true, // isStreamChannel
])
);
}
}
}

public function canHandle($extensionObject): bool
{
return
$extensionObject instanceof AmqpBackedMessageChannelBuilder
|| $extensionObject instanceof AmqpStreamChannelBuilder
|| $extensionObject instanceof ChannelInitializationConfiguration;
}

public function getModuleExtensions(ServiceConfiguration $serviceConfiguration, array $serviceExtensions): array
{
$channelManagerReferences = [];

// Create channel manager references for each AMQP channel
foreach ($serviceExtensions as $extensionObject) {
if ($extensionObject instanceof AmqpBackedMessageChannelBuilder) {
$channelName = $extensionObject->getMessageChannelName();
$channelManagerReferences[] = new ChannelManagerReference("amqp_channel_manager.{$channelName}");
} elseif ($extensionObject instanceof AmqpStreamChannelBuilder) {
$channelName = $extensionObject->getMessageChannelName();
$channelManagerReferences[] = new ChannelManagerReference("amqp_channel_manager.{$channelName}");
}
}

return $channelManagerReferences;
}

public function getModulePackageName(): string
{
return ModulePackageList::AMQP_PACKAGE;
}
}

Loading