Skip to content

Commit 13722c8

Browse files
committed
BlobContainer: add copyBlob method
If a container implements copyBlob, then the copy is performed by the store, without client-side IO. If the store does not provide a copy operation then the default implementation throws UnsupportedOperationException. This change provides implementations for the FS and S3 blob containers. More will follow.
1 parent 930b4ab commit 13722c8

File tree

9 files changed

+212
-5
lines changed

9 files changed

+212
-5
lines changed

modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import java.io.IOException;
7373
import java.io.InputStream;
7474
import java.util.ArrayList;
75+
import java.util.Arrays;
7576
import java.util.Collection;
7677
import java.util.Collections;
7778
import java.util.EnumSet;
@@ -689,7 +690,7 @@ protected class S3StatsCollectorHttpHandler extends HttpStatsCollectorHandler {
689690
private final Map<S3BlobStore.StatsKey, AtomicLong> metricsCount = ConcurrentCollections.newConcurrentMap();
690691

691692
S3StatsCollectorHttpHandler(final HttpHandler delegate) {
692-
super(delegate);
693+
super(delegate, Arrays.stream(S3BlobStore.Operation.values()).map(S3BlobStore.Operation::getKey).toArray(String[]::new));
693694
}
694695

695696
@Override
@@ -732,6 +733,13 @@ public void maybeTrack(final String rawRequest, Headers requestHeaders) {
732733
trackRequest("GetObject");
733734
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.GET_OBJECT, purpose), k -> new AtomicLong())
734735
.incrementAndGet();
736+
} else if (Regex.simpleMatch("PUT /*/*", request)
737+
&& requestComponents.customQueryParameters().containsKey(S3BlobStore.CUSTOM_QUERY_PARAMETER_COPY_SOURCE)) {
738+
trackRequest("CopyObject");
739+
metricsCount.computeIfAbsent(
740+
new S3BlobStore.StatsKey(S3BlobStore.Operation.COPY_OBJECT, purpose),
741+
k -> new AtomicLong()
742+
).incrementAndGet();
735743
} else if (isMultiPartUpload(request)) {
736744
trackRequest("PutMultipartObject");
737745
metricsCount.computeIfAbsent(

modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,4 +228,23 @@ public void testReadFromPositionLargerThanBlobLength() {
228228
e -> asInstanceOf(AmazonS3Exception.class, e.getCause()).getStatusCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()
229229
);
230230
}
231+
232+
public void testCopy() {
233+
final var sourceBlobName = randomIdentifier();
234+
final var blobBytes = randomBytesReference(randomIntBetween(100, 2_000));
235+
final var targetBlobName = randomIdentifier();
236+
237+
final var repository = getRepository();
238+
239+
final var targetBytes = executeOnBlobStore(repository, sourceBlobContainer -> {
240+
sourceBlobContainer.writeBlob(randomPurpose(), sourceBlobName, blobBytes, true);
241+
242+
final var targetBlobContainer = repository.blobStore().blobContainer(repository.basePath().add("target"));
243+
sourceBlobContainer.copyBlob(randomPurpose(), sourceBlobName, targetBlobContainer, targetBlobName, false);
244+
245+
return sourceBlobContainer.readBlob(randomPurpose(), sourceBlobName).readAllBytes();
246+
});
247+
248+
assertArrayEquals(BytesReference.toBytes(blobBytes), targetBytes);
249+
}
231250
}

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
1515
import com.amazonaws.services.s3.model.AmazonS3Exception;
1616
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
17+
import com.amazonaws.services.s3.model.CopyObjectRequest;
1718
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
1819
import com.amazonaws.services.s3.model.GetObjectRequest;
1920
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
@@ -317,6 +318,57 @@ public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesRefe
317318
writeBlob(purpose, blobName, bytes, failIfAlreadyExists);
318319
}
319320

321+
/**
322+
* Perform server-side copy of a blob
323+
*
324+
* Server-side copy can be done for any size object, but if the object is larger than 5 GB then
325+
* it must be done through a series of part copy operations rather than a single blob copy.
326+
* See <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html">CopyObject</a>.
327+
* @param purpose The purpose of the operation
328+
* @param sourceBlobName The name of the blob to copy from
329+
* @param targetBlobContainer The blob container to copy the blob into
330+
* @param targetBlobName The name of the blob to copy to
331+
* @param failIfAlreadyExists Whether to throw a FileAlreadyExistsException if the target blob already exists
332+
* On S3, if true, throws UnsupportedOperationException because we don't know how
333+
* to do this atomically.
334+
* @throws IOException
335+
*/
336+
@Override
337+
public void copyBlob(
338+
OperationPurpose purpose,
339+
String sourceBlobName,
340+
BlobContainer targetBlobContainer,
341+
String targetBlobName,
342+
boolean failIfAlreadyExists
343+
) throws IOException {
344+
assert BlobContainer.assertPurposeConsistency(purpose, sourceBlobName);
345+
assert BlobContainer.assertPurposeConsistency(purpose, targetBlobName);
346+
if (targetBlobContainer instanceof S3BlobContainer == false) {
347+
throw new IllegalArgumentException("target blob container must be a S3BlobContainer");
348+
}
349+
if (failIfAlreadyExists) {
350+
throw new UnsupportedOperationException("S3 blob container does not support failIfAlreadyExists");
351+
}
352+
353+
final var s3TargetBlobContainer = (S3BlobContainer) targetBlobContainer;
354+
355+
// metadata is inherited from source, but not canned ACL or storage class
356+
final CopyObjectRequest copyRequest = new CopyObjectRequest(
357+
blobStore.bucket(),
358+
buildKey(sourceBlobName),
359+
s3TargetBlobContainer.blobStore.bucket(),
360+
s3TargetBlobContainer.buildKey(targetBlobName)
361+
).withCannedAccessControlList(blobStore.getCannedACL()).withStorageClass(blobStore.getStorageClass());
362+
363+
S3BlobStore.configureRequestForMetrics(copyRequest, blobStore, Operation.COPY_OBJECT, purpose);
364+
365+
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
366+
SocketAccess.doPrivilegedVoid(() -> { clientReference.client().copyObject(copyRequest); });
367+
} catch (final AmazonClientException e) {
368+
throw new IOException("Unable to copy object [" + sourceBlobName + "]", e);
369+
}
370+
}
371+
320372
@Override
321373
public DeleteResult delete(OperationPurpose purpose) throws IOException {
322374
final AtomicLong deletedBlobs = new AtomicLong();

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959

6060
class S3BlobStore implements BlobStore {
6161

62+
public static final String CUSTOM_QUERY_PARAMETER_COPY_SOURCE = "x-amz-copy-source";
6263
public static final String CUSTOM_QUERY_PARAMETER_PURPOSE = "x-purpose";
6364

6465
/**
@@ -250,7 +251,7 @@ private boolean assertConsistencyBetweenHttpRequestAndOperation(Request<?> reque
250251
case GET_OBJECT, LIST_OBJECTS -> {
251252
return request.getHttpMethod().name().equals("GET");
252253
}
253-
case PUT_OBJECT -> {
254+
case PUT_OBJECT, COPY_OBJECT -> {
254255
return request.getHttpMethod().name().equals("PUT");
255256
}
256257
case PUT_MULTIPART_OBJECT -> {
@@ -550,7 +551,8 @@ enum Operation {
550551
PUT_OBJECT("PutObject"),
551552
PUT_MULTIPART_OBJECT("PutMultipartObject"),
552553
DELETE_OBJECTS("DeleteObjects"),
553-
ABORT_MULTIPART_OBJECT("AbortMultipartObject");
554+
ABORT_MULTIPART_OBJECT("AbortMultipartObject"),
555+
COPY_OBJECT("CopyObject");
554556

555557
private final String key;
556558

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import com.amazonaws.services.s3.model.CannedAccessControlList;
1616
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
1717
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
18+
import com.amazonaws.services.s3.model.CopyObjectRequest;
19+
import com.amazonaws.services.s3.model.CopyObjectResult;
1820
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
1921
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
2022
import com.amazonaws.services.s3.model.ObjectMetadata;
@@ -358,6 +360,51 @@ public void testExecuteMultipartUploadAborted() {
358360
closeMockClient(blobStore);
359361
}
360362

363+
public void testCopy() throws Exception {
364+
final var sourceBucketName = randomAlphaOfLengthBetween(1, 10);
365+
final var sourceBlobName = randomAlphaOfLengthBetween(1, 10);
366+
final var targetBlobName = randomAlphaOfLengthBetween(1, 10);
367+
368+
final StorageClass storageClass = randomFrom(StorageClass.values());
369+
final CannedAccessControlList cannedAccessControlList = randomBoolean() ? randomFrom(CannedAccessControlList.values()) : null;
370+
371+
final var blobStore = mock(S3BlobStore.class);
372+
when(blobStore.bucket()).thenReturn(sourceBucketName);
373+
when(blobStore.getStorageClass()).thenReturn(storageClass);
374+
if (cannedAccessControlList != null) {
375+
when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
376+
}
377+
378+
final var sourceBlobPath = BlobPath.EMPTY.add(randomAlphaOfLengthBetween(1, 10));
379+
final var sourceBlobContainer = new S3BlobContainer(sourceBlobPath, blobStore);
380+
381+
final var targetBlobPath = BlobPath.EMPTY.add(randomAlphaOfLengthBetween(1, 10));
382+
final var targetBlobContainer = new S3BlobContainer(targetBlobPath, blobStore);
383+
384+
final var client = configureMockClient(blobStore);
385+
386+
final ArgumentCaptor<CopyObjectRequest> captor = ArgumentCaptor.forClass(CopyObjectRequest.class);
387+
when(client.copyObject(captor.capture())).thenReturn(new CopyObjectResult());
388+
389+
sourceBlobContainer.copyBlob(randomPurpose(), sourceBlobName, targetBlobContainer, targetBlobName, false);
390+
391+
final CopyObjectRequest request = captor.getValue();
392+
assertEquals(sourceBucketName, request.getSourceBucketName());
393+
assertEquals(sourceBlobPath.buildAsString() + sourceBlobName, request.getSourceKey());
394+
assertEquals(sourceBucketName, request.getDestinationBucketName());
395+
assertEquals(targetBlobPath.buildAsString() + targetBlobName, request.getDestinationKey());
396+
assertEquals(storageClass.toString(), request.getStorageClass());
397+
assertEquals(cannedAccessControlList, request.getCannedAccessControlList());
398+
399+
// failIfAlreadyExists is not currently supported on S3
400+
assertThrows(
401+
UnsupportedOperationException.class,
402+
() -> sourceBlobContainer.copyBlob(randomPurpose(), sourceBlobName, targetBlobContainer, targetBlobName, true)
403+
);
404+
405+
closeMockClient(blobStore);
406+
}
407+
361408
private static AmazonS3 configureMockClient(S3BlobStore blobStore) {
362409
final AmazonS3 client = mock(AmazonS3.class);
363410
try (AmazonS3Reference clientReference = new AmazonS3Reference(client)) {

server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,31 @@ default void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesRef
177177
writeBlobAtomic(purpose, blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists);
178178
}
179179

180+
/**
181+
* Copy a blob into a new blob container and name.
182+
* If copy is unavailable then throws UnsupportedOperationException.
183+
* It may be unavailable either because the blob container has no copy implementation
184+
* or because the target blob container is not on the same store as the source.
185+
*
186+
* @param purpose The purpose of the operation
187+
* @param sourceBlobName The name of the blob to copy from
188+
* @param targetBlobContainer The blob container to copy the blob into
189+
* @param targetBlobName The name of the blob to copy to
190+
* @param failIfAlreadyExists Whether to throw a FileAlreadyExistsException if the target blob already exists
191+
* @throws NoSuchFileException If the source blob does not exist
192+
* @throws FileAlreadyExistsException If failIfAlreadyExists is true and the target blob already exists
193+
* @throws IOException If the operation generates an IO error (e.g., due to local copy fallback)
194+
*/
195+
default void copyBlob(
196+
OperationPurpose purpose,
197+
String sourceBlobName,
198+
BlobContainer targetBlobContainer,
199+
String targetBlobName,
200+
boolean failIfAlreadyExists
201+
) throws IOException {
202+
throw new UnsupportedOperationException("this blob container does not support copy");
203+
}
204+
180205
/**
181206
* Deletes this container and all its contents from the repository.
182207
*

server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,34 @@ public void writeBlobAtomic(OperationPurpose purpose, final String blobName, Byt
349349
}
350350
}
351351

352+
@Override
353+
public void copyBlob(
354+
OperationPurpose purpose,
355+
String sourceBlobName,
356+
BlobContainer targetBlobContainer,
357+
String targetBlobName,
358+
boolean failIfAlreadyExists
359+
) throws IOException {
360+
if (targetBlobContainer instanceof FsBlobContainer == false) {
361+
throw new IllegalArgumentException("targetBlobContainer must be a FsBlobContainer");
362+
}
363+
final FsBlobContainer targetContainer = (FsBlobContainer) targetBlobContainer;
364+
final Path sourceBlobPath = path.resolve(sourceBlobName);
365+
final String tempBlob = tempBlobName(targetBlobName);
366+
final Path tempBlobPath = targetContainer.path.resolve(tempBlob);
367+
Files.copy(sourceBlobPath, tempBlobPath, StandardCopyOption.REPLACE_EXISTING);
368+
try {
369+
targetContainer.moveBlobAtomic(purpose, tempBlob, targetBlobName, failIfAlreadyExists);
370+
} catch (IOException ex) {
371+
try {
372+
targetContainer.deleteBlobsIgnoringIfNotExists(purpose, Iterators.single(tempBlob));
373+
} catch (IOException e) {
374+
ex.addSuppressed(e);
375+
}
376+
throw ex;
377+
}
378+
}
379+
352380
private static void writeToPath(BytesReference bytes, Path tempBlobPath) throws IOException {
353381
try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
354382
bytes.writeTo(outputStream);

server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,27 @@ private static void checkAtomicWrite() throws IOException {
377377
}
378378
}
379379

380+
public void testCopy() throws IOException {
381+
final var path = PathUtils.get(createTempDir().toString());
382+
final var store = new FsBlobStore(randomIntBetween(1, 8) * 1024, path, false);
383+
final var sourcePath = BlobPath.EMPTY.add("source");
384+
final var sourceContainer = store.blobContainer(sourcePath);
385+
final var targetPath = BlobPath.EMPTY.add("target");
386+
final var targetContainer = store.blobContainer(targetPath);
387+
388+
final String blobName = randomAlphaOfLengthBetween(1, 20).toLowerCase(Locale.ROOT);
389+
final var contents = new BytesArray(randomByteArrayOfLength(randomIntBetween(1, 512)));
390+
sourceContainer.writeBlobAtomic(randomPurpose(), blobName, contents, true);
391+
sourceContainer.copyBlob(randomPurpose(), blobName, targetContainer, blobName, true);
392+
assertThrows(
393+
FileAlreadyExistsException.class,
394+
() -> sourceContainer.copyBlob(randomPurpose(), blobName, targetContainer, blobName, true)
395+
);
396+
397+
var targetContents = new BytesArray(targetContainer.readBlob(randomPurpose(), blobName).readAllBytes());
398+
assertArrayEquals(contents.array(), targetContents.array());
399+
}
400+
380401
static class MockFileSystemProvider extends FilterFileSystemProvider {
381402

382403
final Consumer<Long> onRead;

test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,8 @@ public void testRequestStats() throws Exception {
228228
}
229229
}).filter(Objects::nonNull).map(Repository::stats).reduce(RepositoryStats::merge).get();
230230

231-
// Since no abort request is made, filter it out from the stats (also ensure it is 0) before comparing to the mock counts
232231
Map<String, Long> sdkRequestCounts = repositoryStats.actionStats.entrySet()
233232
.stream()
234-
.filter(entry -> false == ("AbortMultipartObject".equals(entry.getKey()) && entry.getValue().requests() == 0L))
235233
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> e.getValue().requests()));
236234

237235
final Map<String, Long> mockCalls = getMockRequestCounts();
@@ -360,6 +358,13 @@ public HttpStatsCollectorHandler(HttpHandler delegate) {
360358
this.delegate = delegate;
361359
}
362360

361+
public HttpStatsCollectorHandler(HttpHandler delegate, String[] operations) {
362+
this.delegate = delegate;
363+
for (String operation : operations) {
364+
operationCount.put(operation, 0L);
365+
}
366+
}
367+
363368
@Override
364369
public HttpHandler getDelegate() {
365370
return delegate;

0 commit comments

Comments
 (0)