@@ -30,24 +30,24 @@ protected function setUp(): void
3030 public function testCreateWithProducingAndConsuming (): void
3131 {
3232 $ clusterConfig = new Conf ();
33- $ clusterConfig ->set ('log_level ' , (string ) LOG_EMERG );
33+ $ clusterConfig ->set ('log_level ' , (string )LOG_EMERG );
3434 $ cluster = MockCluster::create (1 , $ clusterConfig );
3535
3636 $ producerConfig = new Conf ();
37- $ producerConfig ->set ('log_level ' , (string ) LOG_EMERG );
37+ $ producerConfig ->set ('log_level ' , (string )LOG_EMERG );
3838 $ producerConfig ->set ('bootstrap.servers ' , $ cluster ->getBootstraps ());
3939 $ producer = new Producer ($ producerConfig );
4040 $ producerTopic = $ producer ->newTopic (KAFKA_TEST_TOPIC );
4141 $ producerTopic ->produce (0 , 0 , __METHOD__ , __METHOD__ );
4242 $ producer ->flush (KAFKA_TEST_LONG_TIMEOUT_MS );
4343
4444 $ consumerConfig = new Conf ();
45- $ consumerConfig ->set ('log_level ' , (string ) LOG_EMERG );
45+ $ consumerConfig ->set ('log_level ' , (string )LOG_EMERG );
4646 $ consumerConfig ->set ('bootstrap.servers ' , $ cluster ->getBootstraps ());
4747 $ consumerConfig ->set ('group.id ' , __METHOD__ );
4848 $ consumer = new KafkaConsumer ($ consumerConfig );
4949 $ consumer ->assign ([new TopicPartition (KAFKA_TEST_TOPIC , 0 )]);
50- $ message = $ consumer ->consume (KAFKA_TEST_SHORT_TIMEOUT_MS );
50+ $ message = $ consumer ->consume (KAFKA_TEST_LONG_TIMEOUT_MS );
5151
5252 $ this ->assertSame (__METHOD__ , $ message ->payload );
5353 $ this ->assertSame (__METHOD__ , $ message ->key );
@@ -65,8 +65,8 @@ public function testFromProducer(): void
6565 $ this ->requiresLibrdkafkaVersion ('>= ' , '1.4.0 ' );
6666
6767 $ producerConfig = new Conf ();
68- $ producerConfig ->set ('log_level ' , (string ) LOG_EMERG );
69- $ producerConfig ->set ('test.mock.num.brokers ' , (string ) 3 );
68+ $ producerConfig ->set ('log_level ' , (string )LOG_EMERG );
69+ $ producerConfig ->set ('test.mock.num.brokers ' , (string )3 );
7070 $ producer = new Producer ($ producerConfig );
7171
7272 $ cluster = MockCluster::fromProducer ($ producer );
@@ -79,7 +79,7 @@ public function testFromProducerWithMissingConfigShouldFail(): void
7979 $ this ->requiresLibrdkafkaVersion ('>= ' , '1.4.0 ' );
8080
8181 $ producerConfig = new Conf ();
82- $ producerConfig ->set ('log_level ' , (string ) LOG_EMERG );
82+ $ producerConfig ->set ('log_level ' , (string )LOG_EMERG );
8383 $ producer = new Producer ($ producerConfig );
8484
8585 $ this ->expectException (Exception::class);
@@ -90,7 +90,7 @@ public function testFromProducerWithMissingConfigShouldFail(): void
9090 public function testGetBootstraps (): void
9191 {
9292 $ clusterConfig = new Conf ();
93- $ clusterConfig ->set ('log_level ' , (string ) LOG_EMERG );
93+ $ clusterConfig ->set ('log_level ' , (string )LOG_EMERG );
9494 $ cluster = MockCluster::create (3 , $ clusterConfig );
9595
9696 $ bootstrap = $ cluster ->getBootstraps ();
@@ -105,7 +105,7 @@ public function testSetApiVersion(): void
105105 $ logStack = [];
106106
107107 $ clusterConfig = new Conf ();
108- $ clusterConfig ->set ('log_level ' , (string ) LOG_EMERG );
108+ $ clusterConfig ->set ('log_level ' , (string )LOG_EMERG );
109109 $ cluster = MockCluster::create (1 , $ clusterConfig );
110110
111111 $ cluster ->setApiVersion (ApiKey::Produce, 4 , 6 );
@@ -138,13 +138,13 @@ function (Producer $producer, int $level, string $fac, string $buf) use (&$logSt
138138 public function testSetPartitionFollowerAndLeader (): void
139139 {
140140 $ clusterConfig = new Conf ();
141- $ clusterConfig ->set ('log_level ' , (string ) LOG_EMERG );
141+ $ clusterConfig ->set ('log_level ' , (string )LOG_EMERG );
142142 $ cluster = MockCluster::create (2 , $ clusterConfig );
143143
144144 $ producerConfig = new Conf ();
145- $ producerConfig ->set ('log_level ' , (string ) LOG_EMERG );
145+ $ producerConfig ->set ('log_level ' , (string )LOG_EMERG );
146146 $ producerConfig ->set ('bootstrap.servers ' , $ cluster ->getBootstraps ());
147- $ producerConfig ->set ('topic.metadata.refresh.interval.ms ' , (string ) 50 );
147+ $ producerConfig ->set ('topic.metadata.refresh.interval.ms ' , (string )50 );
148148 $ producer = new Producer ($ producerConfig );
149149 $ producerTopic = $ producer ->newTopic (KAFKA_TEST_TOPIC );
150150
@@ -168,17 +168,17 @@ public function testSetPartitionFollowerAndLeader(): void
168168 public function testSetPartitionFollowerWatermarks (): void
169169 {
170170 $ clusterConfig = new Conf ();
171- $ clusterConfig ->set ('log_level ' , (string ) LOG_EMERG );
171+ $ clusterConfig ->set ('log_level ' , (string )LOG_EMERG );
172172 $ cluster = MockCluster::create (3 , $ clusterConfig );
173173
174174 $ cluster ->setPartitionLeader (KAFKA_TEST_TOPIC , 0 , 1 );
175175 $ cluster ->setPartitionFollower (KAFKA_TEST_TOPIC , 0 , 2 );
176176
177177 // produce 10 msgs
178178 $ producerConfig = new Conf ();
179- $ producerConfig ->set ('log_level ' , (string ) LOG_EMERG );
179+ $ producerConfig ->set ('log_level ' , (string )LOG_EMERG );
180180 $ producerConfig ->set ('bootstrap.servers ' , $ cluster ->getBootstraps ());
181- $ producerConfig ->set ('batch.num.messages ' , (string ) 1 );
181+ $ producerConfig ->set ('batch.num.messages ' , (string )1 );
182182 $ producer = new Producer ($ producerConfig );
183183 $ producerTopic = $ producer ->newTopic (KAFKA_TEST_TOPIC );
184184 for ($ i = 0 ; $ i < 10 ; $ i ++) {
@@ -188,25 +188,23 @@ public function testSetPartitionFollowerWatermarks(): void
188188
189189 // prepare consumer
190190 $ consumerConfig = new Conf ();
191- $ consumerConfig ->set ('log_level ' , (string ) LOG_EMERG );
191+ $ consumerConfig ->set ('log_level ' , (string )LOG_EMERG );
192192 $ consumerConfig ->set ('group.id ' , __METHOD__ );
193193 $ consumerConfig ->set ('auto.offset.reset ' , 'earliest ' );
194- $ consumerConfig ->set ('fetch.min.bytes ' , (string ) 100 );
195- $ consumerConfig ->set ('fetch.message.max.bytes ' , (string ) 1000 );
194+ $ consumerConfig ->set ('fetch.min.bytes ' , (string )100 );
195+ $ consumerConfig ->set ('fetch.message.max.bytes ' , (string )1000 );
196196 $ consumerConfig ->set ('bootstrap.servers ' , $ cluster ->getBootstraps ());
197197 $ consumer = new KafkaConsumer ($ consumerConfig );
198198 $ consumer ->assign ([new TopicPartition (KAFKA_TEST_TOPIC , 0 , RD_KAFKA_OFFSET_INVALID )]);
199199
200200 // set high watermark to 6
201201 $ cluster ->setPartitionFollowerWatermarks (KAFKA_TEST_TOPIC , 0 , -1 , 6 );
202202
203- sleep (1 );
204-
205203 // consume until high watermark
206204 $ consumedStack = [];
207205 do {
208206 $ message = $ consumer ->consume (KAFKA_TEST_SHORT_TIMEOUT_MS );
209- if ($ message === null ) {
207+ if ($ message-> err === RD_KAFKA_RESP_ERR__TIMED_OUT && count ( $ consumedStack ) === 0 ) {
210208 continue ;
211209 }
212210 $ consumedStack [] = $ message ->err ?: $ message ->payload ;
@@ -233,13 +231,11 @@ public function testSetPartitionFollowerWatermarks(): void
233231 // reset high watermark
234232 $ cluster ->setPartitionFollowerWatermarks (KAFKA_TEST_TOPIC , 0 , -1 , -1 );
235233
236- sleep (1 );
237-
238234 // consume rest
239235 $ consumedStack = [];
240236 do {
241237 $ message = $ consumer ->consume (KAFKA_TEST_SHORT_TIMEOUT_MS );
242- if ($ message === null ) {
238+ if ($ message-> err === RD_KAFKA_RESP_ERR__TIMED_OUT && count ( $ consumedStack ) === 0 ) {
243239 continue ;
244240 }
245241 $ consumedStack [] = $ message ->err ?: $ message ->payload ;
@@ -265,13 +261,13 @@ public function testSetBrokerDownAndUp(): void
265261 $ this ->requiresLibrdkafkaVersion ('>= ' , '1.4.0 ' );
266262
267263 $ clusterConfig = new Conf ();
268- $ clusterConfig ->set ('log_level ' , (string ) LOG_EMERG );
264+ $ clusterConfig ->set ('log_level ' , (string )LOG_EMERG );
269265 $ cluster = MockCluster::create (1 , $ clusterConfig );
270266
271267 $ producerConfig = new Conf ();
272- $ producerConfig ->set ('log_level ' , (string ) LOG_EMERG );
268+ $ producerConfig ->set ('log_level ' , (string )LOG_EMERG );
273269 $ producerConfig ->set ('bootstrap.servers ' , $ cluster ->getBootstraps ());
274- $ producerConfig ->set ('reconnect.backoff.max.ms ' , (string ) 1000 );
270+ $ producerConfig ->set ('reconnect.backoff.max.ms ' , (string )1000 );
275271 $ producer = new Producer ($ producerConfig );
276272
277273 $ cluster ->setBrokerDown (1 );
@@ -295,12 +291,12 @@ public function testSetBrokerDownAndUp(): void
295291 public function testPushRequestErrors (): void
296292 {
297293 $ clusterConfig = new Conf ();
298- $ clusterConfig ->set ('log_level ' , (string ) LOG_EMERG );
294+ $ clusterConfig ->set ('log_level ' , (string )LOG_EMERG );
299295 $ cluster = MockCluster::create (1 , $ clusterConfig );
300296
301297 // produce msg
302298 $ producerConfig = new Conf ();
303- $ producerConfig ->set ('log_level ' , (string ) LOG_EMERG );
299+ $ producerConfig ->set ('log_level ' , (string )LOG_EMERG );
304300 $ producerConfig ->set ('bootstrap.servers ' , $ cluster ->getBootstraps ());
305301 $ producer = new Producer ($ producerConfig );
306302 $ producerTopic = $ producer ->newTopic (KAFKA_TEST_TOPIC );
@@ -317,7 +313,7 @@ public function testPushRequestErrors(): void
317313
318314 // try to consume msg
319315 $ consumerConfig = new Conf ();
320- $ consumerConfig ->set ('log_level ' , (string ) LOG_EMERG );
316+ $ consumerConfig ->set ('log_level ' , (string )LOG_EMERG );
321317 $ consumerConfig ->set ('group.id ' , __METHOD__ );
322318 // $consumerConfig->set('debug', 'fetch');
323319 $ consumerConfig ->set ('bootstrap.servers ' , $ cluster ->getBootstraps ());
@@ -337,12 +333,12 @@ public function testPushRequestErrorsArray(): void
337333 $ this ->requiresLibrdkafkaVersion ('>= ' , '1.7.0 ' );
338334
339335 $ clusterConfig = new Conf ();
340- $ clusterConfig ->set ('log_level ' , (string ) LOG_EMERG );
336+ $ clusterConfig ->set ('log_level ' , (string )LOG_EMERG );
341337 $ cluster = MockCluster::create (1 , $ clusterConfig );
342338
343339 // produce msg
344340 $ producerConfig = new Conf ();
345- $ producerConfig ->set ('log_level ' , (string ) LOG_EMERG );
341+ $ producerConfig ->set ('log_level ' , (string )LOG_EMERG );
346342 $ producerConfig ->set ('bootstrap.servers ' , $ cluster ->getBootstraps ());
347343 $ producer = new Producer ($ producerConfig );
348344 $ producerTopic = $ producer ->newTopic (KAFKA_TEST_TOPIC );
@@ -361,7 +357,7 @@ public function testPushRequestErrorsArray(): void
361357
362358 // try to consume msg
363359 $ consumerConfig = new Conf ();
364- $ consumerConfig ->set ('log_level ' , (string ) LOG_EMERG );
360+ $ consumerConfig ->set ('log_level ' , (string )LOG_EMERG );
365361 $ consumerConfig ->set ('group.id ' , __METHOD__ );
366362 // $consumerConfig->set('debug', 'fetch');
367363 $ consumerConfig ->set ('bootstrap.servers ' , $ cluster ->getBootstraps ());
@@ -381,12 +377,12 @@ public function testCreateTopic(): void
381377 $ this ->requiresLibrdkafkaVersion ('>= ' , '1.4.0 ' );
382378
383379 $ clusterConfig = new Conf ();
384- $ clusterConfig ->set ('log_level ' , (string ) LOG_EMERG );
380+ $ clusterConfig ->set ('log_level ' , (string )LOG_EMERG );
385381 $ cluster = MockCluster::create (1 , $ clusterConfig );
386382 $ cluster ->createTopic (KAFKA_TEST_TOPIC , 12 , 1 );
387383
388384 $ producerConfig = new Conf ();
389- $ producerConfig ->set ('log_level ' , (string ) LOG_EMERG );
385+ $ producerConfig ->set ('log_level ' , (string )LOG_EMERG );
390386 $ producerConfig ->set ('bootstrap.servers ' , $ cluster ->getBootstraps ());
391387 $ producer = new Producer ($ producerConfig );
392388 $ producerTopic = $ producer ->newTopic (KAFKA_TEST_TOPIC );
@@ -405,17 +401,17 @@ public function testPushBrokerRequestErrors(): void
405401 $ this ->requiresLibrdkafkaVersion ('< ' , '1.7.0 ' );
406402
407403 $ clusterConfig = new Conf ();
408- $ clusterConfig ->set ('log_level ' , (string ) LOG_EMERG );
404+ $ clusterConfig ->set ('log_level ' , (string )LOG_EMERG );
409405 $ cluster = MockCluster::create (1 , $ clusterConfig );
410406
411407 // produce msg
412408 $ producerConfig = new Conf ();
413- $ producerConfig ->set ('log_level ' , (string ) LOG_EMERG );
409+ $ producerConfig ->set ('log_level ' , (string )LOG_EMERG );
414410 $ producerConfig ->set ('bootstrap.servers ' , $ cluster ->getBootstraps ());
415411 $ producer = new Producer ($ producerConfig );
416412 $ producerTopic = $ producer ->newTopic (KAFKA_TEST_TOPIC );
417413 $ producerTopic ->produce (0 , 0 , __METHOD__ );
418- $ producer ->flush (KAFKA_TEST_LONG_TIMEOUT_MS );
414+ $ producer ->flush (KAFKA_TEST_SHORT_TIMEOUT_MS );
419415
420416 // first error is retriable, second fatal
421417 $ cluster ->pushBrokerRequestErrors (
@@ -428,16 +424,16 @@ public function testPushBrokerRequestErrors(): void
428424
429425 // try to consume msg
430426 $ consumerConfig = new Conf ();
431- $ consumerConfig ->set ('log_level ' , (string ) LOG_EMERG );
427+ $ consumerConfig ->set ('log_level ' , (string )LOG_EMERG );
432428 $ consumerConfig ->set ('group.id ' , __METHOD__ );
433429 // $consumerConfig->set('debug', 'fetch');
434430 $ consumerConfig ->set ('bootstrap.servers ' , $ cluster ->getBootstraps ());
435431 $ consumer = new KafkaConsumer ($ consumerConfig );
436432 $ consumer ->assign ([new TopicPartition (KAFKA_TEST_TOPIC , 0 , rd_kafka_offset_tail (1 ))]);
437433
438434 // try to consume msg
439- $ message1 = $ consumer ->consume (KAFKA_TEST_SHORT_TIMEOUT_MS );
440- $ message2 = $ consumer ->consume (KAFKA_TEST_SHORT_TIMEOUT_MS );
435+ $ message1 = $ consumer ->consume (KAFKA_TEST_LONG_TIMEOUT_MS );
436+ $ message2 = $ consumer ->consume (KAFKA_TEST_LONG_TIMEOUT_MS );
441437
442438 $ this ->assertSame (RD_KAFKA_RESP_ERR__AUTHENTICATION , $ message1 ->err );
443439 $ this ->assertSame (__METHOD__ , $ message2 ->payload );
@@ -448,12 +444,12 @@ public function testPushBrokerRequestErrorRtts(): void
448444 $ this ->requiresLibrdkafkaVersion ('>= ' , '1.7.0 ' );
449445
450446 $ clusterConfig = new Conf ();
451- $ clusterConfig ->set ('log_level ' , (string ) LOG_EMERG );
447+ $ clusterConfig ->set ('log_level ' , (string )LOG_EMERG );
452448 $ cluster = MockCluster::create (1 , $ clusterConfig );
453449
454450 // produce msg
455451 $ producerConfig = new Conf ();
456- $ producerConfig ->set ('log_level ' , (string ) LOG_EMERG );
452+ $ producerConfig ->set ('log_level ' , (string )LOG_EMERG );
457453 $ producerConfig ->set ('bootstrap.servers ' , $ cluster ->getBootstraps ());
458454 $ producer = new Producer ($ producerConfig );
459455 $ producerTopic = $ producer ->newTopic (KAFKA_TEST_TOPIC );
@@ -473,7 +469,7 @@ public function testPushBrokerRequestErrorRtts(): void
473469
474470 // try to consume msg
475471 $ consumerConfig = new Conf ();
476- $ consumerConfig ->set ('log_level ' , (string ) LOG_EMERG );
472+ $ consumerConfig ->set ('log_level ' , (string )LOG_EMERG );
477473 $ consumerConfig ->set ('group.id ' , __METHOD__ );
478474 // $consumerConfig->set('debug', 'fetch');
479475 $ consumerConfig ->set ('bootstrap.servers ' , $ cluster ->getBootstraps ());
0 commit comments