Skip to content

Commit a2e6de4

Browse files
jihoonsongianm
authored andcommitted
Fix the potential race between SplittableInputSource.getNumSplits() and SplittableInputSource.createSplits() in TaskMonitor (apache#8924)
* Fix the potential race SplittableInputSource.getNumSplits() and SplittableInputSource.createSplits() in TaskMonitor * Fix docs and javadoc * Add unit tests for large or small estimated num splits * add override
1 parent e0eb85a commit a2e6de4

File tree

22 files changed

+399
-145
lines changed

22 files changed

+399
-145
lines changed

core/src/main/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public Stream<InputSplit> createSplits(InputFormat inputFormat, @Nullable SplitH
6868
}
6969

7070
@Override
71-
public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException
71+
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException
7272
{
7373
if (firehoseFactory.isSplittable()) {
7474
return firehoseFactory.getNumSplits(splitHintSpec);

core/src/main/java/org/apache/druid/data/input/SplitHintSpec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
* @see FiniteFirehoseFactory#getSplits(SplitHintSpec)
3434
* @see FiniteFirehoseFactory#getNumSplits(SplitHintSpec)
3535
* @see SplittableInputSource#createSplits
36-
* @see SplittableInputSource#getNumSplits
36+
* @see SplittableInputSource#estimateNumSplits
3737
*/
3838
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
3939
@JsonSubTypes(value = {

core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public Stream<InputSplit<URI>> createSplits(InputFormat inputFormat, @Nullable S
8686
}
8787

8888
@Override
89-
public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
89+
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
9090
{
9191
return uris.size();
9292
}

core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public Stream<InputSplit<File>> createSplits(InputFormat inputFormat, @Nullable
7777
}
7878

7979
@Override
80-
public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
80+
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
8181
{
8282
return Iterators.size(getFileIterator());
8383
}

core/src/main/java/org/apache/druid/data/input/impl/SplittableInputSource.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,23 +44,27 @@ default boolean isSplittable()
4444
/**
4545
* Creates a {@link Stream} of {@link InputSplit}s. The returned stream is supposed to be evaluated lazily to avoid
4646
* consuming too much memory.
47-
* Note that this interface also has {@link #getNumSplits} which is related to this method. The implementations
47+
* Note that this interface also has {@link #estimateNumSplits} which is related to this method. The implementations
4848
* should be careful to <i>NOT</i> cache the created splits in memory.
4949
*
5050
* Implementations can consider {@link InputFormat#isSplittable()} and {@link SplitHintSpec} to create splits
51-
* in the same way with {@link #getNumSplits}.
51+
* in the same way with {@link #estimateNumSplits}.
5252
*/
5353
Stream<InputSplit<T>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException;
5454

5555
/**
56-
* Returns the total number of splits to be created via {@link #createSplits}.
57-
* This method can be expensive since it needs to iterate all directories or whatever substructure
58-
* to find all input objects.
56+
* Returns an estimated total number of splits to be created via {@link #createSplits}. The estimated number of splits
57+
* doesn't have to be accurate and can be different from the actual number of InputSplits returned from
58+
* {@link #createSplits}. This will be used to estimate the progress of a phase in parallel indexing.
59+
* See TaskMonitor for more details of the progress estimation.
60+
*
61+
* This method can be expensive if an implementation iterates all directories or whatever substructure
62+
* to find all input entities.
5963
*
6064
* Implementations can consider {@link InputFormat#isSplittable()} and {@link SplitHintSpec} to find splits
6165
* in the same way with {@link #createSplits}.
6266
*/
63-
int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException;
67+
int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException;
6468

6569
/**
6670
* Helper method for ParallelIndexSupervisorTask.

docs/ingestion/native-batch.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ Returns the name of the current phase if the task running in the parallel mode.
274274

275275
* `http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/progress`
276276

277-
Returns the current progress if the supervisor task is running in the parallel mode.
277+
Returns the estimated progress of the current phase if the supervisor task is running in the parallel mode.
278278

279279
An example of the result is
280280

@@ -285,7 +285,7 @@ An example of the result is
285285
"failed":0,
286286
"complete":0,
287287
"total":10,
288-
"expectedSucceeded":10
288+
"estimatedExpectedSucceeded":10
289289
}
290290
```
291291

extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public Stream<InputSplit<URI>> createSplits(InputFormat inputFormat, @Nullable S
6969
}
7070

7171
@Override
72-
public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
72+
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
7373
{
7474
return uris.size();
7575
}

extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ public Stream<InputSplit<Path>> createSplits(InputFormat inputFormat, @Nullable
174174
}
175175

176176
@Override
177-
public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException
177+
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException
178178
{
179179
cachePathsIfNeeded();
180180
return cachedPaths.size();

extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ public void hasCorrectSplits() throws IOException
255255
@Test
256256
public void hasCorrectNumberOfSplits() throws IOException
257257
{
258-
int numSplits = target.getNumSplits(null, null);
258+
int numSplits = target.estimateNumSplits(null, null);
259259
Assert.assertEquals(NUM_FILE, numSplits);
260260
}
261261
}
@@ -295,7 +295,7 @@ public void hasCorrectSplits() throws IOException
295295
@Test
296296
public void hasCorrectNumberOfSplits() throws IOException
297297
{
298-
int numSplits = target.getNumSplits(null, null);
298+
int numSplits = target.estimateNumSplits(null, null);
299299
Assert.assertEquals(0, numSplits);
300300
}
301301
}

indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/InputSourceSplitParallelIndexTaskRunner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,9 @@ Iterator<SubTaskSpec<T>> subTaskSpecIterator() throws IOException
7474
}
7575

7676
@Override
77-
final int getTotalNumSubTasks() throws IOException
77+
final int estimateTotalNumSubTasks() throws IOException
7878
{
79-
return baseInputSource.getNumSplits(
79+
return baseInputSource.estimateNumSplits(
8080
ingestionSchema.getIOConfig().getInputFormat(),
8181
getTuningConfig().getSplitHintSpec()
8282
);

0 commit comments

Comments
 (0)