Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,7 @@ project(':group-coordinator') {
implementation libs.hdrHistogram
implementation libs.re2j
implementation libs.slf4jApi
implementation libs.guava

testImplementation project(':clients').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-group-coordinator.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
<allow pkg="org.apache.kafka.coordinator.common.runtime" />
<allow pkg="com.google.re2j" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="com.google.common.hash" />
<subpackage name="metrics">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.HdrHistogram" />
Expand Down
2 changes: 2 additions & 0 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ versions += [
classgraph: "4.8.173",
gradle: "8.10.2",
grgit: "4.1.1",
guava: "33.4.0-jre",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

Verify Guava version and consider lightweight alternatives.

Guava 33.4.0-jre is being added as a dependency. While Guava is a well-maintained library, it's quite large (~2.5 MB) and this PR only uses its hashing utilities. Consider:

  1. Verifying this is an appropriate version
  2. Evaluating whether a lighter-weight hashing library might suffice

Run the following script to check the Guava version and security status:

Additionally, consider searching for lightweight alternatives:

Also applies to: 151-151


🏁 Script executed:

#!/bin/bash
# Check latest Guava version
echo "=== Checking latest Guava version ==="
curl -s https://repo1.maven.org/maven2/com/google/guava/guava/maven-metadata.xml | grep -o '<version>[^<]*</version>' | tail -5

# Check for known vulnerabilities
echo -e "\n=== Checking for security advisories ==="
gh api graphql -f query='
{
  securityVulnerabilities(first: 10, ecosystem: MAVEN, package: "com.google.guava:guava") {
    nodes {
      advisory {
        summary
        severity
        publishedAt
      }
      vulnerableVersionRange
      firstPatchedVersion {
        identifier
      }
    }
  }
}'

Length of output: 972


🌐 Web query:

What are lightweight alternatives to Guava for Murmur3 hashing in Java?

💡 Result:

Here are small/lightweight Java alternatives to Guava’s Murmur3, with short notes and Maven/artifact references:

  1. Apache Commons Codec — org.apache.commons:commons-codec — includes MurmurHash3 (32/128-bit) (small, well-maintained). (commons.apache.org) [1]

  2. io7m jmurmur — com.io7m.jmurmur:jmurmur — pure-Java Murmur3 (32/64), minimal dependency footprint, on Maven Central. (io7m.com) [2]

  3. Apache DataSketches (memory module) — org.apache.datasketches:datasketches-memory — MurmurHash3 implementation used by DataSketches (no Guava). Good if you already use DataSketches. (apache.github.io) [3]

  4. eprst/murmur3 (github / Maven) — com.github.eprst:murmur3 — small, fast Murmur3 port (Unlicense). (mvnrepository.com) [4]

  5. kotlinx-murmurhash / com.goncalossilva (Kotlin Multiplatform) — usable from JVM for MurmurHash3; lightweight if you accept a small KMP lib. (libraries.io) [5]

If you want, I can give example Maven coordinates + tiny usage snippet for any of these.

Citations:


🏁 Script executed:

#!/bin/bash
# Check if Guava is already in dependencies or newly added
echo "=== Searching for existing Guava usage in codebase ==="
rg -i "guava" --type gradle -C2 | head -20

echo -e "\n=== Checking for Murmur3 or hashing imports ==="
rg "Murmur3|guava.*hash|import.*guava" --type java -n | head -15

echo -e "\n=== Check gradle dependencies file structure ==="
head -80 gradle/dependencies.gradle | tail -30

Length of output: 5248


🏁 Script executed:

#!/bin/bash
# Check JAR size of Guava 33.4.0-jre vs alternatives
echo "=== Guava dependency size comparison ==="
cat <<'EOF' | python3
import json
import urllib.request

libs = {
    "Guava 33.4.0": "com.google.guava:guava:33.4.0-jre",
    "Commons Codec 1.17": "commons-codec:commons-codec:1.17",
    "io7m jmurmur": "com.io7m.jmurmur:jmurmur:7.0.0"
}

for name, coord in libs.items():
    try:
        # Query Maven central for artifact size
        parts = coord.split(":")
        group = parts[0].replace(".", "/")
        artifact = parts[1]
        version = parts[2]
        url = f"https://repo1.maven.org/maven2/{group}/{artifact}/{version}/{artifact}-{version}.jar"
        
        req = urllib.request.Request(url, method='HEAD')
        with urllib.request.urlopen(req) as resp:
            size_mb = int(resp.headers.get('Content-Length', 0)) / (1024 * 1024)
            print(f"{name:25} {size_mb:6.2f} MB")
    except Exception as e:
        print(f"{name:25} [unavailable]")
EOF

Length of output: 160


🏁 Script executed:

#!/bin/bash
# Find actual Guava imports (com.google.common.*)
echo "=== Actual Guava imports in Java source ==="
rg "import com\.google\.common" --type java -n | head -20

echo -e "\n=== Guava class usage (Hash, Hashing, etc.) ==="
rg "com\.google\.common\.(hash|base|collect)" --type java -C1 -n | head -25

echo -e "\n=== Check if Guava is new or existing by searching all gradle files ==="
rg "guava" gradle/ -n

Length of output: 2611


Update Guava to the latest version (currently 33.5.0-jre).

Guava 33.4.0-jre is outdated. The codebase uses Guava's hashing API (com.google.common.hash.* for HashFunction, Hasher, HashCode, Hashing) in group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java. While there are no active security vulnerabilities in version 33.4.0-jre, newer versions are available (33.4.8-jre as a patch, or 33.5.0-jre as the current release).

Consider upgrading to at least 33.4.8-jre or preferably 33.5.0-jre for bug fixes and improvements.

Regarding lightweight alternatives: while options like Apache Commons Codec exist for specific hash functions, they would not be direct drop-in replacements for Guava's full hashing API without refactoring the Group.java implementation.

httpclient: "4.5.14",
jackson: "2.16.2",
jacoco: "0.8.10",
Expand Down Expand Up @@ -147,6 +148,7 @@ libs += [
caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator",
guava: "com.google.guava:guava:$versions.guava",
jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson",
jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
jacksonDatabindYaml: "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$versions.jackson",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.metadata.BrokerRegistration;

import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -209,4 +219,50 @@ void validateOffsetFetch(
default boolean shouldExpire() {
return true;
}

/**
* Computes the hash of the topics in a group.
*
* @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash.
* @return The hash of the group.
*/
static long computeGroupHash(Map<String, Long> topicHashes) {
return Hashing.combineOrdered(
topicHashes.entrySet()
.stream()
.sorted(Map.Entry.comparingByKey())
.map(e -> HashCode.fromLong(e.getValue()))
.toList()
).asLong();
}

/**
* Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3.
*
* @param topicImage The topic image.
* @param clusterImage The cluster image.
* @return The hash of the topic.
*/
static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) {
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(topicImage.id().hashCode()) // topic Id

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using hashCode() on a Uuid converts the 128-bit UUID into a 32-bit integer. This loses a significant amount of information and increases the likelihood of hash collisions. It's better to include the full UUID in the hash calculation by hashing its most and least significant bits.

Suggested change
.putLong(topicImage.id().hashCode()) // topic Id
.putLong(topicImage.id().getMostSignificantBits()) // topic Id
.putLong(topicImage.id().getLeastSignificantBits())

.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
.putInt(topicImage.partitions().size()); // number of partitions

topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> {
topicHasher.putInt(entry.getKey()); // partition id
String racks = Arrays.stream(entry.getValue().replicas)
.mapToObj(clusterImage::broker)
.filter(Objects::nonNull)
.map(BrokerRegistration::rack)
.filter(Optional::isPresent)
.map(Optional::get)
Comment on lines +260 to +261

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This stream operation can be simplified. Instead of filter(Optional::isPresent).map(Optional::get), you can use flatMap(Optional::stream) which is available since Java 9 and achieves the same result more concisely.

Suggested change
.filter(Optional::isPresent)
.map(Optional::get)
.flatMap(Optional::stream)

.sorted()
.collect(Collectors.joining(";"));
topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";"
Comment on lines +256 to +264
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

Potential issue with semicolon delimiter in rack names.

The rack collection logic joins rack names with semicolons (line 263). If a rack name contains a semicolon, this could cause ambiguity or unexpected hash values.

Consider either:

  1. Validating that rack names don't contain semicolons elsewhere in the codebase
  2. Using a different delimiter that's guaranteed not to appear in rack names
  3. Escaping semicolons in rack names before joining

Verify whether rack names can contain semicolons:


🏁 Script executed:

#!/bin/bash
# Search for rack name validation or constraints
rg -n "rack.*validation|rack.*allowed|rack.*pattern" --type java

# Search for rack name usage patterns
rg -n "rack\(\)" --type java -A 2 -B 2

Length of output: 28037



🏁 Script executed:

#!/bin/bash
# Search for BrokerRegistrationRequest to see how rack is defined/validated
rg -n "class BrokerRegistrationRequest" --type java -A 20

# Search for where rack is validated or constrained
rg -n "rack.*setRack|setRack.*rack" --type java -B 3 -A 3

# Look for broker registration validation
rg -n "validateRack|validateBroker.*Rack|rack.*validate" --type java

Length of output: 80265


🏁 Script executed:

#!/bin/bash
# Search for rack in request/response data structures
fd -e java -x grep -l "rack" {} \; | grep -i "request\|response\|data" | head -20 | xargs rg -n "rack" -B 2 -A 2

# Search for any documentation or constraints on rack
rg -n "rack.*must|rack.*should|rack.*cannot|rack.*format" --type java -i

Length of output: 63341


🏁 Script executed:

#!/bin/bash
# Search for BrokerRegistrationRequest schema or API definition
# This will help understand if rack has any character constraints at the protocol level
curl -s "https://raw.githubusercontent.com/apache/kafka/trunk/clients/src/main/java/org/apache/kafka/common/message/BrokerRegistrationRequest.json" 2>/dev/null | head -100

Length of output: 14


🌐 Web query:

Kafka BrokerRegistration rack field constraints allowed characters

💡 Result:

Short answer: Kafka imposes no strict character whitelist for the broker "rack" value — it is treated as an opaque string (stored in broker registration/metadata and carried in protocol messages). Avoid using control characters and characters that would break any external systems you integrate with (e.g. JSON/URLs/filepaths, Kubernetes label values) but Kafka itself does not validate/limit the characters. [1][2][3]

Sources:

  • Kafka protocol / metadata docs (rack is a string). [1]
  • Broker registration schema (rack: "string", optional). [2]
  • Community notes / discussion confirming rack is an opaque string (no enforced charset). [3]

Potential issue with semicolon delimiter in rack names.

The rack collection logic joins rack names with semicolons (line 263). If a rack name contains a semicolon, this could cause hash collisions or unexpected hash values (e.g., racks "a;b" and "a" + "b" would produce identical output).

While Kafka does not enforce character constraints on rack names at the protocol level, this delimiter choice creates a subtle risk if operators configure rack names containing semicolons.

Consider either:

  1. Validating that rack names don't contain semicolons at broker startup
  2. Using a different delimiter guaranteed not to appear in rack names (e.g., null byte or escaped format)
  3. Escaping semicolons in rack names before joining
🤖 Prompt for AI Agents
In group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
around lines 256 to 264 the code joins rack names with a semicolon which can
cause collisions or wrong hashes if a rack name contains ";" — fix by making the
serialization unambiguous: either validate/reject rack names containing ";" at
broker startup, or change the join to an escaped or length-prefixed form (e.g.,
escape ";" characters in rack names before joining or serialize each rack as
length + value, or join with a delimiter guaranteed not to appear such as the
null character), and update the topicHasher input accordingly so hashes are
stable and collision-free.

});
return topicHasher.hash().asLong();
}
Comment on lines +246 to +267
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Document hash versioning strategy and format stability.

The method uses a hardcoded magic byte (0) but lacks documentation about:

  • How format changes will be handled if the hashing algorithm needs to evolve
  • Whether this hash must remain stable across Kafka versions
  • What guarantees are provided about hash stability

This is critical for understanding the long-term implications of this implementation.

Add clear documentation:

 /**
  * Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3.
  *
+ * The hash format is versioned via a magic byte (currently 0). The hash includes:
+ * - Magic byte (0)
+ * - Topic ID hashCode
+ * - Topic name (UTF-8)
+ * - Partition count
+ * - For each partition (sorted): partition ID and semicolon-delimited sorted racks
+ *
+ * This hash is deterministic and stable for the same topic metadata. Changes to the
+ * hashing algorithm MUST use a different magic byte to maintain compatibility.
+ *
  * @param topicImage   The topic image.
  * @param clusterImage The cluster image.
  * @return The hash of the topic.
  */
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) {
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(topicImage.id().hashCode()) // topic Id
.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
.putInt(topicImage.partitions().size()); // number of partitions
topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> {
topicHasher.putInt(entry.getKey()); // partition id
String racks = Arrays.stream(entry.getValue().replicas)
.mapToObj(clusterImage::broker)
.filter(Objects::nonNull)
.map(BrokerRegistration::rack)
.filter(Optional::isPresent)
.map(Optional::get)
.sorted()
.collect(Collectors.joining(";"));
topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";"
});
return topicHasher.hash().asLong();
}
/**
* Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3.
*
* The hash format is versioned via a magic byte (currently 0). The hash includes:
* - Magic byte (0)
* - Topic ID hashCode
* - Topic name (UTF-8)
* - Partition count
* - For each partition (sorted): partition ID and semicolon-delimited sorted racks
*
* This hash is deterministic and stable for the same topic metadata. Changes to the
* hashing algorithm MUST use a different magic byte to maintain compatibility.
*
* @param topicImage The topic image.
* @param clusterImage The cluster image.
* @return The hash of the topic.
*/
static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) {
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(topicImage.id().hashCode()) // topic Id
.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
.putInt(topicImage.partitions().size()); // number of partitions
topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> {
topicHasher.putInt(entry.getKey()); // partition id
String racks = Arrays.stream(entry.getValue().replicas)
.mapToObj(clusterImage::broker)
.filter(Objects::nonNull)
.map(BrokerRegistration::rack)
.filter(Optional::isPresent)
.map(Optional::get)
.sorted()
.collect(Collectors.joining(";"));
topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";"
});
return topicHasher.hash().asLong();
}
🤖 Prompt for AI Agents
In group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
around lines 246 to 267, the computeTopicHash method uses a hardcoded magic byte
and lacks documentation about versioning, stability guarantees, and how to
evolve the hash format; add Javadoc above the method that documents the magic
byte meaning, the intended versioning strategy (e.g., bump magic byte when
changing field encoding or algorithm), whether the hash must be stable across
Kafka releases, and migration/compatibility expectations, and include a note
about charset, separator semantics, and that callers must not rely on numeric
stability unless explicitly guaranteed.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.image.MetadataImage;

