Skip to content

Commit 560cc58

Browse files
committed
Add FS based testing implementation for concurrent multipart uploads
1 parent 64460df commit 560cc58

File tree

5 files changed

+56
-4
lines changed

5 files changed

+56
-4
lines changed

modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.apache.logging.log4j.Logger;
1616
import org.elasticsearch.ExceptionsHelper;
1717
import org.elasticsearch.action.ActionListener;
18-
import org.elasticsearch.common.CheckedBiFunction;
1918
import org.elasticsearch.common.Strings;
2019
import org.elasticsearch.common.blobstore.BlobContainer;
2120
import org.elasticsearch.common.blobstore.BlobPath;
@@ -116,7 +115,7 @@ public void writeBlobAtomic(
116115
OperationPurpose purpose,
117116
String blobName,
118117
long blobSize,
119-
CheckedBiFunction<Long, Long, InputStream, IOException> provider,
118+
BlobMultiPartInputStreamProvider provider,
120119
boolean failIfAlreadyExists
121120
) throws IOException {
122121
blobStore.writeBlobAtomic(purpose, buildKey(blobName), blobSize, provider, failIfAlreadyExists);

server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,11 +153,44 @@ default boolean supportsConcurrentMultipartUploads() {
153153
return false;
154154
}
155155

156+
/**
157+
* Provides an {@link InputStream} to read a part of the blob content.
158+
*/
159+
@FunctionalInterface
160+
interface BlobMultiPartInputStreamProvider extends CheckedBiFunction<Long, Long, InputStream, IOException> {
161+
/**
162+
* Provides an {@link InputStream} to read a part of the blob content.
163+
*
164+
* @param offset the offset in the blob content to start reading bytes from
165+
* @param length the number of bytes to read
166+
* @return an {@link InputStream} to read a part of the blob content.
167+
* @throws IOException if something goes wrong opening the input stream
168+
*/
169+
@Override
170+
InputStream apply(Long offset, Long length) throws IOException;
171+
}
172+
173+
/**
174+
* Reads the blob's content by calling an input stream provider multiple times, in order to split the blob's content into multiple
175+
* parts that can be written to the container concurrently before being assembled into the final blob, using an atomic write operation
176+
* if the implementation supports it. The number and the size of the parts depends of the implementation.
177+
*
178+
* Note: the method {link {@link #supportsConcurrentMultipartUploads()}} must be checked before calling this method.
179+
*
180+
* @param purpose The purpose of the operation
181+
* @param blobName The name of the blob to write the contents of the input stream to.
182+
* @param provider The input stream provider that is used to read the blob content
183+
* @param blobSize The size of the blob to be written, in bytes. Must be the amount of bytes in the input stream. It is
184+
* implementation dependent whether this value is used in writing the blob to the repository.
185+
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
186+
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
187+
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
188+
*/
156189
default void writeBlobAtomic(
157190
OperationPurpose purpose,
158191
String blobName,
159192
long blobSize,
160-
CheckedBiFunction<Long, Long, InputStream, IOException> provider,
193+
BlobMultiPartInputStreamProvider provider,
161194
boolean failIfAlreadyExists
162195
) throws IOException {
163196
throw new UnsupportedOperationException();

server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ public FsBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path) {
8989
this.path = path;
9090
}
9191

92+
public Path getPath() {
93+
return path;
94+
}
95+
9296
@Override
9397
public Map<String, BlobMetadata> listBlobs(OperationPurpose purpose) throws IOException {
9498
return listBlobsByPrefix(purpose, null);

server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,22 @@ public void writeMetadataBlob(
8888
delegate.writeMetadataBlob(purpose, blobName, failIfAlreadyExists, atomic, writer);
8989
}
9090

91+
@Override
92+
public boolean supportsConcurrentMultipartUploads() {
93+
return delegate.supportsConcurrentMultipartUploads();
94+
}
95+
96+
@Override
97+
public void writeBlobAtomic(
98+
OperationPurpose purpose,
99+
String blobName,
100+
long blobSize,
101+
BlobMultiPartInputStreamProvider provider,
102+
boolean failIfAlreadyExists
103+
) throws IOException {
104+
delegate.writeBlobAtomic(purpose, blobName, blobSize, provider, failIfAlreadyExists);
105+
}
106+
91107
@Override
92108
public void writeBlobAtomic(
93109
OperationPurpose purpose,

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ public static String getRepositoryDataBlobName(long repositoryGeneration) {
440440

441441
private final NamedXContentRegistry namedXContentRegistry;
442442

443-
protected final BigArrays bigArrays;
443+
protected BigArrays bigArrays;
444444

445445
/**
446446
* Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for

0 commit comments

Comments
 (0)