Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2258,12 +2258,8 @@ private void reconfigureConnector(final String connName, final Callback<Void> cb

private void publishConnectorTaskConfigs(String connName, List<Map<String, String>> taskProps, Callback<Void> cb) {
List<Map<String, String>> rawTaskProps = reverseTransform(connName, configState, taskProps);
// KAFKA-17719: Always rewrite task configs to the config topic, even if unchanged.
// This ensures task config offsets are always greater than the connector config offset,
// preventing log compaction from reordering records into TCC order (task-commit-connector)
// which would cause task configs to be lost on worker restart.
if (log.isDebugEnabled() && !taskConfigsChanged(configState, connName, rawTaskProps)) {
log.debug("Task configs for connector '{}' are unchanged, but rewriting to prevent compaction reorder (KAFKA-17719)", connName);
if (!taskConfigsChanged(configState, connName, rawTaskProps)) {
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

publishConnectorTaskConfigs returns early when task configs are unchanged, but it does not invoke the provided callback. Since callers (e.g., reconfigureConnector) treat the callback as the completion signal, this can leave request/retry logic waiting indefinitely. Call cb.onCompletion(null, null) before returning in the unchanged-configs case.

Suggested change
if (!taskConfigsChanged(configState, connName, rawTaskProps)) {
if (!taskConfigsChanged(configState, connName, rawTaskProps)) {
cb.onCompletion(null, null);

Copilot uses AI. Check for mistakes.
return;
}

if (isLeader()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,10 +550,41 @@ public void removeConnectorConfig(String connector) {
log.debug("Removing connector configuration for connector '{}'", connector);
try {
Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
List<ProducerKeyValue> keyValues = Arrays.asList(
new ProducerKeyValue(CONNECTOR_KEY(connector), null),
new ProducerKeyValue(TARGET_STATE_KEY(connector), null)
);
List<ProducerKeyValue> keyValues = new ArrayList<>();
keyValues.add(new ProducerKeyValue(CONNECTOR_KEY(connector), null));
keyValues.add(new ProducerKeyValue(TARGET_STATE_KEY(connector), null));

// Write tombstones for all task config keys and the commit key so that log compaction
// will eventually remove them. Without these tombstones, orphaned task configs and commit
// records would persist indefinitely in the config topic after the connector is deleted,
// which is the root cause of KAFKA-16838.
Comment on lines +557 to +560
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method now writes tombstones for individual task config keys. KafkaConfigBackingStore currently treats null-valued task config records as an unexpected error (and does not process them as deletions), which will generate error logs during normal connector deletion. Consider updating task-record handling to treat null values as expected tombstones (remove task configs/deferred entries, and log at debug/trace).

Copilot uses AI. Check for mistakes.
synchronized (lock) {
Integer taskCount = connectorTaskCounts.get(connector);
if (taskCount != null) {
// Also collect task IDs from taskConfigs and deferredTaskUpdates to handle the case
// where task count was reduced (old task keys with higher indices still exist)
Set<Integer> allTaskIds = new HashSet<>();
for (int i = 0; i < taskCount; i++) {
allTaskIds.add(i);
}
for (ConnectorTaskId taskId : taskConfigs.keySet()) {
if (taskId.connector().equals(connector)) {
allTaskIds.add(taskId.task());
}
}
Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connector);
if (deferred != null) {
for (ConnectorTaskId taskId : deferred.keySet()) {
allTaskIds.add(taskId.task());
}
}
for (int taskId : allTaskIds) {
keyValues.add(new ProducerKeyValue(TASK_KEY(new ConnectorTaskId(connector, taskId)), null));
}
keyValues.add(new ProducerKeyValue(COMMIT_TASKS_KEY(connector), null));
}
Comment on lines +563 to +585
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removeConnectorConfig only writes task/commit tombstones when connectorTaskCounts contains an entry. If that entry is missing (e.g., connector deleted before task commit was processed, or state not fully loaded), orphaned commit/task records can still persist. At minimum, consider always writing a tombstone for COMMIT_TASKS_KEY(connector), and deriving task IDs from taskConfigs/deferredTaskUpdates even when taskCount is null.

Suggested change
if (taskCount != null) {
// Also collect task IDs from taskConfigs and deferredTaskUpdates to handle the case
// where task count was reduced (old task keys with higher indices still exist)
Set<Integer> allTaskIds = new HashSet<>();
for (int i = 0; i < taskCount; i++) {
allTaskIds.add(i);
}
for (ConnectorTaskId taskId : taskConfigs.keySet()) {
if (taskId.connector().equals(connector)) {
allTaskIds.add(taskId.task());
}
}
Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connector);
if (deferred != null) {
for (ConnectorTaskId taskId : deferred.keySet()) {
allTaskIds.add(taskId.task());
}
}
for (int taskId : allTaskIds) {
keyValues.add(new ProducerKeyValue(TASK_KEY(new ConnectorTaskId(connector, taskId)), null));
}
keyValues.add(new ProducerKeyValue(COMMIT_TASKS_KEY(connector), null));
}
// Collect task IDs from all available sources so cleanup still works even if
// connectorTaskCounts does not currently contain an entry for this connector.
Set<Integer> allTaskIds = new HashSet<>();
if (taskCount != null) {
for (int i = 0; i < taskCount; i++) {
allTaskIds.add(i);
}
}
for (ConnectorTaskId taskId : taskConfigs.keySet()) {
if (taskId.connector().equals(connector)) {
allTaskIds.add(taskId.task());
}
}
Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connector);
if (deferred != null) {
for (ConnectorTaskId taskId : deferred.keySet()) {
allTaskIds.add(taskId.task());
}
}
for (int taskId : allTaskIds) {
keyValues.add(new ProducerKeyValue(TASK_KEY(new ConnectorTaskId(connector, taskId)), null));
}
keyValues.add(new ProducerKeyValue(COMMIT_TASKS_KEY(connector), null));

Copilot uses AI. Check for mistakes.
}

sendPrivileged(keyValues, timer);

configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -1039,6 +1070,11 @@ private void processConnectorConfigRecord(String connectorName, SchemaAndValue v
// which were created with 0.9 Connect will be initialized in the STARTED state.
if (!connectorTargetStates.containsKey(connectorName))
connectorTargetStates.put(connectorName, TargetState.STARTED);

// If a task commit was previously received before this connector config (due to log compaction
// reordering), the task configs were deferred. Now that the connector config has arrived,
// attempt to apply those deferred task configs. See KAFKA-17719.
applyDeferredTaskConfigs(connectorName);
}
}
if (started) {
Expand Down Expand Up @@ -1078,18 +1114,38 @@ private void processTaskConfigRecord(ConnectorTaskId taskId, SchemaAndValue valu
private void processTasksCommitRecord(String connectorName, SchemaAndValue value) {
List<ConnectorTaskId> updatedTasks = new ArrayList<>();
synchronized (lock) {
// Edge case: connector was deleted before these task configs were published,
// but compaction took place and both the original connector config and the
// tombstone message for it have been removed from the config topic
// We should ignore these task configs
Map<String, String> appliedConnectorConfig = connectorConfigs.get(connectorName);
if (appliedConnectorConfig == null) {
processConnectorRemoval(connectorName);
log.debug(
"Ignoring task configs for connector {}; it appears that the connector was deleted previously "
+ "and that log compaction has since removed any trace of its previous configurations "
+ "from the config topic",
connectorName
// The connector config is not present. This can happen when log compaction reorders
// records (KAFKA-17719) such that the task commit appears before the connector config,
// or when orphaned task configs remain from a previously-deleted connector whose
// tombstone has been removed by delete.retention.ms.
//
// For tombstone records (written during connector deletion), simply ignore them.
if (value.value() == null) {
log.debug("Ignoring tombstone commit record for connector '{}' whose config is not present",
connectorName);
return;
}
if (!(value.value() instanceof Map)) {
log.error("Ignoring connector tasks configuration commit for connector '{}' "
+ "because it is in the wrong format: {}", connectorName, className(value.value()));
return;
}
// Defer processing: store the task count and mark the connector as inconsistent.
// If the connector config arrives later (compaction reorder), applyDeferredTaskConfigs()
// will apply the deferred task configs at that point. If no connector config ever arrives
// (deleted connector), the deferred data remains inert.
@SuppressWarnings("unchecked")
int newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks"));
connectorTaskCounts.put(connectorName, newTaskCount);
inconsistent.add(connectorName);
log.warn(
"Received task commit for connector '{}' (task count: {}) but its connector config "
+ "is not yet present. This may be due to log compaction reordering records "
+ "in the config topic (KAFKA-17719), or the connector may have been deleted. "
+ "Deferring task config application until the connector config is received.",
connectorName, newTaskCount
);
return;
}
Expand Down Expand Up @@ -1279,12 +1335,64 @@ private void processLoggerLevelRecord(String namespace, SchemaAndValue value) {
}
}

/**
* Attempt to apply deferred task configs that were received before the connector config
* due to log compaction reordering records in the config topic (KAFKA-17719).
*
* <p>When log compaction reorders records, the task configs and commit record may appear
* before the connector config record. In that case, {@link #processTasksCommitRecord} stores
* the task count in {@link #connectorTaskCounts} and marks the connector as inconsistent,
* while the task configs remain in {@link #deferredTaskUpdates}. When the connector config
* finally arrives and this method is called, it attempts to apply those deferred task configs.</p>
*
* <p>This method must be called while holding {@link #lock}.</p>
*
* @param connectorName the name of the connector whose deferred task configs should be applied
*/
private void applyDeferredTaskConfigs(String connectorName) {
Integer pendingTaskCount = connectorTaskCounts.get(connectorName);
if (pendingTaskCount == null || !inconsistent.contains(connectorName)) {
return;
}

Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connectorName);
Set<Integer> taskIdSet = taskIds(connectorName, deferred);
if (completeTaskIdSet(taskIdSet, pendingTaskCount)) {
if (deferred != null) {
taskConfigs.putAll(deferred);
connectorTaskConfigGenerations.compute(connectorName,
(ignored, generation) -> generation != null ? generation + 1 : 0);
deferred.clear();
}
inconsistent.remove(connectorName);
connectorsPendingFencing.add(connectorName);
Comment on lines +1358 to +1368
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

applyDeferredTaskConfigs can apply deferred task configs and update connector state, but it never triggers updateListener.onTaskConfigUpdate for the tasks it applied. If a tasks commit is received before the connector config while the store is started, the earlier commit is deferred (no listener call), and this method later applies the configs without notifying the herder, so tasks may never be reconfigured. Consider collecting the applied task IDs here (or returning them) and invoking onTaskConfigUpdate after releasing the lock, similar to processTasksCommitRecord.

Copilot uses AI. Check for mistakes.

Map<String, String> currentConfig = connectorConfigs.get(connectorName);
if (currentConfig != null) {
appliedConnectorConfigs.put(
connectorName,
new AppliedConnectorConfig(currentConfig)
);
}

log.info("Successfully applied deferred task configs for connector '{}' after receiving "
+ "its connector config (likely due to log compaction reordering, see KAFKA-17719)",
connectorName);
} else {
log.warn("Connector '{}' has deferred task configs but they are incomplete "
+ "(expected {} tasks, got task IDs {}). Connector will remain inconsistent "
+ "until a complete set of task configs is written.",
connectorName, pendingTaskCount, taskIdSet);
}
}

private void processConnectorRemoval(String connectorName) {
connectorConfigs.remove(connectorName);
connectorTaskCounts.remove(connectorName);
taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName));
deferredTaskUpdates.remove(connectorName);
appliedConnectorConfigs.remove(connectorName);
inconsistent.remove(connectorName);
}

private ConnectorTaskId parseTaskId(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -1047,8 +1046,6 @@ public void testDestroyConnector() {
time.sleep(1000L);
assertStatistics("leaderUrl", true, 3, 1, 100, 2100L);

// KAFKA-17719: task configs are always rewritten to prevent compaction reorder
verify(configBackingStore, atLeast(0)).putTaskConfigs(any(), any());
verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore, putConnectorCallback);

assertEquals(
Expand Down Expand Up @@ -1098,8 +1095,6 @@ public void testRestartConnector() throws Exception {
verify(worker, times(2)).startConnector(eq(CONN1), any(), any(), eq(herder), eq(TargetState.STARTED), any());
verify(worker, times(2)).connectorTaskConfigs(eq(CONN1), any());
verify(worker).stopAndAwaitConnector(CONN1);
// KAFKA-17719: task configs are always rewritten to prevent compaction reorder
verify(configBackingStore, atLeast(0)).putTaskConfigs(any(), any());
verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore);
}

Expand Down Expand Up @@ -2137,8 +2132,6 @@ public void testJoinLeaderCatchUpFails() throws Exception {
herder.tick();

verify(configBackingStore, times(2)).refresh(anyLong(), any(TimeUnit.class));
// KAFKA-17719: task configs are always rewritten to prevent compaction reorder
verify(configBackingStore, atLeast(0)).putTaskConfigs(any(), any());
verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore);
}

Expand Down Expand Up @@ -2215,8 +2208,6 @@ public void testJoinLeaderCatchUpRetriesForIncrementalCooperative() throws Excep
herder.tick();
assertEquals(before + coordinatorDiscoveryTimeoutMs, time.milliseconds());

// KAFKA-17719: task configs are always rewritten to prevent compaction reorder
verify(configBackingStore, atLeast(0)).putTaskConfigs(any(), any());
verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore);
}

Expand Down Expand Up @@ -2301,8 +2292,6 @@ public void testJoinLeaderCatchUpFailsForIncrementalCooperative() throws Excepti

herder.tick();

// KAFKA-17719: task configs are always rewritten to prevent compaction reorder
verify(configBackingStore, atLeast(0)).putTaskConfigs(any(), any());
verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore);
}

Expand Down Expand Up @@ -2444,8 +2433,6 @@ public void testPutConnectorConfig() throws Exception {

// Once after initial rebalance and assignment; another after config update
verify(worker, times(2)).startConnector(eq(CONN1), any(), any(), eq(herder), eq(TargetState.STARTED), any());
// KAFKA-17719: task configs are always rewritten to prevent compaction reorder
verify(configBackingStore, atLeast(0)).putTaskConfigs(any(), any());
verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore);
}

Expand Down
Loading
Loading