Skip to content

Conversation

@visz11
Copy link
Collaborator

@visz11 visz11 commented Oct 16, 2025

Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.

If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.

If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.

Summary by CodeRabbit

  • New Features

    • Added group and topic hash computation capabilities to the coordinator module.
  • Chores

    • Added Guava library as a new dependency.
    • Updated import control policies to permit hashing utilities.

@codeant-ai
Copy link

codeant-ai bot commented Oct 16, 2025

CodeAnt AI is reviewing your PR.


Thanks for using CodeAnt! 🎉

We're free for open-source projects. if you're enjoying it, help us grow by sharing.

Share on X ·
Reddit ·
LinkedIn

@github-actions github-actions bot added the build label Oct 16, 2025
@gemini-code-assist
Copy link

Summary of Changes

Hello @visz11, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces new hashing utilities within the group-coordinator module to generate deterministic hashes for Kafka topics and consumer groups. This functionality is crucial for internal state management, allowing for efficient comparison and tracking of topic and group configurations. The changes include adding the Guava library as a dependency and implementing two static methods, computeTopicHash and computeGroupHash, along with thorough unit tests to ensure their correctness and determinism.

Highlights

  • New Hashing Functionality: Introduced two new static methods, computeTopicHash and computeGroupHash, within the Group class to generate deterministic hashes for Kafka topics and consumer groups, respectively.
  • Guava Dependency Added: The Google Guava library (version 33.4.0-jre) has been added as a new dependency to the group-coordinator module to leverage its robust hashing utilities, specifically Murmur3-128.
  • Deterministic Topic Hashing: The computeTopicHash method uses Murmur3-128 to hash a topic's ID, name, number of partitions, and the sorted racks of its replicas, ensuring a consistent hash value for identical topic configurations.
  • Deterministic Group Hashing: The computeGroupHash method combines individual topic hashes in a sorted order to produce a deterministic hash for an entire consumer group, facilitating reliable state comparison.
  • Comprehensive Unit Tests: A new test file, GroupTest.java, has been added with extensive unit tests covering various scenarios for both computeTopicHash and computeGroupHash, verifying their determinism and sensitivity to changes in input fields.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@codeant-ai
Copy link

codeant-ai bot commented Oct 16, 2025

CodeAnt AI finished reviewing your PR.

@coderabbitai
Copy link

coderabbitai bot commented Oct 16, 2025

Walkthrough

This pull request introduces Guava library integration into the group-coordinator module and adds two new static utility methods to the Group class for computing deterministic cryptographic hashes of topic and group metadata. Dependencies and import policies are updated accordingly.

Changes

Cohort / File(s) Summary
Build and Dependency Configuration
build.gradle, gradle/dependencies.gradle, checkstyle/import-control-group-coordinator.xml
Adds Guava 33.4.0-jre as a public dependency to group-coordinator; registers version and library mapping in Gradle dependencies; updates import-control policy to allow com.google.common.hash imports
Group Hashing Utilities
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
Introduces two new static methods: computeTopicHash() computes Murmur3-based hash from topic metadata and partition rack data; computeGroupHash() combines topic hashes using ordered combination. Adds imports for Guava hashing, image types, and charset utilities.
Hash Computation Tests
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java
New test class validating topic and group hash computations with parameterized tests confirming hash sensitivity to magic byte, partition order, rack order, and topic metadata variations. Verifies deterministic hashing behavior.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

The changes span multiple concerns: build configuration (straightforward), new public API methods with hashing logic (requires careful verification of algorithm correctness and determinism), and comprehensive test coverage. The heterogeneity of file types and the addition of cryptographic hashing operations to a public interface warrants careful review, though repetition and pattern consistency in tests reduce complexity.

Poem

