@@ -361,28 +361,28 @@ public void testUpdateLastEpoch() {
361
361
// Metadata with newer epoch is handled
362
362
metadataResponse = RequestTestUtils .metadataUpdateWith ("dummy" , 1 , Collections .emptyMap (), Collections .singletonMap ("topic-1" , 1 ), _tp -> 10 );
363
363
metadata .updateWithCurrentRequestVersion (metadataResponse , false , 1L );
364
- assertOptional (metadata .lastSeenLeaderEpoch (tp ), leaderAndEpoch -> assertEquals (leaderAndEpoch .intValue (), 10 ));
364
+ assertOptional (metadata .lastSeenLeaderEpoch (tp ), leaderAndEpoch -> assertEquals (10 , leaderAndEpoch .intValue ()));
365
365
366
366
// Don't update to an older one
367
367
assertFalse (metadata .updateLastSeenEpochIfNewer (tp , 1 ));
368
- assertOptional (metadata .lastSeenLeaderEpoch (tp ), leaderAndEpoch -> assertEquals (leaderAndEpoch .intValue (), 10 ));
368
+ assertOptional (metadata .lastSeenLeaderEpoch (tp ), leaderAndEpoch -> assertEquals (10 , leaderAndEpoch .intValue ()));
369
369
370
370
// Don't cause update if it's the same one
371
371
assertFalse (metadata .updateLastSeenEpochIfNewer (tp , 10 ));
372
- assertOptional (metadata .lastSeenLeaderEpoch (tp ), leaderAndEpoch -> assertEquals (leaderAndEpoch .intValue (), 10 ));
372
+ assertOptional (metadata .lastSeenLeaderEpoch (tp ), leaderAndEpoch -> assertEquals (10 , leaderAndEpoch .intValue ()));
373
373
374
374
// Update if we see newer epoch
375
375
assertTrue (metadata .updateLastSeenEpochIfNewer (tp , 12 ));
376
- assertOptional (metadata .lastSeenLeaderEpoch (tp ), leaderAndEpoch -> assertEquals (leaderAndEpoch .intValue (), 12 ));
376
+ assertOptional (metadata .lastSeenLeaderEpoch (tp ), leaderAndEpoch -> assertEquals (12 , leaderAndEpoch .intValue ()));
377
377
378
378
metadataResponse = RequestTestUtils .metadataUpdateWith ("dummy" , 1 , Collections .emptyMap (), Collections .singletonMap ("topic-1" , 1 ), _tp -> 12 );
379
379
metadata .updateWithCurrentRequestVersion (metadataResponse , false , 2L );
380
- assertOptional (metadata .lastSeenLeaderEpoch (tp ), leaderAndEpoch -> assertEquals (leaderAndEpoch .intValue (), 12 ));
380
+ assertOptional (metadata .lastSeenLeaderEpoch (tp ), leaderAndEpoch -> assertEquals (12 , leaderAndEpoch .intValue ()));
381
381
382
382
// Don't overwrite metadata with older epoch
383
383
metadataResponse = RequestTestUtils .metadataUpdateWith ("dummy" , 1 , Collections .emptyMap (), Collections .singletonMap ("topic-1" , 1 ), _tp -> 11 );
384
384
metadata .updateWithCurrentRequestVersion (metadataResponse , false , 3L );
385
- assertOptional (metadata .lastSeenLeaderEpoch (tp ), leaderAndEpoch -> assertEquals (leaderAndEpoch .intValue (), 12 ));
385
+ assertOptional (metadata .lastSeenLeaderEpoch (tp ), leaderAndEpoch -> assertEquals (12 , leaderAndEpoch .intValue ()));
386
386
}
387
387
388
388
@ Test
@@ -465,7 +465,7 @@ public void testRejectOldMetadata() {
465
465
metadata .updateWithCurrentRequestVersion (metadataResponse , false , 10L );
466
466
assertNotNull (metadata .fetch ().partition (tp ));
467
467
assertTrue (metadata .lastSeenLeaderEpoch (tp ).isPresent ());
468
- assertEquals (metadata .lastSeenLeaderEpoch (tp ).get ().longValue (), 100 );
468
+ assertEquals (100 , metadata .lastSeenLeaderEpoch (tp ).get ().longValue ());
469
469
}
470
470
471
471
// Fake an empty ISR, but with an older epoch, should reject it
@@ -475,8 +475,8 @@ public void testRejectOldMetadata() {
475
475
new MetadataResponse .PartitionMetadata (error , partition , leader ,
476
476
leaderEpoch , replicas , Collections .emptyList (), offlineReplicas ), ApiKeys .METADATA .latestVersion (), Collections .emptyMap ());
477
477
metadata .updateWithCurrentRequestVersion (metadataResponse , false , 20L );
478
- assertEquals (metadata .fetch ().partition (tp ).inSyncReplicas ().length , 1 );
479
- assertEquals (metadata .lastSeenLeaderEpoch (tp ).get ().longValue (), 100 );
478
+ assertEquals (1 , metadata .fetch ().partition (tp ).inSyncReplicas ().length );
479
+ assertEquals (100 , metadata .lastSeenLeaderEpoch (tp ).get ().longValue ());
480
480
}
481
481
482
482
// Fake an empty ISR, with same epoch, accept it
@@ -486,24 +486,24 @@ public void testRejectOldMetadata() {
486
486
new MetadataResponse .PartitionMetadata (error , partition , leader ,
487
487
leaderEpoch , replicas , Collections .emptyList (), offlineReplicas ), ApiKeys .METADATA .latestVersion (), Collections .emptyMap ());
488
488
metadata .updateWithCurrentRequestVersion (metadataResponse , false , 20L );
489
- assertEquals (metadata .fetch ().partition (tp ).inSyncReplicas ().length , 0 );
490
- assertEquals (metadata .lastSeenLeaderEpoch (tp ).get ().longValue (), 100 );
489
+ assertEquals (0 , metadata .fetch ().partition (tp ).inSyncReplicas ().length );
490
+ assertEquals (100 , metadata .lastSeenLeaderEpoch (tp ).get ().longValue ());
491
491
}
492
492
493
493
// Empty metadata response, should not keep old partition but should keep the last-seen epoch
494
494
{
495
495
MetadataResponse metadataResponse = RequestTestUtils .metadataUpdateWith ("dummy" , 1 , Collections .emptyMap (), Collections .emptyMap ());
496
496
metadata .updateWithCurrentRequestVersion (metadataResponse , false , 20L );
497
497
assertNull (metadata .fetch ().partition (tp ));
498
- assertEquals (metadata .lastSeenLeaderEpoch (tp ).get ().longValue (), 100 );
498
+ assertEquals (100 , metadata .lastSeenLeaderEpoch (tp ).get ().longValue ());
499
499
}
500
500
501
501
// Back in the metadata, with old epoch, should not get added
502
502
{
503
503
MetadataResponse metadataResponse = RequestTestUtils .metadataUpdateWith ("dummy" , 1 , Collections .emptyMap (), partitionCounts , _tp -> 99 );
504
504
metadata .updateWithCurrentRequestVersion (metadataResponse , false , 10L );
505
505
assertNull (metadata .fetch ().partition (tp ));
506
- assertEquals (metadata .lastSeenLeaderEpoch (tp ).get ().longValue (), 100 );
506
+ assertEquals (100 , metadata .lastSeenLeaderEpoch (tp ).get ().longValue ());
507
507
}
508
508
}
509
509
@@ -522,31 +522,31 @@ public void testOutOfBandEpochUpdate() {
522
522
metadata .updateWithCurrentRequestVersion (metadataResponse , false , 10L );
523
523
assertNotNull (metadata .fetch ().partition (tp ));
524
524
assertTrue (metadata .lastSeenLeaderEpoch (tp ).isPresent ());
525
- assertEquals (metadata .lastSeenLeaderEpoch (tp ).get ().longValue (), 100 );
525
+ assertEquals (100 , metadata .lastSeenLeaderEpoch (tp ).get ().longValue ());
526
526
527
527
// Simulate a leader epoch from another response, like a fetch response or list offsets
528
528
assertTrue (metadata .updateLastSeenEpochIfNewer (tp , 101 ));
529
529
530
530
// Cache of partition stays, but current partition info is not available since it's stale
531
531
assertNotNull (metadata .fetch ().partition (tp ));
532
- assertEquals (Objects .requireNonNull (metadata .fetch ().partitionCountForTopic ("topic-1" )).longValue (), 5 );
532
+ assertEquals (5 , Objects .requireNonNull (metadata .fetch ().partitionCountForTopic ("topic-1" )).longValue ());
533
533
assertFalse (metadata .partitionMetadataIfCurrent (tp ).isPresent ());
534
- assertEquals (metadata .lastSeenLeaderEpoch (tp ).get ().longValue (), 101 );
534
+ assertEquals (101 , metadata .lastSeenLeaderEpoch (tp ).get ().longValue ());
535
535
536
536
// Metadata with older epoch is rejected, metadata state is unchanged
537
537
metadata .updateWithCurrentRequestVersion (metadataResponse , false , 20L );
538
538
assertNotNull (metadata .fetch ().partition (tp ));
539
- assertEquals (Objects .requireNonNull (metadata .fetch ().partitionCountForTopic ("topic-1" )).longValue (), 5 );
539
+ assertEquals (5 , Objects .requireNonNull (metadata .fetch ().partitionCountForTopic ("topic-1" )).longValue ());
540
540
assertFalse (metadata .partitionMetadataIfCurrent (tp ).isPresent ());
541
- assertEquals (metadata .lastSeenLeaderEpoch (tp ).get ().longValue (), 101 );
541
+ assertEquals (101 , metadata .lastSeenLeaderEpoch (tp ).get ().longValue ());
542
542
543
543
// Metadata with equal or newer epoch is accepted
544
544
metadataResponse = RequestTestUtils .metadataUpdateWith ("dummy" , 1 , Collections .emptyMap (), partitionCounts , _tp -> 101 );
545
545
metadata .updateWithCurrentRequestVersion (metadataResponse , false , 30L );
546
546
assertNotNull (metadata .fetch ().partition (tp ));
547
- assertEquals (Objects .requireNonNull (metadata .fetch ().partitionCountForTopic ("topic-1" )).longValue (), 5 );
547
+ assertEquals (5 , Objects .requireNonNull (metadata .fetch ().partitionCountForTopic ("topic-1" )).longValue ());
548
548
assertTrue (metadata .partitionMetadataIfCurrent (tp ).isPresent ());
549
- assertEquals (metadata .lastSeenLeaderEpoch (tp ).get ().longValue (), 101 );
549
+ assertEquals (101 , metadata .lastSeenLeaderEpoch (tp ).get ().longValue ());
550
550
}
551
551
552
552
@ Test
@@ -585,18 +585,18 @@ public void testClusterCopy() {
585
585
metadata .updateWithCurrentRequestVersion (metadataResponse , false , 0L );
586
586
587
587
Cluster cluster = metadata .fetch ();
588
- assertEquals (cluster .clusterResource ().clusterId (), "dummy" );
589
- assertEquals (cluster .nodes ().size (), 4 );
588
+ assertEquals ("dummy" , cluster .clusterResource ().clusterId ());
589
+ assertEquals (4 , cluster .nodes ().size ());
590
590
591
591
// topic counts
592
592
assertEquals (cluster .invalidTopics (), Collections .singleton ("topic3" ));
593
593
assertEquals (cluster .unauthorizedTopics (), Collections .singleton ("topic4" ));
594
- assertEquals (cluster .topics ().size (), 3 );
594
+ assertEquals (3 , cluster .topics ().size ());
595
595
assertEquals (cluster .internalTopics (), Collections .singleton (Topic .GROUP_METADATA_TOPIC_NAME ));
596
596
597
597
// partition counts
598
- assertEquals (cluster .partitionsForTopic ("topic1" ).size (), 2 );
599
- assertEquals (cluster .partitionsForTopic ("topic2" ).size (), 3 );
598
+ assertEquals (2 , cluster .partitionsForTopic ("topic1" ).size ());
599
+ assertEquals (3 , cluster .partitionsForTopic ("topic2" ).size ());
600
600
601
601
// Sentinel instances
602
602
InetSocketAddress address = InetSocketAddress .createUnresolved ("localhost" , 0 );
@@ -798,10 +798,10 @@ public void testNodeIfOffline() {
798
798
799
799
TopicPartition tp = new TopicPartition ("topic-1" , 0 );
800
800
801
- assertOptional (metadata .fetch ().nodeIfOnline (tp , 0 ), node -> assertEquals (node .id (), 0 ));
801
+ assertOptional (metadata .fetch ().nodeIfOnline (tp , 0 ), node -> assertEquals (0 , node .id ()));
802
802
assertFalse (metadata .fetch ().nodeIfOnline (tp , 1 ).isPresent ());
803
- assertEquals (metadata .fetch ().nodeById (0 ).id (), 0 );
804
- assertEquals (metadata .fetch ().nodeById (1 ).id (), 1 );
803
+ assertEquals (0 , metadata .fetch ().nodeById (0 ).id ());
804
+ assertEquals (1 , metadata .fetch ().nodeById (1 ).id ());
805
805
}
806
806
807
807
@ Test
@@ -831,7 +831,7 @@ public void testNodeIfOnlineNonExistentTopicPartition() {
831
831
832
832
TopicPartition tp = new TopicPartition ("topic-1" , 0 );
833
833
834
- assertEquals (metadata .fetch ().nodeById (0 ).id (), 0 );
834
+ assertEquals (0 , metadata .fetch ().nodeById (0 ).id ());
835
835
assertNull (metadata .fetch ().partition (tp ));
836
836
assertEquals (metadata .fetch ().nodeIfOnline (tp , 0 ), Optional .empty ());
837
837
}
@@ -955,13 +955,13 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) {
955
955
// Update the metadata to add a new topic variant, "new", which will be retained with "keep". Note this
956
956
// means that all of the "old" topics should be dropped.
957
957
Cluster cluster = metadata .fetch ();
958
- assertEquals (cluster .clusterResource ().clusterId (), oldClusterId );
959
- assertEquals (cluster .nodes ().size (), oldNodes );
958
+ assertEquals (oldClusterId , cluster .clusterResource ().clusterId ());
959
+ assertEquals (oldNodes , cluster .nodes ().size ());
960
960
assertEquals (cluster .invalidTopics (), Set .of ("oldInvalidTopic" , "keepInvalidTopic" ));
961
961
assertEquals (cluster .unauthorizedTopics (), Set .of ("oldUnauthorizedTopic" , "keepUnauthorizedTopic" ));
962
962
assertEquals (cluster .topics (), Set .of ("oldValidTopic" , "keepValidTopic" ));
963
- assertEquals (cluster .partitionsForTopic ("oldValidTopic" ).size (), 2 );
964
- assertEquals (cluster .partitionsForTopic ("keepValidTopic" ).size (), 3 );
963
+ assertEquals (2 , cluster .partitionsForTopic ("oldValidTopic" ).size ());
964
+ assertEquals (3 , cluster .partitionsForTopic ("keepValidTopic" ).size ());
965
965
assertEquals (new HashSet <>(cluster .topicIds ()), new HashSet <>(topicIds .values ()));
966
966
967
967
String newClusterId = "newClusterId" ;
@@ -990,13 +990,13 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) {
990
990
assertNull (metadataTopicIds2 .get ("oldValidTopic" ));
991
991
992
992
cluster = metadata .fetch ();
993
- assertEquals (cluster .clusterResource ().clusterId (), newClusterId );
993
+ assertEquals (newClusterId , cluster .clusterResource ().clusterId ());
994
994
assertEquals (cluster .nodes ().size (), newNodes );
995
995
assertEquals (cluster .invalidTopics (), Set .of ("keepInvalidTopic" , "newInvalidTopic" ));
996
996
assertEquals (cluster .unauthorizedTopics (), Set .of ("keepUnauthorizedTopic" , "newUnauthorizedTopic" ));
997
997
assertEquals (cluster .topics (), Set .of ("keepValidTopic" , "newValidTopic" ));
998
- assertEquals (cluster .partitionsForTopic ("keepValidTopic" ).size (), 2 );
999
- assertEquals (cluster .partitionsForTopic ("newValidTopic" ).size (), 4 );
998
+ assertEquals (2 , cluster .partitionsForTopic ("keepValidTopic" ).size ());
999
+ assertEquals (4 , cluster .partitionsForTopic ("newValidTopic" ).size ());
1000
1000
assertEquals (new HashSet <>(cluster .topicIds ()), new HashSet <>(topicIds .values ()));
1001
1001
1002
1002
// Perform another metadata update, but this time all topic metadata should be cleared.
@@ -1008,7 +1008,7 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) {
1008
1008
topicIds .forEach ((topicName , topicId ) -> assertNull (metadataTopicIds3 .get (topicName )));
1009
1009
1010
1010
cluster = metadata .fetch ();
1011
- assertEquals (cluster .clusterResource ().clusterId (), newClusterId );
1011
+ assertEquals (newClusterId , cluster .clusterResource ().clusterId ());
1012
1012
assertEquals (cluster .nodes ().size (), newNodes );
1013
1013
assertEquals (cluster .invalidTopics (), Collections .emptySet ());
1014
1014
assertEquals (cluster .unauthorizedTopics (), Collections .emptySet ());
0 commit comments