Skip to content

Commit 6589249

Browse files
authored
Kafka improvements (#437)
* Kafka improvements * fixes * fixes
1 parent 03e9fac commit 6589249

File tree

11 files changed

+39
-10
lines changed

11 files changed

+39
-10
lines changed

packages/Amqp/tests/Integration/AmqpMessageChannelTest.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ public function test_failing_to_consume_due_to_connection_failure()
259259
$this->assertTrue($wasFinallyRethrown, 'Connection exception was not propagated');
260260
$this->assertEquals(
261261
[
262+
"Message Consumer starting to consume messages",
262263
ConnectionException::connectionRetryMessage(1, 1),
263264
ConnectionException::connectionRetryMessage(2, 3),
264265
ConnectionException::connectionRetryMessage(3, 9),

packages/Dbal/tests/Integration/DbalBackedMessageChannelTest.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ public function test_failing_to_consume_due_to_connection_failure()
268268
$this->assertTrue($wasFinallyRethrown, 'Connection exception was not propagated');
269269
$this->assertEquals(
270270
[
271+
"Message Consumer starting to consume messages",
271272
ConnectionException::connectionRetryMessage(1, 1),
272273
ConnectionException::connectionRetryMessage(2, 3),
273274
ConnectionException::connectionRetryMessage(3, 9),

packages/Ecotone/src/Messaging/Endpoint/PollingConsumer/InterceptedConsumerRunner.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public function runEndpointWithExecutionPollingMetadata(?ExecutionPollingMetadat
3737

3838
public function createConsumer(?ExecutionPollingMetadata $executionPollingMetadata): ConsumerLifecycle
3939
{
40+
$this->logger->info("Message Consumer starting to consume messages");
4041
$pollingMetadata = $this->defaultPollingMetadata->applyExecutionPollingMetadata($executionPollingMetadata);
4142
$interceptors = InterceptedConsumer::createInterceptorsForPollingMetadata($pollingMetadata, $this->logger);
4243
$interceptedGateway = new InterceptedGateway($this->gateway, $interceptors);

packages/Ecotone/src/Messaging/MessagingException.php

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616
/**
1717
* licence Apache-2.0
1818
*/
19-
abstract class MessagingException extends Exception
19+
class MessagingException extends Exception
2020
{
2121
public const LICENSE_EXCEPTION = 1;
22+
public const RUNTIME_EXCEPTION = 2;
2223
public const INVALID_MESSAGE_HEADER_EXCEPTION = 100;
2324
public const MESSAGE_HEADER_NOT_AVAILABLE_EXCEPTION = 101;
2425
public const INVALID_ARGUMENT_EXCEPTION = 102;
@@ -45,10 +46,10 @@ abstract class MessagingException extends Exception
4546
* @param string $message
4647
* @return MessagingException|static
4748
*/
48-
public static function create(string $message): self
49+
public static function create(string $message, ?int $errorCode = null): self
4950
{
5051
/** @phpstan-ignore-next-line */
51-
return new static($message, static::errorCode());
52+
return new static($message, is_null($errorCode) ? static::errorCode() : $errorCode);
5253
}
5354

5455
/**
@@ -100,7 +101,10 @@ public function getFailedMessage(): ?Message
100101
/**
101102
* @return int
102103
*/
103-
abstract protected static function errorCode(): int;
104+
protected static function errorCode(): int
105+
{
106+
return self::RUNTIME_EXCEPTION;
107+
}
104108

105109
/**
106110
* @param Message $message

packages/Ecotone/tests/Messaging/Unit/Handler/TypeDescriptorTest.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,6 @@ public function test_is_interface()
326326

327327
public function test_is_abstract_class()
328328
{
329-
$this->assertTrue(TypeDescriptor::create(MessagingException::class)->isAbstractClass());
330329
$this->assertFalse(TypeDescriptor::create(DumbMessageHandlerBuilder::class)->isAbstractClass());
331330
$this->assertFalse(TypeDescriptor::create(TypeDescriptor::OBJECT)->isAbstractClass());
332331
}

packages/Kafka/src/Configuration/KafkaAdmin.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,17 +80,16 @@ public function getConsumer(string $endpointId): KafkaConsumer
8080
Assert::keyExists($this->kafkaConsumers, $endpointId, "Consumer with endpoint id {$endpointId} not found");
8181

8282
$configuration = $this->getConfigurationForConsumer($endpointId);
83+
$kafkaBrokerConfiguration = $this->kafkaBrokerConfigurations[$configuration->getBrokerConfigurationReference()];
8384
$conf = $configuration->getConfig();
8485
$conf->set('group.id', $this->kafkaConsumers[$endpointId]->getGroupId());
85-
$kafkaBrokerConfiguration = $this->kafkaBrokerConfigurations[$configuration->getBrokerConfigurationReference()];
86-
$conf->set('bootstrap.servers', implode(',', $kafkaBrokerConfiguration->getBootstrapServers()));
86+
$conf->set('metadata.broker.list', implode(',', $kafkaBrokerConfiguration->getBootstrapServers()));
8787
$this->setLoggerCallbacks($conf, $endpointId);
8888
$consumer = new KafkaConsumer($conf);
8989

9090
$topics = $this->getMappedTopicNames($this->kafkaConsumers[$endpointId]->getTopics());
9191
if ($this->isRunningForTests($kafkaBrokerConfiguration)) {
9292
// ensures there is no need for repartitioning
93-
$conf->set('group.instance.id', $endpointId);
9493
$consumer->assign([new TopicPartition($topics[0], 0)]);
9594
} else {
9695
$consumer->subscribe($topics);
@@ -122,9 +121,10 @@ public function getProducer(string $referenceName): Producer
122121
if (! array_key_exists($referenceName, $this->initializedProducers)) {
123122
$configuration = $this->getConfigurationForPublisher($referenceName);
124123
$conf = $configuration->getAsKafkaConfig();
125-
$conf->set('bootstrap.servers', implode(',', $this->kafkaBrokerConfigurations[$configuration->getBrokerConfigurationReference()]->getBootstrapServers()));
124+
$conf->set('metadata.broker.list', implode(',', $this->kafkaBrokerConfigurations[$configuration->getBrokerConfigurationReference()]->getBootstrapServers()));
126125
$this->setLoggerCallbacks($conf, $referenceName);
127126
$producer = new Producer($conf);
127+
$producer->addBrokers(implode(',', $this->kafkaBrokerConfigurations[$configuration->getBrokerConfigurationReference()]->getBootstrapServers()));
128128

129129
$this->initializedProducers[$referenceName] = $producer;
130130
}

packages/Kafka/src/Configuration/KafkaBrokerConfiguration.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@ public function __construct(
1818

1919
/**
2020
* @param array<string> $bootstrapServers [broker-one:9092, broker-two:9092]
21+
* @param $setupForTesting bool|null If true it will assume single partition for topics, which speed up partitioning process for running tests
2122
*/
2223
public static function createWithDefaults(array $bootstrapServers = ['localhost:9092'], ?bool $setupForTesting = null): self
2324
{
24-
return new self($bootstrapServers);
25+
return new self($bootstrapServers, $setupForTesting);
2526
}
2627

2728
public function getBootstrapServers(): array

packages/Kafka/src/Inbound/KafkaInboundChannelAdapter.php

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
use Ecotone\Kafka\Configuration\KafkaAdmin;
88
use Ecotone\Messaging\Conversion\ConversionService;
9+
use Ecotone\Messaging\Handler\Logger\LoggingGateway;
910
use Ecotone\Messaging\Message;
1011
use Ecotone\Messaging\MessagePoller;
1112
use Ecotone\Messaging\MessagingException;
@@ -21,6 +22,7 @@ public function __construct(
2122
protected InboundMessageConverter $inboundMessageConverter,
2223
protected ConversionService $conversionService,
2324
protected int $receiveTimeoutInMilliseconds,
25+
private LoggingGateway $loggingGateway,
2426
) {
2527
}
2628

@@ -41,6 +43,23 @@ public function receiveWithTimeout(int $timeoutInMilliseconds): ?Message
4143
->build();
4244
}
4345

46+
if (in_array($message->err, [RD_KAFKA_MSG_PARTITIONER_RANDOM, RD_KAFKA_MSG_PARTITIONER_CONSISTENT, RD_KAFKA_MSG_PARTITIONER_CONSISTENT_RANDOM, RD_KAFKA_MSG_PARTITIONER_MURMUR2, RD_KAFKA_MSG_PARTITIONER_MURMUR2_RANDOM])) {
47+
$this->loggingGateway->info(sprintf(
48+
"%s hashing key is used for related topic",
49+
match ($message->err) {
50+
RD_KAFKA_MSG_PARTITIONER_RANDOM => "Random",
51+
RD_KAFKA_MSG_PARTITIONER_CONSISTENT => "Consistent",
52+
RD_KAFKA_MSG_PARTITIONER_CONSISTENT_RANDOM => "Consistent random",
53+
RD_KAFKA_MSG_PARTITIONER_MURMUR2 => "MurMur2",
54+
RD_KAFKA_MSG_PARTITIONER_MURMUR2_RANDOM => "MurMur2 random",
55+
default => "Unknown"
56+
}
57+
)
58+
);
59+
60+
return null;
61+
}
62+
4463
throw MessagingException::create("Unhandled error code: {$message->err}");
4564
}
4665
}

packages/Kafka/src/Inbound/KafkaInboundChannelAdapterBuilder.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc
7171
]),
7272
Reference::to(ConversionService::REFERENCE_NAME),
7373
$this->receiveTimeoutInMilliseconds,
74+
Reference::to(LoggingGateway::class),
7475
]
7576
);
7677
}

packages/Redis/tests/Integration/RedisBackedMessageChannelTest.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ public function test_failing_to_consume_due_to_connection_failure()
130130
$this->assertTrue($wasFinallyRethrown, 'Connection exception was not propagated');
131131
$this->assertEquals(
132132
[
133+
"Message Consumer starting to consume messages",
133134
ConnectionException::connectionRetryMessage(1, 1),
134135
ConnectionException::connectionRetryMessage(2, 3),
135136
ConnectionException::connectionRetryMessage(3, 9),

0 commit comments

Comments
 (0)