import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;

public class GroupTest {
private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
private static final String FOO_TOPIC_NAME = "foo";
private static final String BAR_TOPIC_NAME = "bar";
private static final int FOO_NUM_PARTITIONS = 2;
private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder()
.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
.addRacks()
.build();

@Test
void testComputeTopicHash() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using hashCode() on a Uuid is not ideal as it loses information from the 128-bit UUID, which increases the risk of collisions. It's better to hash the full 128 bits of the UUID. This change should be applied to all other places in this test file where hashCode() is used for a Uuid (lines 74, 91, 109).

Suggested change
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id
.putLong(FOO_TOPIC_ID.getLeastSignificantBits())

.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
Comment on lines +54 to +63

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for building the Hasher is duplicated across testComputeTopicHash, testComputeTopicHashWithDifferentMagicByte, testComputeTopicHashWithDifferentPartitionOrder, and testComputeTopicHashWithDifferentRackOrder. To improve maintainability, consider extracting this logic into a private helper method. This would reduce code duplication and make the tests easier to read and update.

assertEquals(topicHasher.hash().asLong(), result);
}

@Test
void testComputeTopicHashWithDifferentMagicByte() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 1) // different magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
}

@Test
void testComputeTopicHashWithDifferentPartitionOrder() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
// different partition order
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8) // rack of partition 1
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8); // rack of partition 0
assertNotEquals(topicHasher.hash().asLong(), result);
}

