Skip to content

Commit f357174

Browse files
authored
[flink] FlinkRunner initializes the same split twice (#31313) (#33606)
* [flink] FlinkRunner initializes the same split twice (#31313)
1 parent 2af6058 commit f357174

File tree

4 files changed

+76
-20
lines changed

4 files changed

+76
-20
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"runFor": "#33146"
3+
"runFor": "#33606"
44
}

runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSource;
3030
import org.apache.beam.sdk.io.BoundedSource;
3131
import org.apache.beam.sdk.io.UnboundedSource;
32-
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
33-
import org.apache.beam.sdk.util.construction.UnboundedReadFromBoundedSource;
3432
import org.apache.flink.api.common.eventtime.Watermark;
3533
import org.apache.flink.api.connector.source.Boundedness;
3634
import org.apache.flink.api.connector.source.Source;
@@ -73,18 +71,6 @@ public static <T> FlinkUnboundedSource<T> unbounded(
7371
return new FlinkUnboundedSource<>(stepName, source, serializablePipelineOptions, numSplits);
7472
}
7573

76-
public static FlinkUnboundedSource<byte[]> unboundedImpulse(long shutdownSourceAfterIdleMs) {
77-
FlinkPipelineOptions flinkPipelineOptions = FlinkPipelineOptions.defaults();
78-
flinkPipelineOptions.setShutdownSourcesAfterIdleMs(shutdownSourceAfterIdleMs);
79-
return new FlinkUnboundedSource<>(
80-
"Impulse",
81-
new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter<>(
82-
new BeamImpulseSource()),
83-
new SerializablePipelineOptions(flinkPipelineOptions),
84-
1,
85-
record -> BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
86-
}
87-
8874
public static FlinkBoundedSource<byte[]> boundedImpulse() {
8975
return new FlinkBoundedSource<>(
9076
"Impulse",
@@ -117,7 +103,8 @@ public Boundedness getBoundedness() {
117103

118104
@Override
119105
public SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>
120-
createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext) throws Exception {
106+
createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext) {
107+
121108
return new FlinkSourceSplitEnumerator<>(
122109
enumContext, beamSource, serializablePipelineOptions.get(), numSplits);
123110
}
@@ -126,11 +113,11 @@ public Boundedness getBoundedness() {
126113
public SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>
127114
restoreEnumerator(
128115
SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext,
129-
Map<Integer, List<FlinkSourceSplit<T>>> checkpoint)
130-
throws Exception {
116+
Map<Integer, List<FlinkSourceSplit<T>>> checkpoint) {
117+
131118
FlinkSourceSplitEnumerator<T> enumerator =
132119
new FlinkSourceSplitEnumerator<>(
133-
enumContext, beamSource, serializablePipelineOptions.get(), numSplits);
120+
enumContext, beamSource, serializablePipelineOptions.get(), numSplits, true);
134121
checkpoint.forEach(
135122
(subtaskId, splitsForSubtask) -> enumerator.addSplitsBack(splitsForSubtask, subtaskId));
136123
return enumerator;

runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,16 +63,40 @@ public FlinkSourceSplitEnumerator(
6363
Source<T> beamSource,
6464
PipelineOptions pipelineOptions,
6565
int numSplits) {
66+
67+
this(context, beamSource, pipelineOptions, numSplits, false);
68+
}
69+
70+
public FlinkSourceSplitEnumerator(
71+
SplitEnumeratorContext<FlinkSourceSplit<T>> context,
72+
Source<T> beamSource,
73+
PipelineOptions pipelineOptions,
74+
int numSplits,
75+
boolean splitsInitialized) {
76+
6677
this.context = context;
6778
this.beamSource = beamSource;
6879
this.pipelineOptions = pipelineOptions;
6980
this.numSplits = numSplits;
7081
this.pendingSplits = new HashMap<>(numSplits);
71-
this.splitsInitialized = false;
82+
this.splitsInitialized = splitsInitialized;
83+
84+
LOG.info(
85+
"Created new enumerator with parallelism {}, source {}, numSplits {}, initialized {}",
86+
context.currentParallelism(),
87+
beamSource,
88+
numSplits,
89+
splitsInitialized);
7290
}
7391

7492
@Override
7593
public void start() {
94+
if (!splitsInitialized) {
95+
initializeSplits();
96+
}
97+
}
98+
99+
private void initializeSplits() {
76100
context.callAsync(
77101
() -> {
78102
try {

runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumeratorTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323

2424
import java.io.IOException;
2525
import java.util.List;
26+
import java.util.Map;
27+
import java.util.stream.Collectors;
2628
import org.apache.beam.runners.flink.FlinkPipelineOptions;
2729
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestBoundedCountingSource;
2830
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestCountingSource;
@@ -130,6 +132,49 @@ public void testAddSplitsBack() throws IOException {
130132
}
131133
}
132134

135+
@Test
136+
public void testAddSplitsBackAfterRescale() throws Exception {
137+
final int numSubtasks = 2;
138+
final int numSplits = 10;
139+
final int totalNumRecords = 10;
140+
TestingSplitEnumeratorContext<FlinkSourceSplit<KV<Integer, Integer>>> testContext =
141+
new TestingSplitEnumeratorContext<>(numSubtasks);
142+
TestBoundedCountingSource testSource =
143+
new TestBoundedCountingSource(numSplits, totalNumRecords);
144+
final Map<Integer, List<FlinkSourceSplit<KV<Integer, Integer>>>> assignment;
145+
try (FlinkSourceSplitEnumerator<KV<Integer, Integer>> splitEnumerator =
146+
new FlinkSourceSplitEnumerator<>(
147+
testContext, testSource, FlinkPipelineOptions.defaults(), numSplits)) {
148+
splitEnumerator.start();
149+
for (int i = 0; i < numSubtasks; i++) {
150+
testContext.registerReader(i, String.valueOf(i));
151+
splitEnumerator.addReader(i);
152+
}
153+
testContext.getExecutorService().triggerAll();
154+
assignment =
155+
testContext.getSplitAssignments().entrySet().stream()
156+
.map(e -> KV.of(e.getKey(), e.getValue().getAssignedSplits()))
157+
.collect(Collectors.toMap(KV::getKey, KV::getValue));
158+
}
159+
160+
// add tasks back
161+
testContext = new TestingSplitEnumeratorContext<>(numSubtasks);
162+
try (FlinkSourceSplitEnumerator<KV<Integer, Integer>> splitEnumerator =
163+
new FlinkSourceSplitEnumerator<>(
164+
testContext, testSource, FlinkPipelineOptions.defaults(), numSplits, true)) {
165+
splitEnumerator.start();
166+
assignment.forEach(
167+
(splitId, assignedSplits) -> splitEnumerator.addSplitsBack(assignedSplits, splitId));
168+
testContext.registerReader(0, "0");
169+
splitEnumerator.addReader(0);
170+
testContext.getExecutorService().triggerAll();
171+
172+
List<FlinkSourceSplit<KV<Integer, Integer>>> splitsForReader =
173+
testContext.getSplitAssignments().get(0).getAssignedSplits();
174+
assertEquals(numSplits / numSubtasks, splitsForReader.size());
175+
}
176+
}
177+
133178
private void assignSplits(
134179
TestingSplitEnumeratorContext<FlinkSourceSplit<KV<Integer, Integer>>> context,
135180
Source<KV<Integer, Integer>> source,

0 commit comments

Comments
 (0)