Skip to content

Commit 519fcf3

Browse files
authored
Merge pull request #77 from Altinity/73-add-bulk-insert-feature
Added support for passing list of parquet files in insert.
2 parents 1eee30c + d55d77b commit 519fcf3

File tree

3 files changed

+61
-7
lines changed

3 files changed

+61
-7
lines changed

ice/src/main/java/com/altinity/ice/cli/Main.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,10 +268,15 @@ void insert(
268268
description = "Create table if not exists")
269269
boolean createTableIfNotExists,
270270
@CommandLine.Parameters(
271-
arity = "1..*",
271+
arity = "0..*",
272272
paramLabel = "<files>",
273273
description = "/path/to/file.parquet")
274274
String[] dataFiles,
275+
@CommandLine.Option(
276+
names = "--from-file",
277+
description =
278+
"Read list of parquet files from the specified file (one file per line)")
279+
String fromFile,
275280
@CommandLine.Option(
276281
names = "--no-copy",
277282
description = "Add files to catalog without copying them")
@@ -358,12 +363,25 @@ void insert(
358363
}
359364
setAWSRegion(s3Region);
360365
try (RESTCatalog catalog = loadCatalog()) {
361-
if (dataFiles.length == 1 && "-".equals(dataFiles[0])) {
366+
// Handle --from-file option
367+
if (!Strings.isNullOrEmpty(fromFile)) {
368+
if (dataFiles != null && dataFiles.length > 0) {
369+
throw new IllegalArgumentException("Cannot specify both --from-file and file arguments");
370+
}
371+
dataFiles = readInputFromFile(fromFile).toArray(new String[0]);
372+
if (dataFiles.length == 0) {
373+
logger.info("Nothing to insert (file empty)");
374+
return;
375+
}
376+
} else if (dataFiles != null && dataFiles.length == 1 && "-".equals(dataFiles[0])) {
362377
dataFiles = readInput().toArray(new String[0]);
363378
if (dataFiles.length == 0) {
364379
logger.info("Nothing to insert (stdin empty)");
365380
return;
366381
}
382+
} else if (dataFiles == null || dataFiles.length == 0) {
383+
throw new IllegalArgumentException(
384+
"No files specified. Provide files as arguments or use --from-file");
367385
}
368386

369387
List<IcePartition> partitions = null;
@@ -418,7 +436,13 @@ void insert(
418436
.build();
419437

420438
if (!watchMode) {
421-
Insert.run(catalog, tableId, dataFiles, options);
439+
Insert.Result result = Insert.run(catalog, tableId, dataFiles, options);
440+
if (result.anyFilesFailed()) {
441+
logger.error(
442+
"{} file(s) failed to insert. Check logs or retry list for details.",
443+
result.failedCount());
444+
System.exit(1);
445+
}
422446
} else {
423447
if (!Strings.isNullOrEmpty(watchDebugAddr)) {
424448
JvmMetrics.builder().register();
@@ -460,6 +484,19 @@ private static List<String> readInput() {
460484
return r;
461485
}
462486

487+
private static List<String> readInputFromFile(String filePath) throws IOException {
488+
List<String> r = new ArrayList<>();
489+
try (Scanner scanner = new Scanner(new java.io.File(filePath))) {
490+
while (scanner.hasNextLine()) {
491+
String line = scanner.nextLine();
492+
if (!line.isBlank()) {
493+
r.add(line);
494+
}
495+
}
496+
}
497+
return r;
498+
}
499+
463500
@CommandLine.Command(name = "scan", description = "Scan table.")
464501
void scanTable(
465502
@CommandLine.Parameters(

ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.concurrent.Executors;
3636
import java.util.concurrent.Future;
3737
import java.util.concurrent.TimeUnit;
38+
import java.util.concurrent.atomic.AtomicInteger;
3839
import java.util.function.Function;
3940
import java.util.function.Supplier;
4041
import java.util.stream.Collectors;
@@ -92,13 +93,15 @@ public final class Insert {
9293

9394
private Insert() {}
9495

96+
public record Result(boolean anyFilesProcessed, boolean anyFilesFailed, int failedCount) {}
97+
9598
// TODO: refactor
96-
public static void run(
99+
public static Result run(
97100
RESTCatalog catalog, TableIdentifier nsTable, String[] files, Options options)
98101
throws NoSuchTableException, IOException, InterruptedException {
99102
if (files.length == 0) {
100103
// no work to be done
101-
return;
104+
return new Result(false, false, 0);
102105
}
103106

104107
Table table = catalog.loadTable(nsTable);
@@ -166,6 +169,7 @@ public static void run(
166169
? new RetryLog(options.retryListFile)
167170
: null) {
168171
boolean atLeastOneFileAppended = false;
172+
final AtomicInteger failedCount = new AtomicInteger(0);
169173

170174
int numThreads = Math.min(options.threadCount(), filesExpanded.size());
171175
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
@@ -202,6 +206,7 @@ public static void run(
202206
logger.error(
203207
"{}: error (adding to retry list and continuing)", file, e);
204208
retryLog.add(file);
209+
failedCount.incrementAndGet();
205210
return Collections.emptyList();
206211
} else {
207212
throw new IOException(String.format("Error processing %s", file), e);
@@ -221,6 +226,7 @@ public static void run(
221226
if (retryLog == null) {
222227
throw new IOException("Error processing file(s)", e.getCause());
223228
}
229+
failedCount.incrementAndGet();
224230
}
225231
}
226232
} finally {
@@ -245,6 +251,9 @@ public static void run(
245251
} else {
246252
logger.warn("Table commit skipped (--no-commit)");
247253
}
254+
255+
int finalFailedCount = failedCount.get();
256+
return new Result(atLeastOneFileAppended, finalFailedCount > 0, finalFailedCount);
248257
}
249258
} finally {
250259
if (s3ClientLazy.hasValue()) {

ice/src/main/java/com/altinity/ice/cli/internal/cmd/InsertWatch.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,11 @@ public static void run(
120120
logger.info("Inserting {}", insertBatch);
121121

122122
try {
123-
Insert.run(catalog, nsTable, insertBatch.toArray(String[]::new), options);
123+
Insert.Result result =
124+
Insert.run(catalog, nsTable, insertBatch.toArray(String[]::new), options);
125+
if (result.anyFilesFailed()) {
126+
logger.warn("{} file(s) failed to insert in this batch", result.failedCount());
127+
}
124128
} catch (NoSuchTableException e) {
125129
if (!createTableIfNotExists) {
126130
throw e;
@@ -145,7 +149,11 @@ public static void run(
145149
retryInsert = false;
146150
}
147151
if (retryInsert) {
148-
Insert.run(catalog, nsTable, insertBatch.toArray(String[]::new), options);
152+
Insert.Result result =
153+
Insert.run(catalog, nsTable, insertBatch.toArray(String[]::new), options);
154+
if (result.anyFilesFailed()) {
155+
logger.warn("{} file(s) failed to insert in this batch", result.failedCount());
156+
}
149157
}
150158
}
151159
}

0 commit comments

Comments
 (0)