@Test
void testComputeTopicHashWithDifferentRackOrder() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack1;rack0", StandardCharsets.UTF_8) // different rack order of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
}

@ParameterizedTest
@MethodSource("differentFieldGenerator")
void testComputeTopicHashWithDifferentField(MetadataImage differentImage, Uuid topicId) {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

assertNotEquals(
Group.computeTopicHash(
differentImage.topics().getTopic(topicId),
differentImage.cluster()
),
result
);
}

private static Stream<Arguments> differentFieldGenerator() {
Uuid differentTopicId = Uuid.randomUuid();
return Stream.of(
Arguments.of(new MetadataImageBuilder() // different topic id
.addTopic(differentTopicId, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
.addRacks()
.build(),
differentTopicId
),
Arguments.of(new MetadataImageBuilder() // different topic name
.addTopic(FOO_TOPIC_ID, "bar", FOO_NUM_PARTITIONS)
.addRacks()
.build(),
FOO_TOPIC_ID
),
Arguments.of(new MetadataImageBuilder() // different partitions
.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, 1)
.addRacks()
.build(),
FOO_TOPIC_ID
),
Arguments.of(new MetadataImageBuilder() // different racks
.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
.build(),
FOO_TOPIC_ID
)
);
}

@Test
void testComputeGroupHash() {
long result = Group.computeGroupHash(Map.of(
BAR_TOPIC_NAME, 123L,
FOO_TOPIC_NAME, 456L
));

long expected = Hashing.combineOrdered(List.of(
HashCode.fromLong(123L),
HashCode.fromLong(456L)
)).asLong();
assertEquals(expected, result);
}

@Test
void testComputeGroupHashWithDifferentOrder() {
long result = Group.computeGroupHash(Map.of(
BAR_TOPIC_NAME, 123L,
FOO_TOPIC_NAME, 456L
));

long unexpected = Hashing.combineOrdered(List.of(
HashCode.fromLong(456L),
HashCode.fromLong(123L)
)).asLong();
assertNotEquals(unexpected, result);
}
}
Loading