Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions ice/src/main/java/com/altinity/ice/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,13 @@ void insert(
description =
"/path/to/file where to save list of files to retry"
+ " (useful for retrying partially failed insert using `cat ice.retry | ice insert - --retry-list=ice.retry`)")
String retryList)
throws IOException {
String retryList,
@CommandLine.Option(
names = {"--thread-count"},
description = "Number of threads to use for inserting data",
defaultValue = "-1")
int threadCount)
throws IOException, InterruptedException {
if (s3NoSignRequest && s3CopyObject) {
throw new UnsupportedOperationException(
"--s3-no-sign-request + --s3-copy-object is not supported by AWS (see --help for details)");
Expand Down Expand Up @@ -203,7 +208,8 @@ void insert(
forceTableAuth,
s3NoSignRequest,
s3CopyObject,
retryList);
retryList,
threadCount < 1 ? Runtime.getRuntime().availableProcessors() : threadCount);
}
}

Expand Down
298 changes: 179 additions & 119 deletions ice/src/main/java/com/altinity/ice/internal/cmd/Insert.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import com.altinity.ice.internal.jvm.Stats;
import com.altinity.ice.internal.parquet.Metadata;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -66,26 +68,38 @@ public static void run(
boolean forceTableAuth,
boolean s3NoSignRequest,
boolean s3CopyObject,
String retryListFile)
throws IOException {
String retryListFile,
int threadCount)
throws IOException, InterruptedException {
if (files.length == 0) {
// no work to be done
return;
}
if (forceNoCopy) {
noCopy = true;
}
InsertOptions options =
InsertOptions.builder()
.skipDuplicates(skipDuplicates)
.noCommit(noCommit)
.noCopy(noCopy)
.forceNoCopy(forceNoCopy)
.forceTableAuth(forceTableAuth)
.s3NoSignRequest(s3NoSignRequest)
.s3CopyObject(s3CopyObject)
.threadCount(threadCount)
.build();

final InsertOptions finalOptions =
options.forceNoCopy() ? options.toBuilder().noCopy(true).build() : options;
Table table = catalog.loadTable(nsTable);
try (FileIO tableIO = table.io()) {
final Supplier<S3Client> s3ClientSupplier;
if (forceTableAuth) {
if (finalOptions.forceTableAuth()) {
if (!(tableIO instanceof S3FileIO)) {
throw new UnsupportedOperationException(
"--force-table-auth is currently only supported for s3:// tables");
}
s3ClientSupplier = ((S3FileIO) tableIO)::client;
} else {
s3ClientSupplier = () -> S3.newClient(s3NoSignRequest);
s3ClientSupplier = () -> S3.newClient(finalOptions.s3NoSignRequest());
}
Lazy<S3Client> s3ClientLazy = new Lazy<>(s3ClientSupplier);
try {
Expand Down Expand Up @@ -136,125 +150,62 @@ public static void run(
: null) {
boolean atLeastOneFileAppended = false;

// TODO: parallel
for (final String file : filesExpanded) {
DataFile df;
try {
logger.info("{}: processing", file);
logger.info("{}: jvm: {}", file, Stats.gather());

Function<String, Boolean> checkNotExists =
dataFile -> {
if (tableDataFiles.contains(dataFile)) {
if (skipDuplicates) {
logger.info("{}: duplicate (skipping)", file);
return true;
}
throw new AlreadyExistsException(
String.format("%s is already referenced by the table", dataFile));
}
return false;
};
int numThreads = Math.min(finalOptions.threadCount(), filesExpanded.size());
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
try {
var futures = new ArrayList<Future<DataFile>>();
for (final String file : filesExpanded) {
futures.add(
executor.submit(
() -> {
try {
return processFile(
table,
catalog,
tableIO,
inputIO,
tableDataFiles,
finalOptions,
s3ClientLazy,
dstDataFileSource,
tableSchema,
dataFileNamingStrategy,
file);
} catch (Exception e) {
if (retryLog != null) {
logger.error(
"{}: error (adding to retry list and continuing)", file, e);
retryLog.add(file);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self: retryLog::add is thread-safe so this is fine

return null;
} else {
throw e;
}
}
}));
}

InputFile inputFile =
Input.newFile(file, catalog, inputIO == null ? tableIO : inputIO);
ParquetMetadata metadata = Metadata.read(inputFile);
MessageType type = metadata.getFileMetaData().getSchema();
Schema fileSchema =
ParquetSchemaUtil.convert(type); // nameMapping applied (when present)
if (!sameSchema(table, fileSchema)) {
throw new BadRequestException(
String.format("%s's schema doesn't match table's schema", file));
}
// assuming datafiles can be anywhere when table.location() is empty
var noCopyPossible = file.startsWith(table.location()) || forceNoCopy;
// TODO: check before uploading anything
if (noCopy && !noCopyPossible) {
throw new BadRequestException(
file + " cannot be added to catalog without copy"); // TODO: explain
}
long dataFileSizeInBytes;
var dataFile = replacePrefix(file, "s3a://", "s3://");
if (noCopy) {
if (checkNotExists.apply(dataFile)) {
continue;
}
dataFileSizeInBytes = inputFile.getLength();
} else if (s3CopyObject) {
if (!dataFile.startsWith("s3://") || !table.location().startsWith("s3://")) {
throw new BadRequestException(
"--s3-copy-object is only supported between s3:// buckets");
for (var future : futures) {
try {
DataFile df = future.get();
if (df != null) {
atLeastOneFileAppended = true;
appendOp.appendFile(df);
}
String dstDataFile = dstDataFileSource.get(file);
if (checkNotExists.apply(dstDataFile)) {
continue;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while processing files", e);
} catch (ExecutionException e) {
if (retryLog == null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like queued up tasks are not canceled;
ExecutorService.close() executed when we exit

try (ExecutorService executor = Executors.newFixedThreadPool(numThreads)) {

on line 157 calls shutdown() (not shutdownNow()), i.e. it's going to block waiting for queued up tasks to complete even though tx won't be committed.

throw new IOException("Error processing files", e.getCause());
}
S3.BucketPath src = S3.bucketPath(dataFile);
S3.BucketPath dst = S3.bucketPath(dstDataFile);
logger.info("{}: fast copying to {}", file, dstDataFile);
CopyObjectRequest copyReq =
CopyObjectRequest.builder()
.sourceBucket(src.bucket())
.sourceKey(src.path())
.destinationBucket(dst.bucket())
.destinationKey(dst.path())
.build();
s3ClientLazy.getValue().copyObject(copyReq);
dataFileSizeInBytes = inputFile.getLength();
dataFile = dstDataFile;
} else {
String dstDataFile = dstDataFileSource.get(file);
if (checkNotExists.apply(dstDataFile)) {
continue;
}
OutputFile outputFile =
tableIO.newOutputFile(replacePrefix(dstDataFile, "s3://", "s3a://"));
// TODO: support transferTo below (note that compression, etc. might be different)
// try (var d = outputFile.create()) { try (var s = inputFile.newStream()) {
// s.transferTo(d); }}
Parquet.ReadBuilder readBuilder =
Parquet.read(inputFile)
.createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s))
.project(tableSchema); // TODO: ?
// TODO: reuseContainers?
Parquet.WriteBuilder writeBuilder =
Parquet.write(outputFile)
.overwrite(
dataFileNamingStrategy == DataFileNamingStrategy.Name.INPUT_FILENAME)
.createWriterFunc(GenericParquetWriter::buildWriter)
.schema(tableSchema);
logger.info("{}: copying to {}", file, dstDataFile);
// file size may have changed due to different compression, etc.
dataFileSizeInBytes = copy(readBuilder, writeBuilder);
dataFile = dstDataFile;
}
logger.info("{}: adding data file", file);
long recordCount =
metadata.getBlocks().stream().mapToLong(BlockMetaData::getRowCount).sum();
MetricsConfig metricsConfig = MetricsConfig.forTable(table);
Metrics metrics = ParquetUtil.fileMetrics(inputFile, metricsConfig);
df =
new DataFiles.Builder(table.spec())
.withPath(dataFile)
.withFormat("PARQUET")
.withRecordCount(recordCount)
.withFileSizeInBytes(dataFileSizeInBytes)
.withMetrics(metrics)
.build();
} catch (Exception e) { // FIXME
if (retryLog != null) {
logger.error("{}: error (adding to retry list and continuing)", file, e);
retryLog.add(file);
continue;
} else {
throw e;
}
}
atLeastOneFileAppended = true;
appendOp.appendFile(df);
} finally {
executor.awaitTermination(1, TimeUnit.MINUTES);
executor.shutdownNow();
}

if (!noCommit) {
if (!finalOptions.noCommit()) {
// TODO: log
if (atLeastOneFileAppended) {
appendOp.commit();
Expand All @@ -276,6 +227,115 @@ public static void run(
}
}

private static DataFile processFile(
Table table,
RESTCatalog catalog,
FileIO tableIO,
FileIO inputIO,
Set<String> tableDataFiles,
InsertOptions options,
Lazy<S3Client> s3ClientLazy,
DataFileNamingStrategy dstDataFileSource,
Schema tableSchema,
DataFileNamingStrategy.Name dataFileNamingStrategy,
String file)
throws IOException {
logger.info("{}: processing", file);
logger.info("{}: jvm: {}", file, Stats.gather());

Function<String, Boolean> checkNotExists =
dataFile -> {
if (tableDataFiles.contains(dataFile)) {
if (options.skipDuplicates()) {
logger.info("{}: duplicate (skipping)", file);
return true;
}
throw new AlreadyExistsException(
String.format("%s is already referenced by the table", dataFile));
}
return false;
};

InputFile inputFile = Input.newFile(file, catalog, inputIO == null ? tableIO : inputIO);
ParquetMetadata metadata = Metadata.read(inputFile);
MessageType type = metadata.getFileMetaData().getSchema();
Schema fileSchema = ParquetSchemaUtil.convert(type); // nameMapping applied (when present)
if (!sameSchema(table, fileSchema)) {
throw new BadRequestException(
String.format("%s's schema doesn't match table's schema", file));
}
// assuming datafiles can be anywhere when table.location() is empty
var noCopyPossible = file.startsWith(table.location()) || options.forceNoCopy();
// TODO: check before uploading anything
if (options.noCopy() && !noCopyPossible) {
throw new BadRequestException(
file + " cannot be added to catalog without copy"); // TODO: explain
}
long dataFileSizeInBytes;
var dataFile = replacePrefix(file, "s3a://", "s3://");
if (options.noCopy()) {
if (checkNotExists.apply(dataFile)) {
return null;
}
dataFileSizeInBytes = inputFile.getLength();
} else if (options.s3CopyObject()) {
if (!dataFile.startsWith("s3://") || !table.location().startsWith("s3://")) {
throw new BadRequestException("--s3-copy-object is only supported between s3:// buckets");
}
String dstDataFile = dstDataFileSource.get(file);
if (checkNotExists.apply(dstDataFile)) {
return null;
}
S3.BucketPath src = S3.bucketPath(dataFile);
S3.BucketPath dst = S3.bucketPath(dstDataFile);
logger.info("{}: fast copying to {}", file, dstDataFile);
CopyObjectRequest copyReq =
CopyObjectRequest.builder()
.sourceBucket(src.bucket())
.sourceKey(src.path())
.destinationBucket(dst.bucket())
.destinationKey(dst.path())
.build();
s3ClientLazy.getValue().copyObject(copyReq);
dataFileSizeInBytes = inputFile.getLength();
dataFile = dstDataFile;
} else {
String dstDataFile = dstDataFileSource.get(file);
if (checkNotExists.apply(dstDataFile)) {
return null;
}
OutputFile outputFile = tableIO.newOutputFile(replacePrefix(dstDataFile, "s3://", "s3a://"));
// TODO: support transferTo below (note that compression, etc. might be different)
// try (var d = outputFile.create()) { try (var s = inputFile.newStream()) {
// s.transferTo(d); }}
Parquet.ReadBuilder readBuilder =
Parquet.read(inputFile)
.createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s))
.project(tableSchema); // TODO: ?
// TODO: reuseContainers?
Parquet.WriteBuilder writeBuilder =
Parquet.write(outputFile)
.overwrite(dataFileNamingStrategy == DataFileNamingStrategy.Name.INPUT_FILENAME)
.createWriterFunc(GenericParquetWriter::buildWriter)
.schema(tableSchema);
logger.info("{}: copying to {}", file, dstDataFile);
// file size may have changed due to different compression, etc.
dataFileSizeInBytes = copy(readBuilder, writeBuilder);
dataFile = dstDataFile;
}
logger.info("{}: adding data file", file);
long recordCount = metadata.getBlocks().stream().mapToLong(BlockMetaData::getRowCount).sum();
MetricsConfig metricsConfig = MetricsConfig.forTable(table);
Metrics metrics = ParquetUtil.fileMetrics(inputFile, metricsConfig);
return new DataFiles.Builder(table.spec())
.withPath(dataFile)
.withFormat("PARQUET")
.withRecordCount(recordCount)
.withFileSizeInBytes(dataFileSizeInBytes)
.withMetrics(metrics)
.build();
}

private static boolean sameSchema(Table table, Schema fileSchema) {
boolean sameSchema;
Schema tableSchema = table.schema();
Expand Down
Loading