🐰 A dash of Guava, hashes so neat,
Topics and groups now march to a beat,
Murmur3 murmurs with racks all arranged,
Order preserved, nothing rearranged!
Tests confirm every hash is just right,

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description Check ⚠️ Warning The pull request description is entirely incomplete and consists only of the placeholder template instructions, which have not been replaced with an actual description of the changes. The author has not provided any explanation of the implementation details, the rationale for adding these hashing utilities, or a testing strategy summary as required by the repository template. This leaves reviewers without essential context about the purpose and scope of the changes. Replace the placeholder template text with a detailed description that includes the motivation for adding these hash computation methods, how they work, and a summary of the testing strategy. The description should explain the purpose of computeGroupHash and computeTopicHash, clarify why Guava hashing was chosen, and describe how the unit tests in GroupTest validate the correctness and determinism of the hash computations.
Docstring Coverage ⚠️ Warning Docstring coverage is 27.27% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (1 passed)
Check name Status Explanation
Title Check ✅ Passed The pull request title "KAFKA-17747: Add compute topic and group hash" is clear, concise, and directly related to the main changes in the changeset. The changes introduce two new static hashing methods (computeGroupHash and computeTopicHash) to the Group interface, along with necessary dependencies and test coverage, which the title accurately captures. The title is specific enough for a reviewer scanning the history to understand the primary addition without being overly verbose.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch clone-KAFKA-17747-2

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@refacto-visz
Copy link

refacto-visz bot commented Oct 16, 2025

KAFKA-17747: Add compute topic and group hash

TL;DR: Adds hash computation utilities for topics and groups using Guava's Murmur3 hashing with comprehensive test coverage.


Refacto PR Summary

Implements deterministic hash computation methods for Kafka topic metadata and group configurations using Google Guava's Murmur3 algorithm.
Adds static methods to compute topic hashes based on ID, name, partitions, and rack topology, plus group hash computation from topic hash collections. This PR introduces cryptographic hash utilities for Kafka's group coordinator to enable consistent topic and group identification across cluster operations. The implementation uses Murmur3-128 hashing to create deterministic fingerprints of topic metadata including partition count and rack distribution, while group hashes combine ordered topic hashes for reproducible group identification. The solution includes comprehensive test coverage validating hash consistency, field sensitivity, and ordering requirements.

Change Highlights

Click to expand
  • build.gradle: Adds Guava 33.4.0-jre dependency to group-coordinator project
  • gradle/dependencies.gradle: Defines Guava version and library reference
  • checkstyle/import-control-group-coordinator.xml: Allows com.google.common.hash imports
  • Group.java: Implements computeTopicHash() and computeGroupHash() static methods
  • GroupTest.java: Comprehensive test suite validating hash computation behavior

Sequence Diagram

sequenceDiagram
    participant GC as Group Coordinator
    participant G as Group Interface
    participant TI as Topic Image
    participant CI as Cluster Image
    participant H as Guava Hasher
    
    GC->>G: computeTopicHash(topicImage, clusterImage)
    G->>H: newHasher().putByte(0)
    G->>TI: getId(), getName(), getPartitions()
    G->>CI: getBrokerRacks()
    G->>H: putLong(topicId).putString(name).putInt(partitions)
    loop For each partition
        G->>H: putInt(partitionId).putString(sortedRacks)
    end
    H-->>G: hash().asLong()
    G-->>GC: Topic hash
    
    GC->>G: computeGroupHash(topicHashes)
    G->>G: Sort topic hashes by key
    G->>H: combineOrdered(hashCodes)
    H-->>G: Combined hash
    G-->>GC: Group hash
Loading

Testing Guide

Click to expand
  1. Topic hash consistency: Create identical topic metadata, verify computeTopicHash() returns same value across multiple calls
  2. Field sensitivity: Modify topic ID, name, partition count, or rack configuration individually, confirm hash changes for each modification
  3. Ordering independence: Verify partition processing order doesn't affect final hash (method sorts internally)
  4. Group hash computation: Create topic hash map, verify computeGroupHash() produces deterministic results regardless of input map ordering
  5. Integration test: Use real TopicImage and ClusterImage objects from metadata, validate hash computation with actual Kafka metadata structures

@visz11
Copy link
Collaborator Author

visz11 commented Oct 16, 2025

/refacto-visz

@refacto-visz
Copy link

refacto-visz bot commented Oct 16, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces utility methods to compute hashes for topics and groups, which will likely be used for consistency checks or routing. The implementation uses Guava's hashing library. My review focuses on improving the robustness of the hashing logic and code clarity. Specifically, the hashing of Uuids should use the full 128 bits to avoid potential collisions, and the stream processing logic can be simplified using more modern Java stream features. The changes are well-tested, but the tests will need updates to reflect the suggested change in the hashing logic. Overall, this is a good addition.

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(topicImage.id().hashCode()) // topic Id

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using topicImage.id().hashCode() to hash the Uuid is not ideal as it truncates the 128-bit UUID into a 32-bit integer hash code before passing it to the Hasher as a long. This significantly increases the probability of hash collisions. To preserve all the information from the UUID, you should hash both the most and least significant bits of the UUID.

