Skip to content

Commit 6339b20

Browse files
committed
ice/watch: Tweak handling of deleted files
1 parent ba7fba2 commit 6339b20

File tree

3 files changed

+41
-12
lines changed

3 files changed

+41
-12
lines changed

ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,17 @@
2929
import org.apache.iceberg.catalog.TableIdentifier;
3030
import org.apache.iceberg.exceptions.AlreadyExistsException;
3131
import org.apache.iceberg.exceptions.BadRequestException;
32+
import org.apache.iceberg.exceptions.NotFoundException;
3233
import org.apache.iceberg.io.InputFile;
3334
import org.apache.iceberg.mapping.MappingUtil;
3435
import org.apache.iceberg.mapping.NameMapping;
3536
import org.apache.iceberg.mapping.NameMappingParser;
3637
import org.apache.iceberg.parquet.ParquetSchemaUtil;
3738
import org.apache.iceberg.rest.RESTCatalog;
39+
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
3840
import org.apache.parquet.schema.MessageType;
3941
import software.amazon.awssdk.services.s3.S3Client;
42+
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
4043
import software.amazon.awssdk.utils.Lazy;
4144

4245
public final class CreateTable {
@@ -69,7 +72,15 @@ public static void run(
6972
}
7073
try (var inputIO = Input.newIO(schemaFile, null, s3ClientLazy)) {
7174
InputFile inputFile = Input.newFile(schemaFile, catalog, inputIO);
72-
MessageType type = Metadata.read(inputFile).getFileMetaData().getSchema();
75+
76+
ParquetMetadata metadata;
77+
try {
78+
metadata = Metadata.read(inputFile);
79+
} catch (NoSuchKeyException e) { // S3FileInput
80+
throw new NotFoundException(inputFile.location(), e);
81+
} // rethrow NotFoundException
82+
83+
MessageType type = metadata.getFileMetaData().getSchema();
7384
Schema fileSchema = ParquetSchemaUtil.convert(type);
7485
try {
7586
Map<String, String> props = null;

ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import org.slf4j.LoggerFactory;
8484
import software.amazon.awssdk.services.s3.S3Client;
8585
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
86+
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
8687
import software.amazon.awssdk.utils.Lazy;
8788

8889
public final class Insert {
@@ -364,9 +365,14 @@ private static List<DataFile> processFile(
364365
InputFile inputFile = Input.newFile(file, catalog, inputIO == null ? tableIO : inputIO);
365366
ParquetMetadata metadata;
366367
try {
367-
metadata = Metadata.read(inputFile);
368+
try {
369+
metadata = Metadata.read(inputFile);
370+
} catch (NoSuchKeyException e) { // S3FileInput
371+
throw new NotFoundException(inputFile.location(), e);
372+
}
368373
} catch (NotFoundException e) {
369374
if (options.ignoreNotFound) {
375+
logger.info("{}: not found (skipping)", file);
370376
return List.of();
371377
}
372378
throw e;

ice/src/main/java/com/altinity/ice/cli/internal/cmd/InsertWatch.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.function.Supplier;
2424
import org.apache.iceberg.catalog.TableIdentifier;
2525
import org.apache.iceberg.exceptions.NoSuchTableException;
26+
import org.apache.iceberg.exceptions.NotFoundException;
2627
import org.apache.iceberg.rest.RESTCatalog;
2728
import org.slf4j.Logger;
2829
import org.slf4j.LoggerFactory;
@@ -122,16 +123,27 @@ public static void run(
122123
if (!createTableIfNotExists) {
123124
throw e;
124125
}
125-
CreateTable.run(
126-
catalog,
127-
nsTable,
128-
insertBatch.iterator().next(),
129-
null,
130-
true,
131-
options.s3NoSignRequest(),
132-
null,
133-
null);
134-
Insert.run(catalog, nsTable, insertBatch.toArray(String[]::new), options);
126+
boolean retryInsert = true;
127+
try {
128+
CreateTable.run(
129+
catalog,
130+
nsTable,
131+
insertBatch.iterator().next(),
132+
null,
133+
true,
134+
options.s3NoSignRequest(),
135+
null,
136+
null);
137+
} catch (NotFoundException nfe) {
138+
if (!options.ignoreNotFound()) {
139+
throw nfe;
140+
}
141+
logger.info("Table not created ({} don't exist)", insertBatch);
142+
retryInsert = false;
143+
}
144+
if (retryInsert) {
145+
Insert.run(catalog, nsTable, insertBatch.toArray(String[]::new), options);
146+
}
135147
}
136148
}
137149

0 commit comments

Comments
 (0)