diff --git a/packages/Kafka/src/Configuration/KafkaModule.php b/packages/Kafka/src/Configuration/KafkaModule.php index 5b049fb29..a24edb7bd 100644 --- a/packages/Kafka/src/Configuration/KafkaModule.php +++ b/packages/Kafka/src/Configuration/KafkaModule.php @@ -117,7 +117,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO foreach ($extensionObjects as $extensionObject) { if ($extensionObject instanceof KafkaConsumerConfiguration) { - $consumerConfigurations[$extensionObject->getEndpointId()] = $consumerConfigurations; + $consumerConfigurations[$extensionObject->getEndpointId()] = $extensionObject; } elseif ($extensionObject instanceof TopicConfiguration) { $topicConfigurations[$extensionObject->getTopicName()] = $extensionObject; $topicReferenceMapping[$extensionObject->referenceName] = $extensionObject->getTopicName(); diff --git a/packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php b/packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php index 82ca45a50..888987c4a 100644 --- a/packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php +++ b/packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php @@ -6,6 +6,7 @@ use Ecotone\Kafka\Api\KafkaHeader; use Ecotone\Kafka\Configuration\KafkaBrokerConfiguration; +use Ecotone\Kafka\Configuration\KafkaConsumerConfiguration; use Ecotone\Kafka\Configuration\KafkaPublisherConfiguration; use Ecotone\Kafka\Configuration\TopicConfiguration; use Ecotone\Kafka\Outbound\MessagePublishingException; @@ -325,4 +326,38 @@ public function test_kafka_consumer_with_delayed_retry(): void // 5. This continues until max delayed retries reached or success // 6. If all retries fail, message goes to dead letter channel or is resent to Kafka (based on finalFailureStrategy) } + + public function test_sending_and_receiving_with_kafka_consumer_configuration(): void + { + $topicName = Uuid::uuid4()->toString(); + $ecotoneLite = EcotoneLite::bootstrapFlowTesting( + [ExampleKafkaConsumer::class], + [ + KafkaBrokerConfiguration::class => ConnectionTestCase::getConnection(), + new ExampleKafkaConsumer(), + ], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE, ModulePackageList::KAFKA_PACKAGE])) + ->withExtensionObjects([ + KafkaPublisherConfiguration::createWithDefaults($topicName), + TopicConfiguration::createWithReferenceName('exampleTopic', $topicName), + KafkaConsumerConfiguration::createWithDefaults('exampleConsumer'), + ]), + licenceKey: LicenceTesting::VALID_LICENCE, + ); + + /** @var MessagePublisher $kafkaPublisher */ + $kafkaPublisher = $ecotoneLite->getGateway(MessagePublisher::class); + + $kafkaPublisher->sendWithMetadata('exampleData', 'application/text', ['key' => 'value']); + + $ecotoneLite->run('exampleConsumer', ExecutionPollingMetadata::createWithTestingSetup( + maxExecutionTimeInMilliseconds: 30000 + )); + + $messages = $ecotoneLite->sendQueryWithRouting('getMessages'); + + self::assertCount(1, $messages); + self::assertEquals('exampleData', $messages[0]['payload']); + } }