Skip to content

Commit 8fa7e09

Browse files
KAFKA-19434: Startup state stores initialization (#20749)
Instead of creating Standby tasks from the state directory, we open the local stores that exists in the state directory. This resolves the issue raised in [#KAFKA-19434 ](https://issues.apache.org/jira/browse/KAFKA-19434), where store-specific metrics were being duplicated due to tasks being created in the main thread and then assigned to a StreamThread. Additionally, since we can now read the offsets from the store during instance initialization, this clears the way for the implementation of KIP-1035. As of now, the stores are loading the offsets from the checkpoint file, but in a later PR, we will read these offsets from the state store itself. This PR modifies the behavior of Kafka Streams when initializing. Now for each pre-existing store on the state directory: We open the store, read offsets from the checkpoint file and then close it again. The reason why we open the store is because the store will be responsible for tracking the offsets and we will deprecate the checkpoint file. Reviewers: Nikiita Shuplestov<nshupletsov@confluent.io>, Bill Bejeck<bbejeck@apache.org>
1 parent 2d68b60 commit 8fa7e09

File tree

8 files changed

+248
-251
lines changed

8 files changed

+248
-251
lines changed

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,11 @@ public void shouldPassMetrics(final String topologyType, final String groupProto
284284
streamsApplicationProperties = props(groupProtocol);
285285
final Topology topology = topologyType.equals("simple") ? simpleTopology(false) : complexTopology();
286286

287+
shouldPassMetrics(topology, FIRST_INSTANCE_CLIENT);
288+
shouldPassMetrics(topology, SECOND_INSTANCE_CLIENT);
289+
}
290+
291+
private void shouldPassMetrics(final Topology topology, final int clientInstance) throws Exception {
287292
try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) {
288293
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
289294

@@ -295,8 +300,8 @@ public void shouldPassMetrics(final String topologyType, final String groupProto
295300

296301

297302

298-
final List<MetricName> consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics().stream().map(KafkaMetric::metricName).toList();
299-
final List<MetricName> adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).toList();
303+
final List<MetricName> consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(clientInstance).passedMetrics().stream().map(KafkaMetric::metricName).toList();
304+
final List<MetricName> adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(clientInstance).passedMetrics.stream().map(KafkaMetric::metricName).toList();
300305

301306

302307
assertEquals(streamsThreadMetrics.size(), consumerPassedStreamThreadMetricNames.size());

streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -641,9 +641,6 @@ private void maybeSetRunning() {
641641
return;
642642
}
643643

644-
// all (alive) threads have received their assignment, close any remaining startup tasks, they're not needed
645-
stateDirectory.closeStartupTasks();
646-
647644
setState(State.RUNNING);
648645
}
649646

@@ -1379,8 +1376,8 @@ private static HostInfo parseHostInfo(final String endPoint) {
13791376
*/
13801377
public synchronized void start() throws IllegalStateException, StreamsException {
13811378
if (setState(State.REBALANCING)) {
1382-
log.debug("Initializing STANDBY tasks for existing local state");
1383-
stateDirectory.initializeStartupTasks(topologyMetadata, streamsMetrics, logContext);
1379+
log.debug("Initializing store offsets for existing local state");
1380+
stateDirectory.initializeStartupStores(topologyMetadata, logContext, streamsMetrics);
13841381

13851382
log.debug("Starting Streams client");
13861383

streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -229,19 +229,6 @@ static ProcessorStateManager createStartupTaskStateManager(final TaskId taskId,
229229
return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, storeToChangelogTopic, sourcePartitions);
230230
}
231231

232-
/**
233-
* Standby tasks initialized for local state on-startup are only partially initialized, because they are not yet
234-
* assigned to a StreamThread. Once assigned to a StreamThread, we complete their initialization here using the
235-
* assigned StreamThread's context.
236-
*/
237-
void assignToStreamThread(final LogContext logContext,
238-
final Collection<TopicPartition> sourcePartitions) {
239-
this.sourcePartitions.clear();
240-
this.log = logContext.logger(ProcessorStateManager.class);
241-
this.logPrefix = logContext.logPrefix();
242-
this.sourcePartitions.addAll(sourcePartitions);
243-
}
244-
245232
void registerStateStores(final List<StateStore> allStores, final InternalProcessorContext<?, ?> processorContext) {
246233
processorContext.uninitialize();
247234
for (final StateStore store : allStores) {

streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java

Lines changed: 149 additions & 71 deletions
Large diffs are not rendered by default.

streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

Lines changed: 36 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -318,26 +318,34 @@ private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, fina
318318
}
319319
}
320320

321-
private Map<Task, Set<TopicPartition>> assignStartupTasks(final Map<TaskId, Set<TopicPartition>> tasksToAssign,
322-
final String threadLogPrefix) {
321+
private Collection<Task> assignActiveTaskFromStartupState(final Map<TaskId, Set<TopicPartition>> tasksToAssign) {
323322
if (stateDirectory.hasStartupTasks()) {
324-
final Map<Task, Set<TopicPartition>> assignedTasks = new HashMap<>(tasksToAssign.size());
323+
final Map<TaskId, Set<TopicPartition>> assignedTasks = new HashMap<>(tasksToAssign.size());
325324
for (final Map.Entry<TaskId, Set<TopicPartition>> entry : tasksToAssign.entrySet()) {
326325
final TaskId taskId = entry.getKey();
327-
final Task task = stateDirectory.removeStartupTask(taskId);
328-
if (task != null) {
329-
// replace our dummy values with the real ones, now we know our thread and assignment
330-
final Set<TopicPartition> inputPartitions = entry.getValue();
331-
task.stateManager().assignToStreamThread(new LogContext(threadLogPrefix), inputPartitions);
332-
updateInputPartitionsOfStandbyTaskIfTheyChanged(task, inputPartitions);
333-
334-
assignedTasks.put(task, inputPartitions);
326+
if (stateDirectory.removeStartupState(taskId)) {
327+
assignedTasks.put(taskId, entry.getValue());
335328
}
336329
}
330+
return activeTaskCreator.createTasks(mainConsumer, assignedTasks);
331+
} else {
332+
return Collections.emptySet();
333+
}
334+
}
337335

338-
return assignedTasks;
336+
private Collection<Task> assignStartupTasks(final Map<TaskId, Set<TopicPartition>> tasksToAssign) {
337+
if (stateDirectory.hasStartupTasks()) {
338+
final Map<TaskId, Set<TopicPartition>> assignedTasks = new HashMap<>(tasksToAssign.size());
339+
for (final Map.Entry<TaskId, Set<TopicPartition>> entry : tasksToAssign.entrySet()) {
340+
final TaskId taskId = entry.getKey();
341+
if (stateDirectory.removeStartupState(taskId)) {
342+
final Set<TopicPartition> inputPartitions = entry.getValue();
343+
assignedTasks.put(taskId, inputPartitions);
344+
}
345+
}
346+
return standbyTaskCreator.createTasks(assignedTasks);
339347
} else {
340-
return Collections.emptyMap();
348+
return Collections.emptySet();
341349
}
342350
}
343351

@@ -484,7 +492,7 @@ private void handleTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCre
484492
final Set<Task> tasksToCloseClean,
485493
final Map<TaskId, RuntimeException> failedTasks) {
486494
handleTasksPendingInitialization();
487-
handleStartupTaskReuse(activeTasksToCreate, standbyTasksToCreate, failedTasks);
495+
handleExistingStateForTasks(activeTasksToCreate, standbyTasksToCreate);
488496
handleRestoringAndUpdatingTasks(activeTasksToCreate, standbyTasksToCreate, failedTasks);
489497
handleRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
490498
}
@@ -502,31 +510,21 @@ private void handleTasksPendingInitialization() {
502510
}
503511
}
504512

505-
private void handleStartupTaskReuse(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
506-
final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
507-
final Map<TaskId, RuntimeException> failedTasks) {
508-
final Map<Task, Set<TopicPartition>> startupStandbyTasksToRecycle = assignStartupTasks(activeTasksToCreate, logPrefix);
509-
final Map<Task, Set<TopicPartition>> startupStandbyTasksToUse = assignStartupTasks(standbyTasksToCreate, logPrefix);
510-
511-
// recycle the startup standbys to active, and remove them from the set of actives that need to be created
512-
if (!startupStandbyTasksToRecycle.isEmpty()) {
513-
final Set<Task> tasksToCloseDirty = new TreeSet<>(Comparator.comparing(Task::id));
514-
for (final Map.Entry<Task, Set<TopicPartition>> entry : startupStandbyTasksToRecycle.entrySet()) {
515-
final Task task = entry.getKey();
516-
recycleTaskFromStateUpdater(task, entry.getValue(), tasksToCloseDirty, failedTasks);
517-
activeTasksToCreate.remove(task.id());
518-
}
519-
520-
// if any standby tasks failed to recycle, close them dirty
521-
tasksToCloseDirty.forEach(task ->
522-
closeTaskDirty(task, false)
523-
);
513+
private void handleExistingStateForTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
514+
final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate) {
515+
final Collection<Task> activeTasks = assignActiveTaskFromStartupState(activeTasksToCreate);
516+
for (final Task activeTask : activeTasks) {
517+
activeTasksToCreate.remove(activeTask.id());
524518
}
525-
526-
// use startup Standbys as real Standby tasks
527-
if (!startupStandbyTasksToUse.isEmpty()) {
528-
tasks.addPendingTasksToInit(startupStandbyTasksToUse.keySet());
529-
startupStandbyTasksToUse.keySet().forEach(task -> standbyTasksToCreate.remove(task.id()));
519+
final Collection<Task> standbyTasks = assignStartupTasks(standbyTasksToCreate);
520+
for (final Task standbyTask : standbyTasks) {
521+
standbyTasksToCreate.remove(standbyTask.id());
522+
}
523+
if (!activeTasks.isEmpty()) {
524+
tasks.addPendingTasksToInit(activeTasks);
525+
}
526+
if (!standbyTasks.isEmpty()) {
527+
tasks.addPendingTasksToInit(standbyTasks);
530528
}
531529
}
532530

streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -416,32 +416,13 @@ public void shouldInitializeTasksForLocalStateOnStart() {
416416
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
417417
assertEquals(1, constructed.constructed().size());
418418
final StateDirectory stateDirectory = constructed.constructed().get(0);
419-
verify(stateDirectory, times(0)).initializeStartupTasks(any(), any(), any());
419+
verify(stateDirectory, times(0)).initializeStartupStores(any(), any(), any());
420420
streams.start();
421-
verify(stateDirectory, times(1)).initializeStartupTasks(any(), any(), any());
421+
verify(stateDirectory, times(1)).initializeStartupStores(any(), any(), any());
422422
}
423423
}
424424
}
425425

426-
@Test
427-
public void shouldCloseStartupTasksAfterFirstRebalance() throws Exception {
428-
prepareStreams();
429-
final AtomicReference<StreamThread.State> state1 = prepareStreamThread(streamThreadOne, 1);
430-
final AtomicReference<StreamThread.State> state2 = prepareStreamThread(streamThreadTwo, 2);
431-
prepareThreadState(streamThreadOne, state1);
432-
prepareThreadState(streamThreadTwo, state2);
433-
try (final MockedConstruction<StateDirectory> constructed = mockConstruction(StateDirectory.class,
434-
(mock, context) -> when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) {
435-
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
436-
assertEquals(1, constructed.constructed().size());
437-
final StateDirectory stateDirectory = constructed.constructed().get(0);
438-
streams.setStateListener(streamsStateListener);
439-
streams.start();
440-
waitForCondition(() -> streams.state() == State.RUNNING, "Streams never started.");
441-
verify(stateDirectory, times(1)).closeStartupTasks();
442-
}
443-
}
444-
}
445426

446427
@Test
447428
public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws Exception {

streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java

Lines changed: 24 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.apache.kafka.streams.processor.internals;
1818

1919
import org.apache.kafka.common.TopicPartition;
20-
import org.apache.kafka.common.metrics.Metrics;
2120
import org.apache.kafka.common.utils.LogCaptureAppender;
2221
import org.apache.kafka.common.utils.LogContext;
2322
import org.apache.kafka.common.utils.MockTime;
@@ -28,7 +27,6 @@
2827
import org.apache.kafka.streams.processor.StateStore;
2928
import org.apache.kafka.streams.processor.TaskId;
3029
import org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory;
31-
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
3230
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
3331
import org.apache.kafka.test.MockKeyValueStore;
3432
import org.apache.kafka.test.TestUtils;
@@ -80,7 +78,6 @@
8078
import static org.hamcrest.CoreMatchers.endsWith;
8179
import static org.hamcrest.CoreMatchers.equalTo;
8280
import static org.hamcrest.CoreMatchers.hasItem;
83-
import static org.hamcrest.CoreMatchers.instanceOf;
8481
import static org.hamcrest.CoreMatchers.is;
8582
import static org.hamcrest.CoreMatchers.not;
8683
import static org.hamcrest.MatcherAssert.assertThat;
@@ -855,113 +852,65 @@ public void shouldReadFutureProcessFileFormat() throws Exception {
855852
}
856853

857854
@Test
858-
public void shouldNotInitializeStandbyTasksWhenNoLocalState() {
855+
public void shouldNotInitializeStartupStateWhenNoLocalState() {
859856
final TaskId taskId = new TaskId(0, 0);
860-
initializeStartupTasks(new TaskId(0, 0), false);
857+
initializeStartupStores(new TaskId(0, 0), false);
861858
assertFalse(directory.hasStartupTasks());
862-
assertNull(directory.removeStartupTask(taskId));
859+
assertFalse(directory.removeStartupState(taskId));
863860
assertFalse(directory.hasStartupTasks());
864861
}
865862

866863
@Test
867-
public void shouldInitializeStandbyTasksForLocalState() {
864+
public void shouldInitializeStartupStateForLocalState() {
868865
final TaskId taskId = new TaskId(0, 0);
869-
initializeStartupTasks(new TaskId(0, 0), true);
866+
initializeStartupStores(new TaskId(0, 0), true);
870867
assertTrue(directory.hasStartupTasks());
871-
assertNotNull(directory.removeStartupTask(taskId));
868+
assertTrue(directory.removeStartupState(taskId));
872869
assertFalse(directory.hasStartupTasks());
873-
assertNull(directory.removeStartupTask(taskId));
870+
assertFalse(directory.removeStartupState(taskId));
874871
}
875872

876873
@Test
877-
public void shouldNotAssignStartupTasksWeDontHave() {
874+
public void shouldNotAssignStartupStateWeDontHave() {
878875
final TaskId taskId = new TaskId(0, 0);
879-
initializeStartupTasks(taskId, false);
880-
final Task task = directory.removeStartupTask(taskId);
881-
assertNull(task);
882-
}
883-
884-
private class FakeStreamThread extends Thread {
885-
private final TaskId taskId;
886-
private final AtomicReference<Task> result;
887-
888-
private FakeStreamThread(final TaskId taskId, final AtomicReference<Task> result) {
889-
this.taskId = taskId;
890-
this.result = result;
891-
}
892-
893-
@Override
894-
public void run() {
895-
result.set(directory.removeStartupTask(taskId));
896-
}
876+
initializeStartupStores(taskId, false);
877+
assertFalse(directory.removeStartupState(taskId));
897878
}
898879

899880
@Test
900-
public void shouldAssignStartupTaskToStreamThread() throws InterruptedException {
881+
public void shouldUnlockStartupStateOnClose() {
901882
final TaskId taskId = new TaskId(0, 0);
902-
903-
initializeStartupTasks(taskId, true);
904-
905-
// main thread owns the newly initialized tasks
906-
assertThat(directory.lockOwner(taskId), is(Thread.currentThread()));
907-
908-
// spawn off a "fake" StreamThread, so we can verify the lock was updated to the correct thread
909-
final AtomicReference<Task> result = new AtomicReference<>();
910-
final Thread streamThread = new FakeStreamThread(taskId, result);
911-
streamThread.start();
912-
streamThread.join();
913-
final Task task = result.get();
914-
915-
assertNotNull(task);
916-
assertThat(task, instanceOf(StandbyTask.class));
917-
918-
// verify the owner of the task directory lock has been shifted over to our assigned StreamThread
919-
assertThat(directory.lockOwner(taskId), is(instanceOf(FakeStreamThread.class)));
920-
}
921-
922-
@Test
923-
public void shouldUnlockStartupTasksOnClose() {
924-
final TaskId taskId = new TaskId(0, 0);
925-
initializeStartupTasks(taskId, true);
883+
initializeStartupStores(taskId, true);
926884

927885
assertEquals(Thread.currentThread(), directory.lockOwner(taskId));
928-
directory.closeStartupTasks();
929-
assertNull(directory.lockOwner(taskId));
930-
}
931-
932-
@Test
933-
public void shouldCloseStartupTasksOnDirectoryClose() {
934-
final StateStore store = initializeStartupTasks(new TaskId(0, 0), true);
935-
936-
assertTrue(directory.hasStartupTasks());
937-
assertTrue(store.isOpen());
938-
939886
directory.close();
940-
941-
assertFalse(directory.hasStartupTasks());
942-
assertFalse(store.isOpen());
887+
assertNull(directory.lockOwner(taskId));
943888
}
944889

945890
@Test
946-
public void shouldNotCloseStartupTasksOnAutoCleanUp() {
891+
public void shouldCloseStartupStateOnAutoCleanUp() {
947892
// we need to set this because the auto-cleanup uses the last-modified time from the filesystem,
948893
// which can't be mocked
949894
time.setCurrentTimeMs(System.currentTimeMillis());
895+
TaskId taskId = new TaskId(0, 0);
950896

951-
final StateStore store = initializeStartupTasks(new TaskId(0, 0), true);
897+
final StateStore store = initializeStartupStores(taskId, true);
952898

953899
assertTrue(directory.hasStartupTasks());
954-
assertTrue(store.isOpen());
900+
assertFalse(store.isOpen());
955901

956902
time.sleep(10000);
903+
// We need to manually unlock the task because the cleanup process only
904+
// cleans tasks that are no-longer owned by the current thread
905+
directory.unlock(taskId);
957906

958907
directory.cleanRemovedTasks(1000);
959908

960-
assertTrue(directory.hasStartupTasks());
961-
assertTrue(store.isOpen());
909+
assertFalse(directory.hasStartupTasks());
910+
assertFalse(store.isOpen());
962911
}
963912

964-
private StateStore initializeStartupTasks(final TaskId taskId, final boolean createTaskDir) {
913+
private StateStore initializeStartupStores(final TaskId taskId, final boolean createTaskDir) {
965914
directory.initializeProcessId();
966915
final TopologyMetadata metadata = Mockito.mock(TopologyMetadata.class);
967916
final TopologyConfig topologyConfig = new TopologyConfig(config);
@@ -987,7 +936,7 @@ private StateStore initializeStartupTasks(final TaskId taskId, final boolean cre
987936
Mockito.when(metadata.buildSubtopology(ArgumentMatchers.any())).thenReturn(processorTopology);
988937
Mockito.when(metadata.taskConfig(ArgumentMatchers.any())).thenReturn(topologyConfig.getTaskConfig());
989938

990-
directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new Metrics(), "test", time), new LogContext("test"));
939+
directory.initializeStartupStores(metadata, new LogContext("test"), null);
991940

992941
return store;
993942
}

0 commit comments

Comments
 (0)