Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,7 @@ project(':group-coordinator') {
implementation libs.hdrHistogram
implementation libs.re2j
implementation libs.slf4jApi
implementation libs.guava

testImplementation project(':clients').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-group-coordinator.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
<allow pkg="org.apache.kafka.coordinator.common.runtime" />
<allow pkg="com.google.re2j" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="com.google.common.hash" />
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Narrow the permission surface by marking the package as exact-match to avoid accidentally allowing subpackages under com.google.common.hash. [security]

Suggested change
<allow pkg="com.google.common.hash" />
<allow pkg="com.google.common.hash" exact-match="true" />
Why Change? ⭐

The proposed change only adds the attribute exact-match="true" which is already used elsewhere in this import-control file (e.g. other <allow ... exact-match="true" /> lines), so the attribute is supported by the DTD referenced at the top of the file and will not introduce a syntax error.

This modification narrows the permission surface by allowing only the exact package com.google.common.hash and not its subpackages. It is syntactically valid XML and consistent with the surrounding entries. Assumptions:

  • The import-control DTD used by this project supports the exact-match attribute (evidence: other lines in this file use it).
  • No runtime code requires implicit access to subpackages under com.google.common.hash; if such access is required, this change would be a deliberate tightening of permissions rather than a syntactic bug.

Given these points, the change is safe from a syntax/execution perspective and does not introduce errors into the configuration itself.

<subpackage name="metrics">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.HdrHistogram" />
Expand Down
2 changes: 2 additions & 0 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ versions += [
classgraph: "4.8.173",
gradle: "8.10.2",
grgit: "4.1.1",
guava: "33.4.0-jre",
httpclient: "4.5.14",
jackson: "2.16.2",
jacoco: "0.8.10",
Expand Down Expand Up @@ -147,6 +148,7 @@ libs += [
caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator",
guava: "com.google.guava:guava:$versions.guava",
jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson",
jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
jacksonDatabindYaml: "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$versions.jackson",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.metadata.BrokerRegistration;

import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -209,4 +219,50 @@ void validateOffsetFetch(
default boolean shouldExpire() {
return true;
}

/**
* Computes the hash of the topics in a group.
*
* @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash.
* @return The hash of the group.
*/
static long computeGroupHash(Map<String, Long> topicHashes) {
return Hashing.combineOrdered(
topicHashes.entrySet()
.stream()
.sorted(Map.Entry.comparingByKey())
.map(e -> HashCode.fromLong(e.getValue()))
.toList()
).asLong();
}

/**
* Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3.
*
* @param topicImage The topic image.
* @param clusterImage The cluster image.
* @return The hash of the topic.
*/
static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) {
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(topicImage.id().hashCode()) // topic Id

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using topicImage.id().hashCode() for hashing the Uuid is not ideal as it truncates the 128-bit UUID to a 32-bit integer hash, increasing the potential for collisions. Guava's Hasher provides a putUuid(Uuid) method that correctly uses the full 128 bits of the UUID. Using this method will make the hash more robust.

Please note that this change will require updating the corresponding logic in GroupTest.java.

Suggested change
.putLong(topicImage.id().hashCode()) // topic Id
.putUuid(topicImage.id()) // topic Id

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Use a deterministic string representation of the topic id (e.g. id().toString()) instead of hashCode() when feeding the hasher to avoid collisions and ensure consistent hashing across JVMs. [possible bug]

Suggested change
.putLong(topicImage.id().hashCode()) // topic Id
.putString(topicImage.id().toString(), StandardCharsets.UTF_8) // topic Id as string
Why Change? ⭐

The improved code is syntactically correct and uses only symbols present in the file:

  • Hasher.putString(CharSequence, Charset) is a valid Guava Hasher API and StandardCharsets is already imported.
  • topicImage.id().toString() is a safe replacement for topicImage.id().hashCode(): it produces a deterministic, textual representation of the id (which is typically a UUID or equivalent), avoiding reliance on JVM-specific hashCode implementations and reducing collision risk.
  • The change does not introduce additional null-safety concerns beyond the original (both .hashCode() and .toString() would NPE if id() is null).
    Assumptions and rationale:
  • topicImage.id() has a meaningful, stable toString() (which is true for UUIDs and Kafka Uuid types).
  • The change intentionally alters the bytes fed into the hasher to a deterministic textual form; this is a compatible internal change to the hashing input and compiles without additional imports.
    Given these verifiable points and that the edit is local and uses existing imports/APIs, this suggestion is classified as verified.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded Magic Byte

Magic byte value hardcoded as constant zero violates organization guideline against hardcoding variables. This reduces hash algorithm flexibility and prevents versioning of hash computation logic.

Standards
  • Org-Guideline-Hardcoding of variables
  • CWE-547
  • OWASP-A05

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Magic Byte Documentation

Magic byte value lacks documentation explaining its purpose and versioning strategy. Future hash algorithm changes may require different magic bytes for backward compatibility. Adding constant with descriptive name improves maintainability.

Standards
  • Clean-Code-Comments
  • Maintainability-Quality-Documentation

.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
.putInt(topicImage.partitions().size()); // number of partitions

topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> {
topicHasher.putInt(entry.getKey()); // partition id
String racks = Arrays.stream(entry.getValue().replicas)
.mapToObj(clusterImage::broker)
.filter(Objects::nonNull)
.map(BrokerRegistration::rack)
.filter(Optional::isPresent)
.map(Optional::get)
Comment on lines +260 to +261

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The pattern .filter(Optional::isPresent).map(Optional::get) can be simplified to .flatMap(Optional::stream) since your project's Java version supports it. This change will make the stream pipeline more concise and idiomatic.

Suggested change
.filter(Optional::isPresent)
.map(Optional::get)
.flatMap(Optional::stream)

.sorted()
.collect(Collectors.joining(";"));
topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Import Statement

Code uses Collectors.joining() but missing import for java.util.stream.Collectors. This will cause compilation failure when the new computeTopicHash method is invoked. The stream operation cannot resolve Collectors without proper import declaration.

import java.util.stream.Collectors;
Commitable Suggestion
Suggested change
topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";"
import java.util.stream.Collectors;
Standards
  • Algorithm-Correctness-Dependency-Resolution
  • Logic-Verification-Compilation-Safety

});
Comment on lines +255 to +265
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream Processing Inefficiency

Nested stream processing creates multiple intermediate collections for each partition. Inner stream operations execute for every partition iteration causing O(n*m) complexity where n=partitions, m=replicas. Performance degrades with partition count and replica factor affecting hash computation scalability.

Standards
  • ISO-IEC-25010-Performance-Efficiency-Time-Behavior
  • Optimization-Pattern-Stream-Efficiency
  • Algorithmic-Complexity-Nested-Operations

return topicHasher.hash().asLong();
}
Comment on lines +246 to +267
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Use full 128-bit UUID; avoid ambiguous rack concatenation.

  • Bug: putLong(topicImage.id().hashCode()) collapses 128-bit UUID to 32-bit int, increasing collision risk. Use most/least bits.
  • Improvement: Joining racks with ";" can collide if rack strings contain ";" (e.g., "a;" + "b" vs "a" + ";b"). Encode lengths instead.

Apply this essential fix for UUID handling:

-        Hasher topicHasher = hf.newHasher()
-            .putByte((byte) 0) // magic byte
-            .putLong(topicImage.id().hashCode()) // topic Id
-            .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
-            .putInt(topicImage.partitions().size()); // number of partitions
+        Hasher topicHasher = hf.newHasher()
+            .putByte((byte) 0) // magic byte
+            // topic id: use full 128 bits to avoid collisions
+            .putLong(topicImage.id().getMostSignificantBits())
+            .putLong(topicImage.id().getLeastSignificantBits())
+            .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
+            .putInt(topicImage.partitions().size()); // number of partitions

Optionally harden rack encoding to avoid separator ambiguity:

-            String racks = Arrays.stream(entry.getValue().replicas)
-                .mapToObj(clusterImage::broker)
-                .filter(Objects::nonNull)
-                .map(BrokerRegistration::rack)
-                .filter(Optional::isPresent)
-                .map(Optional::get)
-                .sorted()
-                .collect(Collectors.joining(";"));
-            topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";"
+            List<String> racks = Arrays.stream(entry.getValue().replicas)
+                .mapToObj(clusterImage::broker)
+                .filter(Objects::nonNull)
+                .map(BrokerRegistration::rack)
+                .flatMap(Optional::stream)
+                .sorted()
+                .toList();
+            topicHasher.putInt(racks.size());
+            for (String rack : racks) {
+                topicHasher.putInt(rack.length());
+                topicHasher.putString(rack, StandardCharsets.UTF_8);
+            }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) {
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(topicImage.id().hashCode()) // topic Id
.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
.putInt(topicImage.partitions().size()); // number of partitions
topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> {
topicHasher.putInt(entry.getKey()); // partition id
String racks = Arrays.stream(entry.getValue().replicas)
.mapToObj(clusterImage::broker)
.filter(Objects::nonNull)
.map(BrokerRegistration::rack)
.filter(Optional::isPresent)
.map(Optional::get)
.sorted()
.collect(Collectors.joining(";"));
topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";"
});
return topicHasher.hash().asLong();
}
static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) {
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
// topic id: use full 128 bits to avoid collisions
.putLong(topicImage.id().getMostSignificantBits())
.putLong(topicImage.id().getLeastSignificantBits())
.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
.putInt(topicImage.partitions().size()); // number of partitions
topicImage.partitions().entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.forEach(entry -> {
topicHasher.putInt(entry.getKey()); // partition id
// hardened rack encoding to avoid separator ambiguity
List<String> racks = Arrays.stream(entry.getValue().replicas)
.mapToObj(clusterImage::broker)
.filter(Objects::nonNull)
.map(BrokerRegistration::rack)
.flatMap(Optional::stream)
.sorted()
.toList();
topicHasher.putInt(racks.size());
for (String rack : racks) {
topicHasher.putInt(rack.length());
topicHasher.putString(rack, StandardCharsets.UTF_8);
}
});
return topicHasher.hash().asLong();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.image.MetadataImage;

import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;

public class GroupTest {
private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
private static final String FOO_TOPIC_NAME = "foo";
private static final String BAR_TOPIC_NAME = "bar";
private static final int FOO_NUM_PARTITIONS = 2;
private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder()
.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
.addRacks()
.build();

@Test
void testComputeTopicHash() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertEquals(topicHasher.hash().asLong(), result);
}
Comment on lines +50 to +65

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is significant code duplication across the testComputeTopicHash* methods. The logic for building the hash is repeated in testComputeTopicHash, testComputeTopicHashWithDifferentMagicByte, testComputeTopicHashWithDifferentPartitionOrder, and testComputeTopicHashWithDifferentRackOrder. This makes the tests brittle and harder to maintain, as any change to the hashing logic in Group.computeTopicHash would require updates in multiple test methods.

Consider refactoring this logic into a private helper method within the test class. This would centralize the hash construction, making the tests cleaner and more maintainable.


@Test
void testComputeTopicHashWithDifferentMagicByte() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 1) // different magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
}
Comment on lines +67 to +82
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Apply same UUID change here.

