diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index 6ab65bebdca2e..80804a2ac8cb2 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -31,6 +31,10 @@ import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.common.errors.OffsetOutOfRangeException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.Collection; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +51,8 @@ public class MirrorSourceTask extends SourceTask { private static final Logger log = LoggerFactory.getLogger(MirrorSourceTask.class); + private final Map lastReplicatedOffsets = new ConcurrentHashMap<>(); + private KafkaConsumer consumer; private String sourceClusterAlias; private Duration pollTimeout; @@ -123,6 +129,24 @@ public String version() { return new MirrorSourceConnector().version(); } + private void detectTruncation(Collection partitions) { + if (partitions == null || partitions.isEmpty()) return; + try { + Map earliestOffsets = consumer.beginningOffsets(partitions); + for (TopicPartition tp : partitions) { + long expectedNext = lastReplicatedOffsets.getOrDefault(tp, -1L) + 1; + long earliest = earliestOffsets.get(tp); + if (expectedNext >= 0 && expectedNext < earliest) { + log.error("DATA TRUNCATION DETECTED for {}: expectedNext={} earliest={}", + tp, expectedNext, earliest); + throw new RuntimeException("Truncation detected for " + tp); + } + } + } catch (Exception e) { + log.warn("Failed to check truncation: {}", e.getMessage()); + } + } + @Override public List poll() { if (!consumerAccess.tryAcquire()) { @@ -132,14 +156,18 @@ public List poll() { return null; } try { + detectTruncation(consumer.assignment()); + ConsumerRecords records = consumer.poll(pollTimeout); List sourceRecords = new ArrayList<>(records.count()); for (ConsumerRecord record : records) { SourceRecord converted = convertRecord(record); sourceRecords.add(converted); - TopicPartition topicPartition = new TopicPartition(converted.topic(), converted.kafkaPartition()); - metrics.recordAge(topicPartition, System.currentTimeMillis() - record.timestamp()); - metrics.recordBytes(topicPartition, byteSize(record.value())); + + TopicPartition tp = new TopicPartition(record.topic(), record.partition()); + lastReplicatedOffsets.put(tp, record.offset()); // track latest + metrics.recordAge(tp, System.currentTimeMillis() - record.timestamp()); + metrics.recordBytes(tp, byteSize(record.value())); } if (sourceRecords.isEmpty()) { // WorkerSourceTasks expects non-zero batch size diff --git a/gradlew b/gradlew index f4bb3360e17ee..769f26eff50ca 100755 --- a/gradlew +++ b/gradlew @@ -1,7 +1,7 @@ #!/bin/sh # -# Copyright © 2015-2021 the original authors. +# Copyright © 2015-2021 the original authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,6 +15,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +# SPDX-License-Identifier: Apache-2.0 +# ############################################################################## # @@ -32,10 +34,10 @@ # Busybox and similar reduced shells will NOT work, because this script # requires all of these POSIX shell features: # * functions; -# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», -# «${var#prefix}», «${var%suffix}», and «$( cmd )»; -# * compound commands having a testable exit status, especially «case»; -# * various built-in commands including «command», «set», and «ulimit». +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». # # Important for patching: # @@ -55,7 +57,7 @@ # Darwin, MinGW, and NonStop. # # (3) This script is generated from the Groovy template -# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt # within the Gradle project. # # You can find Gradle at https://github.com/gradle/gradle/. @@ -84,7 +86,7 @@ done # shellcheck disable=SC2034 APP_BASE_NAME=${0##*/} # Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) -APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit +APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s\n' "$PWD" ) || exit # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD=maximum @@ -112,20 +114,7 @@ case "$( uname )" in #( NONSTOP* ) nonstop=true ;; esac - -# Loop in case we encounter an error. -for attempt in 1 2 3; do - if [ ! -e "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" ]; then - if ! curl -s -S --retry 3 -L -o "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" "https://raw.githubusercontent.com/gradle/gradle/v8.14.3/gradle/wrapper/gradle-wrapper.jar"; then - rm -f "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" - # Pause for a bit before looping in case the server throttled us. - sleep 5 - continue - fi - fi -done - -CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar +CLASSPATH="\\\"\\\"" # Determine the Java command to use to start the JVM. @@ -216,7 +205,7 @@ fi DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' # Collect all arguments for the java command: -# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, # and any embedded shellness will be escaped. # * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be # treated as '${Hostname}' itself on the command line. @@ -224,7 +213,20 @@ DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' set -- \ "-Dorg.gradle.appname=$APP_BASE_NAME" \ -classpath "$CLASSPATH" \ - org.gradle.wrapper.GradleWrapperMain \ + +# Loop in case we encounter an error. +for attempt in 1 2 3; do + if [ ! -e "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" ]; then + if ! curl -s -S --retry 3 -L -o "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" "https://raw.githubusercontent.com/gradle/gradle/v8.14.3/gradle/wrapper/gradle-wrapper.jar"; then + rm -f "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" + # Pause for a bit before looping in case the server throttled us. + sleep 5 + continue + fi + fi +done + + -jar "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" \ "$@" # Stop when "xargs" is not available.