Skip to content

Commit 7023b45

Browse files
authored
List diff instead of entire set for Kafka Supervisor partition mismatch (#18234)
* Clearer lag metrics * Reduce verbosity of Kafka mismatch log * Log KafkaTopicPartition instead of integer * Use object mapper to serialize set difference * Change format argument to include error message * Correctly describe writeValueAsString as serialize
1 parent 78bf55d commit 7023b45

File tree

1 file changed

+14
-6
lines changed
  • extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor

1 file changed

+14
-6
lines changed

extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@
2424
import com.fasterxml.jackson.databind.ObjectMapper;
2525
import com.google.common.annotations.VisibleForTesting;
2626
import com.google.common.base.Objects;
27+
import com.google.common.collect.Sets;
2728
import org.apache.druid.common.utils.IdUtils;
2829
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
2930
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
31+
import org.apache.druid.error.DruidException;
3032
import org.apache.druid.indexing.common.task.Task;
3133
import org.apache.druid.indexing.common.task.TaskResource;
3234
import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
@@ -265,12 +267,18 @@ protected Map<KafkaTopicPartition, Long> getPartitionRecordLag()
265267
return null;
266268
}
267269

268-
if (!latestSequenceFromStream.keySet().equals(highestCurrentOffsets.keySet())) {
269-
log.warn(
270-
"Kafka partitions[%s] do not match task partitions[%s]",
271-
latestSequenceFromStream.keySet(),
272-
highestCurrentOffsets.keySet()
273-
);
270+
Set<KafkaTopicPartition> kafkaPartitions = latestSequenceFromStream.keySet();
271+
Set<KafkaTopicPartition> taskPartitions = highestCurrentOffsets.keySet();
272+
if (!kafkaPartitions.equals(taskPartitions)) {
273+
try {
274+
log.warn("Mismatched kafka and task partitions: Missing Task Partitions %s, Missing Kafka Partitions %s",
275+
sortingMapper.writeValueAsString(Sets.difference(kafkaPartitions, taskPartitions)),
276+
sortingMapper.writeValueAsString(Sets.difference(taskPartitions, kafkaPartitions)));
277+
}
278+
catch (JsonProcessingException e) {
279+
throw DruidException.defensive("Failed to serialize KafkaTopicPartition when getting partition record lag: %s",
280+
e.getMessage());
281+
}
274282
}
275283

276284
return getRecordLagPerPartitionInLatestSequences(highestCurrentOffsets);

0 commit comments

Comments
 (0)