-            .putByte((byte) 1) // different magic byte
-            .putLong(FOO_TOPIC_ID.hashCode()) // topic Id
+            .putByte((byte) 1) // different magic byte
+            .putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
+            .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@Test
void testComputeTopicHashWithDifferentMagicByte() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 1) // different magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
}
@Test
void testComputeTopicHashWithDifferentMagicByte() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID),
FOO_METADATA_IMAGE.cluster());
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 1) // different magic byte
.putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
.putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
}
🤖 Prompt for AI Agents
In
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java
around lines 67 to 82, the test builds a manual topic hasher using
FOO_TOPIC_ID.hashCode(); update it to use the topic UUID components like the
production code: replace the single putLong(FOO_TOPIC_ID.hashCode()) call with
two calls putLong(FOO_TOPIC_ID.getMostSignificantBits()) and
putLong(FOO_TOPIC_ID.getLeastSignificantBits()) so the test uses the UUID's
actual bits when computing the expected hash.


@Test
void testComputeTopicHashWithDifferentPartitionOrder() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
// different partition order
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8) // rack of partition 1
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8); // rack of partition 0
assertNotEquals(topicHasher.hash().asLong(), result);
}
Comment on lines +88 to +100
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Apply same UUID change here.

-            .putByte((byte) 0) // magic byte
-            .putLong(FOO_TOPIC_ID.hashCode()) // topic Id
+            .putByte((byte) 0) // magic byte
+            .putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
+            .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
// different partition order
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8) // rack of partition 1
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8); // rack of partition 0
assertNotEquals(topicHasher.hash().asLong(), result);
}
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
.putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
// different partition order
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8) // rack of partition 1
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8); // rack of partition 0
assertNotEquals(topicHasher.hash().asLong(), result);
🤖 Prompt for AI Agents
In
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java
around lines 88 to 100, the test still uses FOO_TOPIC_ID.hashCode() when
serializing the topic UUID; replace that single putLong(FOO_TOPIC_ID.hashCode())
with two explicit long writes for the UUID parts —
putLong(FOO_TOPIC_ID.getMostSignificantBits()) followed by
putLong(FOO_TOPIC_ID.getLeastSignificantBits()) — so the UUID is serialized
consistently with the other changes.


