Skip to content

Commit b82956a

Browse files
committed
Support scanning manifest files in parallel.
1 parent b128911 commit b82956a

File tree

5 files changed

+238
-17
lines changed

5 files changed

+238
-17
lines changed

core/src/main/java/com/netflix/iceberg/BaseTableScan.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,18 @@
1616

1717
package com.netflix.iceberg;
1818

19-
import com.google.common.base.Function;
2019
import com.google.common.base.Objects;
2120
import com.google.common.collect.Iterables;
2221
import com.netflix.iceberg.expressions.Expression;
2322
import com.netflix.iceberg.expressions.Expressions;
2423
import com.netflix.iceberg.expressions.ResidualEvaluator;
24+
import com.netflix.iceberg.util.ParallelIterable;
2525
import java.util.Collection;
2626
import java.util.Collections;
2727

28+
import static com.netflix.iceberg.util.ThreadPools.getPlannerPool;
29+
import static com.netflix.iceberg.util.ThreadPools.getWorkerPool;
30+
2831
/**
2932
* Base class for {@link TableScan} implementations.
3033
*/
@@ -64,9 +67,9 @@ public TableScan filter(Expression expr) {
6467
public Iterable<FileScanTask> planFiles() {
6568
Snapshot snapshot = ops.current().currentSnapshot();
6669
if (snapshot != null) {
67-
return Iterables.concat(Iterables.transform(
70+
Iterable<Iterable<FileScanTask>> readers = Iterables.transform(
6871
snapshot.manifests(),
69-
(Function<String, Iterable<FileScanTask>>) manifest -> {
72+
manifest -> {
7073
ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest));
7174
String schemaString = SchemaParser.toJson(reader.spec().schema());
7275
String specString = PartitionSpecParser.toJson(reader.spec());
@@ -75,7 +78,14 @@ public Iterable<FileScanTask> planFiles() {
7578
reader.filterRows(rowFilter).select(columns),
7679
file -> new BaseFileScanTask(file, schemaString, specString, residuals)
7780
);
78-
}));
81+
});
82+
83+
if (snapshot.manifests().size() > 1) {
84+
return new ParallelIterable<>(readers, getPlannerPool(), getWorkerPool());
85+
} else {
86+
return Iterables.concat(readers);
87+
}
88+
7989
} else {
8090
return Collections.emptyList();
8191
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2017 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.netflix.iceberg.util;
18+
19+
public class ExceptionUtil {
20+
@SuppressWarnings("unchecked")
21+
static <E extends Exception> void castAndThrow(
22+
Throwable e, Class<E> exceptionClass) throws E {
23+
if (e instanceof RuntimeException) {
24+
throw (RuntimeException) e;
25+
} else if (e instanceof Error) {
26+
throw (Error) e;
27+
} else if (exceptionClass.isInstance(e)) {
28+
throw (E) e;
29+
}
30+
throw new RuntimeException(e);
31+
}
32+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright 2017 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.netflix.iceberg.util;
18+
19+
import java.util.Iterator;
20+
import java.util.NoSuchElementException;
21+
import java.util.concurrent.ConcurrentLinkedQueue;
22+
import java.util.concurrent.ExecutionException;
23+
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.Future;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.TimeoutException;
27+
28+
public class ParallelIterable<T> implements Iterable<T> {
29+
private final Iterable<Iterable<T>> iterables;
30+
private final ExecutorService trackingPool;
31+
private final ExecutorService workerPool;
32+
33+
public ParallelIterable(Iterable<Iterable<T>> iterables,
34+
ExecutorService trackingPool,
35+
ExecutorService workerPool) {
36+
this.iterables = iterables;
37+
this.trackingPool = trackingPool;
38+
this.workerPool = workerPool;
39+
}
40+
41+
@Override
42+
public Iterator<T> iterator() {
43+
return new ParallelIterator<>(iterables, trackingPool, workerPool);
44+
}
45+
46+
private static class ParallelIterator<T> implements Iterator<T> {
47+
private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
48+
private final Future<?> taskFuture;
49+
50+
public ParallelIterator(Iterable<Iterable<T>> iterables,
51+
ExecutorService trackingPool,
52+
ExecutorService workerPool) {
53+
this.taskFuture = trackingPool.submit(() -> {
54+
Tasks.foreach(iterables)
55+
.noRetry().stopOnFailure().throwFailureWhenFinished()
56+
.executeWith(workerPool)
57+
.run(iterable -> {
58+
for (T item : iterable) {
59+
queue.add(item);
60+
}
61+
});
62+
return true;
63+
});
64+
}
65+
66+
@Override
67+
public synchronized boolean hasNext() {
68+
// this cannot conclude that there are no more records until tasks have finished. while some
69+
// are running, return true when there is at least one item to return.
70+
while (!taskFuture.isDone()) {
71+
if (queue.size() > 0) {
72+
return true;
73+
}
74+
75+
try {
76+
taskFuture.get(10, TimeUnit.MILLISECONDS);
77+
break;
78+
79+
} catch (InterruptedException e) {
80+
Thread.currentThread().interrupt();
81+
throw new RuntimeException(e);
82+
} catch (ExecutionException e) {
83+
ExceptionUtil.castAndThrow(e.getCause(), RuntimeException.class);
84+
} catch (TimeoutException e) {
85+
// continue looping to check the queue size and wait again
86+
}
87+
}
88+
89+
// when tasks are no longer running, return whether the queue has items
90+
return queue.size() > 0;
91+
}
92+
93+
@Override
94+
public synchronized T next() {
95+
// use hasNext to block until there is an available record
96+
if (!hasNext()) {
97+
throw new NoSuchElementException();
98+
}
99+
return queue.poll();
100+
}
101+
}
102+
103+
}

core/src/main/java/com/netflix/iceberg/util/Tasks.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Collections;
2626
import java.util.Iterator;
2727
import java.util.List;
28+
import java.util.NoSuchElementException;
2829
import java.util.Queue;
2930
import java.util.concurrent.CancellationException;
3031
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -33,6 +34,7 @@
3334
import java.util.concurrent.Future;
3435
import java.util.concurrent.ThreadLocalRandom;
3536
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.TimeoutException;
3638
import java.util.concurrent.atomic.AtomicBoolean;
3739

3840

@@ -523,19 +525,7 @@ private static <E extends Exception> void throwOne(
523525
}
524526
}
525527

526-
Tasks.<E>castAndThrow(e, allowedException);
528+
ExceptionUtil.<E>castAndThrow(e, allowedException);
527529
}
528530

