fix(connect): correct KAFKA-17719 fix to prevent rebalance storm#3286
fix(connect): correct KAFKA-17719 fix to prevent rebalance storm#3286woshigaopp wants to merge 4 commits into1.6from
Conversation
This reverts commit 83c949f.
…r restart
Root cause: KAFKA-16838 fix added a null check in processTasksCommitRecord()
that calls processConnectorRemoval() when connectorConfigs.get() returns null.
This incorrectly triggers for live connectors when log compaction reorders
records such that task configs + commit appear before the connector config.
Fix approach (full tombstone + defer):
1. removeConnectorConfig() now writes tombstones for ALL related keys
(connector, target-state, all task configs, commit) so that log compaction
will eventually remove all data for deleted connectors. This eliminates
the orphaned task config problem (KAFKA-16838) at the source.
2. processTasksCommitRecord() no longer calls processConnectorRemoval() when
connector config is absent. Instead, it defers processing by storing the
task count and marking the connector as inconsistent. This handles both:
- Compaction reorder (KAFKA-17719): connector config arrives later,
applyDeferredTaskConfigs() applies the deferred task configs.
- Deleted connector with tombstone compacted away: deferred data remains
inert since no connector config will ever arrive.
3. processConnectorConfigRecord() calls applyDeferredTaskConfigs() when a
connector config arrives, to handle the compaction reorder case.
4. processConnectorRemoval() now also clears the inconsistent set.
This approach requires no in-memory state to distinguish between deleted
connectors and compaction reorder - all decisions are based on the actual
data present in the config topic.
There was a problem hiding this comment.
Pull request overview
This PR addresses KAFKA-17719 by preventing task-config rewrites on every rebalance (which can trigger rebalance storms under the eager protocol) and instead handling task/commit vs connector-config ordering issues in the backing store. It also extends connector deletion to write full tombstones to avoid leaving orphaned task configs/commit records behind.
Changes:
- Revert the “always rewrite task configs” behavior in
DistributedHerder.publishConnectorTaskConfigs()to avoid rebalance storms when configs are unchanged. - Add deferred task-config application in
KafkaConfigBackingStorewhen a task commit is observed before the connector config (compaction/retry reorder scenarios). - Write connector deletion “full tombstones” (connector + target state + tasks + commit) and add tests covering reordering and deletion/tombstone scenarios.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java |
Restores early-return when task configs are unchanged to prevent repeated rewrites/rebalance storms. |
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java |
Defers and later applies task configs when commits arrive before connector configs; writes full tombstones on deletion. |
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java |
Adds unit tests for compaction/reorder cases and full-tombstone deletion behavior. |
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java |
Removes assertions that expected task configs to always be rewritten. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // which would cause task configs to be lost on worker restart. | ||
| if (log.isDebugEnabled() && !taskConfigsChanged(configState, connName, rawTaskProps)) { | ||
| log.debug("Task configs for connector '{}' are unchanged, but rewriting to prevent compaction reorder (KAFKA-17719)", connName); | ||
| if (!taskConfigsChanged(configState, connName, rawTaskProps)) { |
There was a problem hiding this comment.
publishConnectorTaskConfigs returns early when task configs are unchanged, but it does not invoke the provided callback. Since callers (e.g., reconfigureConnector) treat the callback as the completion signal, this can leave request/retry logic waiting indefinitely. Call cb.onCompletion(null, null) before returning in the unchanged-configs case.
| if (!taskConfigsChanged(configState, connName, rawTaskProps)) { | |
| if (!taskConfigsChanged(configState, connName, rawTaskProps)) { | |
| cb.onCompletion(null, null); |
| Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connectorName); | ||
| Set<Integer> taskIdSet = taskIds(connectorName, deferred); | ||
| if (completeTaskIdSet(taskIdSet, pendingTaskCount)) { | ||
| if (deferred != null) { | ||
| taskConfigs.putAll(deferred); | ||
| connectorTaskConfigGenerations.compute(connectorName, | ||
| (ignored, generation) -> generation != null ? generation + 1 : 0); | ||
| deferred.clear(); | ||
| } | ||
| inconsistent.remove(connectorName); | ||
| connectorsPendingFencing.add(connectorName); |
There was a problem hiding this comment.
applyDeferredTaskConfigs can apply deferred task configs and update connector state, but it never triggers updateListener.onTaskConfigUpdate for the tasks it applied. If a tasks commit is received before the connector config while the store is started, the earlier commit is deferred (no listener call), and this method later applies the configs without notifying the herder, so tasks may never be reconfigured. Consider collecting the applied task IDs here (or returning them) and invoking onTaskConfigUpdate after releasing the lock, similar to processTasksCommitRecord.
| if (taskCount != null) { | ||
| // Also collect task IDs from taskConfigs and deferredTaskUpdates to handle the case | ||
| // where task count was reduced (old task keys with higher indices still exist) | ||
| Set<Integer> allTaskIds = new HashSet<>(); | ||
| for (int i = 0; i < taskCount; i++) { | ||
| allTaskIds.add(i); | ||
| } | ||
| for (ConnectorTaskId taskId : taskConfigs.keySet()) { | ||
| if (taskId.connector().equals(connector)) { | ||
| allTaskIds.add(taskId.task()); | ||
| } | ||
| } | ||
| Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connector); | ||
| if (deferred != null) { | ||
| for (ConnectorTaskId taskId : deferred.keySet()) { | ||
| allTaskIds.add(taskId.task()); | ||
| } | ||
| } | ||
| for (int taskId : allTaskIds) { | ||
| keyValues.add(new ProducerKeyValue(TASK_KEY(new ConnectorTaskId(connector, taskId)), null)); | ||
| } | ||
| keyValues.add(new ProducerKeyValue(COMMIT_TASKS_KEY(connector), null)); | ||
| } |
There was a problem hiding this comment.
removeConnectorConfig only writes task/commit tombstones when connectorTaskCounts contains an entry. If that entry is missing (e.g., connector deleted before task commit was processed, or state not fully loaded), orphaned commit/task records can still persist. At minimum, consider always writing a tombstone for COMMIT_TASKS_KEY(connector), and deriving task IDs from taskConfigs/deferredTaskUpdates even when taskCount is null.
| if (taskCount != null) { | |
| // Also collect task IDs from taskConfigs and deferredTaskUpdates to handle the case | |
| // where task count was reduced (old task keys with higher indices still exist) | |
| Set<Integer> allTaskIds = new HashSet<>(); | |
| for (int i = 0; i < taskCount; i++) { | |
| allTaskIds.add(i); | |
| } | |
| for (ConnectorTaskId taskId : taskConfigs.keySet()) { | |
| if (taskId.connector().equals(connector)) { | |
| allTaskIds.add(taskId.task()); | |
| } | |
| } | |
| Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connector); | |
| if (deferred != null) { | |
| for (ConnectorTaskId taskId : deferred.keySet()) { | |
| allTaskIds.add(taskId.task()); | |
| } | |
| } | |
| for (int taskId : allTaskIds) { | |
| keyValues.add(new ProducerKeyValue(TASK_KEY(new ConnectorTaskId(connector, taskId)), null)); | |
| } | |
| keyValues.add(new ProducerKeyValue(COMMIT_TASKS_KEY(connector), null)); | |
| } | |
| // Collect task IDs from all available sources so cleanup still works even if | |
| // connectorTaskCounts does not currently contain an entry for this connector. | |
| Set<Integer> allTaskIds = new HashSet<>(); | |
| if (taskCount != null) { | |
| for (int i = 0; i < taskCount; i++) { | |
| allTaskIds.add(i); | |
| } | |
| } | |
| for (ConnectorTaskId taskId : taskConfigs.keySet()) { | |
| if (taskId.connector().equals(connector)) { | |
| allTaskIds.add(taskId.task()); | |
| } | |
| } | |
| Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connector); | |
| if (deferred != null) { | |
| for (ConnectorTaskId taskId : deferred.keySet()) { | |
| allTaskIds.add(taskId.task()); | |
| } | |
| } | |
| for (int taskId : allTaskIds) { | |
| keyValues.add(new ProducerKeyValue(TASK_KEY(new ConnectorTaskId(connector, taskId)), null)); | |
| } | |
| keyValues.add(new ProducerKeyValue(COMMIT_TASKS_KEY(connector), null)); |
| // Write tombstones for all task config keys and the commit key so that log compaction | ||
| // will eventually remove them. Without these tombstones, orphaned task configs and commit | ||
| // records would persist indefinitely in the config topic after the connector is deleted, | ||
| // which is the root cause of KAFKA-16838. |
There was a problem hiding this comment.
This method now writes tombstones for individual task config keys. KafkaConfigBackingStore currently treats null-valued task config records as an unexpected error (and does not process them as deletions), which will generate error logs during normal connector deletion. Consider updating task-record handling to treat null values as expected tombstones (remove task configs/deferred entries, and log at debug/trace).
| CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty())); | ||
| LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>(); | ||
| deserialized.put(CONFIGS_SERIALIZED.get(0), TASK_CONFIG_STRUCTS.get(0)); | ||
| deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(1)); | ||
| deserialized.put(CONFIGS_SERIALIZED.get(2), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); | ||
| deserialized.put(CONFIGS_SERIALIZED.get(3), null); // tombstone: structToMap(null) → null → SchemaAndValue(null, null) |
There was a problem hiding this comment.
The tombstone simulation uses a non-null record.value() byte[] that is mapped to a null Struct in the converter stub. Using an actual Kafka tombstone (ConsumerRecord with null value bytes) would better exercise the real converter/toConnectData path for tombstones and reduce the chance of masking null-handling issues.
| CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty())); | |
| LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>(); | |
| deserialized.put(CONFIGS_SERIALIZED.get(0), TASK_CONFIG_STRUCTS.get(0)); | |
| deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(1)); | |
| deserialized.put(CONFIGS_SERIALIZED.get(2), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); | |
| deserialized.put(CONFIGS_SERIALIZED.get(3), null); // tombstone: structToMap(null) → null → SchemaAndValue(null, null) | |
| null, new RecordHeaders(), Optional.empty())); | |
| LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>(); | |
| deserialized.put(CONFIGS_SERIALIZED.get(0), TASK_CONFIG_STRUCTS.get(0)); | |
| deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(1)); | |
| deserialized.put(CONFIGS_SERIALIZED.get(2), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); | |
| deserialized.put(null, null); // tombstone: record.value() is null and converter returns SchemaAndValue(null, null) |
Summary
83c949f515which caused E2E test failuresfix/KAFKA-17719-full-tombstone-v2branchProblem
The original fix removed the
returnstatement inpublishConnectorTaskConfigs(), causing task configs to be rewritten on every rebalance even when unchanged. This triggers a rebalance storm undereagerprotocol:Observed: 83 rebalances in 10 minutes, connectors never stabilize.
Fix Approach
DistributedHerderKafkaConfigBackingStoreto handle compaction reorderTest Plan
eagerprotocol🤖 Generated with Claude Code