Skip to content

Commit 70c5164

Browse files
authored
Cherrypick "MINOR : Handle error for client telemetry push (#19881)" (#20176)
Update catch to handle compression errors Before : ![image](https://github.com/user-attachments/assets/c5ca121e-ba0c-4664-91f1-20b54abf67cc) After ``` Sent message: KR Message 376 [kafka-producer-network-thread | kr-kafka-producer] INFO org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - KR: Failed to compress telemetry payload for compression: zstd, sending uncompressed data Sent message: KR Message 377 ``` Reviewers: Apoorv Mittal <[email protected]>, Bill Bejeck <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent eefee6d commit 70c5164

File tree

2 files changed

+3
-4
lines changed

2 files changed

+3
-4
lines changed

clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.slf4j.Logger;
4242
import org.slf4j.LoggerFactory;
4343

44-
import java.io.IOException;
4544
import java.nio.ByteBuffer;
4645
import java.time.Duration;
4746
import java.util.Collections;
@@ -718,8 +717,8 @@ private Optional<Builder<?>> createPushRequest(ClientTelemetrySubscription local
718717
ByteBuffer compressedPayload;
719718
try {
720719
compressedPayload = ClientTelemetryUtils.compress(payload, compressionType);
721-
} catch (IOException e) {
722-
log.info("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType);
720+
} catch (Throwable e) {
721+
log.debug("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType);
723722
compressedPayload = ByteBuffer.wrap(payload.toByteArray());
724723
compressionType = CompressionType.NONE;
725724
}

server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest re
217217
long exportTimeStartMs = time.hiResClockMs();
218218
receiverPlugin.exportMetrics(requestContext, request);
219219
clientMetricsStats.recordPluginExport(clientInstanceId, time.hiResClockMs() - exportTimeStartMs);
220-
} catch (Exception exception) {
220+
} catch (Throwable exception) {
221221
clientMetricsStats.recordPluginErrorCount(clientInstanceId);
222222
clientInstance.lastKnownError(Errors.INVALID_RECORD);
223223
log.error("Error exporting client metrics to the plugin for client instance id: {}", clientInstanceId, exception);

0 commit comments

Comments
 (0)