diff --git a/ice/src/main/java/com/altinity/ice/Main.java b/ice/src/main/java/com/altinity/ice/Main.java index a26ed39f..38ffbd4e 100644 --- a/ice/src/main/java/com/altinity/ice/Main.java +++ b/ice/src/main/java/com/altinity/ice/Main.java @@ -171,8 +171,13 @@ void insert( description = "/path/to/file where to save list of files to retry" + " (useful for retrying partially failed insert using `cat ice.retry | ice insert - --retry-list=ice.retry`)") - String retryList) - throws IOException { + String retryList, + @CommandLine.Option( + names = {"--thread-count"}, + description = "Number of threads to use for inserting data", + defaultValue = "-1") + int threadCount) + throws IOException, InterruptedException { if (s3NoSignRequest && s3CopyObject) { throw new UnsupportedOperationException( "--s3-no-sign-request + --s3-copy-object is not supported by AWS (see --help for details)"); @@ -203,7 +208,8 @@ void insert( forceTableAuth, s3NoSignRequest, s3CopyObject, - retryList); + retryList, + threadCount < 1 ? Runtime.getRuntime().availableProcessors() : threadCount); } } diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java index 2a47fc28..748c96dd 100644 --- a/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java @@ -7,10 +7,12 @@ import com.altinity.ice.internal.jvm.Stats; import com.altinity.ice.internal.parquet.Metadata; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.*; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -66,26 +68,38 @@ public static void run( boolean forceTableAuth, boolean s3NoSignRequest, boolean s3CopyObject, - String retryListFile) - throws IOException { + String retryListFile, + int threadCount) + throws IOException, InterruptedException { if (files.length == 0) { // no work to be done return; } - if (forceNoCopy) { - noCopy = true; - } + InsertOptions options = + InsertOptions.builder() + .skipDuplicates(skipDuplicates) + .noCommit(noCommit) + .noCopy(noCopy) + .forceNoCopy(forceNoCopy) + .forceTableAuth(forceTableAuth) + .s3NoSignRequest(s3NoSignRequest) + .s3CopyObject(s3CopyObject) + .threadCount(threadCount) + .build(); + + final InsertOptions finalOptions = + options.forceNoCopy() ? options.toBuilder().noCopy(true).build() : options; Table table = catalog.loadTable(nsTable); try (FileIO tableIO = table.io()) { final Supplier s3ClientSupplier; - if (forceTableAuth) { + if (finalOptions.forceTableAuth()) { if (!(tableIO instanceof S3FileIO)) { throw new UnsupportedOperationException( "--force-table-auth is currently only supported for s3:// tables"); } s3ClientSupplier = ((S3FileIO) tableIO)::client; } else { - s3ClientSupplier = () -> S3.newClient(s3NoSignRequest); + s3ClientSupplier = () -> S3.newClient(finalOptions.s3NoSignRequest()); } Lazy s3ClientLazy = new Lazy<>(s3ClientSupplier); try { @@ -136,125 +150,62 @@ public static void run( : null) { boolean atLeastOneFileAppended = false; - // TODO: parallel - for (final String file : filesExpanded) { - DataFile df; - try { - logger.info("{}: processing", file); - logger.info("{}: jvm: {}", file, Stats.gather()); - - Function checkNotExists = - dataFile -> { - if (tableDataFiles.contains(dataFile)) { - if (skipDuplicates) { - logger.info("{}: duplicate (skipping)", file); - return true; - } - throw new AlreadyExistsException( - String.format("%s is already referenced by the table", dataFile)); - } - return false; - }; + int numThreads = Math.min(finalOptions.threadCount(), filesExpanded.size()); + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + try { + var futures = new ArrayList>(); + for (final String file : filesExpanded) { + futures.add( + executor.submit( + () -> { + try { + return processFile( + table, + catalog, + tableIO, + inputIO, + tableDataFiles, + finalOptions, + s3ClientLazy, + dstDataFileSource, + tableSchema, + dataFileNamingStrategy, + file); + } catch (Exception e) { + if (retryLog != null) { + logger.error( + "{}: error (adding to retry list and continuing)", file, e); + retryLog.add(file); + return null; + } else { + throw e; + } + } + })); + } - InputFile inputFile = - Input.newFile(file, catalog, inputIO == null ? tableIO : inputIO); - ParquetMetadata metadata = Metadata.read(inputFile); - MessageType type = metadata.getFileMetaData().getSchema(); - Schema fileSchema = - ParquetSchemaUtil.convert(type); // nameMapping applied (when present) - if (!sameSchema(table, fileSchema)) { - throw new BadRequestException( - String.format("%s's schema doesn't match table's schema", file)); - } - // assuming datafiles can be anywhere when table.location() is empty - var noCopyPossible = file.startsWith(table.location()) || forceNoCopy; - // TODO: check before uploading anything - if (noCopy && !noCopyPossible) { - throw new BadRequestException( - file + " cannot be added to catalog without copy"); // TODO: explain - } - long dataFileSizeInBytes; - var dataFile = replacePrefix(file, "s3a://", "s3://"); - if (noCopy) { - if (checkNotExists.apply(dataFile)) { - continue; - } - dataFileSizeInBytes = inputFile.getLength(); - } else if (s3CopyObject) { - if (!dataFile.startsWith("s3://") || !table.location().startsWith("s3://")) { - throw new BadRequestException( - "--s3-copy-object is only supported between s3:// buckets"); + for (var future : futures) { + try { + DataFile df = future.get(); + if (df != null) { + atLeastOneFileAppended = true; + appendOp.appendFile(df); } - String dstDataFile = dstDataFileSource.get(file); - if (checkNotExists.apply(dstDataFile)) { - continue; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while processing files", e); + } catch (ExecutionException e) { + if (retryLog == null) { + throw new IOException("Error processing files", e.getCause()); } - S3.BucketPath src = S3.bucketPath(dataFile); - S3.BucketPath dst = S3.bucketPath(dstDataFile); - logger.info("{}: fast copying to {}", file, dstDataFile); - CopyObjectRequest copyReq = - CopyObjectRequest.builder() - .sourceBucket(src.bucket()) - .sourceKey(src.path()) - .destinationBucket(dst.bucket()) - .destinationKey(dst.path()) - .build(); - s3ClientLazy.getValue().copyObject(copyReq); - dataFileSizeInBytes = inputFile.getLength(); - dataFile = dstDataFile; - } else { - String dstDataFile = dstDataFileSource.get(file); - if (checkNotExists.apply(dstDataFile)) { - continue; - } - OutputFile outputFile = - tableIO.newOutputFile(replacePrefix(dstDataFile, "s3://", "s3a://")); - // TODO: support transferTo below (note that compression, etc. might be different) - // try (var d = outputFile.create()) { try (var s = inputFile.newStream()) { - // s.transferTo(d); }} - Parquet.ReadBuilder readBuilder = - Parquet.read(inputFile) - .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s)) - .project(tableSchema); // TODO: ? - // TODO: reuseContainers? - Parquet.WriteBuilder writeBuilder = - Parquet.write(outputFile) - .overwrite( - dataFileNamingStrategy == DataFileNamingStrategy.Name.INPUT_FILENAME) - .createWriterFunc(GenericParquetWriter::buildWriter) - .schema(tableSchema); - logger.info("{}: copying to {}", file, dstDataFile); - // file size may have changed due to different compression, etc. - dataFileSizeInBytes = copy(readBuilder, writeBuilder); - dataFile = dstDataFile; - } - logger.info("{}: adding data file", file); - long recordCount = - metadata.getBlocks().stream().mapToLong(BlockMetaData::getRowCount).sum(); - MetricsConfig metricsConfig = MetricsConfig.forTable(table); - Metrics metrics = ParquetUtil.fileMetrics(inputFile, metricsConfig); - df = - new DataFiles.Builder(table.spec()) - .withPath(dataFile) - .withFormat("PARQUET") - .withRecordCount(recordCount) - .withFileSizeInBytes(dataFileSizeInBytes) - .withMetrics(metrics) - .build(); - } catch (Exception e) { // FIXME - if (retryLog != null) { - logger.error("{}: error (adding to retry list and continuing)", file, e); - retryLog.add(file); - continue; - } else { - throw e; } } - atLeastOneFileAppended = true; - appendOp.appendFile(df); + } finally { + executor.awaitTermination(1, TimeUnit.MINUTES); + executor.shutdownNow(); } - if (!noCommit) { + if (!finalOptions.noCommit()) { // TODO: log if (atLeastOneFileAppended) { appendOp.commit(); @@ -276,6 +227,115 @@ public static void run( } } + private static DataFile processFile( + Table table, + RESTCatalog catalog, + FileIO tableIO, + FileIO inputIO, + Set tableDataFiles, + InsertOptions options, + Lazy s3ClientLazy, + DataFileNamingStrategy dstDataFileSource, + Schema tableSchema, + DataFileNamingStrategy.Name dataFileNamingStrategy, + String file) + throws IOException { + logger.info("{}: processing", file); + logger.info("{}: jvm: {}", file, Stats.gather()); + + Function checkNotExists = + dataFile -> { + if (tableDataFiles.contains(dataFile)) { + if (options.skipDuplicates()) { + logger.info("{}: duplicate (skipping)", file); + return true; + } + throw new AlreadyExistsException( + String.format("%s is already referenced by the table", dataFile)); + } + return false; + }; + + InputFile inputFile = Input.newFile(file, catalog, inputIO == null ? tableIO : inputIO); + ParquetMetadata metadata = Metadata.read(inputFile); + MessageType type = metadata.getFileMetaData().getSchema(); + Schema fileSchema = ParquetSchemaUtil.convert(type); // nameMapping applied (when present) + if (!sameSchema(table, fileSchema)) { + throw new BadRequestException( + String.format("%s's schema doesn't match table's schema", file)); + } + // assuming datafiles can be anywhere when table.location() is empty + var noCopyPossible = file.startsWith(table.location()) || options.forceNoCopy(); + // TODO: check before uploading anything + if (options.noCopy() && !noCopyPossible) { + throw new BadRequestException( + file + " cannot be added to catalog without copy"); // TODO: explain + } + long dataFileSizeInBytes; + var dataFile = replacePrefix(file, "s3a://", "s3://"); + if (options.noCopy()) { + if (checkNotExists.apply(dataFile)) { + return null; + } + dataFileSizeInBytes = inputFile.getLength(); + } else if (options.s3CopyObject()) { + if (!dataFile.startsWith("s3://") || !table.location().startsWith("s3://")) { + throw new BadRequestException("--s3-copy-object is only supported between s3:// buckets"); + } + String dstDataFile = dstDataFileSource.get(file); + if (checkNotExists.apply(dstDataFile)) { + return null; + } + S3.BucketPath src = S3.bucketPath(dataFile); + S3.BucketPath dst = S3.bucketPath(dstDataFile); + logger.info("{}: fast copying to {}", file, dstDataFile); + CopyObjectRequest copyReq = + CopyObjectRequest.builder() + .sourceBucket(src.bucket()) + .sourceKey(src.path()) + .destinationBucket(dst.bucket()) + .destinationKey(dst.path()) + .build(); + s3ClientLazy.getValue().copyObject(copyReq); + dataFileSizeInBytes = inputFile.getLength(); + dataFile = dstDataFile; + } else { + String dstDataFile = dstDataFileSource.get(file); + if (checkNotExists.apply(dstDataFile)) { + return null; + } + OutputFile outputFile = tableIO.newOutputFile(replacePrefix(dstDataFile, "s3://", "s3a://")); + // TODO: support transferTo below (note that compression, etc. might be different) + // try (var d = outputFile.create()) { try (var s = inputFile.newStream()) { + // s.transferTo(d); }} + Parquet.ReadBuilder readBuilder = + Parquet.read(inputFile) + .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s)) + .project(tableSchema); // TODO: ? + // TODO: reuseContainers? + Parquet.WriteBuilder writeBuilder = + Parquet.write(outputFile) + .overwrite(dataFileNamingStrategy == DataFileNamingStrategy.Name.INPUT_FILENAME) + .createWriterFunc(GenericParquetWriter::buildWriter) + .schema(tableSchema); + logger.info("{}: copying to {}", file, dstDataFile); + // file size may have changed due to different compression, etc. + dataFileSizeInBytes = copy(readBuilder, writeBuilder); + dataFile = dstDataFile; + } + logger.info("{}: adding data file", file); + long recordCount = metadata.getBlocks().stream().mapToLong(BlockMetaData::getRowCount).sum(); + MetricsConfig metricsConfig = MetricsConfig.forTable(table); + Metrics metrics = ParquetUtil.fileMetrics(inputFile, metricsConfig); + return new DataFiles.Builder(table.spec()) + .withPath(dataFile) + .withFormat("PARQUET") + .withRecordCount(recordCount) + .withFileSizeInBytes(dataFileSizeInBytes) + .withMetrics(metrics) + .build(); + } + private static boolean sameSchema(Table table, Schema fileSchema) { boolean sameSchema; Schema tableSchema = table.schema(); diff --git a/ice/src/main/java/com/altinity/ice/internal/cmd/InsertOptions.java b/ice/src/main/java/com/altinity/ice/internal/cmd/InsertOptions.java new file mode 100644 index 00000000..da85292c --- /dev/null +++ b/ice/src/main/java/com/altinity/ice/internal/cmd/InsertOptions.java @@ -0,0 +1,93 @@ +package com.altinity.ice.internal.cmd; + +public record InsertOptions( + boolean skipDuplicates, + boolean noCommit, + boolean noCopy, + boolean forceNoCopy, + boolean forceTableAuth, + boolean s3NoSignRequest, + boolean s3CopyObject, + int threadCount) { + + public static Builder builder() { + return new Builder(); + } + + public Builder toBuilder() { + return builder() + .skipDuplicates(skipDuplicates) + .noCommit(noCommit) + .noCopy(noCopy) + .forceNoCopy(forceNoCopy) + .forceTableAuth(forceTableAuth) + .s3NoSignRequest(s3NoSignRequest) + .s3CopyObject(s3CopyObject) + .threadCount(threadCount); + } + + public static final class Builder { + private boolean skipDuplicates; + private boolean noCommit; + private boolean noCopy; + private boolean forceNoCopy; + private boolean forceTableAuth; + private boolean s3NoSignRequest; + private boolean s3CopyObject; + private int threadCount = Runtime.getRuntime().availableProcessors(); + + private Builder() {} + + public Builder skipDuplicates(boolean skipDuplicates) { + this.skipDuplicates = skipDuplicates; + return this; + } + + public Builder noCommit(boolean noCommit) { + this.noCommit = noCommit; + return this; + } + + public Builder noCopy(boolean noCopy) { + this.noCopy = noCopy; + return this; + } + + public Builder forceNoCopy(boolean forceNoCopy) { + this.forceNoCopy = forceNoCopy; + return this; + } + + public Builder forceTableAuth(boolean forceTableAuth) { + this.forceTableAuth = forceTableAuth; + return this; + } + + public Builder s3NoSignRequest(boolean s3NoSignRequest) { + this.s3NoSignRequest = s3NoSignRequest; + return this; + } + + public Builder s3CopyObject(boolean s3CopyObject) { + this.s3CopyObject = s3CopyObject; + return this; + } + + public Builder threadCount(int threadCount) { + this.threadCount = threadCount; + return this; + } + + public InsertOptions build() { + return new InsertOptions( + skipDuplicates, + noCommit, + noCopy, + forceNoCopy, + forceTableAuth, + s3NoSignRequest, + s3CopyObject, + threadCount); + } + } +}