Skip to content

Commit a97fb66

Browse files
MINOR: add testRestoreCompactedDeletedConnector back to KafkaConfigBackingStoreTest (apache#18392)
Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 058f0a9 commit a97fb66

File tree

1 file changed

+53
-1
lines changed

1 file changed

+53
-1
lines changed

connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import java.util.List;
7070
import java.util.Map;
7171
import java.util.Optional;
72+
import java.util.Set;
7273
import java.util.concurrent.CompletableFuture;
7374
import java.util.concurrent.ExecutionException;
7475
import java.util.concurrent.Future;
@@ -174,7 +175,8 @@ public class KafkaConfigBackingStoreTest {
174175
.put("state.v2", "STOPPED");
175176
private static final List<Struct> CONNECTOR_TASK_COUNT_RECORD_STRUCTS = Arrays.asList(
176177
new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 6),
177-
new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9)
178+
new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9),
179+
new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 2)
178180
);
179181

180182
// The exact format doesn't matter here since both conversions are mocked
@@ -818,6 +820,56 @@ public void testRestoreZeroTasks() {
818820
verify(configLog).stop();
819821
}
820822

823+
@Test
824+
public void testRestoreCompactedDeletedConnector() {
825+
// When a connector is deleted, we emit a tombstone record for its config (with key
826+
// "connector-<name>") and its target state (with key "target-state-<name>"), but not
827+
// for its task configs
828+
// As a result, we need to carefully handle the case where task configs are present in
829+
// the config topic for a connector, but there is no accompanying config for the
830+
// connector itself
831+
832+
int offset = 0;
833+
List<ConsumerRecord<String, byte[]>> existingRecords = List.of(
834+
new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0,
835+
TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()),
836+
new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0,
837+
TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()),
838+
new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0,
839+
COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()),
840+
new ConsumerRecord<>(TOPIC, 0, offset++, 0L, TimestampType.CREATE_TIME, 0, 0,
841+
CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty()));
842+
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
843+
deserialized.put(CONFIGS_SERIALIZED.get(0), TASK_CONFIG_STRUCTS.get(0));
844+
deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
845+
deserialized.put(CONFIGS_SERIALIZED.get(2), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
846+
deserialized.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(2));
847+
logOffset = offset;
848+
expectStart(existingRecords, deserialized);
849+
when(configLog.partitionCount()).thenReturn(1);
850+
851+
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
852+
verifyConfigure();
853+
configStorage.start();
854+
855+
// Should see no connectors and no task configs
856+
ClusterConfigState configState = configStorage.snapshot();
857+
assertEquals(Set.of(), configState.connectors());
858+
assertEquals(0, configState.taskCount(CONNECTOR_1_NAME));
859+
assertNull(configState.rawTaskConfig(TASK_IDS.get(0)));
860+
assertNull(configState.rawTaskConfig(TASK_IDS.get(1)));
861+
862+
// Probe internal collections just to be sure
863+
assertEquals(Map.of(), configState.connectorConfigs);
864+
assertEquals(Map.of(), configState.taskConfigs);
865+
assertEquals(Map.of(), configState.connectorTaskCounts);
866+
867+
// Exception: we still include task count records, for the unlikely-but-possible case
868+
// where there are still zombie instances of the tasks for this long-deleted connector
869+
// running somewhere on the cluster
870+
assertEquals(2, (int) configState.taskCountRecord(CONNECTOR_1_NAME));
871+
}
872+
821873
@Test
822874
public void testRecordToRestartRequest() {
823875
ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),

0 commit comments

Comments
 (0)