Skip to content

Commit 95e690f

Browse files
zzzmingeolivelli
authored andcommitted
use retry-able error codes for fetch failures
1 parent f65b9bc commit 95e690f

File tree

1 file changed

+8
-4
lines changed

1 file changed

+8
-4
lines changed

proxy/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProxyRequestHandler.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -793,8 +793,9 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch,
793793
.whenComplete((result, error) -> {
794794
// TODO: report errors for specific partitions and continue for non failed lookups
795795
if (error != null) {
796+
log.error("Cannot lookup brokers for a fetch request error {}", error);
796797
FetchResponse fetchResponse = buildFetchErrorResponse(fetchRequest,
797-
fetchData, Errors.UNKNOWN_SERVER_ERROR);
798+
fetchData, Errors.FETCH_SESSION_ID_NOT_FOUND);
798799
resultFuture.complete(fetchResponse);
799800
} else {
800801
boolean multipleBrokers = false;
@@ -822,7 +823,7 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch,
822823
}).exceptionally(badError -> {
823824
log.error("bad error for FULL fetch", badError);
824825
FetchResponse fetchResponse = buildFetchErrorResponse(fetchRequest,
825-
fetchData, Errors.UNKNOWN_SERVER_ERROR);
826+
fetchData, Errors.FETCH_SESSION_TOPIC_ID_ERROR);
826827
resultFuture.complete(fetchResponse);
827828
return null;
828829
});
@@ -842,8 +843,10 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch,
842843
// add the topicPartition with timeout error if it's not existed in responseMap
843844
fetchData.keySet().forEach(topicPartition -> {
844845
if (!responseMap.containsKey(topicPartition)) {
846+
log.error(metadataNamespace + " Request {}: not found response for {}",
847+
fetch.getHeader(), topicPartition);
845848
responseMap.put(topicPartition,
846-
getFetchPartitionDataWithError(Errors.UNKNOWN_SERVER_ERROR));
849+
getFetchPartitionDataWithError(Errors.FETCH_SESSION_TOPIC_ID_ERROR));
847850
}
848851
});
849852
if (log.isDebugEnabled()) {
@@ -961,7 +964,7 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch,
961964
log.error("bad error while fetching for {} from {}",
962965
fetchData.keySet(), badError, kopBroker);
963966
fetchData.keySet().forEach(topicPartition ->
964-
errorsConsumer.accept(topicPartition, Errors.UNKNOWN_SERVER_ERROR)
967+
errorsConsumer.accept(topicPartition, Errors.FETCH_SESSION_TOPIC_ID_ERROR)
965968
);
966969
return null;
967970
}).whenComplete((ignore1, ignore2) -> {
@@ -1595,6 +1598,7 @@ protected void handleDeleteRecords(KafkaHeaderAndRequest deleteRecords,
15951598
.whenComplete((result, error) -> {
15961599
// TODO: report errors for specific partitions and continue for non failed lookups
15971600
if (error != null) {
1601+
log.error("delete records error {}", error);
15981602
Map<TopicPartition, Errors> errorsMap =
15991603
partitionOffsets
16001604
.keySet()

0 commit comments

Comments
 (0)