Skip to content

Commit d55d77b

Browse files
committed
Added support for passing list of parquet files in insert.
1 parent d56b849 commit d55d77b

File tree

3 files changed

+61
-7
lines changed

3 files changed

+61
-7
lines changed

ice/src/main/java/com/altinity/ice/cli/Main.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,10 +261,15 @@ void insert(
261261
description = "Create table if not exists")
262262
boolean createTableIfNotExists,
263263
@CommandLine.Parameters(
264-
arity = "1..*",
264+
arity = "0..*",
265265
paramLabel = "<files>",
266266
description = "/path/to/file.parquet")
267267
String[] dataFiles,
268+
@CommandLine.Option(
269+
names = "--from-file",
270+
description =
271+
"Read list of parquet files from the specified file (one file per line)")
272+
String fromFile,
268273
@CommandLine.Option(
269274
names = "--no-copy",
270275
description = "Add files to catalog without copying them")
@@ -351,12 +356,25 @@ void insert(
351356
}
352357
setAWSRegion(s3Region);
353358
try (RESTCatalog catalog = loadCatalog()) {
354-
if (dataFiles.length == 1 && "-".equals(dataFiles[0])) {
359+
// Handle --from-file option
360+
if (!Strings.isNullOrEmpty(fromFile)) {
361+
if (dataFiles != null && dataFiles.length > 0) {
362+
throw new IllegalArgumentException("Cannot specify both --from-file and file arguments");
363+
}
364+
dataFiles = readInputFromFile(fromFile).toArray(new String[0]);
365+
if (dataFiles.length == 0) {
366+
logger.info("Nothing to insert (file empty)");
367+
return;
368+
}
369+
} else if (dataFiles != null && dataFiles.length == 1 && "-".equals(dataFiles[0])) {
355370
dataFiles = readInput().toArray(new String[0]);
356371
if (dataFiles.length == 0) {
357372
logger.info("Nothing to insert (stdin empty)");
358373
return;
359374
}
375+
} else if (dataFiles == null || dataFiles.length == 0) {
376+
throw new IllegalArgumentException(
377+
"No files specified. Provide files as arguments or use --from-file");
360378
}
361379

362380
List<IcePartition> partitions = null;
@@ -411,7 +429,13 @@ void insert(
411429
.build();
412430

413431
if (!watchMode) {
414-
Insert.run(catalog, tableId, dataFiles, options);
432+
Insert.Result result = Insert.run(catalog, tableId, dataFiles, options);
433+
if (result.anyFilesFailed()) {
434+
logger.error(
435+
"{} file(s) failed to insert. Check logs or retry list for details.",
436+
result.failedCount());
437+
System.exit(1);
438+
}
415439
} else {
416440
if (!Strings.isNullOrEmpty(watchDebugAddr)) {
417441
JvmMetrics.builder().register();
@@ -453,6 +477,19 @@ private static List<String> readInput() {
453477
return r;
454478
}
455479

480+
private static List<String> readInputFromFile(String filePath) throws IOException {
481+
List<String> r = new ArrayList<>();
482+
try (Scanner scanner = new Scanner(new java.io.File(filePath))) {
483+
while (scanner.hasNextLine()) {
484+
String line = scanner.nextLine();
485+
if (!line.isBlank()) {
486+
r.add(line);
487+
}
488+
}
489+
}
490+
return r;
491+
}
492+
456493
@CommandLine.Command(name = "scan", description = "Scan table.")
457494
void scanTable(
458495
@CommandLine.Parameters(

ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.concurrent.Executors;
3636
import java.util.concurrent.Future;
3737
import java.util.concurrent.TimeUnit;
38+
import java.util.concurrent.atomic.AtomicInteger;
3839
import java.util.function.Function;
3940
import java.util.function.Supplier;
4041
import java.util.stream.Collectors;
@@ -92,13 +93,15 @@ public final class Insert {
9293

9394
private Insert() {}
9495

96+
public record Result(boolean anyFilesProcessed, boolean anyFilesFailed, int failedCount) {}
97+
9598
// TODO: refactor
96-
public static void run(
99+
public static Result run(
97100
RESTCatalog catalog, TableIdentifier nsTable, String[] files, Options options)
98101
throws NoSuchTableException, IOException, InterruptedException {
99102
if (files.length == 0) {
100103
// no work to be done
101-
return;
104+
return new Result(false, false, 0);
102105
}
103106

104107
Table table = catalog.loadTable(nsTable);
@@ -166,6 +169,7 @@ public static void run(
166169
? new RetryLog(options.retryListFile)
167170
: null) {
168171
boolean atLeastOneFileAppended = false;
172+
final AtomicInteger failedCount = new AtomicInteger(0);
169173

170174
int numThreads = Math.min(options.threadCount(), filesExpanded.size());
171175
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
@@ -202,6 +206,7 @@ public static void run(
202206
logger.error(
203207
"{}: error (adding to retry list and continuing)", file, e);
204208
retryLog.add(file);
209+
failedCount.incrementAndGet();
205210
return Collections.emptyList();
206211
} else {
207212
throw new IOException(String.format("Error processing %s", file), e);
@@ -221,6 +226,7 @@ public static void run(
221226
if (retryLog == null) {
222227
throw new IOException("Error processing file(s)", e.getCause());
223228
}
229+
failedCount.incrementAndGet();
224230
}
225231
}
226232
} finally {
@@ -245,6 +251,9 @@ public static void run(
245251
} else {
246252
logger.warn("Table commit skipped (--no-commit)");
247253
}
254+
255+
int finalFailedCount = failedCount.get();
256+
return new Result(atLeastOneFileAppended, finalFailedCount > 0, finalFailedCount);
248257
}
249258
} finally {
250259
if (s3ClientLazy.hasValue()) {

ice/src/main/java/com/altinity/ice/cli/internal/cmd/InsertWatch.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,11 @@ public static void run(
120120
logger.info("Inserting {}", insertBatch);
121121

122122
try {
123-
Insert.run(catalog, nsTable, insertBatch.toArray(String[]::new), options);
123+
Insert.Result result =
124+
Insert.run(catalog, nsTable, insertBatch.toArray(String[]::new), options);
125+
if (result.anyFilesFailed()) {
126+
logger.warn("{} file(s) failed to insert in this batch", result.failedCount());
127+
}
124128
} catch (NoSuchTableException e) {
125129
if (!createTableIfNotExists) {
126130
throw e;
@@ -145,7 +149,11 @@ public static void run(
145149
retryInsert = false;
146150
}
147151
if (retryInsert) {
148-
Insert.run(catalog, nsTable, insertBatch.toArray(String[]::new), options);
152+
Insert.Result result =
153+
Insert.run(catalog, nsTable, insertBatch.toArray(String[]::new), options);
154+
if (result.anyFilesFailed()) {
155+
logger.warn("{} file(s) failed to insert in this batch", result.failedCount());
156+
}
149157
}
150158
}
151159
}

0 commit comments

Comments
 (0)