Skip to content

Commit 2dd678d

Browse files
committed
ice: Added thread count as a command line argument, default to number of processors.
1 parent ed4f402 commit 2dd678d

File tree

2 files changed

+63
-32
lines changed

2 files changed

+63
-32
lines changed

ice/src/main/java/com/altinity/ice/Main.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,12 @@ void insert(
171171
description =
172172
"/path/to/file where to save list of files to retry"
173173
+ " (useful for retrying partially failed insert using `cat ice.retry | ice insert - --retry-list=ice.retry`)")
174-
String retryList)
174+
String retryList,
175+
@CommandLine.Option(
176+
names = {"--thread-count"},
177+
description = "Number of threads to use for inserting data",
178+
defaultValue = "-1")
179+
int threadCount)
175180
throws IOException {
176181
if (s3NoSignRequest && s3CopyObject) {
177182
throw new UnsupportedOperationException(
@@ -203,7 +208,8 @@ void insert(
203208
forceTableAuth,
204209
s3NoSignRequest,
205210
s3CopyObject,
206-
retryList);
211+
retryList,
212+
threadCount == -1 ? Runtime.getRuntime().availableProcessors() : threadCount);
207213
}
208214
}
209215

ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java

Lines changed: 55 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,15 @@
77
import com.altinity.ice.internal.jvm.Stats;
88
import com.altinity.ice.internal.parquet.Metadata;
99
import java.io.IOException;
10+
import java.util.ArrayList;
1011
import java.util.Arrays;
1112
import java.util.HashSet;
1213
import java.util.Map;
1314
import java.util.Set;
15+
import java.util.concurrent.ExecutionException;
16+
import java.util.concurrent.ExecutorService;
17+
import java.util.concurrent.Executors;
18+
import java.util.concurrent.Future;
1419
import java.util.function.Function;
1520
import java.util.function.Supplier;
1621
import java.util.stream.Collectors;
@@ -66,7 +71,8 @@ public static void run(
6671
boolean forceTableAuth,
6772
boolean s3NoSignRequest,
6873
boolean s3CopyObject,
69-
String retryListFile)
74+
String retryListFile,
75+
int threadCount)
7076
throws IOException {
7177
if (files.length == 0) {
7278
// no work to be done
@@ -81,6 +87,7 @@ public static void run(
8187
.forceTableAuth(forceTableAuth)
8288
.s3NoSignRequest(s3NoSignRequest)
8389
.s3CopyObject(s3CopyObject)
90+
.threadCount(threadCount)
8491
.build();
8592

8693
final InsertOptions finalOptions =
@@ -146,37 +153,55 @@ public static void run(
146153
: null) {
147154
boolean atLeastOneFileAppended = false;
148155

149-
// TODO: parallel
150-
for (final String file : filesExpanded) {
151-
DataFile df;
152-
try {
153-
df =
154-
processFile(
155-
table,
156-
catalog,
157-
tableIO,
158-
inputIO,
159-
tableDataFiles,
160-
finalOptions,
161-
s3ClientLazy,
162-
dstDataFileSource,
163-
tableSchema,
164-
dataFileNamingStrategy,
165-
file);
166-
if (df == null) {
167-
continue;
168-
}
169-
} catch (Exception e) { // FIXME
170-
if (retryLog != null) {
171-
logger.error("{}: error (adding to retry list and continuing)", file, e);
172-
retryLog.add(file);
173-
continue;
174-
} else {
175-
throw e;
156+
int numThreads = Math.min(finalOptions.threadCount(), filesExpanded.size());
157+
try (ExecutorService executor = Executors.newFixedThreadPool(numThreads)) {
158+
var futures = new ArrayList<Future<DataFile>>();
159+
for (final String file : filesExpanded) {
160+
futures.add(
161+
executor.submit(
162+
() -> {
163+
try {
164+
return processFile(
165+
table,
166+
catalog,
167+
tableIO,
168+
inputIO,
169+
tableDataFiles,
170+
finalOptions,
171+
s3ClientLazy,
172+
dstDataFileSource,
173+
tableSchema,
174+
dataFileNamingStrategy,
175+
file);
176+
} catch (Exception e) {
177+
if (retryLog != null) {
178+
logger.error(
179+
"{}: error (adding to retry list and continuing)", file, e);
180+
retryLog.add(file);
181+
return null;
182+
} else {
183+
throw e;
184+
}
185+
}
186+
}));
187+
}
188+
189+
for (var future : futures) {
190+
try {
191+
DataFile df = future.get();
192+
if (df != null) {
193+
atLeastOneFileAppended = true;
194+
appendOp.appendFile(df);
195+
}
196+
} catch (InterruptedException e) {
197+
Thread.currentThread().interrupt();
198+
throw new IOException("Interrupted while processing files", e);
199+
} catch (ExecutionException e) {
200+
if (retryLog == null) {
201+
throw new IOException("Error processing files", e.getCause());
202+
}
176203
}
177204
}
178-
atLeastOneFileAppended = true;
179-
appendOp.appendFile(df);
180205
}
181206

182207
if (!finalOptions.noCommit()) {

0 commit comments

Comments
 (0)