Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,7 @@ project(':group-coordinator') {
implementation libs.hdrHistogram
implementation libs.re2j
implementation libs.slf4jApi
implementation libs.guava
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

🧩 Analysis chain

Scope Guava correctly; consider exposing metadata as api for the new public methods

  • Guava usage is internal to hashing and not exposed in signatures — keeping it as implementation is correct. LGTM.
  • New public static methods on Group use TopicImage and ClusterImage types. To avoid downstream compile issues, expose ':metadata' as an api dependency in group-coordinator.

Also ensure LICENSE-binary is updated for the new Guava dep (per dependencies.gradle note).

Proposed change (within project(':group-coordinator') dependencies):

-    implementation project(':metadata')
+    api project(':metadata')

I'll verify the review comment by examining the relevant code sections. Let me start by inspecting the build configuration and the Group class.


---

🏁 Script executed:

```shell
#!/bin/bash
# Find and inspect the build.gradle file around line 1423 for Guava dependency
fd -e gradle "build.gradle" | head -5

Length of output: 12


🏁 Script executed:

#!/bin/bash
# Search for the Group class and its public static methods
rg -n "class Group" --type=java -A 20 | head -50

Length of output: 6189


🏁 Script executed:

#!/bin/bash
# Check group-coordinator dependencies configuration
fd -e gradle | xargs rg -l "group-coordinator" | head -5

Length of output: 28


🏁 Script executed:

#!/bin/bash
# Look for TopicImage and ClusterImage in public method signatures
rg -nP "public\s+static.*(?:TopicImage|ClusterImage)" --type=java

Length of output: 384


🏁 Script executed:

#!/bin/bash
# Check for LICENSE-binary file and Guava references
fd "LICENSE" -e binary -o -e txt | grep -i license

Length of output: 116



testImplementation project(':clients').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-group-coordinator.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
<allow pkg="org.apache.kafka.coordinator.common.runtime" />
<allow pkg="com.google.re2j" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="com.google.common.hash" />
<subpackage name="metrics">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.HdrHistogram" />
Expand Down
2 changes: 2 additions & 0 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ versions += [
classgraph: "4.8.173",
gradle: "8.10.2",
grgit: "4.1.1",
guava: "33.4.0-jre",
httpclient: "4.5.14",
jackson: "2.16.2",
jacoco: "0.8.10",
Expand Down Expand Up @@ -147,6 +148,7 @@ libs += [
caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator",
guava: "com.google.guava:guava:$versions.guava",
jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson",
jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
jacksonDatabindYaml: "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$versions.jackson",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.metadata.BrokerRegistration;

import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -209,4 +219,50 @@ void validateOffsetFetch(
default boolean shouldExpire() {
return true;
}

/**
* Computes the hash of the topics in a group.
*
* @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash.
* @return The hash of the group.
*/
static long computeGroupHash(Map<String, Long> topicHashes) {
return Hashing.combineOrdered(
topicHashes.entrySet()
.stream()
.sorted(Map.Entry.comparingByKey())
.map(e -> HashCode.fromLong(e.getValue()))
.toList()
).asLong();
}

/**
* Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3.
*
* @param topicImage The topic image.
* @param clusterImage The cluster image.
* @return The hash of the topic.
*/
static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) {
Comment on lines +229 to +246
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Static Method Placement

Hash computation methods are placed in Group interface but handle generic metadata operations. These utility methods have no direct relationship to Group behavior and violate SRP. Consider extracting to dedicated HashUtils class for better separation of concerns.

Standards
  • SOLID-SRP
  • Clean-Code-Class-Organization
  • Refactoring-Extract-Class

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(topicImage.id().hashCode()) // topic Id

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using topicImage.id().hashCode() to hash the Uuid is not ideal as it truncates the 128-bit UUID into a 32-bit integer hash code before passing it to the Hasher as a long. This significantly increases the probability of hash collisions. To preserve all the information from the UUID, you should hash both the most and least significant bits of the UUID.

Note that you will also need to update the corresponding tests in GroupTest.java to reflect this change.

Suggested change
.putLong(topicImage.id().hashCode()) // topic Id
.putLong(topicImage.id().getMostSignificantBits()).putLong(topicImage.id().getLeastSignificantBits()) // topic Id

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Null Pointer Risk

Direct hashCode() call on topicImage.id() without null validation creates NullPointerException risk. Null topic ID causes hash computation failure and system crash. Service unavailability results from unhandled runtime exception during group hash calculation.

            .putLong(Objects.hashCode(topicImage.id())) // topic Id
Commitable Suggestion
Suggested change
.putLong(topicImage.id().hashCode()) // topic Id
.putLong(Objects.hashCode(topicImage.id())) // topic Id
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Preconditions

.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
.putInt(topicImage.partitions().size()); // number of partitions

topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> {
topicHasher.putInt(entry.getKey()); // partition id
String racks = Arrays.stream(entry.getValue().replicas)
.mapToObj(clusterImage::broker)
.filter(Objects::nonNull)
.map(BrokerRegistration::rack)
.filter(Optional::isPresent)
.map(Optional::get)
Comment on lines +260 to +261

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The stream pipeline for collecting rack information can be made more concise. Instead of using .filter(Optional::isPresent).map(Optional::get), you can use .flatMap(Optional::stream). This is available since Java 9 and is more idiomatic for unwrapping Optional values within a stream.

.sorted()
.collect(Collectors.joining(";"));
topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Import Statement

Collectors class usage without visible import statement causes compilation failure. Missing import prevents successful build and deployment. Runtime ClassNotFoundException occurs when attempting to execute joining operation.

import java.util.stream.Collectors;
Commitable Suggestion
Suggested change
topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";"
import java.util.stream.Collectors;
Standards
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • ISO-IEC-25010-Reliability-Maturity

});
return topicHasher.hash().asLong();
}
Comment on lines +246 to +267
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use full 128-bit UUID, not Uuid.hashCode(), to avoid unnecessary collisions

The doc says “hash of the topic id…”. Using topicImage.id().hashCode() reduces 128-bit UUID to 32 bits, then writes it as a long, increasing collision risk and diverging from the stated intent.

Replace with both UUID halves; optionally avoid building an intermediate rack string.

Apply:

-        Hasher topicHasher = hf.newHasher()
-            .putByte((byte) 0) // magic byte
-            .putLong(topicImage.id().hashCode()) // topic Id
-            .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
-            .putInt(topicImage.partitions().size()); // number of partitions
+        Hasher topicHasher = hf.newHasher()
+            .putByte((byte) 0) // magic byte
+            // topic id (full 128 bits)
+            .putLong(topicImage.id().getMostSignificantBits())
+            .putLong(topicImage.id().getLeastSignificantBits())
+            .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
+            .putInt(topicImage.partitions().size()); // number of partitions
@@
-            String racks = Arrays.stream(entry.getValue().replicas)
-                .mapToObj(clusterImage::broker)
-                .filter(Objects::nonNull)
-                .map(BrokerRegistration::rack)
-                .filter(Optional::isPresent)
-                .map(Optional::get)
-                .sorted()
-                .collect(Collectors.joining(";"));
-            topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";"
+            Arrays.stream(entry.getValue().replicas)
+                .mapToObj(clusterImage::broker)
+                .filter(Objects::nonNull)
+                .map(BrokerRegistration::rack)
+                .filter(Optional::isPresent)
+                .map(Optional::get)
+                .sorted()
+                // optional: dedupe racks per partition to avoid double-counting identical rack strings
+                // .distinct()
+                .forEachOrdered(r -> {
+                    topicHasher.putString(r, StandardCharsets.UTF_8);
+                    topicHasher.putByte((byte) ';');
+                });

Note: Tests will need updates to write the full UUID (see test suggestions).

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) {
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(topicImage.id().hashCode()) // topic Id
.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
.putInt(topicImage.partitions().size()); // number of partitions
topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> {
topicHasher.putInt(entry.getKey()); // partition id
String racks = Arrays.stream(entry.getValue().replicas)
.mapToObj(clusterImage::broker)
.filter(Objects::nonNull)
.map(BrokerRegistration::rack)
.filter(Optional::isPresent)
.map(Optional::get)
.sorted()
.collect(Collectors.joining(";"));
topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";"
});
return topicHasher.hash().asLong();
}
static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) {
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
// topic id (full 128 bits)
.putLong(topicImage.id().getMostSignificantBits())
.putLong(topicImage.id().getLeastSignificantBits())
.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
.putInt(topicImage.partitions().size()); // number of partitions
topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> {
topicHasher.putInt(entry.getKey()); // partition id
Arrays.stream(entry.getValue().replicas)
.mapToObj(clusterImage::broker)
.filter(Objects::nonNull)
.map(BrokerRegistration::rack)
.filter(Optional::isPresent)
.map(Optional::get)
.sorted()
// optional: dedupe racks per partition to avoid double-counting identical rack strings
// .distinct()
.forEachOrdered(r -> {
topicHasher.putString(r, StandardCharsets.UTF_8);
topicHasher.putByte((byte) ';');
});
});
return topicHasher.hash().asLong();
}
🤖 Prompt for AI Agents
In group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
around lines 246 to 267, replace the use of topicImage.id().hashCode() (which
reduces the 128-bit UUID to 32 bits) with the full 128-bit UUID by writing both
halves: call topicImage.id().getMostSignificantBits() and
topicImage.id().getLeastSignificantBits() (two putLong calls) so the full UUID
is included in the hasher; also avoid building an intermediate joined rack
string by serializing the sorted racks directly (e.g., write an int count then
iterate sorted rack strings and putString each) or otherwise stream each rack
into the topicHasher to reduce allocations; update tests accordingly to expect
the full UUID serialization.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.image.MetadataImage;

import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;

public class GroupTest {
private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
private static final String FOO_TOPIC_NAME = "foo";
private static final String BAR_TOPIC_NAME = "bar";
private static final int FOO_NUM_PARTITIONS = 2;
private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder()
.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
.addRacks()
.build();

@Test
void testComputeTopicHash() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertEquals(topicHasher.hash().asLong(), result);
}
Comment on lines +50 to +65
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Align expected hash with full 128-bit UUID (not hashCode)

After fixing computeTopicHash to hash the full UUID, adjust the tests:

         HashFunction hf = Hashing.murmur3_128();
         Hasher topicHasher = hf.newHasher()
             .putByte((byte) 0) // magic byte
-            .putLong(FOO_TOPIC_ID.hashCode()) // topic Id
+            .putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
+            .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)
             .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
             .putInt(FOO_NUM_PARTITIONS) // number of partitions
             .putInt(0) // partition 0
             .putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
             .putInt(1) // partition 1
             .putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@Test
void testComputeTopicHash() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertEquals(topicHasher.hash().asLong(), result);
}
@Test
void testComputeTopicHash() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
.putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertEquals(topicHasher.hash().asLong(), result);
}
🤖 Prompt for AI Agents
In
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java
around lines 50 to 65, the test currently feeds only topicId.hashCode() into the
hasher but computeTopicHash now hashes the full 128-bit UUID; update the test to
feed the full UUID into the Hasher (replace the single
putLong(FOO_TOPIC_ID.hashCode()) with two putLong calls:
FOO_TOPIC_ID.getMostSignificantBits() and
FOO_TOPIC_ID.getLeastSignificantBits()) so the expected hasher input matches the
implementation and the asserted hash is correct.


@Test
void testComputeTopicHashWithDifferentMagicByte() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 1) // different magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
Comment on lines +71 to +81
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Same UUID correction for different-magic test

         Hasher topicHasher = hf.newHasher()
             .putByte((byte) 1) // different magic byte
-            .putLong(FOO_TOPIC_ID.hashCode()) // topic Id
+            .putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
+            .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)
             .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
             .putInt(FOO_NUM_PARTITIONS) // number of partitions
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 1) // different magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 1) // different magic byte
.putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
.putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
🤖 Prompt for AI Agents
In
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java
around lines 71 to 81, the test uses FOO_TOPIC_ID.hashCode() when feeding the
UUID into the hasher which can change the UUID representation and invalidate the
"different-magic" expectation; replace the single
putLong(FOO_TOPIC_ID.hashCode()) with the UUID's canonical two-long
representation by adding putLong(FOO_TOPIC_ID.getMostSignificantBits()) and
putLong(FOO_TOPIC_ID.getLeastSignificantBits()) so the same UUID bytes are used
across tests while keeping the different magic byte.

}

@Test
void testComputeTopicHashWithDifferentPartitionOrder() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
// different partition order
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8) // rack of partition 1
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8); // rack of partition 0
assertNotEquals(topicHasher.hash().asLong(), result);
Comment on lines +88 to +99
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Same UUID correction for different partition order

         Hasher topicHasher = hf.newHasher()
             .putByte((byte) 0) // magic byte
-            .putLong(FOO_TOPIC_ID.hashCode()) // topic Id
+            .putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
+            .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)
             .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
             .putInt(FOO_NUM_PARTITIONS) // number of partitions
🤖 Prompt for AI Agents
In
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java
around lines 88 to 99, the code puts only FOO_TOPIC_ID.hashCode() into the
hasher which loses UUID entropy and can produce the same hash for different
partition orders; replace that single putLong(FOO_TOPIC_ID.hashCode()) with two
putLong calls for the UUID’s full value
(putLong(FOO_TOPIC_ID.getMostSignificantBits()) and
putLong(FOO_TOPIC_ID.getLeastSignificantBits())) so the complete UUID is
included in the hash and different partition orders produce different results.

}

@Test
void testComputeTopicHashWithDifferentRackOrder() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack1;rack0", StandardCharsets.UTF_8) // different rack order of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
Comment on lines +106 to +116
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Same UUID correction for different rack order

         Hasher topicHasher = hf.newHasher()
             .putByte((byte) 0) // magic byte
-            .putLong(FOO_TOPIC_ID.hashCode()) // topic Id
+            .putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
+            .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)
             .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
             .putInt(FOO_NUM_PARTITIONS) // number of partitions
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack1;rack0", StandardCharsets.UTF_8) // different rack order of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
.putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack1;rack0", StandardCharsets.UTF_8) // different rack order of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
🤖 Prompt for AI Agents
In
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java
around lines 106 to 116, the test currently expects different UUIDs when
partition rack order changes but the comment indicates the UUID should remain
the same; update the test to assert equality instead of inequality (replace
assertNotEquals(...) with assertEquals(...)) so it verifies that the hash/UUID
is invariant to rack order, or alternatively normalize/sort the rack strings
before building the hasher so the computed hash is order-independent and keep
the assertion accordingly.

}

@ParameterizedTest
@MethodSource("differentFieldGenerator")
void testComputeTopicHashWithDifferentField(MetadataImage differentImage, Uuid topicId) {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

assertNotEquals(
Group.computeTopicHash(
differentImage.topics().getTopic(topicId),
differentImage.cluster()
),
result
);
}

private static Stream<Arguments> differentFieldGenerator() {
Uuid differentTopicId = Uuid.randomUuid();
return Stream.of(
Arguments.of(new MetadataImageBuilder() // different topic id
.addTopic(differentTopicId, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
.addRacks()
.build(),
differentTopicId
),
Arguments.of(new MetadataImageBuilder() // different topic name
.addTopic(FOO_TOPIC_ID, "bar", FOO_NUM_PARTITIONS)
.addRacks()
.build(),
FOO_TOPIC_ID
),
Arguments.of(new MetadataImageBuilder() // different partitions
.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, 1)
.addRacks()
.build(),
FOO_TOPIC_ID
),
Arguments.of(new MetadataImageBuilder() // different racks
.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
.build(),
FOO_TOPIC_ID
)
);
}

@Test
void testComputeGroupHash() {
long result = Group.computeGroupHash(Map.of(
BAR_TOPIC_NAME, 123L,
FOO_TOPIC_NAME, 456L
));

long expected = Hashing.combineOrdered(List.of(
HashCode.fromLong(123L),
HashCode.fromLong(456L)
)).asLong();
assertEquals(expected, result);
}

@Test
void testComputeGroupHashWithDifferentOrder() {
long result = Group.computeGroupHash(Map.of(
BAR_TOPIC_NAME, 123L,
FOO_TOPIC_NAME, 456L
));

long unexpected = Hashing.combineOrdered(List.of(
HashCode.fromLong(456L),
HashCode.fromLong(123L)
)).asLong();
assertNotEquals(unexpected, result);
}
}
Loading