Skip to content

Commit 6f6f2db

Browse files
committed
Fix adding headers to log change method, some variables final, update return type
1 parent 8fa7e09 commit 6f6f2db

File tree

4 files changed

+8
-7
lines changed

4 files changed

+8
-7
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.kafka.streams.processor.internals;
1818

1919
import org.apache.kafka.common.TopicPartition;
20+
import org.apache.kafka.common.header.Headers;
2021
import org.apache.kafka.common.utils.Bytes;
2122
import org.apache.kafka.common.utils.LogContext;
2223
import org.apache.kafka.common.utils.Time;
@@ -805,7 +806,7 @@ private static class StartupContext extends AbstractProcessorContext<Object, Obj
805806
private final StateManager stateManager;
806807
final StreamsMetricsImpl metricsImpl;
807808

808-
public StartupContext(final TaskId taskId, final StreamsConfig config, final StateManager stateManager, final StreamsMetricsImpl metricsImpl, ThreadCache cache) {
809+
public StartupContext(final TaskId taskId, final StreamsConfig config, final StateManager stateManager, final StreamsMetricsImpl metricsImpl, final ThreadCache cache) {
809810
super(taskId, config, metricsImpl, cache);
810811
this.stateManager = stateManager;
811812
this.metricsImpl = metricsImpl;
@@ -831,7 +832,7 @@ public void registerCacheFlushListener(final String namespace, final ThreadCache
831832
}
832833

833834
@Override
834-
public void logChange(final String storeName, final Bytes key, final byte[] value, final long timestamp, final Position position) {
835+
public void logChange(final String storeName, final Bytes key, final byte[] value, final long timestamp, final Headers headers, final Position position) {
835836
throw new IllegalStateException("Should not be called");
836837
}
837838

streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ private Collection<Task> assignActiveTaskFromStartupState(final Map<TaskId, Set<
327327
assignedTasks.put(taskId, entry.getValue());
328328
}
329329
}
330-
return activeTaskCreator.createTasks(mainConsumer, assignedTasks);
330+
return new ArrayList<>(activeTaskCreator.createTasks(mainConsumer, assignedTasks));
331331
} else {
332332
return Collections.emptySet();
333333
}
@@ -343,7 +343,7 @@ private Collection<Task> assignStartupTasks(final Map<TaskId, Set<TopicPartition
343343
assignedTasks.put(taskId, inputPartitions);
344344
}
345345
}
346-
return standbyTaskCreator.createTasks(assignedTasks);
346+
return new ArrayList<>(standbyTaskCreator.createTasks(assignedTasks));
347347
} else {
348348
return Collections.emptySet();
349349
}

streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -892,7 +892,7 @@ public void shouldCloseStartupStateOnAutoCleanUp() {
892892
// we need to set this because the auto-cleanup uses the last-modified time from the filesystem,
893893
// which can't be mocked
894894
time.setCurrentTimeMs(System.currentTimeMillis());
895-
TaskId taskId = new TaskId(0, 0);
895+
final TaskId taskId = new TaskId(0, 0);
896896

897897
final StateStore store = initializeStartupStores(taskId, true);
898898

streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4902,7 +4902,7 @@ public void shouldCreateActiveTaskFromStartupStateStore() {
49024902
when(stateUpdater.tasks()).thenReturn(Collections.singleton(activeTask));
49034903
when(stateUpdater.standbyTasks()).thenReturn(Collections.emptySet());
49044904

4905-
InOrder inOrder = inOrder(activeTaskCreator);
4905+
final InOrder inOrder = inOrder(activeTaskCreator);
49064906
inOrder.verify(activeTaskCreator).createTasks(same(consumer), eq(Map.of(taskId00, taskId00Partitions)));
49074907
inOrder.verify(activeTaskCreator).createTasks(consumer, emptyMap());
49084908

@@ -4941,7 +4941,7 @@ public void shouldCreateStandbyTaskFromStartupStateStore() {
49414941

49424942
// ensure we didn't construct any new Tasks, or recycle an existing Task; we only used the one we already have
49434943
verify(activeTaskCreator, times(2)).createTasks(any(), eq(Collections.emptyMap()));
4944-
InOrder inOrder = inOrder(standbyTaskCreator);
4944+
final InOrder inOrder = inOrder(standbyTaskCreator);
49454945
inOrder.verify(standbyTaskCreator).createTasks(Map.of(taskId00, taskId00Partitions));
49464946
inOrder.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
49474947
verifyNoMoreInteractions(activeTaskCreator);

0 commit comments

Comments
 (0)