Skip to content

Commit 9178f89

Browse files
see-quickUbuntu
authored andcommitted
MINOR: Refactor on FeaturesPublisher and ScramPublisher (apache#20522)
This PR is a follow-up from apache#20468. Basically makes two things: 1. Moves the variable to the catch block as it is used only there. 2. Refactor FeaturesPublisher to handle exceptions the same as ScramPublisher or other publishers :) Reviewers: Chia-Ping Tsai <[email protected]> --------- Signed-off-by: see-quick <[email protected]>
1 parent 4908f97 commit 9178f89

File tree

4 files changed

+21
-16
lines changed

4 files changed

+21
-16
lines changed

core/src/main/scala/kafka/server/ControllerServer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ class ControllerServer(
145145

146146
metadataCachePublisher = new KRaftMetadataCachePublisher(metadataCache)
147147

148-
featuresPublisher = new FeaturesPublisher(logContext)
148+
featuresPublisher = new FeaturesPublisher(logContext, sharedServer.metadataPublishingFaultHandler)
149149

150150
registrationsPublisher = new ControllerRegistrationsPublisher()
151151

core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ class BrokerMetadataPublisher(
224224
dynamicTopicClusterQuotaPublisher.onMetadataUpdate(delta, newImage)
225225

226226
// Apply SCRAM delta.
227-
scramPublisher.onMetadataUpdate(delta, newImage)
227+
scramPublisher.onMetadataUpdate(delta, newImage, manifest)
228228

229229
// Apply DelegationToken delta.
230230
delegationTokenPublisher.onMetadataUpdate(delta, newImage)

metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.kafka.image.loader.LoaderManifest;
2424
import org.apache.kafka.image.publisher.MetadataPublisher;
2525
import org.apache.kafka.server.common.FinalizedFeatures;
26+
import org.apache.kafka.server.fault.FaultHandler;
2627

2728
import org.slf4j.Logger;
2829

@@ -31,12 +32,15 @@
3132

3233
public class FeaturesPublisher implements MetadataPublisher {
3334
private final Logger log;
35+
private final FaultHandler faultHandler;
3436
private volatile FinalizedFeatures finalizedFeatures = FinalizedFeatures.fromKRaftVersion(MINIMUM_VERSION);
3537

3638
public FeaturesPublisher(
37-
LogContext logContext
39+
LogContext logContext,
40+
FaultHandler faultHandler
3841
) {
39-
log = logContext.logger(FeaturesPublisher.class);
42+
this.log = logContext.logger(FeaturesPublisher.class);
43+
this.faultHandler = faultHandler;
4044
}
4145

4246
public FinalizedFeatures features() {
@@ -54,15 +58,20 @@ public void onMetadataUpdate(
5458
MetadataImage newImage,
5559
LoaderManifest manifest
5660
) {
57-
if (delta.featuresDelta() != null) {
58-
FinalizedFeatures newFinalizedFeatures = new FinalizedFeatures(newImage.features().metadataVersionOrThrow(),
61+
try {
62+
if (delta.featuresDelta() != null) {
63+
FinalizedFeatures newFinalizedFeatures = new FinalizedFeatures(newImage.features().metadataVersionOrThrow(),
5964
newImage.features().finalizedVersions(),
6065
newImage.provenance().lastContainedOffset()
61-
);
62-
if (!newFinalizedFeatures.equals(finalizedFeatures)) {
63-
log.info("Loaded new metadata {}.", newFinalizedFeatures);
64-
finalizedFeatures = newFinalizedFeatures;
66+
);
67+
if (!newFinalizedFeatures.equals(finalizedFeatures)) {
68+
log.info("Loaded new metadata {}.", newFinalizedFeatures);
69+
finalizedFeatures = newFinalizedFeatures;
70+
}
6571
}
72+
} catch (Throwable t) {
73+
faultHandler.handleFault("Uncaught exception while publishing SCRAM changes from MetadataDelta up to "
74+
+ newImage.highestOffsetAndEpoch().offset(), t);
6675
}
6776
}
6877
}

metadata/src/main/java/org/apache/kafka/metadata/publisher/ScramPublisher.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,6 @@ public final String name() {
4444

4545
@Override
4646
public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) {
47-
onMetadataUpdate(delta, newImage);
48-
}
49-
50-
public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage) {
51-
String deltaName = "MetadataDelta up to " + newImage.highestOffsetAndEpoch().offset();
5247
try {
5348
// Apply changes to SCRAM credentials.
5449
ScramDelta scramDelta = delta.scramDelta();
@@ -63,7 +58,8 @@ public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage) {
6358
});
6459
}
6560
} catch (Throwable t) {
66-
faultHandler.handleFault("Uncaught exception while publishing SCRAM changes from " + deltaName, t);
61+
faultHandler.handleFault("Uncaught exception while publishing SCRAM changes from MetadataDelta up to "
62+
+ newImage.highestOffsetAndEpoch().offset(), t);
6763
}
6864
}
6965
}

0 commit comments

Comments
 (0)