Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Collection;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,6 +51,8 @@ public class MirrorSourceTask extends SourceTask {

private static final Logger log = LoggerFactory.getLogger(MirrorSourceTask.class);

private final Map<TopicPartition, Long> lastReplicatedOffsets = new ConcurrentHashMap<>();

private KafkaConsumer<byte[], byte[]> consumer;
private String sourceClusterAlias;
private Duration pollTimeout;
Expand Down Expand Up @@ -123,6 +129,24 @@ public String version() {
return new MirrorSourceConnector().version();
}

private void detectTruncation(Collection<TopicPartition> partitions) {
if (partitions == null || partitions.isEmpty()) return;
try {
Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(partitions);
for (TopicPartition tp : partitions) {
long expectedNext = lastReplicatedOffsets.getOrDefault(tp, -1L) + 1;
long earliest = earliestOffsets.get(tp);
if (expectedNext >= 0 && expectedNext < earliest) {
log.error("DATA TRUNCATION DETECTED for {}: expectedNext={} earliest={}",
tp, expectedNext, earliest);
throw new RuntimeException("Truncation detected for " + tp);
}
}
} catch (Exception e) {
log.warn("Failed to check truncation: {}", e.getMessage());
}
}

@Override
public List<SourceRecord> poll() {
if (!consumerAccess.tryAcquire()) {
Expand All @@ -132,14 +156,18 @@ public List<SourceRecord> poll() {
return null;
}
try {
detectTruncation(consumer.assignment());

ConsumerRecords<byte[], byte[]> records = consumer.poll(pollTimeout);
List<SourceRecord> sourceRecords = new ArrayList<>(records.count());
for (ConsumerRecord<byte[], byte[]> record : records) {
SourceRecord converted = convertRecord(record);
sourceRecords.add(converted);
TopicPartition topicPartition = new TopicPartition(converted.topic(), converted.kafkaPartition());
metrics.recordAge(topicPartition, System.currentTimeMillis() - record.timestamp());
metrics.recordBytes(topicPartition, byteSize(record.value()));

TopicPartition tp = new TopicPartition(record.topic(), record.partition());
lastReplicatedOffsets.put(tp, record.offset()); // track latest
metrics.recordAge(tp, System.currentTimeMillis() - record.timestamp());
metrics.recordBytes(tp, byteSize(record.value()));
}
if (sourceRecords.isEmpty()) {
// WorkerSourceTasks expects non-zero batch size
Expand Down
48 changes: 25 additions & 23 deletions gradlew
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/sh

#
# Copyright © 2015-2021 the original authors.
# Copyright 2015-2021 the original authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
#

##############################################################################
#
Expand All @@ -32,10 +34,10 @@
# Busybox and similar reduced shells will NOT work, because this script
# requires all of these POSIX shell features:
# * functions;
# * expansions «$var», «${var}», «${var:-default}», «${var+SET}»,
# «${var#prefix}», «${var%suffix}», and «$( cmd )»;
# * compound commands having a testable exit status, especially «case»;
# * various built-in commands including «command», «set», and «ulimit».
# * expansions $var�, �${var}�, �${var:-default}�, �${var+SET},
# ${var#prefix}�, �${var%suffix}, and $( cmd );
# * compound commands having a testable exit status, especially case;
# * various built-in commands including command�, �set, and ulimit.
#
# Important for patching:
#
Expand All @@ -55,7 +57,7 @@
# Darwin, MinGW, and NonStop.
#
# (3) This script is generated from the Groovy template
# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
# within the Gradle project.
#
# You can find Gradle at https://github.com/gradle/gradle/.
Expand Down Expand Up @@ -84,7 +86,7 @@ done
# shellcheck disable=SC2034
APP_BASE_NAME=${0##*/}
# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit
APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s\n' "$PWD" ) || exit

# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD=maximum
Expand Down Expand Up @@ -112,20 +114,7 @@ case "$( uname )" in #(
NONSTOP* ) nonstop=true ;;
esac


# Loop in case we encounter an error.
for attempt in 1 2 3; do
if [ ! -e "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" ]; then
if ! curl -s -S --retry 3 -L -o "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" "https://raw.githubusercontent.com/gradle/gradle/v8.14.3/gradle/wrapper/gradle-wrapper.jar"; then
rm -f "$APP_HOME/gradle/wrapper/gradle-wrapper.jar"
# Pause for a bit before looping in case the server throttled us.
sleep 5
continue
fi
fi
done

CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
CLASSPATH="\\\"\\\""


# Determine the Java command to use to start the JVM.
Expand Down Expand Up @@ -216,15 +205,28 @@ fi
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'

# Collect all arguments for the java command:
# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
# * DEFAULT_JVM_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
# and any embedded shellness will be escaped.
# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
# treated as '${Hostname}' itself on the command line.

set -- \
"-Dorg.gradle.appname=$APP_BASE_NAME" \
-classpath "$CLASSPATH" \
org.gradle.wrapper.GradleWrapperMain \

# Loop in case we encounter an error.
for attempt in 1 2 3; do
if [ ! -e "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" ]; then
if ! curl -s -S --retry 3 -L -o "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" "https://raw.githubusercontent.com/gradle/gradle/v8.14.3/gradle/wrapper/gradle-wrapper.jar"; then
rm -f "$APP_HOME/gradle/wrapper/gradle-wrapper.jar"
# Pause for a bit before looping in case the server throttled us.
sleep 5
continue
fi
fi
done

-jar "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" \
"$@"

# Stop when "xargs" is not available.
Expand Down