529-
@SuppressWarnings("unchecked")
530-
private static <E extends Exception> void castAndThrow(
531-
Throwable e, Class<E> exceptionClass) throws E {
532-
if (e instanceof RuntimeException) {
533-
throw (RuntimeException) e;
534-
} else if (e instanceof Error) {
535-
throw (Error) e;
536-
} else if (exceptionClass.isInstance(e)) {
537-
throw (E) e;
538-
}
539-
throw new RuntimeException(e);
540-
}
541531
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2017 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.netflix.iceberg.util;
18+
19+
import com.google.common.util.concurrent.MoreExecutors;
20+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
21+
import java.util.concurrent.ExecutorService;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.ThreadPoolExecutor;
24+
25+
public class ThreadPools {
26+
public static final String PLANNER_THREAD_POOL_SIZE_PROP = "iceberg.planner.num-threads";
27+
public static final String WORKER_THREAD_POOL_SIZE_PROP = "iceberg.worker.num-threads";
28+
29+
private static ExecutorService PLANNER_POOL = MoreExecutors.getExitingExecutorService(
30+
(ThreadPoolExecutor) Executors.newFixedThreadPool(
31+
getPoolSize(PLANNER_THREAD_POOL_SIZE_PROP, 4),
32+
new ThreadFactoryBuilder()
33+
.setDaemon(true)
34+
.setNameFormat("iceberg-planner-pool-%d")
35+
.build()));
36+
37+
private static ExecutorService WORKER_POOL = MoreExecutors.getExitingExecutorService(
38+
(ThreadPoolExecutor) Executors.newFixedThreadPool(
39+
getPoolSize(WORKER_THREAD_POOL_SIZE_PROP, Runtime.getRuntime().availableProcessors()),
40+
new ThreadFactoryBuilder()
41+
.setDaemon(true)
42+
.setNameFormat("iceberg-worker-pool-%d")
43+
.build()));
44+
45+
/**
46+
* Return an {@link ExecutorService} that uses the "planner" thread-pool.
47+
* <p>
48+
* The size of the planner pool limits the number of concurrent planning operations in the base
49+
* table implementation.
50+
* <p>
51+
* The size of this thread-pool is controlled by the Java system property
52+
* {@code iceberg.planner.num-threads}.
53+
*
54+
* @return an {@link ExecutorService} that uses the planner pool
55+
*/
56+
public static ExecutorService getPlannerPool() {
57+
return PLANNER_POOL;
58+
}
59+
60+
/**
61+
* Return an {@link ExecutorService} that uses the "worker" thread-pool.
62+
* <p>
63+
* The size of the worker pool limits the number of tasks concurrently reading manifests in the
64+
* base table implementation across all concurrent planning operations.
65+
* <p>
66+
* The size of this thread-pool is controlled by the Java system property
67+
* {@code iceberg.worker.num-threads}.
68+
*
69+
* @return an {@link ExecutorService} that uses the worker pool
70+
*/
71+
public static ExecutorService getWorkerPool() {
72+
return WORKER_POOL;
73+
}
74+
75+
private static int getPoolSize(String systemProperty, int defaultSize) {
76+
String value = System.getProperty(systemProperty);
77+
if (value != null) {
78+
try {
79+
return Integer.parseUnsignedInt(value);
80+
} catch (NumberFormatException e) {
81+
// will return the default
82+
}
83+
}
84+
return defaultSize;
85+
}
86+
}

0 commit comments

Comments
 (0)