Skip to content

Commit c1a71ff

Browse files
bcullyelasticsearchmachineDaveCTurner
authored
BlobContainer: add copyBlob method (#125737)
* BlobContainer: add copyBlob method If a container implements copyBlob, then the copy is performed by the store, without client-side IO. If the store does not provide a copy operation then the default implementation throws UnsupportedOperationException. This change provides implementations for the FS and S3 blob containers. More will follow. Co-authored-by: elasticsearchmachine <[email protected]> Co-authored-by: David Turner <[email protected]>
1 parent 44507cc commit c1a71ff

File tree

21 files changed

+754
-97
lines changed

21 files changed

+754
-97
lines changed

modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.nio.charset.StandardCharsets;
4949
import java.nio.file.NoSuchFileException;
5050
import java.util.ArrayList;
51+
import java.util.Arrays;
5152
import java.util.Base64;
5253
import java.util.Collection;
5354
import java.util.Collections;
@@ -241,7 +242,7 @@ protected String requestUniqueId(final HttpExchange exchange) {
241242
private static class AzureHTTPStatsCollectorHandler extends HttpStatsCollectorHandler {
242243

243244
private AzureHTTPStatsCollectorHandler(HttpHandler delegate) {
244-
super(delegate);
245+
super(delegate, Arrays.stream(AzureBlobStore.Operation.values()).map(AzureBlobStore.Operation::getKey).toArray(String[]::new));
245246
}
246247

247248
@Override

modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ protected boolean canFailRequest(final HttpExchange exchange) {
364364
private static class GoogleCloudStorageStatsCollectorHttpHandler extends HttpStatsCollectorHandler {
365365

366366
GoogleCloudStorageStatsCollectorHttpHandler(final HttpHandler delegate) {
367-
super(delegate);
367+
super(delegate, Arrays.stream(StorageOperation.values()).map(StorageOperation::key).toArray(String[]::new));
368368
}
369369

370370
@Override

modules/repository-s3/qa/third-party/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,4 +228,56 @@ public void testReadFromPositionLargerThanBlobLength() {
228228
e -> asInstanceOf(AmazonS3Exception.class, e.getCause()).getStatusCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()
229229
);
230230
}
231+
232+
public void testCopy() {
233+
final var sourceBlobName = randomIdentifier();
234+
final var blobBytes = randomBytesReference(randomIntBetween(100, 2_000));
235+
final var destinationBlobName = randomIdentifier();
236+
237+
final var repository = getRepository();
238+
239+
final var targetBytes = executeOnBlobStore(repository, sourceBlobContainer -> {
240+
sourceBlobContainer.writeBlob(randomPurpose(), sourceBlobName, blobBytes, true);
241+
242+
final var destinationBlobContainer = repository.blobStore().blobContainer(repository.basePath().add("target"));
243+
destinationBlobContainer.copyBlob(
244+
randomPurpose(),
245+
sourceBlobContainer,
246+
sourceBlobName,
247+
destinationBlobName,
248+
blobBytes.length()
249+
);
250+
251+
return destinationBlobContainer.readBlob(randomPurpose(), destinationBlobName).readAllBytes();
252+
});
253+
254+
assertArrayEquals(BytesReference.toBytes(blobBytes), targetBytes);
255+
}
256+
257+
public void testMultipartCopy() {
258+
final var sourceBlobName = randomIdentifier();
259+
// executeMultipart requires a minimum part size of 5 MiB
260+
final var blobBytes = randomBytesReference(randomIntBetween(5 * 1024 * 1024, 10 * 1024 * 1024));
261+
final var destinationBlobName = randomIdentifier();
262+
263+
final var repository = getRepository();
264+
265+
final var targetBytes = executeOnBlobStore(repository, sourceBlobContainer -> {
266+
sourceBlobContainer.writeBlob(randomPurpose(), sourceBlobName, blobBytes, true);
267+
268+
final S3BlobContainer destinationBlobContainer = (S3BlobContainer) repository.blobStore()
269+
.blobContainer(repository.basePath().add("target"));
270+
destinationBlobContainer.executeMultipartCopy(
271+
randomPurpose(),
272+
(S3BlobContainer) sourceBlobContainer,
273+
sourceBlobName,
274+
destinationBlobName,
275+
blobBytes.length()
276+
);
277+
278+
return destinationBlobContainer.readBlob(randomPurpose(), destinationBlobName).readAllBytes();
279+
});
280+
281+
assertArrayEquals(BytesReference.toBytes(blobBytes), targetBytes);
282+
}
231283
}

modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import java.io.IOException;
7272
import java.io.InputStream;
7373
import java.util.ArrayList;
74+
import java.util.Arrays;
7475
import java.util.Collection;
7576
import java.util.Collections;
7677
import java.util.EnumSet;
@@ -621,6 +622,12 @@ long getLargeBlobThresholdInBytes() {
621622
return ByteSizeUnit.MB.toBytes(1L);
622623
}
623624

625+
@Override
626+
long getMaxCopySizeBeforeMultipart() {
627+
// on my laptop 10K exercises this better but larger values should be fine for nightlies
628+
return ByteSizeUnit.MB.toBytes(1L);
629+
}
630+
624631
@Override
625632
void ensureMultiPartUploadSize(long blobSize) {}
626633
};
@@ -688,7 +695,7 @@ protected class S3StatsCollectorHttpHandler extends HttpStatsCollectorHandler {
688695
private final Map<S3BlobStore.StatsKey, AtomicLong> metricsCount = ConcurrentCollections.newConcurrentMap();
689696

690697
S3StatsCollectorHttpHandler(final HttpHandler delegate) {
691-
super(delegate);
698+
super(delegate, Arrays.stream(S3BlobStore.Operation.values()).map(S3BlobStore.Operation::getKey).toArray(String[]::new));
692699
}
693700

694701
private S3HttpHandler.S3Request parseRequest(HttpExchange exchange) {
@@ -736,9 +743,17 @@ public void maybeTrack(HttpExchange exchange) {
736743
k -> new AtomicLong()
737744
).incrementAndGet();
738745
} else if (request.isPutObjectRequest()) {
739-
trackRequest("PutObject");
740-
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.PUT_OBJECT, purpose), k -> new AtomicLong())
741-
.incrementAndGet();
746+
if (exchange.getRequestHeaders().containsKey(S3BlobStore.CUSTOM_QUERY_PARAMETER_COPY_SOURCE)) {
747+
trackRequest("CopyObject");
748+
metricsCount.computeIfAbsent(
749+
new S3BlobStore.StatsKey(S3BlobStore.Operation.COPY_OBJECT, purpose),
750+
k -> new AtomicLong()
751+
).incrementAndGet();
752+
} else {
753+
trackRequest("PutObject");
754+
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.PUT_OBJECT, purpose), k -> new AtomicLong())
755+
.incrementAndGet();
756+
}
742757
} else if (request.isMultiObjectDeleteRequest()) {
743758
trackRequest("DeleteObjects");
744759
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.DELETE_OBJECTS, purpose), k -> new AtomicLong())

0 commit comments

Comments
 (0)