@Test
void testComputeTopicHashWithDifferentRackOrder() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack1;rack0", StandardCharsets.UTF_8) // different rack order of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
}
Comment on lines +106 to +117
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Apply same UUID change here.

-            .putByte((byte) 0) // magic byte
-            .putLong(FOO_TOPIC_ID.hashCode()) // topic Id
+            .putByte((byte) 0) // magic byte
+            .putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
+            .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack1;rack0", StandardCharsets.UTF_8) // different rack order of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
}
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
.putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack1;rack0", StandardCharsets.UTF_8) // different rack order of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
}
🤖 Prompt for AI Agents
In
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java
around lines 106 to 117, the test currently uses FOO_TOPIC_ID.hashCode() when
feeding the UUID into the hasher; change this to encode the full UUID like
elsewhere by replacing the single putLong(FOO_TOPIC_ID.hashCode()) with two
putLong calls that write FOO_TOPIC_ID.getMostSignificantBits() and
FOO_TOPIC_ID.getLeastSignificantBits() so the complete UUID is used in the
hashing.


@ParameterizedTest
@MethodSource("differentFieldGenerator")
void testComputeTopicHashWithDifferentField(MetadataImage differentImage, Uuid topicId) {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

assertNotEquals(
Group.computeTopicHash(
differentImage.topics().getTopic(topicId),
differentImage.cluster()
),
result
);
}

