Skip to content

Commit 24ab9b4

Browse files
committed
ice: Use multipart uploads instead of copyObject to overcome s3:CopyObject 5 GiB limit
1 parent fcff741 commit 24ab9b4

File tree

5 files changed

+196
-13
lines changed

5 files changed

+196
-13
lines changed

ice/src/main/java/com/altinity/ice/cli/Main.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,9 +246,14 @@ void insert(
246246
@CommandLine.Option(
247247
names = "--s3-copy-object",
248248
description =
249-
"Avoid download/upload by using https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html for copying S3 objects."
249+
"Avoid download/upload by using https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html or https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html for copying S3 objects."
250250
+ " Note that AWS does not support copying objects anonymously (i.e. you can't use this flag to copy objects from public buckets like https://registry.opendata.aws/aws-public-blockchain/).")
251251
boolean s3CopyObject,
252+
@CommandLine.Option(
253+
names = "--s3-multipart-upload-thread-count",
254+
description = "Number of threads to use for uploading multiparts.",
255+
defaultValue = "4")
256+
int s3MultipartUploadThreadCount,
252257
@CommandLine.Option(names = "--no-commit", description = "Skip transaction commit")
253258
boolean noCommit,
254259
@CommandLine.Option(
@@ -350,6 +355,7 @@ void insert(
350355
.forceTableAuth(forceTableAuth)
351356
.s3NoSignRequest(s3NoSignRequest)
352357
.s3CopyObject(s3CopyObject)
358+
.s3MultipartUploadThreadCount(s3MultipartUploadThreadCount)
353359
.assumeSorted(assumeSorted)
354360
.ignoreNotFound(watchMode)
355361
.retryListFile(retryList)

ice/src/main/java/com/altinity/ice/cli/internal/cmd/CreateTable.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
4040
import org.apache.parquet.schema.MessageType;
4141
import software.amazon.awssdk.services.s3.S3Client;
42+
import software.amazon.awssdk.services.s3.internal.crossregion.S3CrossRegionSyncClient;
4243
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
4344
import software.amazon.awssdk.utils.Lazy;
4445

@@ -60,7 +61,8 @@ public static void run(
6061
return;
6162
}
6263

63-
Lazy<S3Client> s3ClientLazy = new Lazy<>(() -> S3.newClient(s3NoSignRequest));
64+
Lazy<S3Client> s3ClientLazy =
65+
new Lazy<>(() -> new S3CrossRegionSyncClient(S3.newClient(s3NoSignRequest)));
6466

6567
if (schemaFile.startsWith("s3://") && schemaFile.contains("*")) {
6668
var b = S3.bucketPath(schemaFile);

ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
import com.altinity.ice.cli.internal.iceberg.io.Input;
1717
import com.altinity.ice.cli.internal.iceberg.parquet.Metadata;
1818
import com.altinity.ice.cli.internal.retry.RetryLog;
19+
import com.altinity.ice.cli.internal.s3.CopyObjectMultipart;
1920
import com.altinity.ice.cli.internal.s3.S3;
21+
import com.altinity.ice.internal.iceberg.io.SchemeFileIO;
2022
import com.altinity.ice.internal.strings.Strings;
2123
import java.io.IOException;
2224
import java.util.ArrayList;
@@ -82,6 +84,7 @@
8284
import org.slf4j.Logger;
8385
import org.slf4j.LoggerFactory;
8486
import software.amazon.awssdk.services.s3.S3Client;
87+
import software.amazon.awssdk.services.s3.internal.crossregion.S3CrossRegionSyncClient;
8588
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
8689
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
8790
import software.amazon.awssdk.utils.Lazy;
@@ -109,15 +112,21 @@ public static void run(
109112
try (FileIO tableIO = table.io()) {
110113
final Supplier<S3Client> s3ClientSupplier;
111114
if (options.forceTableAuth()) {
112-
if (!(tableIO instanceof S3FileIO)) {
115+
FileIO underlyingTableIO = tableIO;
116+
if (tableIO instanceof SchemeFileIO x) {
117+
underlyingTableIO = x.io(table.location());
118+
}
119+
if (!(underlyingTableIO instanceof S3FileIO)) {
113120
throw new UnsupportedOperationException(
114-
"--force-table-auth is currently only supported for s3:// tables");
121+
"--force-table-auth is currently only supported for s3:// tables"
122+
+ tableIO.getClass());
115123
}
116-
s3ClientSupplier = ((S3FileIO) tableIO)::client;
124+
s3ClientSupplier = ((S3FileIO) underlyingTableIO)::client;
117125
} else {
118126
s3ClientSupplier = () -> S3.newClient(options.s3NoSignRequest());
119127
}
120-
Lazy<S3Client> s3ClientLazy = new Lazy<>(s3ClientSupplier);
128+
Lazy<S3Client> s3ClientLazy =
129+
new Lazy<>(() -> new S3CrossRegionSyncClient(s3ClientSupplier.get()));
121130
try {
122131
var filesExpanded =
123132
Arrays.stream(files)
@@ -442,15 +451,20 @@ private static List<DataFile> processFile(
442451
}
443452
S3.BucketPath src = S3.bucketPath(dataFile);
444453
S3.BucketPath dst = S3.bucketPath(dstDataFile);
445-
logger.info("{}: fast copying to {}", file, dstDataFile);
454+
logger.info("{}: performing S3 server-side copy to {}", file, dstDataFile);
446455
CopyObjectRequest copyReq =
447456
CopyObjectRequest.builder()
448457
.sourceBucket(src.bucket())
449458
.sourceKey(src.path())
450459
.destinationBucket(dst.bucket())
451460
.destinationKey(dst.path())
452461
.build();
453-
s3ClientLazy.getValue().copyObject(copyReq);
462+
CopyObjectMultipart.run(
463+
s3ClientLazy.getValue(),
464+
copyReq,
465+
CopyObjectMultipart.Options.builder()
466+
.s3MultipartUploadThreadCount(options.s3MultipartUploadThreadCount)
467+
.build());
454468
dataFileSizeInBytes = inputFile.getLength();
455469
dataFile = dstDataFile;
456470
} else if (partitionSpec.isPartitioned() && partitionKey == null) {
@@ -738,6 +752,7 @@ public record Options(
738752
boolean forceTableAuth,
739753
boolean s3NoSignRequest,
740754
boolean s3CopyObject,
755+
int s3MultipartUploadThreadCount,
741756
boolean assumeSorted,
742757
boolean ignoreNotFound,
743758
@Nullable String retryListFile,
@@ -750,19 +765,20 @@ public static Builder builder() {
750765
}
751766

752767
public static final class Builder {
753-
DataFileNamingStrategy.Name dataFileNamingStrategy;
768+
private DataFileNamingStrategy.Name dataFileNamingStrategy;
754769
private boolean skipDuplicates;
755770
private boolean noCommit;
756771
private boolean noCopy;
757772
private boolean forceNoCopy;
758773
private boolean forceTableAuth;
759774
private boolean s3NoSignRequest;
760775
private boolean s3CopyObject;
776+
private int s3MultipartUploadThreadCount;
761777
private boolean assumeSorted;
762778
private boolean ignoreNotFound;
763-
String retryListFile;
764-
List<Main.IcePartition> partitionList = List.of();
765-
List<Main.IceSortOrder> sortOrderList = List.of();
779+
private String retryListFile;
780+
private List<Main.IcePartition> partitionList = List.of();
781+
private List<Main.IceSortOrder> sortOrderList = List.of();
766782
private int threadCount = Runtime.getRuntime().availableProcessors();
767783

768784
private Builder() {}
@@ -807,6 +823,11 @@ public Builder s3CopyObject(boolean s3CopyObject) {
807823
return this;
808824
}
809825

826+
public Builder s3MultipartUploadThreadCount(int s3MultipartUploadThreadCount) {
827+
this.s3MultipartUploadThreadCount = s3MultipartUploadThreadCount;
828+
return this;
829+
}
830+
810831
public Builder assumeSorted(boolean assumeSorted) {
811832
this.assumeSorted = assumeSorted;
812833
return this;
@@ -847,6 +868,7 @@ public Options build() {
847868
forceTableAuth,
848869
s3NoSignRequest,
849870
s3CopyObject,
871+
s3MultipartUploadThreadCount,
850872
assumeSorted,
851873
ignoreNotFound,
852874
retryListFile,
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*/
10+
package com.altinity.ice.cli.internal.s3;
11+
12+
import java.util.ArrayList;
13+
import java.util.List;
14+
import java.util.concurrent.CompletionException;
15+
import java.util.concurrent.ExecutionException;
16+
import java.util.concurrent.ExecutorService;
17+
import java.util.concurrent.Executors;
18+
import java.util.concurrent.Future;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
import software.amazon.awssdk.services.s3.S3Client;
22+
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
23+
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
24+
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
25+
import software.amazon.awssdk.services.s3.model.CompletedPart;
26+
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
27+
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
28+
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
29+
import software.amazon.awssdk.services.s3.model.UploadPartCopyRequest;
30+
import software.amazon.awssdk.services.s3.model.UploadPartCopyResponse;
31+
32+
public class CopyObjectMultipart {
33+
34+
private static final Logger logger = LoggerFactory.getLogger(CopyObjectMultipart.class);
35+
36+
// S3::CopyObject has 5GiB file limit. This version doesn't.
37+
public static void run(S3Client s3, CopyObjectRequest req, Options opts) {
38+
long partSize = 256L * 1024L * 1024L; // 256 MiB
39+
40+
long objectSize =
41+
s3.headObject(r -> r.bucket(req.sourceBucket()).key(req.sourceKey())).contentLength();
42+
43+
if (objectSize < partSize) {
44+
logger.info(
45+
"Copying {}/{} to {}/{}",
46+
req.sourceBucket(),
47+
req.sourceKey(),
48+
req.destinationBucket(),
49+
req.destinationKey());
50+
51+
s3.copyObject(req);
52+
return;
53+
}
54+
55+
CreateMultipartUploadResponse createResponse =
56+
s3.createMultipartUpload(
57+
CreateMultipartUploadRequest.builder()
58+
.bucket(req.destinationBucket())
59+
.key(req.destinationKey())
60+
.build());
61+
62+
String uploadId = createResponse.uploadId();
63+
int numberOfParts = (int) Math.ceilDiv(objectSize, partSize);
64+
65+
List<Future<CompletedPart>> futures = new ArrayList<>();
66+
try (ExecutorService executor =
67+
Executors.newFixedThreadPool(opts.s3MultipartUploadThreadCount)) {
68+
for (int partNumber = 1; partNumber <= numberOfParts; partNumber++) {
69+
final int p = partNumber;
70+
final long rangeStart = (p - 1) * partSize;
71+
final long rangeEnd = Math.min(rangeStart + partSize - 1, objectSize - 1);
72+
73+
futures.add(
74+
executor.submit(
75+
() -> {
76+
logger.info(
77+
"Copying {}/{}[{}:{}] to {}/{} (part {} of {})",
78+
req.sourceBucket(),
79+
req.sourceKey(),
80+
rangeStart,
81+
rangeEnd,
82+
req.destinationBucket(),
83+
req.destinationKey(),
84+
p,
85+
numberOfParts);
86+
87+
UploadPartCopyResponse re =
88+
s3.uploadPartCopy(
89+
UploadPartCopyRequest.builder()
90+
.sourceBucket(req.sourceBucket())
91+
.sourceKey(req.sourceKey())
92+
.destinationBucket(req.destinationBucket())
93+
.destinationKey(req.destinationKey())
94+
.uploadId(uploadId)
95+
.partNumber(p)
96+
.copySourceRange("bytes=" + rangeStart + "-" + rangeEnd)
97+
.build());
98+
99+
return CompletedPart.builder()
100+
.partNumber(p)
101+
.eTag(re.copyPartResult().eTag())
102+
.build();
103+
}));
104+
}
105+
}
106+
107+
List<CompletedPart> completedParts = new ArrayList<>();
108+
for (Future<CompletedPart> f : futures) {
109+
try {
110+
completedParts.add(f.get());
111+
} catch (InterruptedException | ExecutionException e) {
112+
// Cleanup.
113+
s3.abortMultipartUpload(
114+
AbortMultipartUploadRequest.builder()
115+
.bucket(req.destinationBucket())
116+
.key(req.destinationKey())
117+
.uploadId(uploadId)
118+
.build());
119+
throw new CompletionException(e);
120+
}
121+
}
122+
123+
s3.completeMultipartUpload(
124+
CompleteMultipartUploadRequest.builder()
125+
.bucket(req.destinationBucket())
126+
.key(req.destinationKey())
127+
.uploadId(uploadId)
128+
.multipartUpload(CompletedMultipartUpload.builder().parts(completedParts).build())
129+
.build());
130+
}
131+
132+
public record Options(int s3MultipartUploadThreadCount) {
133+
134+
public static Options.Builder builder() {
135+
return new Options.Builder();
136+
}
137+
138+
public static final class Builder {
139+
private int s3MultipartUploadThreadCount;
140+
141+
private Builder() {}
142+
143+
public Options.Builder s3MultipartUploadThreadCount(int s3MultipartUploadThreadCount) {
144+
this.s3MultipartUploadThreadCount = s3MultipartUploadThreadCount;
145+
return this;
146+
}
147+
148+
public Options build() {
149+
return new Options(s3MultipartUploadThreadCount);
150+
}
151+
}
152+
}
153+
}

ice/src/main/java/com/altinity/ice/internal/iceberg/io/SchemeFileIO.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public void close() {
138138
}
139139
}
140140

141-
private DelegateFileIO io(String location) {
141+
public DelegateFileIO io(String location) {
142142
String s = scheme(location);
143143
String impl = SCHEME_TO_FILE_IO.get(s);
144144
Preconditions.checkNotNull(

0 commit comments

Comments
 (0)