Skip to content

Commit bb97a7b

Browse files
committed
Follow up to #77
1 parent 519fcf3 commit bb97a7b

File tree

3 files changed

+53
-35
lines changed

3 files changed

+53
-35
lines changed

ice/src/main/java/com/altinity/ice/cli/Main.java

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -271,12 +271,12 @@ void insert(
271271
arity = "0..*",
272272
paramLabel = "<files>",
273273
description = "/path/to/file.parquet")
274-
String[] dataFiles,
274+
String[] files,
275275
@CommandLine.Option(
276-
names = "--from-file",
276+
names = "--files-from",
277277
description =
278278
"Read list of parquet files from the specified file (one file per line)")
279-
String fromFile,
279+
String filesFrom,
280280
@CommandLine.Option(
281281
names = "--no-copy",
282282
description = "Add files to catalog without copying them")
@@ -324,6 +324,11 @@ void insert(
324324
"/path/to/file where to save list of files to retry"
325325
+ " (useful for retrying partially failed insert using `cat ice.retry | ice insert - --retry-list=ice.retry`)")
326326
String retryList,
327+
@CommandLine.Option(
328+
names = {"--retry-list-exit-code"},
329+
description =
330+
"Exit code to return when insert produces non-empty --retry-list file (default: 0)")
331+
int retryListExitCode,
327332
@CommandLine.Option(
328333
names = {"--partition"},
329334
description =
@@ -361,27 +366,30 @@ void insert(
361366
throw new UnsupportedOperationException(
362367
"--s3-no-sign-request + --s3-copy-object is not supported by AWS (see --help for details)");
363368
}
369+
boolean filesSet = files != null && files.length > 0;
370+
boolean filesFromSet = !Strings.isNullOrEmpty(filesFrom);
371+
if (!filesSet && !filesFromSet) {
372+
throw new IllegalArgumentException(
373+
"At least one <files> argument or --files-from is required");
374+
}
375+
if (filesSet && filesFromSet) {
376+
throw new IllegalArgumentException(
377+
"<files> arguments and --files-from are mutually exclusive");
378+
}
364379
setAWSRegion(s3Region);
365380
try (RESTCatalog catalog = loadCatalog()) {
366-
// Handle --from-file option
367-
if (!Strings.isNullOrEmpty(fromFile)) {
368-
if (dataFiles != null && dataFiles.length > 0) {
369-
throw new IllegalArgumentException("Cannot specify both --from-file and file arguments");
370-
}
371-
dataFiles = readInputFromFile(fromFile).toArray(new String[0]);
372-
if (dataFiles.length == 0) {
381+
if (filesFromSet) {
382+
files = readInputFromFile(filesFrom).toArray(new String[0]);
383+
if (files.length == 0) {
373384
logger.info("Nothing to insert (file empty)");
374385
return;
375386
}
376-
} else if (dataFiles != null && dataFiles.length == 1 && "-".equals(dataFiles[0])) {
377-
dataFiles = readInput().toArray(new String[0]);
378-
if (dataFiles.length == 0) {
387+
} else if (files.length == 1 && "-".equals(files[0])) {
388+
files = readInput().toArray(new String[0]);
389+
if (files.length == 0) {
379390
logger.info("Nothing to insert (stdin empty)");
380391
return;
381392
}
382-
} else if (dataFiles == null || dataFiles.length == 0) {
383-
throw new IllegalArgumentException(
384-
"No files specified. Provide files as arguments or use --from-file");
385393
}
386394

387395
List<IcePartition> partitions = null;
@@ -405,7 +413,7 @@ void insert(
405413
CreateTable.run(
406414
catalog,
407415
tableId,
408-
dataFiles[0],
416+
files[0],
409417
null,
410418
createTableIfNotExists,
411419
useVendedCredentials,
@@ -436,12 +444,14 @@ void insert(
436444
.build();
437445

438446
if (!watchMode) {
439-
Insert.Result result = Insert.run(catalog, tableId, dataFiles, options);
440-
if (result.anyFilesFailed()) {
447+
Insert.Result result = Insert.run(catalog, tableId, files, options);
448+
if (!result.ok()) {
441449
logger.error(
442-
"{} file(s) failed to insert. Check logs or retry list for details.",
443-
result.failedCount());
444-
System.exit(1);
450+
"{}/{} file(s) failed to insert (see {})",
451+
result.totalNumberOfFiles(),
452+
result.numberOfFilesFailedToInsert(),
453+
retryList);
454+
System.exit(retryListExitCode);
445455
}
446456
} else {
447457
if (!Strings.isNullOrEmpty(watchDebugAddr)) {
@@ -459,7 +469,7 @@ void insert(
459469
}
460470

461471
InsertWatch.run(
462-
catalog, tableId, dataFiles, watch, watchFireOnce, createTableIfNotExists, options);
472+
catalog, tableId, files, watch, watchFireOnce, createTableIfNotExists, options);
463473
}
464474
}
465475
}

ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import java.util.concurrent.Executors;
3636
import java.util.concurrent.Future;
3737
import java.util.concurrent.TimeUnit;
38-
import java.util.concurrent.atomic.AtomicInteger;
3938
import java.util.function.Function;
4039
import java.util.function.Supplier;
4140
import java.util.stream.Collectors;
@@ -93,15 +92,20 @@ public final class Insert {
9392

9493
private Insert() {}
9594

96-
public record Result(boolean anyFilesProcessed, boolean anyFilesFailed, int failedCount) {}
95+
public record Result(int totalNumberOfFiles, int numberOfFilesFailedToInsert) {
96+
97+
public boolean ok() {
98+
return numberOfFilesFailedToInsert == 0;
99+
}
100+
}
97101

98102
// TODO: refactor
99103
public static Result run(
100104
RESTCatalog catalog, TableIdentifier nsTable, String[] files, Options options)
101105
throws NoSuchTableException, IOException, InterruptedException {
102106
if (files.length == 0) {
103107
// no work to be done
104-
return new Result(false, false, 0);
108+
return new Result(0, 0);
105109
}
106110

107111
Table table = catalog.loadTable(nsTable);
@@ -169,10 +173,10 @@ public static Result run(
169173
? new RetryLog(options.retryListFile)
170174
: null) {
171175
boolean atLeastOneFileAppended = false;
172-
final AtomicInteger failedCount = new AtomicInteger(0);
173176

174177
int numThreads = Math.min(options.threadCount(), filesExpanded.size());
175178
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
179+
int failed = 0;
176180
try {
177181
var futures = new ArrayList<Future<List<DataFile>>>();
178182
for (final String file : filesExpanded) {
@@ -206,7 +210,6 @@ public static Result run(
206210
logger.error(
207211
"{}: error (adding to retry list and continuing)", file, e);
208212
retryLog.add(file);
209-
failedCount.incrementAndGet();
210213
return Collections.emptyList();
211214
} else {
212215
throw new IOException(String.format("Error processing %s", file), e);
@@ -223,10 +226,10 @@ public static Result run(
223226
appendOp.appendFile(df); // Only main thread appends now
224227
}
225228
} catch (ExecutionException e) {
229+
failed++;
226230
if (retryLog == null) {
227231
throw new IOException("Error processing file(s)", e.getCause());
228232
}
229-
failedCount.incrementAndGet();
230233
}
231234
}
232235
} finally {
@@ -252,8 +255,7 @@ public static Result run(
252255
logger.warn("Table commit skipped (--no-commit)");
253256
}
254257

255-
int finalFailedCount = failedCount.get();
256-
return new Result(atLeastOneFileAppended, finalFailedCount > 0, finalFailedCount);
258+
return new Result(filesExpanded.size(), failed);
257259
}
258260
} finally {
259261
if (s3ClientLazy.hasValue()) {

ice/src/main/java/com/altinity/ice/cli/internal/cmd/InsertWatch.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,11 @@ public static void run(
122122
try {
123123
Insert.Result result =
124124
Insert.run(catalog, nsTable, insertBatch.toArray(String[]::new), options);
125-
if (result.anyFilesFailed()) {
126-
logger.warn("{} file(s) failed to insert in this batch", result.failedCount());
125+
if (!result.ok()) {
126+
logger.warn(
127+
"{}/{} file(s) failed to insert in this batch",
128+
result.totalNumberOfFiles(),
129+
result.numberOfFilesFailedToInsert());
127130
}
128131
} catch (NoSuchTableException e) {
129132
if (!createTableIfNotExists) {
@@ -151,8 +154,11 @@ public static void run(
151154
if (retryInsert) {
152155
Insert.Result result =
153156
Insert.run(catalog, nsTable, insertBatch.toArray(String[]::new), options);
154-
if (result.anyFilesFailed()) {
155-
logger.warn("{} file(s) failed to insert in this batch", result.failedCount());
157+
if (!result.ok()) {
158+
logger.warn(
159+
"{}/{} file(s) failed to insert in this batch",
160+
result.totalNumberOfFiles(),
161+
result.numberOfFilesFailedToInsert());
156162
}
157163
}
158164
}

0 commit comments

Comments
 (0)