private static Stream<Arguments> differentFieldGenerator() {
Uuid differentTopicId = Uuid.randomUuid();
return Stream.of(
Arguments.of(new MetadataImageBuilder() // different topic id
.addTopic(differentTopicId, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
.addRacks()
.build(),
differentTopicId
),
Arguments.of(new MetadataImageBuilder() // different topic name
.addTopic(FOO_TOPIC_ID, "bar", FOO_NUM_PARTITIONS)
.addRacks()
.build(),
FOO_TOPIC_ID
),
Arguments.of(new MetadataImageBuilder() // different partitions
.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, 1)
.addRacks()
.build(),
FOO_TOPIC_ID
),
Arguments.of(new MetadataImageBuilder() // different racks
.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
.build(),
FOO_TOPIC_ID
)
);
}

@Test
void testComputeGroupHash() {
long result = Group.computeGroupHash(Map.of(
BAR_TOPIC_NAME, 123L,
FOO_TOPIC_NAME, 456L
));

long expected = Hashing.combineOrdered(List.of(
HashCode.fromLong(123L),
HashCode.fromLong(456L)
)).asLong();
assertEquals(expected, result);
}

@Test
void testComputeGroupHashWithDifferentOrder() {
long result = Group.computeGroupHash(Map.of(
BAR_TOPIC_NAME, 123L,
FOO_TOPIC_NAME, 456L
));

long unexpected = Hashing.combineOrdered(List.of(
HashCode.fromLong(456L),
HashCode.fromLong(123L)
)).asLong();
assertNotEquals(unexpected, result);
}
}
Loading