Skip to content
This repository was archived by the owner on Oct 27, 2025. It is now read-only.

Commit 0089f8f

Browse files
committed
ci: improve test speed
- use slow and long timeout - add consume trait helper
1 parent b427850 commit 0089f8f

15 files changed

+233
-149
lines changed

resources/phpunit/config.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@
55
const KAFKA_TEST_TOPIC = "test";
66
const KAFKA_TEST_TOPIC_ADMIN = "test_admin";
77
const KAFKA_TEST_TOPIC_PARTITIONS = "test_partitions";
8-
const KAFKA_TEST_TIMEOUT_MS = 20000;
8+
const KAFKA_TEST_LONG_TIMEOUT_MS = 10000;
9+
const KAFKA_TEST_SHORT_TIMEOUT_MS = 100;
910
const KAFKA_BROKER_ID = 111;

tests/ConsumeTrait.php

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
use RdKafka\ConsumerTopic;
6+
use RdKafka\KafkaConsumer;
7+
use RdKafka\Queue;
8+
9+
trait ConsumeTrait
10+
{
11+
public static $MAX_CONSUME_TIMEOUT_MS = 30_000;
12+
13+
protected function consumeMessages(
14+
Closure $consumeCallback,
15+
int $numberOfMessages,
16+
bool $continueOnNull = false
17+
): array {
18+
$messages = [];
19+
$time = time();
20+
do {
21+
if (time() - $time > self::$MAX_CONSUME_TIMEOUT_MS) {
22+
$this->fail(sprintf('Consume Kafka Message timeout %s ms exceeded', self::$MAX_CONSUME_TIMEOUT_MS));
23+
}
24+
$message = $consumeCallback();
25+
if ($message->err === RD_KAFKA_RESP_ERR__TIMED_OUT) {
26+
break;
27+
}
28+
if ($message === null) {
29+
if ($continueOnNull) {
30+
continue;
31+
}
32+
break;
33+
}
34+
$messages[] = $message;
35+
} while (count($messages) < $numberOfMessages);
36+
37+
return $messages;
38+
}
39+
40+
protected function consumeMessagesWitQueue(
41+
Queue $queue,
42+
int $numberOfMessages,
43+
int $timeoutMs = KAFKA_TEST_SHORT_TIMEOUT_MS
44+
): array {
45+
return $this->consumeMessages(
46+
function () use ($queue, $timeoutMs) {
47+
return $queue->consume($timeoutMs);
48+
},
49+
$numberOfMessages
50+
);
51+
}
52+
53+
protected function consumeMessagesWithConsumerTopic(
54+
ConsumerTopic $topic,
55+
int $partition,
56+
int $numberOfMessages,
57+
int $timeoutMs = KAFKA_TEST_SHORT_TIMEOUT_MS
58+
): array {
59+
return $this->consumeMessages(
60+
function () use ($topic, $partition, $timeoutMs) {
61+
return $topic->consume($partition, $timeoutMs);
62+
},
63+
$numberOfMessages,
64+
true
65+
);
66+
}
67+
68+
protected function consumeMessagesWithKafkaConsumer(
69+
KafkaConsumer $consumer,
70+
int $numberOfMessages,
71+
int $timeoutMs = KAFKA_TEST_SHORT_TIMEOUT_MS
72+
): array {
73+
return $this->consumeMessages(
74+
function () use ($consumer, $timeoutMs) {
75+
return $consumer->consume($timeoutMs);
76+
},
77+
$numberOfMessages
78+
);
79+
}
80+
}

tests/RdKafka/Admin/ClientTest.php

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,16 @@ public function testCreateTopics(): void
6161
$conf = new Conf();
6262
$conf->set('bootstrap.servers', KAFKA_BROKERS);
6363
$client = Client::fromConf($conf);
64-
$client->setWaitForResultEventTimeout(KAFKA_TEST_TIMEOUT_MS);
64+
$client->setWaitForResultEventTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
6565

6666
$topics = [
6767
new NewTopic('test_admin_1', 1, 1),
6868
new NewTopic('test_admin_2', 2, 1),
6969
];
7070

