Skip to content

Commit f896733

Browse files
committed
Add documentation to concurrent multipart upload utility methods
In the hope to make more sense of how the flux of byte buffer works. Also remove the synchronization on the input stream. Relates ES-11815
1 parent 2625200 commit f896733

File tree

1 file changed

+88
-64
lines changed

1 file changed

+88
-64
lines changed

modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

Lines changed: 88 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.elasticsearch.common.unit.ByteSizeUnit;
6767
import org.elasticsearch.common.unit.ByteSizeValue;
6868
import org.elasticsearch.common.util.BigArrays;
69+
import org.elasticsearch.core.Assertions;
6970
import org.elasticsearch.core.CheckedConsumer;
7071
import org.elasticsearch.core.IOUtils;
7172
import org.elasticsearch.core.Nullable;
@@ -74,6 +75,7 @@
7475
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
7576
import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream;
7677
import org.elasticsearch.rest.RestStatus;
78+
import org.elasticsearch.threadpool.ThreadPool;
7779

7880
import java.io.FilterInputStream;
7981
import java.io.IOException;
@@ -101,6 +103,7 @@
101103
import java.util.concurrent.TimeUnit;
102104
import java.util.concurrent.atomic.AtomicInteger;
103105
import java.util.concurrent.atomic.AtomicLong;
106+
import java.util.concurrent.atomic.AtomicReference;
104107
import java.util.concurrent.atomic.LongAdder;
105108
import java.util.function.BiPredicate;
106109
import java.util.stream.Collectors;
@@ -507,7 +510,11 @@ void writeBlobAtomic(
507510
return asyncClient.commitBlockList(
508511
multiParts.stream().map(MultiPart::blockId).toList(),
509512
failIfAlreadyExists == false
510-
).doOnSuccess(unused -> logger.debug("{}: all {} parts committed", blobName, multiParts.size()));
513+
)
514+
.doOnSuccess(unused -> logger.debug("{}: all {} parts committed", blobName, multiParts.size()))
515+
// Note: non-committed uploaded blocks will be deleted by Azure after a week
516+
// (see https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#remarks)
517+
.doOnError(e -> logger.error(() -> format("%s: failed to commit %d parts", blobName, multiParts.size()), e));
511518
})
512519
.block();
513520
}
@@ -562,12 +569,13 @@ private static Mono<String> stageBlock(
562569
multiPart.blockOffset()
563570
);
564571
try {
565-
var stream = toSynchronizedInputStream(blobName, provider.apply(multiPart.blockOffset(), multiPart.blockSize()), multiPart);
572+
final var stream = provider.apply(multiPart.blockOffset(), multiPart.blockSize());
573+
assert stream.markSupported() : "provided input stream must support mark and reset";
566574
boolean success = false;
567575
try {
568576
var stageBlock = asyncClient.stageBlock(
569577
multiPart.blockId(),
570-
toFlux(stream, multiPart.blockSize(), DEFAULT_UPLOAD_BUFFERS_SIZE),
578+
toFlux(wrapInputStream(blobName, stream, multiPart), multiPart.blockSize(), DEFAULT_UPLOAD_BUFFERS_SIZE),
571579
multiPart.blockSize()
572580
).doOnSuccess(unused -> {
573581
logger.debug(() -> format("%s: part [%s] of size [%s] uploaded", blobName, multiPart.part(), multiPart.blockSize()));
@@ -760,88 +768,105 @@ public synchronized int read() throws IOException {
760768
// we read the input stream (i.e. when it's rate limited)
761769
}
762770

763-
private static InputStream toSynchronizedInputStream(String blobName, InputStream delegate, MultiPart multipart) {
764-
assert delegate.markSupported() : "An InputStream with mark support was expected";
765-
// We need to introduce a read barrier in order to provide visibility for the underlying
766-
// input stream state as the input stream can be read from different threads.
767-
// TODO See if this is still needed
771+
/**
772+
* Wraps an {@link InputStream} to assert that it is read only by a single thread at a time and to add log traces.
773+
*/
774+
private static InputStream wrapInputStream(final String blobName, final InputStream delegate, final MultiPart multipart) {
768775
return new FilterInputStream(delegate) {
769776

777+
private final AtomicReference<Thread> currentThread = Assertions.ENABLED ? new AtomicReference<>() : null;
770778
private final boolean isTraceEnabled = logger.isTraceEnabled();
771779

772780
@Override
773-
public synchronized int read(byte[] b, int off, int len) throws IOException {
774-
var result = super.read(b, off, len);
775-
if (isTraceEnabled) {
776-
logger.trace("{} reads {} bytes from {} part {}", Thread.currentThread(), result, blobName, multipart.part());
777-
}
778-
return result;
779-
}
780-
781-
@Override
782-
public synchronized int read() throws IOException {
783-
var result = super.read();
784-
if (isTraceEnabled) {
785-
logger.trace("{} reads {} byte from {} part {}", Thread.currentThread(), result, blobName, multipart.part());
786-
}
787-
return result;
788-
}
789-
790-
@Override
791-
public synchronized void mark(int readlimit) {
792-
if (isTraceEnabled) {
793-
logger.trace("{} marks stream {} part {}", Thread.currentThread(), blobName, multipart.part());
794-
}
795-
super.mark(readlimit);
796-
}
797-
798-
@Override
799-
public synchronized void reset() throws IOException {
800-
if (isTraceEnabled) {
801-
logger.trace("{} resets stream {} part {}", Thread.currentThread(), blobName, multipart.part());
781+
public int read(byte[] b, int off, int len) throws IOException {
782+
assert assertThread(null, Thread.currentThread());
783+
assert ThreadPool.assertCurrentThreadPool(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
784+
try {
785+
var result = super.read(b, off, len);
786+
if (isTraceEnabled) {
787+
logger.trace("{} reads {} bytes from {} part {}", Thread.currentThread(), result, blobName, multipart.part());
788+
}
789+
return result;
790+
} finally {
791+
assert assertThread(Thread.currentThread(), null);
802792
}
803-
super.reset();
804793
}
805794

806795
@Override
807-
public synchronized void close() throws IOException {
808-
if (isTraceEnabled) {
809-
logger.trace("{} closes stream {} part {}", Thread.currentThread(), blobName, multipart.part());
796+
public int read() throws IOException {
797+
assert assertThread(null, Thread.currentThread());
798+
assert ThreadPool.assertCurrentThreadPool(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
799+
try {
800+
var result = super.read();
801+
if (isTraceEnabled) {
802+
logger.trace("{} reads {} byte from {} part {}", Thread.currentThread(), result, blobName, multipart.part());
803+
}
804+
return result;
805+
} finally {
806+
assert assertThread(Thread.currentThread(), null);
810807
}
811-
super.close();
812808
}
813809

814-
@Override
815-
public String toString() {
816-
return blobName + " part [" + multipart.part() + "] of size [" + multipart.blockSize() + ']';
810+
private boolean assertThread(Thread current, Thread updated) {
811+
final Thread witness = currentThread.compareAndExchange(current, updated);
812+
assert witness == current
813+
: "Unable to set current thread to ["
814+
+ updated
815+
+ "]: expected thread ["
816+
+ current
817+
+ "] to be the thread currently accessing the input stream for reading, but thread "
818+
+ witness
819+
+ " is already reading "
820+
+ blobName
821+
+ " part "
822+
+ multipart.part();
823+
return true;
817824
}
818825
};
819826
}
820827

821-
private static Flux<ByteBuffer> toFlux(InputStream stream, long length, int chunkSize) {
822-
assert stream.markSupported() : "An InputStream with mark support was expected";
823-
// We need to mark the InputStream as it's possible that we need to retry for the same chunk
828+
/**
829+
* Converts an input stream to a Flux of ByteBuffer. This method also checks that the stream has provided the expected number of bytes.
830+
*
831+
* @param stream the input stream that needs to be converted
832+
* @param length the expected length in bytes of the input stream
833+
* @param byteBufferSize the size of the ByteBuffer to be created
834+
**/
835+
private static Flux<ByteBuffer> toFlux(InputStream stream, long length, final int byteBufferSize) {
836+
assert stream.markSupported() : "input stream must support mark and reset";
837+
// always marks the input stream in case it needs to be retried
824838
stream.mark(Integer.MAX_VALUE);
839+
// defer the creation of the flux until it is subscribed
825840
return Flux.defer(() -> {
826-
// TODO Code in this Flux.defer() can be concurrently executed by multiple threads?
827841
try {
828842
stream.reset();
829843
} catch (IOException e) {
830-
throw new RuntimeException(e);
844+
// Flux.defer() catches and propagates the exception
845+
throw new UncheckedIOException(e);
831846
}
847+
// the number of bytes read is updated in a thread pool (repository_azure) and later compared to the expected length in another
848+
// thread pool (azure_event_loop), so we need this to be atomic.
832849
final var bytesRead = new AtomicLong(0L);
833-
// This flux is subscribed by a downstream operator that finally queues the
834-
// buffers into netty output queue. Sadly we are not able to get a signal once
835-
// the buffer has been flushed, so we have to allocate those and let the GC to
836-
// reclaim them (see MonoSendMany). Additionally, that very same operator requests
837-
// 128 elements (that's hardcoded) once it's subscribed (later on, it requests
838-
// by 64 elements), that's why we provide 64kb buffers.
839850

840-
// length is at most 100MB so it's safe to cast back to an integer in this case
841-
final int parts = (int) length / chunkSize;
842-
final long remaining = length % chunkSize;
843-
return Flux.range(0, remaining == 0 ? parts : parts + 1).map(i -> i * chunkSize).concatMap(pos -> Mono.fromCallable(() -> {
844-
long count = pos + chunkSize > length ? length - pos : chunkSize;
851+
assert length <= ByteSizeValue.ofMb(100L).getBytes() : length;
852+
// length is at most 100MB so it's safe to cast back to an integer
853+
final int parts = Math.toIntExact(length / byteBufferSize);
854+
final long remaining = length % byteBufferSize;
855+
856+
// This flux is subscribed by a downstream subscriber (reactor.netty.channel.MonoSendMany) that queues the buffers into netty
857+
// output queue. Sadly we are not able to get a signal once the buffer has been flushed, so we have to allocate those and let
858+
// the GC to reclaim them. Additionally, the MonoSendMany subscriber requests 128 elements from the flux when it subscribes to
859+
// it. This 128 value is hardcoded in reactor.netty.channel.MonoSend.MAX_SIZE). After 128 byte buffers have been published by
860+
// the flux, the MonoSendMany subscriber requests 64 more byte buffers (see reactor.netty.channel.MonoSend.REFILL_SIZE) and so
861+
// on.
862+
//
863+
// So this flux instantiates 128 ByteBuffer objects of DEFAULT_UPLOAD_BUFFERS_SIZE bytes in heap every time the NettyOutbound in
864+
// the Azure's Netty event loop requests byte buffers to write to the network channel. That represents 128 * 64kb = 8 mb per
865+
// flux. The creation of the ByteBuffer objects are forked to the repository_azure thread pool, which has a maximum of 15
866+
// threads (most of the time, can be less than that for nodes with less than 750mb). It means that max. 15 * 8 = 120mb bytes are
867+
// allocated on heap at a time here (omitting the ones already created and pending garbage collection).
868+
return Flux.range(0, remaining == 0 ? parts : parts + 1).map(i -> i * byteBufferSize).concatMap(pos -> Mono.fromCallable(() -> {
869+
long count = pos + byteBufferSize > length ? length - pos : byteBufferSize;
845870
int numOfBytesRead = 0;
846871
int offset = 0;
847872
int len = (int) count;
@@ -860,16 +885,15 @@ private static Flux<ByteBuffer> toFlux(InputStream stream, long length, int chun
860885
);
861886
}
862887
return ByteBuffer.wrap(buffer);
863-
})).doOnComplete(() -> {
888+
}), 0 /* no prefetching needed for concatMap as the MonoSendMany subscriber requests 128 elements */).doOnComplete(() -> {
864889
if (bytesRead.get() > length) {
865890
throw new IllegalStateException(
866891
format("Input stream [%s] emitted %d bytes, more than the expected %d bytes.", stream, bytesRead, length)
867892
);
868893
}
869894
});
870-
// We need to subscribe on a different scheduler to avoid blocking the io threads when we read the input stream
895+
// subscribe on a different scheduler to avoid blocking the network io threads when reading bytes from disk
871896
}).subscribeOn(Schedulers.elastic());
872-
873897
}
874898

875899
/**

0 commit comments

Comments
 (0)