diff --git a/packages/Amqp/src/AmqpChannelManager.php b/packages/Amqp/src/AmqpChannelManager.php new file mode 100644 index 000000000..45f718261 --- /dev/null +++ b/packages/Amqp/src/AmqpChannelManager.php @@ -0,0 +1,92 @@ +channelName; + } + + public function getChannelType(): string + { + return $this->isStreamChannel ? 'amqp_stream' : 'amqp'; + } + + public function initialize(): void + { + if ($this->isInitialized()) { + return; + } + + $context = $this->connectionFactory->createContext(); + $this->amqpAdmin->declareQueueWithBindings($this->queueName, $context); + } + + public function delete(): void + { + $context = $this->connectionFactory->createContext(); + $queue = $this->amqpAdmin->getQueueByName($this->queueName); + $context->deleteQueue($queue); + } + + public function isInitialized(): bool + { + try { + $context = $this->connectionFactory->createContext(); + $queue = $this->amqpAdmin->getQueueByName($this->queueName); + + // Use passive declaration to check if queue exists + $passiveQueue = clone $queue; + $passiveQueue->addFlag(AmqpQueue::FLAG_PASSIVE); + $context->declareQueue($passiveQueue); + + return true; + } catch (\Exception $e) { + // Queue doesn't exist or other error + return false; + } + } + + public function shouldBeInitializedAutomatically(): bool + { + return $this->shouldAutoInitialize; + } + + public function getDefinition(): Definition + { + return new Definition(self::class, [ + $this->channelName, + $this->queueName, + $this->connectionFactory, + $this->amqpAdmin, + $this->shouldAutoInitialize, + $this->isStreamChannel, + ]); + } +} + diff --git a/packages/Amqp/src/Configuration/AmqpChannelManagerModule.php b/packages/Amqp/src/Configuration/AmqpChannelManagerModule.php new file mode 100644 index 000000000..732c4f571 --- /dev/null +++ b/packages/Amqp/src/Configuration/AmqpChannelManagerModule.php @@ -0,0 +1,130 @@ +isAutomaticChannelInitializationEnabled(); + + // Configure channel builders and register channel managers + foreach ($extensionObjects as $extensionObject) { + if ($extensionObject instanceof AmqpBackedMessageChannelBuilder) { + if (!$shouldAutoInitialize) { + $extensionObject->withAutoDeclare($shouldAutoInitialize); + } + + $channelName = $extensionObject->getMessageChannelName(); + $queueName = $extensionObject->getQueueName(); + $connectionRef = $extensionObject->getInboundChannelAdapter()->getConnectionReferenceName(); + + $managerRef = "amqp_channel_manager.{$channelName}"; + $messagingConfiguration->registerServiceDefinition( + $managerRef, + new Definition(AmqpChannelManager::class, [ + $channelName, + $queueName, + new Reference($connectionRef), + new Reference(AmqpAdmin::REFERENCE_NAME), + $shouldAutoInitialize, + false, // isStreamChannel + ]) + ); + } elseif ($extensionObject instanceof AmqpStreamChannelBuilder) { + if (!$shouldAutoInitialize) { + $extensionObject->withAutoDeclare($shouldAutoInitialize); + } + + $channelName = $extensionObject->getMessageChannelName(); + $queueName = $extensionObject->queueName; + $connectionRef = $extensionObject->getInboundChannelAdapter()->getConnectionReferenceName(); + + $managerRef = "amqp_channel_manager.{$channelName}"; + $messagingConfiguration->registerServiceDefinition( + $managerRef, + new Definition(AmqpChannelManager::class, [ + $channelName, + $queueName, + new Reference($connectionRef), + new Reference(AmqpAdmin::REFERENCE_NAME), + $shouldAutoInitialize, + true, // isStreamChannel + ]) + ); + } + } + } + + public function canHandle($extensionObject): bool + { + return + $extensionObject instanceof AmqpBackedMessageChannelBuilder + || $extensionObject instanceof AmqpStreamChannelBuilder + || $extensionObject instanceof ChannelInitializationConfiguration; + } + + public function getModuleExtensions(ServiceConfiguration $serviceConfiguration, array $serviceExtensions): array + { + $channelManagerReferences = []; + + // Create channel manager references for each AMQP channel + foreach ($serviceExtensions as $extensionObject) { + if ($extensionObject instanceof AmqpBackedMessageChannelBuilder) { + $channelName = $extensionObject->getMessageChannelName(); + $channelManagerReferences[] = new ChannelManagerReference("amqp_channel_manager.{$channelName}"); + } elseif ($extensionObject instanceof AmqpStreamChannelBuilder) { + $channelName = $extensionObject->getMessageChannelName(); + $channelManagerReferences[] = new ChannelManagerReference("amqp_channel_manager.{$channelName}"); + } + } + + return $channelManagerReferences; + } + + public function getModulePackageName(): string + { + return ModulePackageList::AMQP_PACKAGE; + } +} + diff --git a/packages/Amqp/tests/Integration/AmqpChannelInitializationTest.php b/packages/Amqp/tests/Integration/AmqpChannelInitializationTest.php new file mode 100644 index 000000000..310462efd --- /dev/null +++ b/packages/Amqp/tests/Integration/AmqpChannelInitializationTest.php @@ -0,0 +1,340 @@ +cleanUpRabbitMQ(); + + $ecotone = $this->bootstrapEcotone( + ChannelInitializationConfiguration::createWithDefaults() + ->withAutomaticChannelInitialization(false) + ); + + // Try to receive message - should fail because queue doesn't exist and auto-init is disabled + $this->expectException(\Exception::class); + $ecotone->getMessageChannel(self::TEST_CHANNEL_NAME)->receive(); + } + + public function test_sending_succeeds_when_auto_initialization_enabled_and_channel_auto_declare_disabled(): void + { + $this->cleanUpRabbitMQ(); + + $ecotone = $this->bootstrapEcotone( + ChannelInitializationConfiguration::createWithDefaults() + ->withAutomaticChannelInitialization(true), + autoDeclare: false, + ); + + // Try to receive message - should fail because queue doesn't exist and auto-init is disabled + $this->expectException(\Exception::class); + $ecotone->getMessageChannel(self::TEST_CHANNEL_NAME)->receive(); + } + + public function test_manual_channel_initialization_via_console_command_then_send_succeeds(): void + { + $this->cleanUpRabbitMQ(); + + $ecotone = $this->bootstrapEcotone( + ChannelInitializationConfiguration::createWithDefaults() + ->withAutomaticChannelInitialization(false) + ); + + // Initialize via console command + $runner = $ecotone->getGateway(ConsoleCommandRunner::class); + $result = $runner->execute('ecotone:migration:channel:setup', ['initialize' => true]); + + // Verify the command output shows initialization + self::assertNotNull($result); + $rows = $result->getRows(); + self::assertCount(1, $rows); + self::assertEquals([self::TEST_CHANNEL_NAME, 'Initialized'], $rows[0]); + + // Now sending should work + $ecotone->sendDirectToChannel(self::TEST_CHANNEL_NAME, 'test message'); + + // Consume and verify + $message = $ecotone->getMessageChannel(self::TEST_CHANNEL_NAME)->receive(); + self::assertNotNull($message); + self::assertEquals('test message', $message->getPayload()); + } + + public function test_setup_then_delete_then_send_fails(): void + { + $this->cleanUpRabbitMQ(); + + $ecotone = $this->bootstrapEcotone( + ChannelInitializationConfiguration::createWithDefaults() + ->withAutomaticChannelInitialization(false) + ); + + $runner = $ecotone->getGateway(ConsoleCommandRunner::class); + + // First, initialize the channel + $runner->execute('ecotone:migration:channel:setup', ['initialize' => true]); + + // Verify we can send and receive + $ecotone->sendDirectToChannel(self::TEST_CHANNEL_NAME, 'test message 1'); + $message = $ecotone->getMessageChannel(self::TEST_CHANNEL_NAME)->receive(); + self::assertNotNull($message); + self::assertEquals('test message 1', $message->getPayload()); + + // Now delete the channel + $result = $runner->execute('ecotone:migration:channel:delete', ['force' => true]); + self::assertNotNull($result); + $rows = $result->getRows(); + self::assertCount(1, $rows); + self::assertEquals([self::TEST_CHANNEL_NAME, 'Deleted'], $rows[0]); + + // Try to receive message - should fail because queue was deleted + $this->expectException(\Exception::class); + $ecotone->getMessageChannel(self::TEST_CHANNEL_NAME)->receive(); + } + + public function test_channel_status_command_shows_initialization_state(): void + { + $this->cleanUpRabbitMQ(); + + $ecotone = $this->bootstrapEcotone( + ChannelInitializationConfiguration::createWithDefaults() + ->withAutomaticChannelInitialization(false) + ); + + $runner = $ecotone->getGateway(ConsoleCommandRunner::class); + + // Check status before initialization + $result = $runner->execute('ecotone:migration:channel:setup', []); + self::assertNotNull($result); + $rows = $result->getRows(); + self::assertCount(1, $rows); + self::assertEquals([self::TEST_CHANNEL_NAME, 'No'], $rows[0]); + + // Initialize + $runner->execute('ecotone:migration:channel:setup', ['initialize' => true]); + + // Check status after initialization + $result = $runner->execute('ecotone:migration:channel:setup', []); + self::assertNotNull($result); + $rows = $result->getRows(); + self::assertCount(1, $rows); + self::assertEquals([self::TEST_CHANNEL_NAME, 'Yes'], $rows[0]); + } + + public function test_initialize_multiple_channels_at_once(): void + { + $this->cleanUpRabbitMQ(); + + $ecotone = $this->bootstrapEcotoneWithMultipleChannels( + ChannelInitializationConfiguration::createWithDefaults() + ->withAutomaticChannelInitialization(false) + ); + + $runner = $ecotone->getGateway(ConsoleCommandRunner::class); + + // Initialize multiple channels at once + $result = $runner->execute('ecotone:migration:channel:setup', [ + 'channels' => [self::TEST_CHANNEL_NAME_2, self::TEST_CHANNEL_NAME_3], + 'initialize' => true + ]); + + self::assertNotNull($result); + $rows = $result->getRows(); + self::assertCount(2, $rows); + self::assertEquals([self::TEST_CHANNEL_NAME_2, 'Initialized'], $rows[0]); + self::assertEquals([self::TEST_CHANNEL_NAME_3, 'Initialized'], $rows[1]); + + // Verify we can send and receive on both channels + $ecotone->sendDirectToChannel(self::TEST_CHANNEL_NAME_2, 'message 2'); + $ecotone->sendDirectToChannel(self::TEST_CHANNEL_NAME_3, 'message 3'); + + $message2 = $ecotone->getMessageChannel(self::TEST_CHANNEL_NAME_2)->receive(); + $message3 = $ecotone->getMessageChannel(self::TEST_CHANNEL_NAME_3)->receive(); + + self::assertNotNull($message2); + self::assertNotNull($message3); + self::assertEquals('message 2', $message2->getPayload()); + self::assertEquals('message 3', $message3->getPayload()); + } + + public function test_delete_multiple_channels_at_once(): void + { + $this->cleanUpRabbitMQ(); + + $ecotone = $this->bootstrapEcotoneWithMultipleChannels( + ChannelInitializationConfiguration::createWithDefaults() + ->withAutomaticChannelInitialization(false) + ); + + $runner = $ecotone->getGateway(ConsoleCommandRunner::class); + + // Initialize all channels first + $runner->execute('ecotone:migration:channel:setup', ['initialize' => true]); + + // Delete multiple channels at once + $result = $runner->execute('ecotone:migration:channel:delete', [ + 'channels' => [self::TEST_CHANNEL_NAME_2, self::TEST_CHANNEL_NAME_3], + 'force' => true + ]); + + self::assertNotNull($result); + $rows = $result->getRows(); + self::assertCount(2, $rows); + self::assertEquals([self::TEST_CHANNEL_NAME_2, 'Deleted'], $rows[0]); + self::assertEquals([self::TEST_CHANNEL_NAME_3, 'Deleted'], $rows[1]); + + // Verify channels are deleted - receiving should fail + $this->expectException(\Exception::class); + $ecotone->getMessageChannel(self::TEST_CHANNEL_NAME_2)->receive(); + } + + public function test_stream_channel_initialization(): void + { + if (getenv('AMQP_IMPLEMENTATION') !== 'lib') { + $this->markTestSkipped('Stream tests require AMQP lib'); + } + + $this->cleanUpRabbitMQ(); + + $ecotone = $this->bootstrapEcotoneWithStreamChannel( + ChannelInitializationConfiguration::createWithDefaults() + ->withAutomaticChannelInitialization(false) + ); + + $runner = $ecotone->getGateway(ConsoleCommandRunner::class); + + // Initialize stream channel + $result = $runner->execute('ecotone:migration:channel:setup', [ + 'channels' => [self::TEST_STREAM_CHANNEL_NAME], + 'initialize' => true + ]); + + self::assertNotNull($result); + $rows = $result->getRows(); + self::assertCount(1, $rows); + self::assertEquals([self::TEST_STREAM_CHANNEL_NAME, 'Initialized'], $rows[0]); + + // Verify we can send and receive on stream channel + $ecotone->sendDirectToChannel(self::TEST_STREAM_CHANNEL_NAME, 'stream message'); + + $message = $ecotone->getMessageChannel(self::TEST_STREAM_CHANNEL_NAME)->receive(); + self::assertNotNull($message); + self::assertEquals('stream message', $message->getPayload()); + + // Delete stream channel + $result = $runner->execute('ecotone:migration:channel:delete', [ + 'channels' => [self::TEST_STREAM_CHANNEL_NAME], + 'force' => true + ]); + + self::assertNotNull($result); + $rows = $result->getRows(); + self::assertCount(1, $rows); + self::assertEquals([self::TEST_STREAM_CHANNEL_NAME, 'Deleted'], $rows[0]); + } + + private function bootstrapEcotone(ChannelInitializationConfiguration $config, bool $autoDeclare = true): \Ecotone\Lite\Test\FlowTestSupport + { + return $this->bootstrapFlowTesting( + containerOrAvailableServices: $this->getConnectionFactoryReferences(), + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(\Ecotone\Messaging\Config\ModulePackageList::allPackagesExcept([ + \Ecotone\Messaging\Config\ModulePackageList::CORE_PACKAGE, + \Ecotone\Messaging\Config\ModulePackageList::AMQP_PACKAGE, + ])) + ->withExtensionObjects([ + $config, + AmqpBackedMessageChannelBuilder::create(self::TEST_CHANNEL_NAME) + ->withAutoDeclare($autoDeclare), + ]), + pathToRootCatalog: __DIR__ . '/../../', + ); + } + + private function bootstrapEcotoneWithMultipleChannels(ChannelInitializationConfiguration $config): \Ecotone\Lite\Test\FlowTestSupport + { + return $this->bootstrapFlowTesting( + containerOrAvailableServices: $this->getConnectionFactoryReferences(), + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(\Ecotone\Messaging\Config\ModulePackageList::allPackagesExcept([ + \Ecotone\Messaging\Config\ModulePackageList::CORE_PACKAGE, + \Ecotone\Messaging\Config\ModulePackageList::AMQP_PACKAGE, + ])) + ->withExtensionObjects([ + $config, + AmqpBackedMessageChannelBuilder::create(self::TEST_CHANNEL_NAME_2)->withAutoDeclare(false), + AmqpBackedMessageChannelBuilder::create(self::TEST_CHANNEL_NAME_3)->withAutoDeclare(false), + ]), + pathToRootCatalog: __DIR__ . '/../../', + ); + } + + private function bootstrapEcotoneWithStreamChannel(ChannelInitializationConfiguration $config): \Ecotone\Lite\Test\FlowTestSupport + { + return $this->bootstrapFlowTesting( + containerOrAvailableServices: $this->getConnectionFactoryReferences(), + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(\Ecotone\Messaging\Config\ModulePackageList::allPackagesExcept([ + \Ecotone\Messaging\Config\ModulePackageList::CORE_PACKAGE, + \Ecotone\Messaging\Config\ModulePackageList::AMQP_PACKAGE, + \Ecotone\Messaging\Config\ModulePackageList::ASYNCHRONOUS_PACKAGE, + ])) + ->withLicenceKey(LicenceTesting::VALID_LICENCE) + ->withExtensionObjects([ + $config, + AmqpQueue::createStreamQueue(self::TEST_STREAM_CHANNEL_NAME), + AmqpStreamChannelBuilder::create( + self::TEST_STREAM_CHANNEL_NAME, + 'first', + AmqpLibConnection::class, + self::TEST_STREAM_CHANNEL_NAME + ), + ]), + pathToRootCatalog: __DIR__ . '/../../', + ); + } + + private function cleanUpRabbitMQ(): void + { + try { + $context = $this->getCachedConnectionFactory()->createContext(); + + // Clean up regular channels + foreach ([self::TEST_CHANNEL_NAME, self::TEST_CHANNEL_NAME_2, self::TEST_CHANNEL_NAME_3, self::TEST_STREAM_CHANNEL_NAME] as $channelName) { + try { + $queue = $context->createQueue($channelName); + $context->deleteQueue($queue); + } catch (\Exception $e) { + // Queue might not exist, that's fine + } + } + } catch (\Exception $e) { + // Connection might fail, that's fine + } + } +} + diff --git a/packages/Dbal/src/Database/DatabaseDeleteCommand.php b/packages/Dbal/src/Database/DatabaseDeleteCommand.php new file mode 100644 index 000000000..288ac5354 --- /dev/null +++ b/packages/Dbal/src/Database/DatabaseDeleteCommand.php @@ -0,0 +1,71 @@ + 0) { + $rows = []; + + if (!$force) { + foreach ($features as $featureName) { + $rows[] = [$featureName, 'Would be deleted (use --force to confirm)']; + } + return ConsoleCommandResultSet::create(['Feature', 'Warning'], $rows); + } + + foreach ($features as $featureName) { + $this->databaseSetupManager->drop($featureName); + $rows[] = [$featureName, 'Deleted']; + } + return ConsoleCommandResultSet::create(['Feature', 'Status'], $rows); + } + + // Show all features + $featureNames = $this->databaseSetupManager->getFeatureNames($onlyUsed); + + if (count($featureNames) === 0) { + return ConsoleCommandResultSet::create( + ['Status'], + [['No database tables registered for deletion.']] + ); + } + + if (!$force) { + return ConsoleCommandResultSet::create( + ['Feature', 'Warning'], + array_map(fn (string $feature) => [$feature, 'Would be deleted (use --force to confirm)'], $featureNames) + ); + } + + $this->databaseSetupManager->dropAll($onlyUsed); + return ConsoleCommandResultSet::create( + ['Feature', 'Status'], + array_map(fn (string $feature) => [$feature, 'Deleted'], $featureNames) + ); + } +} + diff --git a/packages/Dbal/src/Database/DatabaseDropCommand.php b/packages/Dbal/src/Database/DatabaseDropCommand.php deleted file mode 100644 index d131b4234..000000000 --- a/packages/Dbal/src/Database/DatabaseDropCommand.php +++ /dev/null @@ -1,50 +0,0 @@ -databaseSetupManager->getFeatureNames($all); - - if (count($featureNames) === 0) { - return ConsoleCommandResultSet::create( - ['Status'], - [['No database tables registered for drop.']] - ); - } - - if ($force) { - $this->databaseSetupManager->dropAll($all); - return ConsoleCommandResultSet::create( - ['Feature', 'Status'], - array_map(fn (string $feature) => [$feature, 'Dropped'], $featureNames) - ); - } - - return ConsoleCommandResultSet::create( - ['Feature', 'Warning'], - array_map(fn (string $feature) => [$feature, 'Would be dropped (use --force to confirm)'], $featureNames) - ); - } -} diff --git a/packages/Dbal/src/Database/DatabaseSetupCommand.php b/packages/Dbal/src/Database/DatabaseSetupCommand.php index 3906e03ba..81e1c882d 100644 --- a/packages/Dbal/src/Database/DatabaseSetupCommand.php +++ b/packages/Dbal/src/Database/DatabaseSetupCommand.php @@ -22,11 +22,43 @@ public function __construct( #[ConsoleCommand('ecotone:migration:database:setup')] public function setup( + #[ConsoleParameterOption] array $features = [], #[ConsoleParameterOption] bool $initialize = false, #[ConsoleParameterOption] bool $sql = false, - #[ConsoleParameterOption] bool $all = false, + #[ConsoleParameterOption] bool $onlyUsed = true, ): ?ConsoleCommandResultSet { - $featureNames = $this->databaseSetupManager->getFeatureNames($all); + // If specific feature names provided + if (count($features) > 0) { + $rows = []; + + if ($sql) { + $statements = $this->databaseSetupManager->getCreateSqlStatementsForFeatures($features); + return ConsoleCommandResultSet::create( + ['SQL Statement'], + [[implode("\n", $statements)]] + ); + } + + if ($initialize) { + foreach ($features as $featureName) { + $this->databaseSetupManager->initialize($featureName); + $rows[] = [$featureName, 'Created']; + } + return ConsoleCommandResultSet::create(['Feature', 'Status'], $rows); + } + + $initStatus = $this->databaseSetupManager->getInitializationStatus(); + $usageStatus = $this->databaseSetupManager->getUsageStatus(); + foreach ($features as $featureName) { + $isInitialized = $initStatus[$featureName] ?? false; + $isUsed = $usageStatus[$featureName] ?? false; + $rows[] = [$featureName, $isUsed ? 'Yes' : 'No', $isInitialized ? 'Yes' : 'No']; + } + return ConsoleCommandResultSet::create(['Feature', 'Used', 'Initialized'], $rows); + } + + // Show all features + $featureNames = $this->databaseSetupManager->getFeatureNames($onlyUsed); if (count($featureNames) === 0) { return ConsoleCommandResultSet::create( @@ -36,30 +68,32 @@ public function setup( } if ($sql) { - $statements = $this->databaseSetupManager->getCreateSqlStatements($all); + $statements = $this->databaseSetupManager->getCreateSqlStatements($onlyUsed); return ConsoleCommandResultSet::create( ['SQL Statement'], - array_map(fn (string $statement) => [$statement], $statements) + [[implode("\n", $statements)]] ); } if ($initialize) { - $this->databaseSetupManager->initializeAll($all); + $this->databaseSetupManager->initializeAll($onlyUsed); return ConsoleCommandResultSet::create( ['Feature', 'Status'], array_map(fn (string $feature) => [$feature, 'Created'], $featureNames) ); } - $initializationStatus = $this->databaseSetupManager->getInitializationStatus($all); + $initializationStatus = $this->databaseSetupManager->getInitializationStatus($onlyUsed); + $usageStatus = $this->databaseSetupManager->getUsageStatus(); $rows = []; foreach ($featureNames as $featureName) { $isInitialized = $initializationStatus[$featureName] ?? false; - $rows[] = [$featureName, $isInitialized ? 'Yes' : 'No']; + $isUsed = $usageStatus[$featureName] ?? false; + $rows[] = [$featureName, $isUsed ? 'Yes' : 'No', $isInitialized ? 'Yes' : 'No']; } return ConsoleCommandResultSet::create( - ['Feature', 'Initialized'], + ['Feature', 'Used', 'Initialized'], $rows ); } diff --git a/packages/Dbal/src/Database/DatabaseSetupManager.php b/packages/Dbal/src/Database/DatabaseSetupManager.php index 7ab6c915d..9c34b2614 100644 --- a/packages/Dbal/src/Database/DatabaseSetupManager.php +++ b/packages/Dbal/src/Database/DatabaseSetupManager.php @@ -30,23 +30,23 @@ public function __construct( /** * @return string[] List of feature names that require database tables */ - public function getFeatureNames(bool $includeInactive = false): array + public function getFeatureNames(bool $onlyUsed = true): array { return array_map( fn (DbalTableManager $manager) => $manager->getFeatureName(), - $this->getManagers($includeInactive) + $this->getManagers($onlyUsed) ); } /** * @return string[] SQL statements to create all tables */ - public function getCreateSqlStatements(bool $includeInactive = false): array + public function getCreateSqlStatements(bool $onlyUsed = true): array { $connection = $this->getConnection(); $statements = []; - foreach ($this->getManagers($includeInactive) as $manager) { + foreach ($this->getManagers($onlyUsed) as $manager) { $sql = $manager->getCreateTableSql($connection); if (is_array($sql)) { $statements = array_merge($statements, $sql); @@ -61,12 +61,12 @@ public function getCreateSqlStatements(bool $includeInactive = false): array /** * @return string[] SQL statements to drop all tables */ - public function getDropSqlStatements(bool $includeInactive = false): array + public function getDropSqlStatements(bool $onlyUsed = true): array { $connection = $this->getConnection(); $statements = []; - foreach ($this->getManagers($includeInactive) as $manager) { + foreach ($this->getManagers($onlyUsed) as $manager) { $statements[] = $manager->getDropTableSql($connection); } @@ -76,11 +76,11 @@ public function getDropSqlStatements(bool $includeInactive = false): array /** * Creates all tables. */ - public function initializeAll(bool $includeInactive = false): void + public function initializeAll(bool $onlyUsed = true): void { $connection = $this->getConnection(); - foreach ($this->getManagers($includeInactive) as $manager) { + foreach ($this->getManagers($onlyUsed) as $manager) { if ($manager->isInitialized($connection)) { continue; } @@ -92,47 +92,144 @@ public function initializeAll(bool $includeInactive = false): void /** * Drops all tables. */ - public function dropAll(bool $includeInactive = false): void + public function dropAll(bool $onlyUsed = true): void { $connection = $this->getConnection(); - foreach ($this->getManagers($includeInactive) as $manager) { + foreach ($this->getManagers($onlyUsed) as $manager) { $manager->dropTable($connection); } } + /** + * Initialize specific feature by name. + */ + public function initialize(string $featureName): void + { + $manager = $this->findManager($featureName); + $connection = $this->getConnection(); + + if ($manager->isInitialized($connection)) { + return; + } + + $manager->createTable($connection); + } + + /** + * Drop specific feature by name. + */ + public function drop(string $featureName): void + { + $manager = $this->findManager($featureName); + $connection = $this->getConnection(); + $manager->dropTable($connection); + } + + /** + * Get SQL statements for specific features. + * + * @param string[] $featureNames + * @return string[] SQL statements to create tables for specified features + */ + public function getCreateSqlStatementsForFeatures(array $featureNames): array + { + $connection = $this->getConnection(); + $statements = []; + + foreach ($featureNames as $featureName) { + $manager = $this->findManager($featureName); + $sql = $manager->getCreateTableSql($connection); + if (is_array($sql)) { + $statements = array_merge($statements, $sql); + } else { + $statements[] = $sql; + } + } + + return $statements; + } + + /** + * Get SQL statements to drop specific features. + * + * @param string[] $featureNames + * @return string[] SQL statements to drop tables for specified features + */ + public function getDropSqlStatementsForFeatures(array $featureNames): array + { + $connection = $this->getConnection(); + $statements = []; + + foreach ($featureNames as $featureName) { + $manager = $this->findManager($featureName); + $statements[] = $manager->getDropTableSql($connection); + } + + return $statements; + } + /** * Returns initialization status for each table manager. * * @return array Map of feature name to initialization status */ - public function getInitializationStatus(bool $includeInactive = false): array + public function getInitializationStatus(bool $onlyUsed = true): array { $connection = $this->getConnection(); $status = []; - foreach ($this->getManagers($includeInactive) as $manager) { + foreach ($this->getManagers($onlyUsed) as $manager) { $status[$manager->getFeatureName()] = $manager->isInitialized($connection); } return $status; } + /** + * Returns usage status for each table manager. + * + * @return array Map of feature name to usage status + */ + public function getUsageStatus(): array + { + $status = []; + + foreach ($this->tableManagers as $manager) { + $status[$manager->getFeatureName()] = $manager->isUsed(); + } + + return $status; + } + /** * @return DbalTableManager[] */ - private function getManagers(bool $includeInactive): array + private function getManagers(bool $onlyUsed): array { - if ($includeInactive) { + if (!$onlyUsed) { + // Return all managers when onlyUsed is false return $this->tableManagers; } + // Return only used managers when onlyUsed is true (default) return array_filter( $this->tableManagers, - fn (DbalTableManager $manager) => $manager->isActive() + fn (DbalTableManager $manager) => $manager->isUsed() ); } + private function findManager(string $featureName): DbalTableManager + { + foreach ($this->tableManagers as $manager) { + if ($manager->getFeatureName() === $featureName) { + return $manager; + } + } + + throw new \InvalidArgumentException("Table manager not found for feature: {$featureName}"); + } + private function getConnection(): Connection { /** @var DbalContext $context */ diff --git a/packages/Dbal/src/Database/DatabaseSetupModule.php b/packages/Dbal/src/Database/DatabaseSetupModule.php index fede8071f..cbfbfdd3c 100644 --- a/packages/Dbal/src/Database/DatabaseSetupModule.php +++ b/packages/Dbal/src/Database/DatabaseSetupModule.php @@ -68,8 +68,8 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO ); $messagingConfiguration->registerServiceDefinition( - DatabaseDropCommand::class, - new Definition(DatabaseDropCommand::class, [ + DatabaseDeleteCommand::class, + new Definition(DatabaseDeleteCommand::class, [ new Reference(DatabaseSetupManager::class), ]) ); @@ -83,9 +83,9 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO ); $this->registerConsoleCommand( - 'drop', - 'ecotone:migration:database:drop', - DatabaseDropCommand::class, + 'delete', + 'ecotone:migration:database:delete', + DatabaseDeleteCommand::class, $messagingConfiguration, $interfaceToCallRegistry ); diff --git a/packages/Dbal/src/Database/DbalTableManager.php b/packages/Dbal/src/Database/DbalTableManager.php index 906b894a4..87acfcee6 100644 --- a/packages/Dbal/src/Database/DbalTableManager.php +++ b/packages/Dbal/src/Database/DbalTableManager.php @@ -23,10 +23,10 @@ interface DbalTableManager extends DefinedObject public function getFeatureName(): string; /** - * Returns whether this table manager is active based on configuration. - * Inactive managers are skipped during setup/drop operations by default. + * Returns whether this table manager is used based on configuration. + * Unused managers can be filtered during setup/drop operations. */ - public function isActive(): bool; + public function isUsed(): bool; /** * Returns the SQL statement(s) to create the table. diff --git a/packages/Dbal/src/Database/DeadLetterTableManager.php b/packages/Dbal/src/Database/DeadLetterTableManager.php index e1493ba9e..879fcb377 100644 --- a/packages/Dbal/src/Database/DeadLetterTableManager.php +++ b/packages/Dbal/src/Database/DeadLetterTableManager.php @@ -21,7 +21,7 @@ class DeadLetterTableManager implements DbalTableManager public function __construct( private string $tableName, - private bool $isActive, + private bool $isUsed, private bool $shouldAutoInitialize, ) { } @@ -31,9 +31,9 @@ public function getFeatureName(): string return self::FEATURE_NAME; } - public function isActive(): bool + public function isUsed(): bool { - return $this->isActive; + return $this->isUsed; } public function getTableName(): string @@ -87,7 +87,7 @@ public function getDefinition(): Definition { return new Definition( self::class, - [$this->tableName, $this->isActive, $this->shouldAutoInitialize] + [$this->tableName, $this->isUsed, $this->shouldAutoInitialize] ); } diff --git a/packages/Dbal/src/Database/DeduplicationTableManager.php b/packages/Dbal/src/Database/DeduplicationTableManager.php index 156f7ca32..2435dcfe8 100644 --- a/packages/Dbal/src/Database/DeduplicationTableManager.php +++ b/packages/Dbal/src/Database/DeduplicationTableManager.php @@ -19,7 +19,7 @@ class DeduplicationTableManager implements DbalTableManager public function __construct( private string $tableName, - private bool $isActive, + private bool $isUsed, private bool $shouldAutoInitialize, ) { } @@ -29,9 +29,9 @@ public function getFeatureName(): string return self::FEATURE_NAME; } - public function isActive(): bool + public function isUsed(): bool { - return $this->isActive; + return $this->isUsed; } public function getTableName(): string @@ -83,7 +83,7 @@ public function getDefinition(): Definition { return new Definition( self::class, - [$this->tableName, $this->isActive, $this->shouldAutoInitialize] + [$this->tableName, $this->isUsed, $this->shouldAutoInitialize] ); } diff --git a/packages/Dbal/src/Database/DocumentStoreTableManager.php b/packages/Dbal/src/Database/DocumentStoreTableManager.php index 65e1a7f21..de436c61f 100644 --- a/packages/Dbal/src/Database/DocumentStoreTableManager.php +++ b/packages/Dbal/src/Database/DocumentStoreTableManager.php @@ -18,7 +18,7 @@ final class DocumentStoreTableManager implements DbalTableManager public function __construct( private string $tableName, - private bool $isActive, + private bool $isUsed, private bool $shouldAutoInitialize, ) { } @@ -28,9 +28,9 @@ public function getFeatureName(): string return self::FEATURE_NAME; } - public function isActive(): bool + public function isUsed(): bool { - return $this->isActive; + return $this->isUsed; } public function getTableName(): string @@ -40,7 +40,7 @@ public function getTableName(): string public function getDefinition(): Definition { - return new Definition(self::class, [$this->tableName, $this->isActive, $this->shouldAutoInitialize]); + return new Definition(self::class, [$this->tableName, $this->isUsed, $this->shouldAutoInitialize]); } public function shouldBeInitializedAutomatically(): bool diff --git a/packages/Dbal/src/Database/EnqueueTableManager.php b/packages/Dbal/src/Database/EnqueueTableManager.php index e937b9e91..a81b09ae2 100644 --- a/packages/Dbal/src/Database/EnqueueTableManager.php +++ b/packages/Dbal/src/Database/EnqueueTableManager.php @@ -19,7 +19,7 @@ final class EnqueueTableManager implements DbalTableManager public function __construct( private string $tableName, - private bool $isActive, + private bool $isUsed, private bool $shouldAutoInitialize, ) { } @@ -29,9 +29,9 @@ public function getFeatureName(): string return self::FEATURE_NAME; } - public function isActive(): bool + public function isUsed(): bool { - return $this->isActive; + return $this->isUsed; } public function getTableName(): string @@ -41,7 +41,7 @@ public function getTableName(): string public function getDefinition(): Definition { - return new Definition(self::class, [$this->tableName, $this->isActive, $this->shouldAutoInitialize]); + return new Definition(self::class, [$this->tableName, $this->isUsed, $this->shouldAutoInitialize]); } public function shouldBeInitializedAutomatically(): bool diff --git a/packages/Dbal/tests/Integration/DatabaseInitializationTest.php b/packages/Dbal/tests/Integration/DatabaseInitializationTest.php index a0bae2f40..623acf07b 100644 --- a/packages/Dbal/tests/Integration/DatabaseInitializationTest.php +++ b/packages/Dbal/tests/Integration/DatabaseInitializationTest.php @@ -39,13 +39,14 @@ public function test_database_setup_lists_features_with_initialization_status(): $result = $this->executeConsoleCommand($ecotone, 'ecotone:migration:database:setup', []); - self::assertEquals(['Feature', 'Initialized'], $result->getColumnHeaders()); + self::assertEquals(['Feature', 'Used', 'Initialized'], $result->getColumnHeaders()); $featureNames = array_column($result->getRows(), 0); self::assertContains('dead_letter', $featureNames); - // Verify dead_letter shows as not initialized + // Verify dead_letter shows as used and not initialized $deadLetterRow = $this->findRowByFeature($result, 'dead_letter'); - self::assertEquals('No', $deadLetterRow[1]); + self::assertEquals('Yes', $deadLetterRow[1]); // Used + self::assertEquals('No', $deadLetterRow[2]); // Initialized } public function test_database_setup_shows_initialized_status_after_initialization(): void @@ -59,7 +60,8 @@ public function test_database_setup_shows_initialized_status_after_initializatio $result = $this->executeConsoleCommand($ecotone, 'ecotone:migration:database:setup', []); $deadLetterRow = $this->findRowByFeature($result, 'dead_letter'); - self::assertEquals('Yes', $deadLetterRow[1]); + self::assertEquals('Yes', $deadLetterRow[1]); // Used + self::assertEquals('Yes', $deadLetterRow[2]); // Initialized } public function test_database_setup_initializes_tables(): void @@ -90,7 +92,7 @@ public function test_database_setup_returns_sql_statements(): void self::assertStringContainsString(DbalDeadLetterHandler::DEFAULT_DEAD_LETTER_TABLE, $allSql); } - public function test_database_drop_drops_tables(): void + public function test_database_delete_deletes_tables(): void { $ecotone = $this->bootstrapEcotone(); @@ -98,14 +100,14 @@ public function test_database_drop_drops_tables(): void $this->executeConsoleCommand($ecotone, 'ecotone:migration:database:setup', ['initialize' => true]); self::assertTrue($this->tableExists(DbalDeadLetterHandler::DEFAULT_DEAD_LETTER_TABLE)); - // Drop tables - $result = $this->executeConsoleCommand($ecotone, 'ecotone:migration:database:drop', ['force' => true]); + // Delete tables + $result = $this->executeConsoleCommand($ecotone, 'ecotone:migration:database:delete', ['force' => true]); self::assertEquals(['Feature', 'Status'], $result->getColumnHeaders()); self::assertFalse($this->tableExists(DbalDeadLetterHandler::DEFAULT_DEAD_LETTER_TABLE)); } - public function test_database_drop_shows_warning_without_force(): void + public function test_database_delete_shows_warning_without_force(): void { $ecotone = $this->bootstrapEcotone(); @@ -113,8 +115,8 @@ public function test_database_drop_shows_warning_without_force(): void $this->executeConsoleCommand($ecotone, 'ecotone:migration:database:setup', ['initialize' => true]); self::assertTrue($this->tableExists(DbalDeadLetterHandler::DEFAULT_DEAD_LETTER_TABLE)); - // Try to drop without force - $result = $this->executeConsoleCommand($ecotone, 'ecotone:migration:database:drop', []); + // Try to delete without force + $result = $this->executeConsoleCommand($ecotone, 'ecotone:migration:database:delete', []); self::assertEquals(['Feature', 'Warning'], $result->getColumnHeaders()); // Tables should still exist @@ -156,6 +158,94 @@ public function test_tables_are_not_auto_created_when_auto_initialization_disabl self::assertTrue($this->tableExists(DbalDeadLetterHandler::DEFAULT_DEAD_LETTER_TABLE)); } + public function test_database_setup_with_specific_features(): void + { + $ecotone = $this->bootstrapEcotone(); + + // Initialize only specific feature + $result = $this->executeConsoleCommand($ecotone, 'ecotone:migration:database:setup', [ + 'features' => ['dead_letter'], + 'initialize' => true, + ]); + + self::assertEquals(['Feature', 'Status'], $result->getColumnHeaders()); + self::assertEquals([['dead_letter', 'Created']], $result->getRows()); + self::assertTrue($this->tableExists(DbalDeadLetterHandler::DEFAULT_DEAD_LETTER_TABLE)); + } + + public function test_database_setup_shows_status_for_specific_features(): void + { + $ecotone = $this->bootstrapEcotone(); + + // First initialize + $this->executeConsoleCommand($ecotone, 'ecotone:migration:database:setup', [ + 'features' => ['dead_letter'], + 'initialize' => true, + ]); + + // Check status for specific feature + $result = $this->executeConsoleCommand($ecotone, 'ecotone:migration:database:setup', [ + 'features' => ['dead_letter'], + ]); + + self::assertEquals(['Feature', 'Used', 'Initialized'], $result->getColumnHeaders()); + self::assertEquals([['dead_letter', 'Yes', 'Yes']], $result->getRows()); + } + + public function test_database_setup_returns_sql_for_specific_features(): void + { + $ecotone = $this->bootstrapEcotone(); + + $result = $this->executeConsoleCommand($ecotone, 'ecotone:migration:database:setup', [ + 'features' => ['dead_letter'], + 'sql' => true, + ]); + + self::assertEquals(['SQL Statement'], $result->getColumnHeaders()); + self::assertCount(1, $result->getRows()); + $sql = $result->getRows()[0][0]; + self::assertStringContainsString('CREATE TABLE', $sql); + self::assertStringContainsString(DbalDeadLetterHandler::DEFAULT_DEAD_LETTER_TABLE, $sql); + } + + public function test_database_delete_with_specific_features(): void + { + $ecotone = $this->bootstrapEcotone(); + + // First create tables + $this->executeConsoleCommand($ecotone, 'ecotone:migration:database:setup', ['initialize' => true]); + self::assertTrue($this->tableExists(DbalDeadLetterHandler::DEFAULT_DEAD_LETTER_TABLE)); + + // Delete specific feature + $result = $this->executeConsoleCommand($ecotone, 'ecotone:migration:database:delete', [ + 'features' => ['dead_letter'], + 'force' => true, + ]); + + self::assertEquals(['Feature', 'Status'], $result->getColumnHeaders()); + self::assertEquals([['dead_letter', 'Deleted']], $result->getRows()); + self::assertFalse($this->tableExists(DbalDeadLetterHandler::DEFAULT_DEAD_LETTER_TABLE)); + } + + public function test_database_delete_shows_warning_for_specific_features_without_force(): void + { + $ecotone = $this->bootstrapEcotone(); + + // First create tables + $this->executeConsoleCommand($ecotone, 'ecotone:migration:database:setup', ['initialize' => true]); + self::assertTrue($this->tableExists(DbalDeadLetterHandler::DEFAULT_DEAD_LETTER_TABLE)); + + // Try to delete without force + $result = $this->executeConsoleCommand($ecotone, 'ecotone:migration:database:delete', [ + 'features' => ['dead_letter'], + ]); + + self::assertEquals(['Feature', 'Warning'], $result->getColumnHeaders()); + self::assertEquals([['dead_letter', 'Would be deleted (use --force to confirm)']], $result->getRows()); + // Table should still exist + self::assertTrue($this->tableExists(DbalDeadLetterHandler::DEFAULT_DEAD_LETTER_TABLE)); + } + private function executeConsoleCommand(FlowTestSupport $ecotone, string $commandName, array $parameters): ConsoleCommandResultSet { /** @var ConsoleCommandRunner $runner */ diff --git a/packages/Ecotone/src/Messaging/Channel/Manager/ChannelDeleteCommand.php b/packages/Ecotone/src/Messaging/Channel/Manager/ChannelDeleteCommand.php new file mode 100644 index 000000000..bf903626d --- /dev/null +++ b/packages/Ecotone/src/Messaging/Channel/Manager/ChannelDeleteCommand.php @@ -0,0 +1,70 @@ + 0) { + $rows = []; + + if (!$force) { + foreach ($channels as $channelName) { + $rows[] = [$channelName, 'Would be deleted (use --force to confirm)']; + } + return ConsoleCommandResultSet::create(['Channel', 'Warning'], $rows); + } + + foreach ($channels as $channelName) { + $this->channelSetupManager->delete($channelName); + $rows[] = [$channelName, 'Deleted']; + } + return ConsoleCommandResultSet::create(['Channel', 'Status'], $rows); + } + + // Show all channels + $channelNames = $this->channelSetupManager->getChannelNames(); + + if (\count($channelNames) === 0) { + return ConsoleCommandResultSet::create( + ['Status'], + [['No message channels registered for deletion.']] + ); + } + + if (!$force) { + return ConsoleCommandResultSet::create( + ['Channel', 'Warning'], + array_map(fn (string $channel) => [$channel, 'Would be deleted (use --force to confirm)'], $channelNames) + ); + } + + $this->channelSetupManager->deleteAll(); + return ConsoleCommandResultSet::create( + ['Channel', 'Status'], + array_map(fn (string $channel) => [$channel, 'Deleted'], $channelNames) + ); + } +} + diff --git a/packages/Ecotone/src/Messaging/Channel/Manager/ChannelInitializationConfiguration.php b/packages/Ecotone/src/Messaging/Channel/Manager/ChannelInitializationConfiguration.php new file mode 100644 index 000000000..a5391c595 --- /dev/null +++ b/packages/Ecotone/src/Messaging/Channel/Manager/ChannelInitializationConfiguration.php @@ -0,0 +1,40 @@ +automaticChannelInitialization = $enabled; + return $self; + } + + public function isAutomaticChannelInitializationEnabled(): bool + { + return $this->automaticChannelInitialization; + } +} + diff --git a/packages/Ecotone/src/Messaging/Channel/Manager/ChannelManager.php b/packages/Ecotone/src/Messaging/Channel/Manager/ChannelManager.php new file mode 100644 index 000000000..fa597b236 --- /dev/null +++ b/packages/Ecotone/src/Messaging/Channel/Manager/ChannelManager.php @@ -0,0 +1,47 @@ +referenceName; + } +} + diff --git a/packages/Ecotone/src/Messaging/Channel/Manager/ChannelSetupCommand.php b/packages/Ecotone/src/Messaging/Channel/Manager/ChannelSetupCommand.php new file mode 100644 index 000000000..3f7cc08fd --- /dev/null +++ b/packages/Ecotone/src/Messaging/Channel/Manager/ChannelSetupCommand.php @@ -0,0 +1,88 @@ + 0) { + $rows = []; + + if ($initialize) { + foreach ($channels as $channelName) { + $this->channelSetupManager->initialize($channelName); + $rows[] = [$channelName, 'Initialized']; + } + return ConsoleCommandResultSet::create(['Channel', 'Status'], $rows); + } + + $status = $this->channelSetupManager->getInitializationStatus(); + foreach ($channels as $channelName) { + $channelStatus = $status[$channelName] ?? false; + $rows[] = [$channelName, $this->formatStatus($channelStatus)]; + } + return ConsoleCommandResultSet::create(['Channel', 'Initialized'], $rows); + } + + // Show all channels + $channelNames = $this->channelSetupManager->getChannelNames(); + + if (count($channelNames) === 0) { + return ConsoleCommandResultSet::create( + ['Status'], + [['No message channels registered for setup.']] + ); + } + + if ($initialize) { + $this->channelSetupManager->initializeAll(); + return ConsoleCommandResultSet::create( + ['Channel', 'Status'], + array_map(fn (string $channel) => [$channel, 'Initialized'], $channelNames) + ); + } + + // Show status + $initializationStatus = $this->channelSetupManager->getInitializationStatus(); + $rows = []; + foreach ($channelNames as $channelName) { + $channelStatus = $initializationStatus[$channelName] ?? false; + $rows[] = [$channelName, $this->formatStatus($channelStatus)]; + } + + return ConsoleCommandResultSet::create(['Channel', 'Initialized'], $rows); + } + + /** + * Format the initialization status for display + */ + private function formatStatus(bool|string $status): string + { + if (is_string($status)) { + return $status; + } + return $status ? 'Yes' : 'No'; + } +} + diff --git a/packages/Ecotone/src/Messaging/Channel/Manager/ChannelSetupManager.php b/packages/Ecotone/src/Messaging/Channel/Manager/ChannelSetupManager.php new file mode 100644 index 000000000..ec9de720a --- /dev/null +++ b/packages/Ecotone/src/Messaging/Channel/Manager/ChannelSetupManager.php @@ -0,0 +1,152 @@ + $manager->getChannelName(), + $this->channelManagers + ); + + // Combine managed and non-managed channels, removing duplicates + return array_values(array_unique(array_merge($managedChannelNames, $this->allPollableChannelNames))); + } + + /** + * Check if a channel is managed by migration system + */ + private function isManagedChannel(string $channelName): bool + { + foreach ($this->channelManagers as $manager) { + if ($manager->getChannelName() === $channelName) { + return true; + } + } + return false; + } + + /** + * Initialize all channels + */ + public function initializeAll(): void + { + foreach ($this->channelManagers as $manager) { + $manager->initialize(); + } + } + + /** + * Initialize specific channel by name + */ + public function initialize(string $channelName): void + { + if (!$this->isManagedChannel($channelName)) { + throw new \InvalidArgumentException( + "Channel '{$channelName}' is not managed by the migration system. " . + "Only channels registered via ChannelManagerReference can be initialized through this command." + ); + } + + $manager = $this->findManager($channelName); + $manager->initialize(); + } + + /** + * Delete all channels + */ + public function deleteAll(): void + { + foreach ($this->channelManagers as $manager) { + $manager->delete(); + } + } + + /** + * Delete specific channel by name + */ + public function delete(string $channelName): void + { + if (!$this->isManagedChannel($channelName)) { + throw new \InvalidArgumentException( + "Channel '{$channelName}' is not managed by the migration system. " . + "Only channels registered via ChannelManagerReference can be deleted through this command." + ); + } + + $manager = $this->findManager($channelName); + $manager->delete(); + } + + /** + * Returns initialization status for each channel + * @return array Map of channel name to initialization status (bool for managed, 'Not managed by migration' for non-managed) + */ + public function getInitializationStatus(): array + { + $status = []; + + foreach ($this->channelManagers as $manager) { + $status[$manager->getChannelName()] = $manager->isInitialized(); + } + + foreach ($this->allPollableChannelNames as $channelName) { + if (!isset($status[$channelName])) { + $status[$channelName] = 'Not managed by migration'; + } + } + + return $status; + } + + private function findManager(string $channelName): ChannelManager + { + foreach ($this->channelManagers as $manager) { + if ($manager->getChannelName() === $channelName) { + return $manager; + } + } + + throw new \InvalidArgumentException("Channel manager not found for channel: {$channelName}"); + } + + public function getDefinition(): Definition + { + $channelManagerDefinitions = array_map( + fn (ChannelManager $manager) => $manager->getDefinition(), + $this->channelManagers + ); + + return new Definition( + self::class, + [$channelManagerDefinitions, $this->allPollableChannelNames] + ); + } +} + diff --git a/packages/Ecotone/src/Messaging/Channel/Manager/ChannelSetupModule.php b/packages/Ecotone/src/Messaging/Channel/Manager/ChannelSetupModule.php new file mode 100644 index 000000000..453609d0d --- /dev/null +++ b/packages/Ecotone/src/Messaging/Channel/Manager/ChannelSetupModule.php @@ -0,0 +1,105 @@ + new Reference($ref->getReferenceName()), + $channelManagerReferences + ); + + // Collect all pollable channels from extension objects + $messageChannelBuilders = ExtensionObjectResolver::resolve(\Ecotone\Messaging\Channel\MessageChannelBuilder::class, $extensionObjects); + $allPollableChannelNames = array_map( + fn (\Ecotone\Messaging\Channel\MessageChannelBuilder $builder) => $builder->getMessageChannelName(), + array_filter( + $messageChannelBuilders, + fn (\Ecotone\Messaging\Channel\MessageChannelBuilder $builder) => $builder->isPollable() + ) + ); + + $messagingConfiguration->registerServiceDefinition( + ChannelSetupManager::class, + new Definition(ChannelSetupManager::class, [$channelManagerRefs, $allPollableChannelNames]) + ); + + $messagingConfiguration->registerServiceDefinition( + ChannelSetupCommand::class, + new Definition(ChannelSetupCommand::class, [new Reference(ChannelSetupManager::class)]) + ); + + $messagingConfiguration->registerServiceDefinition( + ChannelDeleteCommand::class, + new Definition(ChannelDeleteCommand::class, [new Reference(ChannelSetupManager::class)]) + ); + + // Register console commands + $this->registerConsoleCommand('setup', 'ecotone:migration:channel:setup', ChannelSetupCommand::class, $messagingConfiguration, $interfaceToCallRegistry); + $this->registerConsoleCommand('delete', 'ecotone:migration:channel:delete', ChannelDeleteCommand::class, $messagingConfiguration, $interfaceToCallRegistry); + } + + public function canHandle($extensionObject): bool + { + return $extensionObject instanceof ChannelManagerReference + || $extensionObject instanceof ChannelInitializationConfiguration + || $extensionObject instanceof \Ecotone\Messaging\Channel\MessageChannelBuilder; + } + + public function getModulePackageName(): string + { + return ModulePackageList::CORE_PACKAGE; + } + + private function registerConsoleCommand( + string $methodName, + string $commandName, + string $className, + Configuration $configuration, + InterfaceToCallRegistry $interfaceToCallRegistry + ): void { + [$messageHandlerBuilder, $oneTimeCommandConfiguration] = ConsoleCommandModule::prepareConsoleCommandForReference( + new Reference($className), + new InterfaceToCallReference($className, $methodName), + $commandName, + true, + $interfaceToCallRegistry + ); + + $configuration + ->registerMessageHandler($messageHandlerBuilder) + ->registerConsoleCommand($oneTimeCommandConfiguration); + } +} + diff --git a/packages/Ecotone/src/Messaging/Config/ModuleClassList.php b/packages/Ecotone/src/Messaging/Config/ModuleClassList.php index bd02d3287..91f219a75 100644 --- a/packages/Ecotone/src/Messaging/Config/ModuleClassList.php +++ b/packages/Ecotone/src/Messaging/Config/ModuleClassList.php @@ -2,6 +2,7 @@ namespace Ecotone\Messaging\Config; +use Ecotone\Amqp\Configuration\AmqpChannelManagerModule; use Ecotone\Amqp\Configuration\AmqpMessageConsumerModule; use Ecotone\Amqp\Configuration\AmqpModule; use Ecotone\Amqp\Configuration\RabbitConsumerModule; @@ -26,6 +27,7 @@ use Ecotone\Lite\Test\Configuration\EcotoneTestSupportModule; use Ecotone\Messaging\Channel\Collector\Config\CollectorModule; use Ecotone\Messaging\Channel\DynamicChannel\Config\DynamicMessageChannelModule; +use Ecotone\Messaging\Channel\Manager\ChannelSetupModule; use Ecotone\Messaging\Channel\PollableChannel\InMemory\InMemoryQueueAcknowledgeModule; use Ecotone\Messaging\Channel\PollableChannel\SendRetries\PollableChannelSendRetriesModule; use Ecotone\Messaging\Channel\PollableChannel\Serialization\PollableChannelSerializationModule; @@ -69,8 +71,10 @@ use Ecotone\Projecting\EventStoreAdapter\EventStoreAdapterModule; use Ecotone\Redis\Configuration\RedisMessageConsumerModule; use Ecotone\Redis\Configuration\RedisMessagePublisherModule; +use Ecotone\Sqs\Configuration\SqsChannelManagerModule; use Ecotone\Sqs\Configuration\SqsMessageConsumerModule; use Ecotone\Sqs\Configuration\SqsMessagePublisherModule; +use Ecotone\Sqs\Configuration\SqsModule; use Ecotone\SymfonyBundle\Config\SymfonyConnectionModule; /** @@ -98,6 +102,7 @@ class ModuleClassList RouterModule::class, ScheduledModule::class, CollectorModule::class, + ChannelSetupModule::class, SerializerModule::class, ServiceActivatorModule::class, SplitterModule::class, @@ -132,6 +137,7 @@ class ModuleClassList AmqpTransactionModule::class, AmqpMessagePublisherModule::class, AmqpModule::class, + AmqpChannelManagerModule::class, AmqpMessageConsumerModule::class, RabbitConsumerModule::class, ]; @@ -155,6 +161,8 @@ class ModuleClassList ]; public const SQS_MODULES = [ + SqsModule::class, + SqsChannelManagerModule::class, SqsMessageConsumerModule::class, SqsMessagePublisherModule::class, ]; diff --git a/packages/Ecotone/tests/Messaging/Integration/ChannelSetupCommandTest.php b/packages/Ecotone/tests/Messaging/Integration/ChannelSetupCommandTest.php new file mode 100644 index 000000000..ef6b4d275 --- /dev/null +++ b/packages/Ecotone/tests/Messaging/Integration/ChannelSetupCommandTest.php @@ -0,0 +1,87 @@ +bootstrapEcotone(); + + $runner = $ecotone->getGateway(ConsoleCommandRunner::class); + $result = $runner->execute('ecotone:migration:channel:setup', []); + + self::assertNotNull($result); + $rows = $result->getRows(); + + // Find the in-memory channel in the results + $inMemoryChannelRow = null; + foreach ($rows as $row) { + if ($row[0] === self::IN_MEMORY_CHANNEL) { + $inMemoryChannelRow = $row; + break; + } + } + + self::assertNotNull($inMemoryChannelRow, 'In-memory channel should be listed'); + self::assertEquals([self::IN_MEMORY_CHANNEL, 'Not managed by migration'], $inMemoryChannelRow); + } + + public function test_throwing_exception_when_trying_to_setup_non_managed_channel(): void + { + $ecotone = $this->bootstrapEcotone(); + + $runner = $ecotone->getGateway(ConsoleCommandRunner::class); + + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage("Channel 'in_memory_test_channel' is not managed by the migration system"); + + $runner->execute('ecotone:migration:channel:setup', [ + 'channels' => [self::IN_MEMORY_CHANNEL], + 'initialize' => true, + ]); + } + + public function test_throwing_exception_when_trying_to_delete_non_managed_channel(): void + { + $ecotone = $this->bootstrapEcotone(); + + $runner = $ecotone->getGateway(ConsoleCommandRunner::class); + + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage("Channel 'in_memory_test_channel' is not managed by the migration system"); + + $runner->execute('ecotone:migration:channel:delete', [ + 'channels' => [self::IN_MEMORY_CHANNEL], + 'force' => true, + ]); + } + + private function bootstrapEcotone(): \Ecotone\Lite\Test\FlowTestSupport + { + return EcotoneLite::bootstrapFlowTesting( + containerOrAvailableServices: [], + configuration: ServiceConfiguration::createWithDefaults() + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(self::IN_MEMORY_CHANNEL), + ]), + pathToRootCatalog: __DIR__ . '/../../', + ); + } +} + diff --git a/packages/Enqueue/src/EnqueueInboundChannelAdapterBuilder.php b/packages/Enqueue/src/EnqueueInboundChannelAdapterBuilder.php index 65fc6ca52..75871b5f1 100644 --- a/packages/Enqueue/src/EnqueueInboundChannelAdapterBuilder.php +++ b/packages/Enqueue/src/EnqueueInboundChannelAdapterBuilder.php @@ -75,6 +75,11 @@ public function getMessageChannelName(): string return $this->messageChannelName; } + public function getConnectionReferenceName(): string + { + return $this->connectionReferenceName; + } + /** * @param string $headerMapper * @return static diff --git a/packages/PdoEventSourcing/src/Database/EventStreamTableManager.php b/packages/PdoEventSourcing/src/Database/EventStreamTableManager.php index ed08a0b8b..961b73439 100644 --- a/packages/PdoEventSourcing/src/Database/EventStreamTableManager.php +++ b/packages/PdoEventSourcing/src/Database/EventStreamTableManager.php @@ -22,8 +22,8 @@ final class EventStreamTableManager implements DbalTableManager public function __construct( private string $tableName, - private bool $isActive, - private bool $shouldAutoInitialize, + private bool $isUsed, + private bool $shouldAutoInitialize, ) { } @@ -32,9 +32,9 @@ public function getFeatureName(): string return self::FEATURE_NAME; } - public function isActive(): bool + public function isUsed(): bool { - return $this->isActive; + return $this->isUsed; } public function getTableName(): string @@ -94,7 +94,7 @@ public function isInitialized(Connection $connection): bool public function getDefinition(): Definition { - return new Definition(self::class, [$this->tableName, $this->isActive, $this->shouldAutoInitialize]); + return new Definition(self::class, [$this->tableName, $this->isUsed, $this->shouldAutoInitialize]); } public function shouldBeInitializedAutomatically(): bool diff --git a/packages/PdoEventSourcing/src/Database/LegacyProjectionsTableManager.php b/packages/PdoEventSourcing/src/Database/LegacyProjectionsTableManager.php index a7ac7b4a7..5d639a490 100644 --- a/packages/PdoEventSourcing/src/Database/LegacyProjectionsTableManager.php +++ b/packages/PdoEventSourcing/src/Database/LegacyProjectionsTableManager.php @@ -22,8 +22,8 @@ final class LegacyProjectionsTableManager implements DbalTableManager public function __construct( private string $tableName, - private bool $isActive, - private bool $shouldAutoInitialize, + private bool $isUsed, + private bool $shouldAutoInitialize, ) { } @@ -32,9 +32,9 @@ public function getFeatureName(): string return self::FEATURE_NAME; } - public function isActive(): bool + public function isUsed(): bool { - return $this->isActive; + return $this->isUsed; } public function getTableName(): string @@ -94,7 +94,7 @@ public function isInitialized(Connection $connection): bool public function getDefinition(): Definition { - return new Definition(self::class, [$this->tableName, $this->isActive, $this->shouldAutoInitialize]); + return new Definition(self::class, [$this->tableName, $this->isUsed, $this->shouldAutoInitialize]); } public function shouldBeInitializedAutomatically(): bool diff --git a/packages/PdoEventSourcing/src/Database/ProjectionStateTableManager.php b/packages/PdoEventSourcing/src/Database/ProjectionStateTableManager.php index e2280316f..98c7f440f 100644 --- a/packages/PdoEventSourcing/src/Database/ProjectionStateTableManager.php +++ b/packages/PdoEventSourcing/src/Database/ProjectionStateTableManager.php @@ -24,8 +24,8 @@ final class ProjectionStateTableManager implements DbalTableManager public function __construct( private string $tableName, - private bool $isActive, - private bool $shouldAutoInitialize, + private bool $isUsed, + private bool $shouldAutoInitialize, ) { } @@ -34,9 +34,9 @@ public function getFeatureName(): string return self::FEATURE_NAME; } - public function isActive(): bool + public function isUsed(): bool { - return $this->isActive; + return $this->isUsed; } public function getTableName(): string @@ -86,7 +86,7 @@ public function isInitialized(Connection $connection): bool public function getDefinition(): Definition { - return new Definition(self::class, [$this->tableName, $this->isActive, $this->shouldAutoInitialize]); + return new Definition(self::class, [$this->tableName, $this->isUsed, $this->shouldAutoInitialize]); } public function shouldBeInitializedAutomatically(): bool diff --git a/packages/PdoEventSourcing/tests/Projecting/ProjectionStateTableInitializationTest.php b/packages/PdoEventSourcing/tests/Projecting/ProjectionStateTableInitializationTest.php index 15430e368..a52e31fc8 100644 --- a/packages/PdoEventSourcing/tests/Projecting/ProjectionStateTableInitializationTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/ProjectionStateTableInitializationTest.php @@ -116,7 +116,7 @@ public function test_projection_works_with_auto_initialization_enabled(): void DbalConfiguration::createWithDefaults()->withAutomaticTableInitialization(true) ); - $this->executeConsoleCommand($ecotone, 'ecotone:migration:database:drop', ['force' => true]); + $this->executeConsoleCommand($ecotone, 'ecotone:migration:database:delete', ['force' => true]); // Verify projection state table does not exist self::assertFalse($this->projectionStateTableExists()); diff --git a/packages/Sqs/src/Configuration/SqsChannelManagerModule.php b/packages/Sqs/src/Configuration/SqsChannelManagerModule.php new file mode 100644 index 000000000..290a306a5 --- /dev/null +++ b/packages/Sqs/src/Configuration/SqsChannelManagerModule.php @@ -0,0 +1,101 @@ +isAutomaticChannelInitializationEnabled(); + + // Configure channel builders and register channel managers + foreach ($extensionObjects as $extensionObject) { + if ($extensionObject instanceof SqsBackedMessageChannelBuilder) { + if (!$shouldAutoInitialize) { + $extensionObject->withAutoDeclare(false); + } + + $channelName = $extensionObject->getMessageChannelName(); + $queueName = $channelName; // For SQS, queue name is same as channel name + $connectionRef = $extensionObject->getInboundChannelAdapter()->getConnectionReferenceName(); + + $managerRef = "sqs_channel_manager.{$channelName}"; + $messagingConfiguration->registerServiceDefinition( + $managerRef, + new Definition(SqsChannelManager::class, [ + $channelName, + $queueName, + new Reference($connectionRef), + $shouldAutoInitialize, + ]) + ); + } + } + } + + public function canHandle($extensionObject): bool + { + return + $extensionObject instanceof SqsBackedMessageChannelBuilder + || $extensionObject instanceof ChannelInitializationConfiguration; + } + + public function getModuleExtensions(ServiceConfiguration $serviceConfiguration, array $serviceExtensions): array + { + $channelManagerReferences = []; + + // Create channel manager references for each SQS channel + foreach ($serviceExtensions as $extensionObject) { + if ($extensionObject instanceof SqsBackedMessageChannelBuilder) { + $channelName = $extensionObject->getMessageChannelName(); + $channelManagerReferences[] = new ChannelManagerReference("sqs_channel_manager.{$channelName}"); + } + } + + return $channelManagerReferences; + } + + public function getModulePackageName(): string + { + return ModulePackageList::SQS_PACKAGE; + } +} + diff --git a/packages/Sqs/src/Configuration/SqsModule.php b/packages/Sqs/src/Configuration/SqsModule.php new file mode 100644 index 000000000..82d350f03 --- /dev/null +++ b/packages/Sqs/src/Configuration/SqsModule.php @@ -0,0 +1,45 @@ +channelName; + } + + public function getChannelType(): string + { + return 'sqs'; + } + + public function initialize(): void + { + if ($this->isInitialized()) { + return; + } + + /** @var SqsContext $context */ + $context = $this->connectionFactory->createContext(); + $context->declareQueue($context->createQueue($this->queueName)); + } + + public function delete(): void + { + /** @var SqsContext $context */ + $context = $this->connectionFactory->createContext(); + $queue = $context->createQueue($this->queueName); + $context->deleteQueue($queue); + } + + public function isInitialized(): bool + { + try { + /** @var SqsContext $context */ + $context = $this->connectionFactory->createContext(); + $queue = $context->createQueue($this->queueName); + // Use GetQueueUrl to check existence + $context->getQueueUrl($queue); + return true; + } catch (\Exception $e) { + // QueueDoesNotExist exception or other error + return false; + } + } + + public function shouldBeInitializedAutomatically(): bool + { + return $this->shouldAutoInitialize; + } + + public function getDefinition(): Definition + { + return new Definition(self::class, [ + $this->channelName, + $this->queueName, + $this->connectionFactory, + $this->shouldAutoInitialize, + ]); + } +} + diff --git a/packages/Sqs/tests/Integration/SqsChannelInitializationTest.php b/packages/Sqs/tests/Integration/SqsChannelInitializationTest.php new file mode 100644 index 000000000..23c0c3fc9 --- /dev/null +++ b/packages/Sqs/tests/Integration/SqsChannelInitializationTest.php @@ -0,0 +1,271 @@ +cleanUpSqs(); + + $ecotone = $this->bootstrapEcotone( + ChannelInitializationConfiguration::createWithDefaults() + ->withAutomaticChannelInitialization(false) + ); + + // Try to receive message - should fail because queue doesn't exist and auto-init is disabled + $this->expectException(\Exception::class); + $ecotone->getMessageChannel(self::TEST_CHANNEL_NAME)->receive(); + } + + public function test_sending_fails_when_auto_initialization_disabled_and_channel_auto_declare_disabled(): void + { + $this->cleanUpSqs(); + + $ecotone = $this->bootstrapEcotone( + ChannelInitializationConfiguration::createWithDefaults() + ->withAutomaticChannelInitialization(true), + autoDeclare: false, + ); + + // Try to receive message - should fail because queue doesn't exist and auto-init is disabled + $this->expectException(\Exception::class); + $ecotone->getMessageChannel(self::TEST_CHANNEL_NAME)->receive(); + } + + public function test_manual_channel_initialization_via_console_command_then_send_succeeds(): void + { + $this->cleanUpSqs(); + + $ecotone = $this->bootstrapEcotone( + ChannelInitializationConfiguration::createWithDefaults() + ->withAutomaticChannelInitialization(false) + ); + + // Initialize via console command + $runner = $ecotone->getGateway(ConsoleCommandRunner::class); + $result = $runner->execute('ecotone:migration:channel:setup', ['initialize' => true]); + + // Verify the command output shows initialization + self::assertNotNull($result); + $rows = $result->getRows(); + self::assertCount(1, $rows); + self::assertEquals([self::TEST_CHANNEL_NAME, 'Initialized'], $rows[0]); + + // Now sending should work + $ecotone->sendDirectToChannel(self::TEST_CHANNEL_NAME, 'test message'); + + // Consume and verify + $message = $ecotone->getMessageChannel(self::TEST_CHANNEL_NAME)->receive(); + self::assertNotNull($message); + self::assertEquals('test message', $message->getPayload()); + } + + public function test_setup_then_delete_then_send_fails(): void + { + $this->cleanUpSqs(); + + $ecotone = $this->bootstrapEcotone( + ChannelInitializationConfiguration::createWithDefaults() + ->withAutomaticChannelInitialization(false) + ); + + $runner = $ecotone->getGateway(ConsoleCommandRunner::class); + + // First, initialize the channel + $runner->execute('ecotone:migration:channel:setup', ['initialize' => true]); + + // Verify we can send and receive + $ecotone->sendDirectToChannel(self::TEST_CHANNEL_NAME, 'test message 1'); + $message = $ecotone->getMessageChannel(self::TEST_CHANNEL_NAME)->receive(); + self::assertNotNull($message); + self::assertEquals('test message 1', $message->getPayload()); + + // Now delete the channel + $result = $runner->execute('ecotone:migration:channel:delete', ['force' => true]); + self::assertNotNull($result); + $rows = $result->getRows(); + self::assertCount(1, $rows); + self::assertEquals([self::TEST_CHANNEL_NAME, 'Deleted'], $rows[0]); + + // Try to receive message - should fail because queue was deleted + $this->expectException(\Exception::class); + $ecotone->getMessageChannel(self::TEST_CHANNEL_NAME)->receive(); + } + + public function test_channel_status_command_shows_initialization_state(): void + { + $this->cleanUpSqs(); + + $ecotone = $this->bootstrapEcotone( + ChannelInitializationConfiguration::createWithDefaults() + ->withAutomaticChannelInitialization(false) + ); + + $runner = $ecotone->getGateway(ConsoleCommandRunner::class); + + // Check status before initialization + $result = $runner->execute('ecotone:migration:channel:setup', []); + self::assertNotNull($result); + $rows = $result->getRows(); + self::assertCount(1, $rows); + self::assertEquals([self::TEST_CHANNEL_NAME, 'No'], $rows[0]); + + // Initialize + $runner->execute('ecotone:migration:channel:setup', ['initialize' => true]); + + // Check status after initialization + $result = $runner->execute('ecotone:migration:channel:setup', []); + self::assertNotNull($result); + $rows = $result->getRows(); + self::assertCount(1, $rows); + self::assertEquals([self::TEST_CHANNEL_NAME, 'Yes'], $rows[0]); + } + + public function test_initialize_multiple_channels_at_once(): void + { + $this->cleanUpSqs(); + + $ecotone = $this->bootstrapEcotoneWithMultipleChannels( + ChannelInitializationConfiguration::createWithDefaults() + ->withAutomaticChannelInitialization(false) + ); + + $runner = $ecotone->getGateway(ConsoleCommandRunner::class); + + // Initialize multiple channels at once + $result = $runner->execute('ecotone:migration:channel:setup', [ + 'channels' => [self::TEST_CHANNEL_NAME_2, self::TEST_CHANNEL_NAME_3], + 'initialize' => true + ]); + + self::assertNotNull($result); + $rows = $result->getRows(); + self::assertCount(2, $rows); + self::assertEquals([self::TEST_CHANNEL_NAME_2, 'Initialized'], $rows[0]); + self::assertEquals([self::TEST_CHANNEL_NAME_3, 'Initialized'], $rows[1]); + + // Verify we can send and receive on both channels + $ecotone->sendDirectToChannel(self::TEST_CHANNEL_NAME_2, 'message 2'); + $ecotone->sendDirectToChannel(self::TEST_CHANNEL_NAME_3, 'message 3'); + + $message2 = $ecotone->getMessageChannel(self::TEST_CHANNEL_NAME_2)->receive(); + $message3 = $ecotone->getMessageChannel(self::TEST_CHANNEL_NAME_3)->receive(); + + self::assertNotNull($message2); + self::assertNotNull($message3); + self::assertEquals('message 2', $message2->getPayload()); + self::assertEquals('message 3', $message3->getPayload()); + } + + public function test_delete_multiple_channels_at_once(): void + { + $this->cleanUpSqs(); + + $ecotone = $this->bootstrapEcotoneWithMultipleChannels( + ChannelInitializationConfiguration::createWithDefaults() + ->withAutomaticChannelInitialization(false) + ); + + $runner = $ecotone->getGateway(ConsoleCommandRunner::class); + + // Initialize all channels first + $runner->execute('ecotone:migration:channel:setup', ['initialize' => true]); + + // Delete multiple channels at once + $result = $runner->execute('ecotone:migration:channel:delete', [ + 'channels' => [self::TEST_CHANNEL_NAME_2, self::TEST_CHANNEL_NAME_3], + 'force' => true + ]); + + self::assertNotNull($result); + $rows = $result->getRows(); + self::assertCount(2, $rows); + self::assertEquals([self::TEST_CHANNEL_NAME_2, 'Deleted'], $rows[0]); + self::assertEquals([self::TEST_CHANNEL_NAME_3, 'Deleted'], $rows[1]); + + // Verify channels are deleted - receiving should fail + $this->expectException(\Exception::class); + $ecotone->getMessageChannel(self::TEST_CHANNEL_NAME_2)->receive(); + } + + private function bootstrapEcotone(ChannelInitializationConfiguration $config, bool $autoDeclare = true): \Ecotone\Lite\Test\FlowTestSupport + { + return EcotoneLite::bootstrapFlowTesting( + containerOrAvailableServices: [ + self::getConnection(), + ], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(\Ecotone\Messaging\Config\ModulePackageList::allPackagesExcept([ + \Ecotone\Messaging\Config\ModulePackageList::CORE_PACKAGE, + \Ecotone\Messaging\Config\ModulePackageList::SQS_PACKAGE, + ])) + ->withExtensionObjects([ + $config, + SqsBackedMessageChannelBuilder::create(self::TEST_CHANNEL_NAME) + ->withAutoDeclare($autoDeclare), + ]), + pathToRootCatalog: __DIR__ . '/../../', + ); + } + + private function bootstrapEcotoneWithMultipleChannels(ChannelInitializationConfiguration $config): \Ecotone\Lite\Test\FlowTestSupport + { + return EcotoneLite::bootstrapFlowTesting( + containerOrAvailableServices: [ + self::getConnection(), + ], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(\Ecotone\Messaging\Config\ModulePackageList::allPackagesExcept([ + \Ecotone\Messaging\Config\ModulePackageList::CORE_PACKAGE, + \Ecotone\Messaging\Config\ModulePackageList::SQS_PACKAGE, + ])) + ->withExtensionObjects([ + $config, + SqsBackedMessageChannelBuilder::create(self::TEST_CHANNEL_NAME_2)->withAutoDeclare(false), + SqsBackedMessageChannelBuilder::create(self::TEST_CHANNEL_NAME_3)->withAutoDeclare(false), + ]), + pathToRootCatalog: __DIR__ . '/../../', + ); + } + + public static function cleanUpSqs(): void + { + try { + /** @var \Enqueue\Sqs\SqsContext $context */ + $context = self::getConnection()->createContext(); + + // Clean up all test channels + foreach ([self::TEST_CHANNEL_NAME, self::TEST_CHANNEL_NAME_2, self::TEST_CHANNEL_NAME_3] as $channelName) { + try { + $queue = $context->createQueue($channelName); + $context->deleteQueue($queue); + } catch (\Exception $e) { + // Queue might not exist, that's fine + } + } + } catch (\Exception $e) { + // Connection might fail, that's fine + } + } +} +