7171
$options = $client->newCreateTopicsOptions();
72-
$options->setOperationTimeout(KAFKA_TEST_TIMEOUT_MS);
73-
$options->setRequestTimeout(KAFKA_TEST_TIMEOUT_MS);
72+
$options->setOperationTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
73+
$options->setRequestTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
7474
$options->setBrokerId(KAFKA_BROKER_ID);
7575

7676
$result = $client->createTopics($topics, $options);
@@ -123,16 +123,16 @@ public function testCreatePartitions(): void
123123
$conf = new Conf();
124124
$conf->set('bootstrap.servers', KAFKA_BROKERS);
125125
$client = Client::fromConf($conf);
126-
$client->setWaitForResultEventTimeout(KAFKA_TEST_TIMEOUT_MS);
126+
$client->setWaitForResultEventTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
127127

128128
$partitions = [
129129
new NewPartitions('test_admin_1', 4),
130130
new NewPartitions('test_admin_2', 6),
131131
];
132132

133133
$options = $client->newCreatePartitionsOptions();
134-
$options->setOperationTimeout(KAFKA_TEST_TIMEOUT_MS);
135-
$options->setRequestTimeout(KAFKA_TEST_TIMEOUT_MS);
134+
$options->setOperationTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
135+
$options->setRequestTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
136136
$options->setBrokerId(KAFKA_BROKER_ID);
137137

138138
$result = $client->createPartitions($partitions, $options);
@@ -185,16 +185,16 @@ public function testDeleteTopics(): void
185185
$conf = new Conf();
186186
$conf->set('bootstrap.servers', KAFKA_BROKERS);
187187
$client = Client::fromConf($conf);
188-
$client->setWaitForResultEventTimeout(KAFKA_TEST_TIMEOUT_MS);
188+
$client->setWaitForResultEventTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
189189

190190
$topics = [
191191
new DeleteTopic('test_admin_1'),
192192
new DeleteTopic('test_admin_2'),
193193
];
194194

195195
$options = $client->newDeleteTopicsOptions();
196-
$options->setOperationTimeout(KAFKA_TEST_TIMEOUT_MS);
197-
$options->setRequestTimeout(KAFKA_TEST_TIMEOUT_MS);
196+
$options->setOperationTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
197+
$options->setRequestTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
198198
$options->setBrokerId(KAFKA_BROKER_ID);
199199

200200
$result = $client->deleteTopics($topics, $options);
@@ -247,15 +247,15 @@ public function testCreateTopicsWithReplicaAssignment(): void
247247
$conf = new Conf();
248248
$conf->set('bootstrap.servers', KAFKA_BROKERS);
249249
$client = Client::fromConf($conf);
250-
$client->setWaitForResultEventTimeout(KAFKA_TEST_TIMEOUT_MS);
250+
$client->setWaitForResultEventTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
251251

252252
$topic = new NewTopic('test_admin_3', 2, -1);
253253
$topic->setReplicaAssignment(0, [KAFKA_BROKER_ID]);
254254
$topic->setReplicaAssignment(1, [KAFKA_BROKER_ID]);
255255

256256
$options = $client->newCreateTopicsOptions();
257-
$options->setOperationTimeout(KAFKA_TEST_TIMEOUT_MS);
258-
$options->setRequestTimeout(KAFKA_TEST_TIMEOUT_MS);
257+
$options->setOperationTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
258+
$options->setRequestTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
259259
$options->setBrokerId(KAFKA_BROKER_ID);
260260

261261
$result = $client->createTopics([$topic], $options);
@@ -284,15 +284,15 @@ public function testCreatePartitionsWithReplicaAssignment(): void
284284
$conf = new Conf();
285285
$conf->set('bootstrap.servers', KAFKA_BROKERS);
286286
$client = Client::fromConf($conf);
287-
$client->setWaitForResultEventTimeout(KAFKA_TEST_TIMEOUT_MS);
287+
$client->setWaitForResultEventTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
288288