Note that you will also need to update the corresponding tests in GroupTest.java to reflect this change.

Suggested change
.putLong(topicImage.id().hashCode()) // topic Id
.putLong(topicImage.id().getMostSignificantBits()).putLong(topicImage.id().getLeastSignificantBits()) // topic Id

Comment on lines +260 to +261
.filter(Optional::isPresent)
.map(Optional::get)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The stream pipeline for collecting rack information can be made more concise. Instead of using .filter(Optional::isPresent).map(Optional::get), you can use .flatMap(Optional::stream). This is available since Java 9 and is more idiomatic for unwrapping Optional values within a stream.

@refacto-visz
Copy link

refacto-visz bot commented Oct 16, 2025

Code Review: Group Hash Implementation

👍 Well Done
Comprehensive Test Coverage

Thorough hash function validation with edge case testing and deterministic design

Secure Hash Implementation

Uses cryptographically secure Murmur3 hash with proper input validation and efficient performance characteristics

📁 Selected files for review (5)
  • build.gradle
  • checkstyle/import-control-group-coordinator.xml
  • gradle/dependencies.gradle
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java
🎯 Custom Instructions
✅ Applied Instructions
Organization Guidelines
  • Keep pull requests small and focused (prefer < 400 lines of code).
  • All CI/CD checks, linting, and unit tests must pass before merge.
  • Use feature flags for new functionality and include a clear rollback plan.
  • Follow the company security checklist:
    • No hard-coded secrets or credentials.
    • Validate all external inputs.
    • Use parameterized queries for DB access.

Scope: All files

📝 Additional Comments
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java (5)
Stream Null Safety

Stream operations lack null safety for entry.getValue().replicas array access. Null replicas array causes NullPointerException during stream processing. Hash computation fails silently or crashes affecting group coordination reliability.

Standards:

  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • SRE-Error-Handling
Stream Collection Inefficiency

Stream processing with intermediate collection creation (.toList()) adds unnecessary memory allocation overhead. Direct stream consumption by Hashing.combineOrdered would eliminate intermediate list creation. For large topic maps, this creates temporary objects increasing GC pressure.

Standards:

  • ISO-IEC-25010-Performance-Efficiency-Resource-Utilization
  • Memory-Allocation-Optimization
Nested Stream Operations

Complex nested stream operations with multiple intermediate transformations create processing overhead. Each filter and map operation processes elements sequentially with potential object creation. For topics with many partitions and replicas, this multiplies computational cost affecting hash calculation performance.

Standards:

  • ISO-IEC-25010-Performance-Efficiency-Time-Behavior
  • Algorithmic-Complexity-Linear-Optimization
Hash Collision Risk

Using hashCode() on Uuid object introduces potential hash collision risk in topic hash computation. The hashCode() method reduces 128-bit UUID to 32-bit integer, increasing collision probability. Consider using the full UUID bytes for cryptographic hash input to maintain uniqueness guarantees.

Standards:

  • Algorithm-Correctness-Hash-Functions
  • Mathematical-Accuracy-Collision-Prevention
Magic Number Usage

Hard-coded magic byte value reduces maintainability when hash format versioning is needed. Consider extracting to named constant like HASH_FORMAT_VERSION for better readability and future extensibility.

Standards:

  • Clean-Code-Meaningful-Names
  • Maintainability-Quality-Readability

.map(Optional::get)
.sorted()
.collect(Collectors.joining(";"));
topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Import Statement

Collectors class usage without visible import statement causes compilation failure. Missing import prevents successful build and deployment. Runtime ClassNotFoundException occurs when attempting to execute joining operation.

import java.util.stream.Collectors;
Commitable Suggestion
Suggested change
topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";"
import java.util.stream.Collectors;
Standards
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • ISO-IEC-25010-Reliability-Maturity

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(topicImage.id().hashCode()) // topic Id
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Null Pointer Risk

Direct hashCode() call on topicImage.id() without null validation creates NullPointerException risk. Null topic ID causes hash computation failure and system crash. Service unavailability results from unhandled runtime exception during group hash calculation.

            .putLong(Objects.hashCode(topicImage.id())) // topic Id
