Skip to content

MINOR: Cleanup Metadata Module #20346

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,7 @@

package org.apache.kafka.controller;

import java.util.Objects;


class BrokerControlStates {
private final BrokerControlState current;
private final BrokerControlState next;

BrokerControlStates(BrokerControlState current, BrokerControlState next) {
this.current = current;
this.next = next;
}

BrokerControlState current() {
return current;
}

BrokerControlState next() {
return next;
}

@Override
public int hashCode() {
return Objects.hash(current, next);
}

@Override
public boolean equals(Object o) {
if (!(o instanceof BrokerControlStates other)) return false;
return other.current == current && other.next == next;
}

record BrokerControlStates(BrokerControlState current, BrokerControlState next) {
@Override
public String toString() {
return "BrokerControlStates(current=" + current + ", next=" + next + ")";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,7 @@

package org.apache.kafka.controller;

import java.util.Objects;

public class BrokerIdAndEpoch {
private final int id;
private final long epoch;

public BrokerIdAndEpoch(
int id,
long epoch
) {
this.id = id;
this.epoch = epoch;
}

public int id() {
return id;
}

public long epoch() {
return epoch;
}

@Override
public boolean equals(Object o) {
if (o == null || (!(o instanceof BrokerIdAndEpoch other))) return false;
return id == other.id && epoch == other.epoch;
}

@Override
public int hashCode() {
return Objects.hash(id, epoch);
}

public record BrokerIdAndEpoch(int id, long epoch) {
@Override
public String toString() {
return "BrokerIdAndEpoch(id=" + id + ", epoch=" + epoch + ")";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ private void completeReassignmentIfNeeded() {

PartitionReassignmentReplicas.CompletedReassignment completedReassignment = completedReassignmentOpt.get();

targetIsr = completedReassignment.isr;
targetReplicas = completedReassignment.replicas;
targetIsr = completedReassignment.isr();
targetReplicas = completedReassignment.replicas();
targetRemoving = List.of();
targetAdding = List.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,7 @@ Optional<CompletedReassignment> maybeCompleteReassignment(List<Integer> targetIs
);
}

static class CompletedReassignment {
final List<Integer> replicas;
final List<Integer> isr;

public CompletedReassignment(List<Integer> replicas, List<Integer> isr) {
this.replicas = replicas;
this.isr = isr;
}
record CompletedReassignment(List<Integer> replicas, List<Integer> isr) {
}

List<Integer> originalReplicas() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2441,14 +2441,7 @@ private void validatePartitionReplicationFactorUnchanged(PartitionRegistration p
}
}

private static final class IneligibleReplica {
private final int replicaId;
private final String reason;

private IneligibleReplica(int replicaId, String reason) {
this.replicaId = replicaId;
this.reason = reason;
}
private record IneligibleReplica(int replicaId, String reason) {

@Override
public String toString() {
Expand Down
21 changes: 2 additions & 19 deletions metadata/src/main/java/org/apache/kafka/image/AclsImage.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@

/**
* Represents the ACLs in the metadata image.
*
* <p>
* This class is thread-safe.
*/
public final class AclsImage {
public record AclsImage(Map<Uuid, StandardAcl> acls) {
public static final AclsImage EMPTY = new AclsImage(Map.of());

private final Map<Uuid, StandardAcl> acls;

public AclsImage(Map<Uuid, StandardAcl> acls) {
this.acls = Collections.unmodifiableMap(acls);
}
Expand All @@ -46,28 +44,13 @@ public boolean isEmpty() {
return acls.isEmpty();
}

public Map<Uuid, StandardAcl> acls() {
return acls;
}

public void write(ImageWriter writer) {
for (Entry<Uuid, StandardAcl> entry : acls.entrySet()) {
StandardAclWithId aclWithId = new StandardAclWithId(entry.getKey(), entry.getValue());
writer.write(0, aclWithId.toRecord());
}
}

@Override
public int hashCode() {
return acls.hashCode();
}

@Override
public boolean equals(Object o) {
if (!(o instanceof AclsImage other)) return false;
return acls.equals(other.acls);
}

@Override
public String toString() {
return new AclsImageNode(this).stringify();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,12 @@

/**
* Represents the client quotas in the metadata image.
*
* <p>
* This class is thread-safe.
*/
public final class ClientQuotasImage {
public record ClientQuotasImage(Map<ClientQuotaEntity, ClientQuotaImage> entities) {
public static final ClientQuotasImage EMPTY = new ClientQuotasImage(Map.of());

private final Map<ClientQuotaEntity, ClientQuotaImage> entities;

public ClientQuotasImage(Map<ClientQuotaEntity, ClientQuotaImage> entities) {
this.entities = Collections.unmodifiableMap(entities);
}
Expand All @@ -61,11 +59,6 @@ public boolean isEmpty() {
return entities.isEmpty();
}

// Visible for testing
public Map<ClientQuotaEntity, ClientQuotaImage> entities() {
return entities;
}

public void write(ImageWriter writer) {
for (Entry<ClientQuotaEntity, ClientQuotaImage> entry : entities.entrySet()) {
ClientQuotaEntity entity = entry.getKey();
Expand All @@ -82,14 +75,14 @@ public DescribeClientQuotasResponseData describe(DescribeClientQuotasRequestData
if (component.entityType().isEmpty()) {
throw new InvalidRequestException("Invalid empty entity type.");
} else if (exactMatch.containsKey(component.entityType()) ||
typeMatch.contains(component.entityType())) {
typeMatch.contains(component.entityType())) {
throw new InvalidRequestException("Entity type " + component.entityType() +
" cannot appear more than once in the filter.");
}
if (!(component.entityType().equals(IP) || component.entityType().equals(USER) ||
component.entityType().equals(CLIENT_ID))) {
component.entityType().equals(CLIENT_ID))) {
throw new UnsupportedVersionException("Unsupported entity type " +
component.entityType());
component.entityType());
}
switch (component.matchType()) {
case MATCH_TYPE_EXACT:
Expand Down Expand Up @@ -119,7 +112,7 @@ public DescribeClientQuotasResponseData describe(DescribeClientQuotasRequestData
}
if (exactMatch.containsKey(IP) || typeMatch.contains(IP)) {
if ((exactMatch.containsKey(USER) || typeMatch.contains(USER)) ||
(exactMatch.containsKey(CLIENT_ID) || typeMatch.contains(CLIENT_ID))) {
(exactMatch.containsKey(CLIENT_ID) || typeMatch.contains(CLIENT_ID))) {
throw new InvalidRequestException("Invalid entity filter component " +
"combination. IP filter component should not be used with " +
"user or clientId filter component.");
Expand Down Expand Up @@ -173,17 +166,6 @@ private static EntryData toDescribeEntry(ClientQuotaEntity entity,
return data;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof ClientQuotasImage other)) return false;
return entities.equals(other.entities);
}

@Override
public int hashCode() {
return Objects.hash(entities);
}

@Override
public String toString() {
return new ClientQuotasImageNode(this).stringify();
Expand Down
34 changes: 4 additions & 30 deletions metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,16 @@

import java.util.Collections;
import java.util.Map;
import java.util.Objects;


/**
* Represents the cluster in the metadata image.
*
* <p>
* This class is thread-safe.
*/
public final class ClusterImage {
public record ClusterImage(Map<Integer, BrokerRegistration> brokers, Map<Integer, ControllerRegistration> controllers) {
public static final ClusterImage EMPTY = new ClusterImage(
Map.of(),
Map.of());

private final Map<Integer, BrokerRegistration> brokers;

private final Map<Integer, ControllerRegistration> controllers;
Map.of(),
Map.of());

public ClusterImage(
Map<Integer, BrokerRegistration> brokers,
Expand All @@ -54,18 +48,10 @@ public boolean isEmpty() {
return brokers.isEmpty();
}

public Map<Integer, BrokerRegistration> brokers() {
return brokers;
}

public BrokerRegistration broker(int nodeId) {
return brokers.get(nodeId);
}

public Map<Integer, ControllerRegistration> controllers() {
return controllers;
}

public long brokerEpoch(int brokerId) {
BrokerRegistration brokerRegistration = broker(brokerId);
if (brokerRegistration == null) {
Expand All @@ -89,18 +75,6 @@ public void write(ImageWriter writer, ImageWriterOptions options) {
}
}

@Override
public int hashCode() {
return Objects.hash(brokers, controllers);
}

@Override
public boolean equals(Object o) {
if (!(o instanceof ClusterImage other)) return false;
return brokers.equals(other.brokers) &&
controllers.equals(other.controllers);
}

@Override
public String toString() {
return new ClusterImageNode(this).stringify();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,10 @@

/**
* Represents the configuration of a resource.
*
* <p>
* This class is thread-safe.
*/
public final class ConfigurationImage {
private final ConfigResource resource;

private final Map<String, String> data;

public ConfigurationImage(
ConfigResource resource,
Map<String, String> data
) {
this.resource = resource;
this.data = data;
}

public ConfigResource resource() {
return resource;
}

public Map<String, String> data() {
return data;
}
public record ConfigurationImage(ConfigResource resource, Map<String, String> data) {

public boolean isEmpty() {
return data.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,14 @@

/**
* Represents the DelegationToken credentials in the metadata image.
*
* <p>
* This class is thread-safe.
*
* @param tokens Map TokenID to TokenInformation. The TokenID is also contained in the TokenInformation inside the DelegationTokenData
*/
public final class DelegationTokenImage {
public record DelegationTokenImage(Map<String, DelegationTokenData> tokens) {
public static final DelegationTokenImage EMPTY = new DelegationTokenImage(Map.of());

// Map TokenID to TokenInformation.
// The TokenID is also contained in the TokenInformation inside the DelegationTokenData
private final Map<String, DelegationTokenData> tokens;

public DelegationTokenImage(Map<String, DelegationTokenData> tokens) {
this.tokens = Collections.unmodifiableMap(tokens);
}
Expand All @@ -55,31 +53,14 @@ public void write(ImageWriter writer, ImageWriterOptions options) {
List<String> tokenIds = new ArrayList<>(tokens.keySet());
String delegationTokenImageString = "DelegationTokenImage(" + String.join(", ", tokenIds) + ")";
options.handleLoss(delegationTokenImageString);
}
}
}
}

public Map<String, DelegationTokenData> tokens() {
return tokens;
}

public boolean isEmpty() {
return tokens.isEmpty();
}

@Override
public int hashCode() {
return tokens.hashCode();
}

@Override
public boolean equals(Object o) {
if (o == null) return false;
if (!o.getClass().equals(DelegationTokenImage.class)) return false;
DelegationTokenImage other = (DelegationTokenImage) o;
return tokens.equals(other.tokens);
}

@Override
public String toString() {
return new DelegationTokenImageNode(this).stringify();
Expand Down
Loading