289289
$partition = new NewPartitions('test_admin_3', 4);
290290
$partition->setReplicaAssignment(0, [KAFKA_BROKER_ID]);
291291
$partition->setReplicaAssignment(1, [KAFKA_BROKER_ID]);
292292

293293
$options = $client->newCreatePartitionsOptions();
294-
$options->setOperationTimeout(KAFKA_TEST_TIMEOUT_MS);
295-
$options->setRequestTimeout(KAFKA_TEST_TIMEOUT_MS);
294+
$options->setOperationTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
295+
$options->setRequestTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
296296
$options->setBrokerId(KAFKA_BROKER_ID);
297297

298298
$result = $client->createPartitions([$partition], $options);
@@ -319,7 +319,7 @@ private function getFilteredMetaTopics(array $topicNames): array
319319
$conf->set('bootstrap.servers', KAFKA_BROKERS);
320320
$producer = new Producer($conf);
321321
$metaTopics = [];
322-
$metadata = $producer->getMetadata(true, null, KAFKA_TEST_TIMEOUT_MS);
322+
$metadata = $producer->getMetadata(true, null, KAFKA_TEST_LONG_TIMEOUT_MS);
323323
foreach ($metadata->getTopics() as $topic) {
324324
if (in_array($topic->getTopic(), $topicNames, true)) {
325325
$metaTopics[$topic->getTopic()] = $topic;
@@ -338,12 +338,12 @@ public function testDescribeConfigs(): void
338338
$conf = new Conf();
339339
$conf->set('bootstrap.servers', KAFKA_BROKERS);
340340
$client = Client::fromConf($conf);
341-
$client->setWaitForResultEventTimeout(KAFKA_TEST_TIMEOUT_MS);
341+
$client->setWaitForResultEventTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
342342

343343
$configResource = new ConfigResource(RD_KAFKA_RESOURCE_BROKER, (string) KAFKA_BROKER_ID);
344344

345345
$options = $client->newDescribeConfigsOptions();
346-
$options->setRequestTimeout(KAFKA_TEST_TIMEOUT_MS);
346+
$options->setRequestTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
347347
$options->setBrokerId(KAFKA_BROKER_ID);
348348

349349
$result = $client->describeConfigs([$configResource], $options);
@@ -390,17 +390,17 @@ public function testAlterConfigs(): void
390390
$conf->set('bootstrap.servers', KAFKA_BROKERS);
391391
$conf->set('broker.version.fallback', '2.0.0');
392392
$client = Client::fromConf($conf);
393-
$client->setWaitForResultEventTimeout(KAFKA_TEST_TIMEOUT_MS);
393+
$client->setWaitForResultEventTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
394394

395395
$configResource = new ConfigResource(RD_KAFKA_RESOURCE_BROKER, (string) KAFKA_BROKER_ID);
396396
$configResource->setConfig('max.connections.per.ip', (string) 500000);
397397

398398
$alterConfigOptions = $client->newAlterConfigsOptions();
399-
$alterConfigOptions->setRequestTimeout(KAFKA_TEST_TIMEOUT_MS);
399+
$alterConfigOptions->setRequestTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
400400
$alterConfigOptions->setBrokerId(KAFKA_BROKER_ID);
401401

402402
$describeConfigsOptions = $client->newDescribeConfigsOptions();
403-
$describeConfigsOptions->setRequestTimeout(KAFKA_TEST_TIMEOUT_MS);
403+
$describeConfigsOptions->setRequestTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
404404
$describeConfigsOptions->setBrokerId(KAFKA_BROKER_ID);
405405

406406
// alter config
@@ -472,20 +472,20 @@ public function testDeleteRecords(): void
472472
$topic = $producer->newTopic(KAFKA_TEST_TOPIC_ADMIN);
473473
$topic->produce(0, 0, __METHOD__);
474474
$topic->produce(0, 0, __METHOD__);
475-
$producer->flush(KAFKA_TEST_TIMEOUT_MS);
475+
$producer->flush(KAFKA_TEST_LONG_TIMEOUT_MS);
476476

477477
$conf = new Conf();
478478
$conf->set('bootstrap.servers', KAFKA_BROKERS);
479479
$client = Client::fromConf($conf);
480-
$client->setWaitForResultEventTimeout(KAFKA_TEST_TIMEOUT_MS);
481-
$client->getMetadata(true, null, KAFKA_TEST_TIMEOUT_MS);
480+
$client->setWaitForResultEventTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
481+
$client->getMetadata(true, null, KAFKA_TEST_LONG_TIMEOUT_MS);
482482

483483
$deleteRecords = new DeleteRecords(
484484
new TopicPartition(KAFKA_TEST_TOPIC_ADMIN, 0, 1)
485485
);
486486

487487
$deleteRecordsOptions = $client->newDeleteRecordsOptions();
488-
$deleteRecordsOptions->setRequestTimeout(KAFKA_TEST_TIMEOUT_MS);
488+
$deleteRecordsOptions->setRequestTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
489489
$deleteRecordsOptions->setBrokerId(KAFKA_BROKER_ID);
490490

491491
$result = $client->deleteRecords([$deleteRecords], $deleteRecordsOptions);
@@ -508,7 +508,7 @@ public function testDeleteConsumerGroupOffset(): void
508508
$topic->produce(0, 0, __METHOD__);
509509
$topic->produce(0, 0, __METHOD__);
510510
$topic->produce(0, 0, __METHOD__);
511-
$producer->flush(KAFKA_TEST_TIMEOUT_MS);
511+
$producer->flush(KAFKA_TEST_LONG_TIMEOUT_MS);
512512

513513
$conf = new Conf();
514514
$conf->set('bootstrap.servers', KAFKA_BROKERS);
@@ -525,22 +525,22 @@ public function testDeleteConsumerGroupOffset(): void
525525
$consumer->commit();
526526
$topicPartitions = $consumer->getCommittedOffsets(
527527
[new TopicPartition(KAFKA_TEST_TOPIC_ADMIN, 0)],
528-
KAFKA_TEST_TIMEOUT_MS
528+
KAFKA_TEST_LONG_TIMEOUT_MS
529529
);
530530
$this->assertSame(2, $topicPartitions[0]->getOffset());
531531

532532
$conf = new Conf();
533533
$conf->set('bootstrap.servers', KAFKA_BROKERS);
534534
$client = Client::fromConf($conf);
535-
$client->setWaitForResultEventTimeout(KAFKA_TEST_TIMEOUT_MS);
535+
$client->setWaitForResultEventTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
536536

537537
$deleteGroupOffsets = new DeleteConsumerGroupOffsets(
538538
__METHOD__,
539539
new TopicPartition(KAFKA_TEST_TOPIC_ADMIN, 0, 1)
540540
);
541541

542542
$deleteGroupOffsetsOptions = $client->newDeleteConsumerGroupOffsetsOptions();
543-
$deleteGroupOffsetsOptions->setRequestTimeout(KAFKA_TEST_TIMEOUT_MS);
543+
$deleteGroupOffsetsOptions->setRequestTimeout(KAFKA_TEST_LONG_TIMEOUT_MS);
544544
$deleteGroupOffsetsOptions->setBrokerId(KAFKA_BROKER_ID);
545545

546546
$result = $client->deleteConsumerGroupOffsets($deleteGroupOffsets, $deleteGroupOffsetsOptions);

tests/RdKafka/ConfTest.php

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -291,17 +291,17 @@ function ($producer, $message) use ($drMsgCallbackStack): void {
291291
$producer = new Producer($conf);
292292
$producerTopic = $producer->newTopic(KAFKA_TEST_TOPIC);
293293
$producerTopic->produce(0, 0, __METHOD__ . '1');
294-
$producer->poll(KAFKA_TEST_TIMEOUT_MS);
294+
$producer->poll(KAFKA_TEST_LONG_TIMEOUT_MS);
295295
$producerTopic->produce(0, 0, __METHOD__ . '2');
296-
$producer->poll(KAFKA_TEST_TIMEOUT_MS);
296+
$producer->poll(KAFKA_TEST_LONG_TIMEOUT_MS);
297297

298298
$this->assertCount(2, $drMsgCallbackStack->stack);
299299
$this->assertSame($producer, $drMsgCallbackStack->stack[0]['producer']);
300300
$this->assertSame(__METHOD__ . '1', $drMsgCallbackStack->stack[0]['message']->payload);
301301
$this->assertSame($producer, $drMsgCallbackStack->stack[1]['producer']);
302302
$this->assertSame(__METHOD__ . '2', $drMsgCallbackStack->stack[1]['message']->payload);
303303

304-
$producer->flush(KAFKA_TEST_TIMEOUT_MS);
304+
$producer->flush(KAFKA_TEST_LONG_TIMEOUT_MS);
305305
}
306306

307307
/**
@@ -330,17 +330,17 @@ function (Producer $producer, Message $message, $opaque = null) use ($drMsgCallb
330330
$producer = new Producer($conf);
331331
$producerTopic = $producer->newTopic(KAFKA_TEST_TOPIC);
332332
$producerTopic->produce(0, 0, __METHOD__ . '1');
333-
$producer->poll(KAFKA_TEST_TIMEOUT_MS);
333+
$producer->poll(KAFKA_TEST_LONG_TIMEOUT_MS);
334334
$producerTopic->produce(0, 0, __METHOD__ . '2');
335-
$producer->poll(KAFKA_TEST_TIMEOUT_MS);
335+
$producer->poll(KAFKA_TEST_LONG_TIMEOUT_MS);
336336

337337
$this->assertCount(2, $drMsgCallbackStack->stack);
338338
$this->assertSame($producer, $drMsgCallbackStack->stack[0]['producer']);
339339
$this->assertSame($expectedOpaque, $drMsgCallbackStack->stack[0]['opaque']);
340340
$this->assertSame($producer, $drMsgCallbackStack->stack[1]['producer']);
341341
$this->assertSame($expectedOpaque, $drMsgCallbackStack->stack[1]['opaque']);
342342

343-
$producer->flush(KAFKA_TEST_TIMEOUT_MS);
343+
$producer->flush(KAFKA_TEST_LONG_TIMEOUT_MS);
344344
}
345345

346346
public function testSetStatsCb(): void

tests/RdKafka/ConsumerTest.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public function testGetMetadata(): void
4545
$conf->set('bootstrap.servers', KAFKA_BROKERS);
4646
$consumer = new Consumer($conf);
4747

48-
$metadata = $consumer->getMetadata(true, null, KAFKA_TEST_TIMEOUT_MS);
48+
$metadata = $consumer->getMetadata(true, null, KAFKA_TEST_LONG_TIMEOUT_MS);
4949

5050
$this->assertInstanceOf(Metadata::class, $metadata);
5151
}
@@ -181,7 +181,7 @@ public function testQueryWatermarkOffsets(): void
181181
0,
182182
$lowWatermarkOffset1,
183183
$highWatermarkOffset1,
184-
KAFKA_TEST_TIMEOUT_MS
184+
KAFKA_TEST_LONG_TIMEOUT_MS
185185
);
186186

187187
$this->assertSame(0, $lowWatermarkOffset1);
@@ -191,7 +191,7 @@ public function testQueryWatermarkOffsets(): void
191191
$producer = new Producer($producerConf);
192192
$producerTopic = $producer->newTopic(KAFKA_TEST_TOPIC);
193193
$producerTopic->produce(0, 0, __METHOD__);
194-
$producer->flush(KAFKA_TEST_TIMEOUT_MS);
194+
$producer->flush(KAFKA_TEST_LONG_TIMEOUT_MS);
195195

196196
$lowWatermarkOffset2 = 0;
197197
$highWatermarkOffset2 = 0;
@@ -201,7 +201,7 @@ public function testQueryWatermarkOffsets(): void
201201
0,
202202
$lowWatermarkOffset2,
203203
$highWatermarkOffset2,
204-
KAFKA_TEST_TIMEOUT_MS
204+
KAFKA_TEST_LONG_TIMEOUT_MS
205205
);
206206

207207
$this->assertSame(0, $lowWatermarkOffset2);

0 commit comments

Comments
 (0)