Skip to content

Commit 80698dd

Browse files
bcullyelasticsearchmachineDaveCTurner
committed
BlobContainer: add copyBlob method
If a container implements copyBlob, then the copy is performed by the store, without client-side IO. If the store does not provide a copy operation then the default implementation throws UnsupportedOperationException. This change provides implementations for the FS and S3 blob containers. More will follow. Co-authored-by: elasticsearchmachine <[email protected]> Co-authored-by: David Turner <[email protected]> (cherry picked from commit c1a71ff)
1 parent f248253 commit 80698dd

File tree

23 files changed

+811
-97
lines changed

23 files changed

+811
-97
lines changed

modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.nio.charset.StandardCharsets;
3838
import java.nio.file.NoSuchFileException;
3939
import java.util.ArrayList;
40+
import java.util.Arrays;
4041
import java.util.Base64;
4142
import java.util.Collection;
4243
import java.util.Collections;
@@ -200,7 +201,8 @@ private static class AzureHTTPStatsCollectorHandler extends HttpStatsCollectorHa
200201
private final Set<String> seenRequestIds = ConcurrentCollections.newConcurrentSet();
201202

202203
private AzureHTTPStatsCollectorHandler(HttpHandler delegate) {
203-
super(delegate);
204+
// minimal implementation of
205+
super(delegate, Arrays.stream(AzureBlobStore.Operation.values()).map(AzureBlobStore.Operation::getKey).toArray(String[]::new));
204206
}
205207

206208
@Override

modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,36 @@ public Map<String, Long> stats() {
637637
return stats.toMap();
638638
}
639639

640+
// partial backport of #113573
641+
// visible for testing
642+
enum Operation {
643+
GET_BLOB("GetBlob"),
644+
LIST_BLOBS("ListBlobs"),
645+
GET_BLOB_PROPERTIES("GetBlobProperties"),
646+
PUT_BLOB("PutBlob"),
647+
PUT_BLOCK("PutBlock"),
648+
PUT_BLOCK_LIST("PutBlockList");
649+
650+
private final String key;
651+
652+
public String getKey() {
653+
return key;
654+
}
655+
656+
Operation(String key) {
657+
this.key = key;
658+
}
659+
660+
public static Operation fromKey(String key) {
661+
for (Operation operation : Operation.values()) {
662+
if (operation.key.equals(key)) {
663+
return operation;
664+
}
665+
}
666+
throw new IllegalArgumentException("No matching key: " + key);
667+
}
668+
}
669+
640670
private static class Stats {
641671

642672
private final AtomicLong getOperations = new AtomicLong();

modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ private static class GoogleCloudStorageStatsCollectorHttpHandler extends HttpSta
339339
public static final Pattern contentRangeMatcher = Pattern.compile("bytes \\d+-(\\d+)/(\\d+)");
340340

341341
GoogleCloudStorageStatsCollectorHttpHandler(final HttpHandler delegate) {
342-
super(delegate);
342+
super(delegate, Arrays.stream(StorageOperation.values()).map(StorageOperation::key).toArray(String[]::new));
343343
}
344344

345345
@Override
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.repositories.gcs;
11+
12+
// partial backport of #125452
13+
public enum StorageOperation {
14+
INSERT("InsertObject"),
15+
GET("GetObject"),
16+
LIST("ListObjects");
17+
18+
final String key;
19+
20+
public String key() {
21+
return key;
22+
}
23+
24+
StorageOperation(String key) {
25+
this.key = key;
26+
}
27+
}

modules/repository-s3/qa/third-party/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,4 +228,56 @@ public void testReadFromPositionLargerThanBlobLength() {
228228
e -> asInstanceOf(AmazonS3Exception.class, e.getCause()).getStatusCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()
229229
);
230230
}
231+
232+
public void testCopy() {
233+
final var sourceBlobName = randomIdentifier();
234+
final var blobBytes = randomBytesReference(randomIntBetween(100, 2_000));
235+
final var destinationBlobName = randomIdentifier();
236+
237+
final var repository = getRepository();
238+
239+
final var targetBytes = executeOnBlobStore(repository, sourceBlobContainer -> {
240+
sourceBlobContainer.writeBlob(randomPurpose(), sourceBlobName, blobBytes, true);
241+
242+
final var destinationBlobContainer = repository.blobStore().blobContainer(repository.basePath().add("target"));
243+
destinationBlobContainer.copyBlob(
244+
randomPurpose(),
245+
sourceBlobContainer,
246+
sourceBlobName,
247+
destinationBlobName,
248+
blobBytes.length()
249+
);
250+
251+
return destinationBlobContainer.readBlob(randomPurpose(), destinationBlobName).readAllBytes();
252+
});
253+
254+
assertArrayEquals(BytesReference.toBytes(blobBytes), targetBytes);
255+
}
256+
257+
public void testMultipartCopy() {
258+
final var sourceBlobName = randomIdentifier();
259+
// executeMultipart requires a minimum part size of 5 MiB
260+
final var blobBytes = randomBytesReference(randomIntBetween(5 * 1024 * 1024, 10 * 1024 * 1024));
261+
final var destinationBlobName = randomIdentifier();
262+
263+
final var repository = getRepository();
264+
265+
final var targetBytes = executeOnBlobStore(repository, sourceBlobContainer -> {
266+
sourceBlobContainer.writeBlob(randomPurpose(), sourceBlobName, blobBytes, true);
267+
268+
final S3BlobContainer destinationBlobContainer = (S3BlobContainer) repository.blobStore()
269+
.blobContainer(repository.basePath().add("target"));
270+
destinationBlobContainer.executeMultipartCopy(
271+
randomPurpose(),
272+
(S3BlobContainer) sourceBlobContainer,
273+
sourceBlobName,
274+
destinationBlobName,
275+
blobBytes.length()
276+
);
277+
278+
return destinationBlobContainer.readBlob(randomPurpose(), destinationBlobName).readAllBytes();
279+
});
280+
281+
assertArrayEquals(BytesReference.toBytes(blobBytes), targetBytes);
282+
}
231283
}

modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import java.io.IOException;
7171
import java.io.InputStream;
7272
import java.util.ArrayList;
73+
import java.util.Arrays;
7374
import java.util.Collection;
7475
import java.util.Collections;
7576
import java.util.EnumSet;
@@ -600,6 +601,12 @@ long getLargeBlobThresholdInBytes() {
600601
return ByteSizeUnit.MB.toBytes(1L);
601602
}
602603

604+
@Override
605+
long getMaxCopySizeBeforeMultipart() {
606+
// on my laptop 10K exercises this better but larger values should be fine for nightlies
607+
return ByteSizeUnit.MB.toBytes(1L);
608+
}
609+
603610
@Override
604611
void ensureMultiPartUploadSize(long blobSize) {}
605612
};
@@ -667,7 +674,7 @@ protected class S3StatsCollectorHttpHandler extends HttpStatsCollectorHandler {
667674
private final Map<S3BlobStore.StatsKey, AtomicLong> metricsCount = ConcurrentCollections.newConcurrentMap();
668675

669676
S3StatsCollectorHttpHandler(final HttpHandler delegate) {
670-
super(delegate);
677+
super(delegate, Arrays.stream(S3BlobStore.Operation.values()).map(S3BlobStore.Operation::getKey).toArray(String[]::new));
671678
}
672679

673680
private S3HttpHandler.S3Request parseRequest(HttpExchange exchange) {
@@ -715,9 +722,17 @@ public void maybeTrack(HttpExchange exchange) {
715722
k -> new AtomicLong()
716723
).incrementAndGet();
717724
} else if (request.isPutObjectRequest()) {
718-
trackRequest("PutObject");
719-
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.PUT_OBJECT, purpose), k -> new AtomicLong())
720-
.incrementAndGet();
725+
if (exchange.getRequestHeaders().containsKey(S3BlobStore.CUSTOM_QUERY_PARAMETER_COPY_SOURCE)) {
726+
trackRequest("CopyObject");
727+
metricsCount.computeIfAbsent(
728+
new S3BlobStore.StatsKey(S3BlobStore.Operation.COPY_OBJECT, purpose),
729+
k -> new AtomicLong()
730+
).incrementAndGet();
731+
} else {
732+
trackRequest("PutObject");
733+
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.PUT_OBJECT, purpose), k -> new AtomicLong())
734+
.incrementAndGet();
735+
}
721736
} else if (request.isMultiObjectDeleteRequest()) {
722737
trackRequest("DeleteObjects");
723738
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.DELETE_OBJECTS, purpose), k -> new AtomicLong())

0 commit comments

Comments
 (0)