Skip to content

Commit b7b2676

Browse files
authored
Cherrypick "MINOR : Handle error for client telemetry push (apache#19881)" (apache#20179)
Update catch to handle compression errors Before : ![image](https://github.com/user-attachments/assets/c5ca121e-ba0c-4664-91f1-20b54abf67cc) After ``` Sent message: KR Message 376 [kafka-producer-network-thread | kr-kafka-producer] INFO org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter - KR: Failed to compress telemetry payload for compression: zstd, sending uncompressed data Sent message: KR Message 377 ``` Reviewers: Apoorv Mittal <[email protected]>, Bill Bejeck <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent f7f249e commit b7b2676

File tree

2 files changed

+3
-4
lines changed

2 files changed

+3
-4
lines changed

clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.slf4j.Logger;
4242
import org.slf4j.LoggerFactory;
4343

44-
import java.io.IOException;
4544
import java.time.Duration;
4645
import java.util.Collections;
4746
import java.util.List;
@@ -719,8 +718,8 @@ private Optional<Builder<?>> createPushRequest(ClientTelemetrySubscription local
719718
byte[] compressedPayload;
720719
try {
721720
compressedPayload = ClientTelemetryUtils.compress(payload, compressionType);
722-
} catch (IOException e) {
723-
log.info("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType);
721+
} catch (Throwable e) {
722+
log.debug("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType);
724723
compressedPayload = payload;
725724
compressionType = CompressionType.NONE;
726725
}

server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest re
211211
long exportTimeStartMs = time.hiResClockMs();
212212
receiverPlugin.exportMetrics(requestContext, request);
213213
clientMetricsStats.recordPluginExport(clientInstanceId, time.hiResClockMs() - exportTimeStartMs);
214-
} catch (Exception exception) {
214+
} catch (Throwable exception) {
215215
clientMetricsStats.recordPluginErrorCount(clientInstanceId);
216216
clientInstance.lastKnownError(Errors.INVALID_RECORD);
217217
log.error("Error exporting client metrics to the plugin for client instance id: {}", clientInstanceId, exception);

0 commit comments

Comments
 (0)