Commitable Suggestion
Suggested change
.putLong(topicImage.id().hashCode()) // topic Id
.putLong(Objects.hashCode(topicImage.id())) // topic Id
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Preconditions

Comment on lines +229 to +246
static long computeGroupHash(Map<String, Long> topicHashes) {
return Hashing.combineOrdered(
topicHashes.entrySet()
.stream()
.sorted(Map.Entry.comparingByKey())
.map(e -> HashCode.fromLong(e.getValue()))
.toList()
).asLong();
}

/**
* Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3.
*
* @param topicImage The topic image.
* @param clusterImage The cluster image.
* @return The hash of the topic.
*/
static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Static Method Placement

Hash computation methods are placed in Group interface but handle generic metadata operations. These utility methods have no direct relationship to Group behavior and violate SRP. Consider extracting to dedicated HashUtils class for better separation of concerns.

Standards
  • SOLID-SRP
  • Clean-Code-Class-Organization
  • Refactoring-Extract-Class

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed79212 and 36cb999.

📒 Files selected for processing (5)
  • build.gradle (1 hunks)
  • checkstyle/import-control-group-coordinator.xml (1 hunks)
  • gradle/dependencies.gradle (2 hunks)
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java (2 hunks)
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java (3)
metadata/src/main/java/org/apache/kafka/image/ClusterImage.java (1)
  • ClusterImage (36-108)
metadata/src/main/java/org/apache/kafka/image/TopicImage.java (1)
  • TopicImage (37-92)
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java (1)
  • BrokerRegistration (44-398)
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java (2)
clients/src/main/java/org/apache/kafka/common/Uuid.java (1)
  • Uuid (32-196)
metadata/src/main/java/org/apache/kafka/image/MetadataImage.java (1)
  • MetadataImage (33-200)
🔇 Additional comments (4)
checkstyle/import-control-group-coordinator.xml (1)

80-81: Import policy update looks good

Allowing com.google.common.hash only within coordinator/group is appropriately scoped.

gradle/dependencies.gradle (2)

64-65: Guava version mapping added

Version pin and alias look fine.


151-152: Dependency entry added; remember LICENSE-binary update

Add Guava to LICENSE-binary and verify dependency-check suppressions if needed.

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java (1)

229-237: Deterministic ordered combination for group hash

Sorting by topic name before Hashing.combineOrdered achieves stable results. LGTM.

implementation libs.hdrHistogram
implementation libs.re2j
implementation libs.slf4jApi
implementation libs.guava
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

🧩 Analysis chain

Scope Guava correctly; consider exposing metadata as api for the new public methods

  • Guava usage is internal to hashing and not exposed in signatures — keeping it as implementation is correct. LGTM.
  • New public static methods on Group use TopicImage and ClusterImage types. To avoid downstream compile issues, expose ':metadata' as an api dependency in group-coordinator.

Also ensure LICENSE-binary is updated for the new Guava dep (per dependencies.gradle note).

Proposed change (within project(':group-coordinator') dependencies):

-    implementation project(':metadata')
+    api project(':metadata')

I'll verify the review comment by examining the relevant code sections. Let me start by inspecting the build configuration and the Group class.


---

🏁 Script executed:

```shell
#!/bin/bash
# Find and inspect the build.gradle file around line 1423 for Guava dependency
fd -e gradle "build.gradle" | head -5

Length of output: 12


🏁 Script executed:

#!/bin/bash
# Search for the Group class and its public static methods
rg -n "class Group" --type=java -A 20 | head -50

Length of output: 6189


🏁 Script executed:

#!/bin/bash
# Check group-coordinator dependencies configuration
fd -e gradle | xargs rg -l "group-coordinator" | head -5

Length of output: 28


🏁 Script executed:

#!/bin/bash
# Look for TopicImage and ClusterImage in public method signatures
rg -nP "public\s+static.*(?:TopicImage|ClusterImage)" --type=java

Length of output: 384


🏁 Script executed:

#!/bin/bash
# Check for LICENSE-binary file and Guava references
fd "LICENSE" -e binary -o -e txt | grep -i license

Length of output: 116


Comment on lines +246 to +267
static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) {
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(topicImage.id().hashCode()) // topic Id
.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
.putInt(topicImage.partitions().size()); // number of partitions

topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> {
topicHasher.putInt(entry.getKey()); // partition id
String racks = Arrays.stream(entry.getValue().replicas)
.mapToObj(clusterImage::broker)
.filter(Objects::nonNull)
.map(BrokerRegistration::rack)
.filter(Optional::isPresent)
.map(Optional::get)
.sorted()
.collect(Collectors.joining(";"));
topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";"
});
return topicHasher.hash().asLong();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use full 128-bit UUID, not Uuid.hashCode(), to avoid unnecessary collisions

The doc says “hash of the topic id…”. Using topicImage.id().hashCode() reduces 128-bit UUID to 32 bits, then writes it as a long, increasing collision risk and diverging from the stated intent.

Replace with both UUID halves; optionally avoid building an intermediate rack string.

Apply:

-        Hasher topicHasher = hf.newHasher()
-            .putByte((byte) 0) // magic byte
-            .putLong(topicImage.id().hashCode()) // topic Id
-            .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
-            .putInt(topicImage.partitions().size()); // number of partitions
+        Hasher topicHasher = hf.newHasher()
+            .putByte((byte) 0) // magic byte
+            // topic id (full 128 bits)
+            .putLong(topicImage.id().getMostSignificantBits())
+            .putLong(topicImage.id().getLeastSignificantBits())
+            .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
+            .putInt(topicImage.partitions().size()); // number of partitions
@@
-            String racks = Arrays.stream(entry.getValue().replicas)
-                .mapToObj(clusterImage::broker)
-                .filter(Objects::nonNull)
-                .map(BrokerRegistration::rack)
-                .filter(Optional::isPresent)
-                .map(Optional::get)
-                .sorted()
-                .collect(Collectors.joining(";"));
-            topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";"
+            Arrays.stream(entry.getValue().replicas)
+                .mapToObj(clusterImage::broker)
+                .filter(Objects::nonNull)
+                .map(BrokerRegistration::rack)
+                .filter(Optional::isPresent)
+                .map(Optional::get)
+                .sorted()
+                // optional: dedupe racks per partition to avoid double-counting identical rack strings
+                // .distinct()
+                .forEachOrdered(r -> {
+                    topicHasher.putString(r, StandardCharsets.UTF_8);
+                    topicHasher.putByte((byte) ';');
+                });

Note: Tests will need updates to write the full UUID (see test suggestions).

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) {
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(topicImage.id().hashCode()) // topic Id
.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
.putInt(topicImage.partitions().size()); // number of partitions
topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> {
topicHasher.putInt(entry.getKey()); // partition id
String racks = Arrays.stream(entry.getValue().replicas)
.mapToObj(clusterImage::broker)
.filter(Objects::nonNull)
.map(BrokerRegistration::rack)
.filter(Optional::isPresent)
.map(Optional::get)
.sorted()
.collect(Collectors.joining(";"));
topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";"
});
return topicHasher.hash().asLong();
}
static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) {
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
// topic id (full 128 bits)
.putLong(topicImage.id().getMostSignificantBits())
.putLong(topicImage.id().getLeastSignificantBits())
.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
.putInt(topicImage.partitions().size()); // number of partitions
topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> {
topicHasher.putInt(entry.getKey()); // partition id
Arrays.stream(entry.getValue().replicas)
.mapToObj(clusterImage::broker)
.filter(Objects::nonNull)
.map(BrokerRegistration::rack)
.filter(Optional::isPresent)
.map(Optional::get)
.sorted()
// optional: dedupe racks per partition to avoid double-counting identical rack strings
// .distinct()
.forEachOrdered(r -> {
topicHasher.putString(r, StandardCharsets.UTF_8);
topicHasher.putByte((byte) ';');
});
});
return topicHasher.hash().asLong();
}
🤖 Prompt for AI Agents
In group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
around lines 246 to 267, replace the use of topicImage.id().hashCode() (which
reduces the 128-bit UUID to 32 bits) with the full 128-bit UUID by writing both
halves: call topicImage.id().getMostSignificantBits() and
topicImage.id().getLeastSignificantBits() (two putLong calls) so the full UUID
is included in the hasher; also avoid building an intermediate joined rack
string by serializing the sorted racks directly (e.g., write an int count then
iterate sorted rack strings and putString each) or otherwise stream each rack
into the topicHasher to reduce allocations; update tests accordingly to expect
the full UUID serialization.

Comment on lines +50 to +65
@Test
void testComputeTopicHash() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertEquals(topicHasher.hash().asLong(), result);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Align expected hash with full 128-bit UUID (not hashCode)

After fixing computeTopicHash to hash the full UUID, adjust the tests:

         HashFunction hf = Hashing.murmur3_128();
         Hasher topicHasher = hf.newHasher()
             .putByte((byte) 0) // magic byte
-            .putLong(FOO_TOPIC_ID.hashCode()) // topic Id
+            .putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
+            .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)
             .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
             .putInt(FOO_NUM_PARTITIONS) // number of partitions
             .putInt(0) // partition 0
             .putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
             .putInt(1) // partition 1
             .putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@Test
void testComputeTopicHash() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertEquals(topicHasher.hash().asLong(), result);
}
@Test
void testComputeTopicHash() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
.putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertEquals(topicHasher.hash().asLong(), result);
}
🤖 Prompt for AI Agents
In
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java
around lines 50 to 65, the test currently feeds only topicId.hashCode() into the
hasher but computeTopicHash now hashes the full 128-bit UUID; update the test to
feed the full UUID into the Hasher (replace the single
putLong(FOO_TOPIC_ID.hashCode()) with two putLong calls:
FOO_TOPIC_ID.getMostSignificantBits() and
FOO_TOPIC_ID.getLeastSignificantBits()) so the expected hasher input matches the
implementation and the asserted hash is correct.

Comment on lines +71 to +81
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 1) // different magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Same UUID correction for different-magic test

         Hasher topicHasher = hf.newHasher()
             .putByte((byte) 1) // different magic byte
-            .putLong(FOO_TOPIC_ID.hashCode()) // topic Id
+            .putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
+            .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)
             .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
             .putInt(FOO_NUM_PARTITIONS) // number of partitions
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 1) // different magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 1) // different magic byte
.putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
.putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
🤖 Prompt for AI Agents
In
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java
around lines 71 to 81, the test uses FOO_TOPIC_ID.hashCode() when feeding the
UUID into the hasher which can change the UUID representation and invalidate the
"different-magic" expectation; replace the single
putLong(FOO_TOPIC_ID.hashCode()) with the UUID's canonical two-long
representation by adding putLong(FOO_TOPIC_ID.getMostSignificantBits()) and
putLong(FOO_TOPIC_ID.getLeastSignificantBits()) so the same UUID bytes are used
across tests while keeping the different magic byte.

Comment on lines +88 to +99
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
// different partition order
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8) // rack of partition 1
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8); // rack of partition 0
assertNotEquals(topicHasher.hash().asLong(), result);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Same UUID correction for different partition order

         Hasher topicHasher = hf.newHasher()
             .putByte((byte) 0) // magic byte
-            .putLong(FOO_TOPIC_ID.hashCode()) // topic Id
+            .putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
+            .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)
             .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
             .putInt(FOO_NUM_PARTITIONS) // number of partitions
🤖 Prompt for AI Agents
In
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java
around lines 88 to 99, the code puts only FOO_TOPIC_ID.hashCode() into the
hasher which loses UUID entropy and can produce the same hash for different
partition orders; replace that single putLong(FOO_TOPIC_ID.hashCode()) with two
putLong calls for the UUID’s full value
(putLong(FOO_TOPIC_ID.getMostSignificantBits()) and
putLong(FOO_TOPIC_ID.getLeastSignificantBits())) so the complete UUID is
included in the hash and different partition orders produce different results.

Comment on lines +106 to +116
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack1;rack0", StandardCharsets.UTF_8) // different rack order of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Same UUID correction for different rack order

         Hasher topicHasher = hf.newHasher()
             .putByte((byte) 0) // magic byte
-            .putLong(FOO_TOPIC_ID.hashCode()) // topic Id
+            .putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
+            .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)
             .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
             .putInt(FOO_NUM_PARTITIONS) // number of partitions
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack1;rack0", StandardCharsets.UTF_8) // different rack order of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
.putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack1;rack0", StandardCharsets.UTF_8) // different rack order of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
🤖 Prompt for AI Agents
In
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java
around lines 106 to 116, the test currently expects different UUIDs when
partition rack order changes but the comment indicates the UUID should remain
the same; update the test to assert equality instead of inequality (replace
assertNotEquals(...) with assertEquals(...)) so it verifies that the hash/UUID
is invariant to rack order, or alternatively normalize/sort the rack strings
before building the hasher so the computed hash is order-independent and keep
the assertion accordingly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants