Skip to content

Commit fa383af

Browse files
authored
Small changes in concurrent multipart upload interfaces (#128977)
Small changes in BlobContainer interface and wrapper. Relates ES-11815
1 parent 49f8e5c commit fa383af

File tree

5 files changed

+55
-7
lines changed

5 files changed

+55
-7
lines changed

modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.apache.logging.log4j.Logger;
1616
import org.elasticsearch.ExceptionsHelper;
1717
import org.elasticsearch.action.ActionListener;
18-
import org.elasticsearch.common.CheckedBiFunction;
1918
import org.elasticsearch.common.Strings;
2019
import org.elasticsearch.common.blobstore.BlobContainer;
2120
import org.elasticsearch.common.blobstore.BlobPath;
@@ -116,7 +115,7 @@ public void writeBlobAtomic(
116115
OperationPurpose purpose,
117116
String blobName,
118117
long blobSize,
119-
CheckedBiFunction<Long, Long, InputStream, IOException> provider,
118+
BlobMultiPartInputStreamProvider provider,
120119
boolean failIfAlreadyExists
121120
) throws IOException {
122121
blobStore.writeBlobAtomic(purpose, buildKey(blobName), blobSize, provider, failIfAlreadyExists);

modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import org.apache.logging.log4j.Logger;
5151
import org.apache.logging.log4j.core.util.Throwables;
5252
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
53-
import org.elasticsearch.common.CheckedBiFunction;
5453
import org.elasticsearch.common.UUIDs;
5554
import org.elasticsearch.common.blobstore.BlobContainer;
5655
import org.elasticsearch.common.blobstore.BlobPath;
@@ -477,7 +476,7 @@ void writeBlobAtomic(
477476
final OperationPurpose purpose,
478477
final String blobName,
479478
final long blobSize,
480-
final CheckedBiFunction<Long, Long, InputStream, IOException> provider,
479+
final BlobContainer.BlobMultiPartInputStreamProvider provider,
481480
final boolean failIfAlreadyExists
482481
) throws IOException {
483482
try {
@@ -559,7 +558,7 @@ private static Mono<String> stageBlock(
559558
BlockBlobAsyncClient asyncClient,
560559
String blobName,
561560
MultiPart multiPart,
562-
CheckedBiFunction<Long, Long, InputStream, IOException> provider
561+
BlobContainer.BlobMultiPartInputStreamProvider provider
563562
) {
564563
logger.debug(
565564
"{}: staging part [{}] of size [{}] from offset [{}]",

server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
package org.elasticsearch.common.blobstore;
1111

1212
import org.elasticsearch.action.ActionListener;
13-
import org.elasticsearch.common.CheckedBiFunction;
1413
import org.elasticsearch.common.blobstore.support.BlobMetadata;
1514
import org.elasticsearch.common.bytes.BytesArray;
1615
import org.elasticsearch.common.bytes.BytesReference;
@@ -153,11 +152,42 @@ default boolean supportsConcurrentMultipartUploads() {
153152
return false;
154153
}
155154

155+
/**
156+
* Provides an {@link InputStream} to read a part of the blob content.
157+
*/
158+
interface BlobMultiPartInputStreamProvider {
159+
/**
160+
* Provides an {@link InputStream} to read a part of the blob content.
161+
*
162+
* @param offset the offset in the blob content to start reading bytes from
163+
* @param length the number of bytes to read
164+
* @return an {@link InputStream} to read a part of the blob content.
165+
* @throws IOException if something goes wrong opening the input stream
166+
*/
167+
InputStream apply(long offset, long length) throws IOException;
168+
}
169+
170+
/**
171+
* Reads the blob's content by calling an input stream provider multiple times, in order to split the blob's content into multiple
172+
* parts that can be written to the container concurrently before being assembled into the final blob, using an atomic write operation
173+
* if the implementation supports it. The number and the size of the parts depends of the implementation.
174+
*
175+
* Note: the method {link {@link #supportsConcurrentMultipartUploads()}} must be checked before calling this method.
176+
*
177+
* @param purpose The purpose of the operation
178+
* @param blobName The name of the blob to write the contents of the input stream to.
179+
* @param provider The input stream provider that is used to read the blob content
180+
* @param blobSize The size of the blob to be written, in bytes. Must be the amount of bytes in the input stream. It is
181+
* implementation dependent whether this value is used in writing the blob to the repository.
182+
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
183+
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
184+
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
185+
*/
156186
default void writeBlobAtomic(
157187
OperationPurpose purpose,
158188
String blobName,
159189
long blobSize,
160-
CheckedBiFunction<Long, Long, InputStream, IOException> provider,
190+
BlobMultiPartInputStreamProvider provider,
161191
boolean failIfAlreadyExists
162192
) throws IOException {
163193
throw new UnsupportedOperationException();

server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ public FsBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path) {
8989
this.path = path;
9090
}
9191

92+
public Path getPath() {
93+
return path;
94+
}
95+
9296
@Override
9397
public Map<String, BlobMetadata> listBlobs(OperationPurpose purpose) throws IOException {
9498
return listBlobsByPrefix(purpose, null);

server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,22 @@ public void writeMetadataBlob(
8888
delegate.writeMetadataBlob(purpose, blobName, failIfAlreadyExists, atomic, writer);
8989
}
9090

91+
@Override
92+
public boolean supportsConcurrentMultipartUploads() {
93+
return delegate.supportsConcurrentMultipartUploads();
94+
}
95+
96+
@Override
97+
public void writeBlobAtomic(
98+
OperationPurpose purpose,
99+
String blobName,
100+
long blobSize,
101+
BlobMultiPartInputStreamProvider provider,
102+
boolean failIfAlreadyExists
103+
) throws IOException {
104+
delegate.writeBlobAtomic(purpose, blobName, blobSize, provider, failIfAlreadyExists);
105+
}
106+
91107
@Override
92108
public void writeBlobAtomic(
93109
OperationPurpose purpose,

0 commit comments

Comments
 (0)