From f8967333fb567040be23a19c40afcd583df12070 Mon Sep 17 00:00:00 2001 From: tlrx Date: Tue, 3 Jun 2025 14:35:02 +0200 Subject: [PATCH 1/2] Add documentation to concurrent multipart upload utility methods In the hope to make more sense of how the flux of byte buffer works. Also remove the synchronization on the input stream. Relates ES-11815 --- .../repositories/azure/AzureBlobStore.java | 152 ++++++++++-------- 1 file changed, 88 insertions(+), 64 deletions(-) diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index d2a52a95c66e7..41c1057fb885f 100644 --- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -66,6 +66,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.core.Assertions; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; @@ -74,6 +75,7 @@ import org.elasticsearch.repositories.azure.AzureRepository.Repository; import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.threadpool.ThreadPool; import java.io.FilterInputStream; import java.io.IOException; @@ -101,6 +103,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.function.BiPredicate; import java.util.stream.Collectors; @@ -507,7 +510,11 @@ void writeBlobAtomic( return asyncClient.commitBlockList( multiParts.stream().map(MultiPart::blockId).toList(), failIfAlreadyExists == false - ).doOnSuccess(unused -> logger.debug("{}: all {} parts committed", blobName, multiParts.size())); + ) + .doOnSuccess(unused -> logger.debug("{}: all {} parts committed", blobName, multiParts.size())) + // Note: non-committed uploaded blocks will be deleted by Azure after a week + // (see https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#remarks) + .doOnError(e -> logger.error(() -> format("%s: failed to commit %d parts", blobName, multiParts.size()), e)); }) .block(); } @@ -562,12 +569,13 @@ private static Mono stageBlock( multiPart.blockOffset() ); try { - var stream = toSynchronizedInputStream(blobName, provider.apply(multiPart.blockOffset(), multiPart.blockSize()), multiPart); + final var stream = provider.apply(multiPart.blockOffset(), multiPart.blockSize()); + assert stream.markSupported() : "provided input stream must support mark and reset"; boolean success = false; try { var stageBlock = asyncClient.stageBlock( multiPart.blockId(), - toFlux(stream, multiPart.blockSize(), DEFAULT_UPLOAD_BUFFERS_SIZE), + toFlux(wrapInputStream(blobName, stream, multiPart), multiPart.blockSize(), DEFAULT_UPLOAD_BUFFERS_SIZE), multiPart.blockSize() ).doOnSuccess(unused -> { logger.debug(() -> format("%s: part [%s] of size [%s] uploaded", blobName, multiPart.part(), multiPart.blockSize())); @@ -760,88 +768,105 @@ public synchronized int read() throws IOException { // we read the input stream (i.e. when it's rate limited) } - private static InputStream toSynchronizedInputStream(String blobName, InputStream delegate, MultiPart multipart) { - assert delegate.markSupported() : "An InputStream with mark support was expected"; - // We need to introduce a read barrier in order to provide visibility for the underlying - // input stream state as the input stream can be read from different threads. - // TODO See if this is still needed + /** + * Wraps an {@link InputStream} to assert that it is read only by a single thread at a time and to add log traces. + */ + private static InputStream wrapInputStream(final String blobName, final InputStream delegate, final MultiPart multipart) { return new FilterInputStream(delegate) { + private final AtomicReference currentThread = Assertions.ENABLED ? new AtomicReference<>() : null; private final boolean isTraceEnabled = logger.isTraceEnabled(); @Override - public synchronized int read(byte[] b, int off, int len) throws IOException { - var result = super.read(b, off, len); - if (isTraceEnabled) { - logger.trace("{} reads {} bytes from {} part {}", Thread.currentThread(), result, blobName, multipart.part()); - } - return result; - } - - @Override - public synchronized int read() throws IOException { - var result = super.read(); - if (isTraceEnabled) { - logger.trace("{} reads {} byte from {} part {}", Thread.currentThread(), result, blobName, multipart.part()); - } - return result; - } - - @Override - public synchronized void mark(int readlimit) { - if (isTraceEnabled) { - logger.trace("{} marks stream {} part {}", Thread.currentThread(), blobName, multipart.part()); - } - super.mark(readlimit); - } - - @Override - public synchronized void reset() throws IOException { - if (isTraceEnabled) { - logger.trace("{} resets stream {} part {}", Thread.currentThread(), blobName, multipart.part()); + public int read(byte[] b, int off, int len) throws IOException { + assert assertThread(null, Thread.currentThread()); + assert ThreadPool.assertCurrentThreadPool(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME); + try { + var result = super.read(b, off, len); + if (isTraceEnabled) { + logger.trace("{} reads {} bytes from {} part {}", Thread.currentThread(), result, blobName, multipart.part()); + } + return result; + } finally { + assert assertThread(Thread.currentThread(), null); } - super.reset(); } @Override - public synchronized void close() throws IOException { - if (isTraceEnabled) { - logger.trace("{} closes stream {} part {}", Thread.currentThread(), blobName, multipart.part()); + public int read() throws IOException { + assert assertThread(null, Thread.currentThread()); + assert ThreadPool.assertCurrentThreadPool(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME); + try { + var result = super.read(); + if (isTraceEnabled) { + logger.trace("{} reads {} byte from {} part {}", Thread.currentThread(), result, blobName, multipart.part()); + } + return result; + } finally { + assert assertThread(Thread.currentThread(), null); } - super.close(); } - @Override - public String toString() { - return blobName + " part [" + multipart.part() + "] of size [" + multipart.blockSize() + ']'; + private boolean assertThread(Thread current, Thread updated) { + final Thread witness = currentThread.compareAndExchange(current, updated); + assert witness == current + : "Unable to set current thread to [" + + updated + + "]: expected thread [" + + current + + "] to be the thread currently accessing the input stream for reading, but thread " + + witness + + " is already reading " + + blobName + + " part " + + multipart.part(); + return true; } }; } - private static Flux toFlux(InputStream stream, long length, int chunkSize) { - assert stream.markSupported() : "An InputStream with mark support was expected"; - // We need to mark the InputStream as it's possible that we need to retry for the same chunk + /** + * Converts an input stream to a Flux of ByteBuffer. This method also checks that the stream has provided the expected number of bytes. + * + * @param stream the input stream that needs to be converted + * @param length the expected length in bytes of the input stream + * @param byteBufferSize the size of the ByteBuffer to be created + **/ + private static Flux toFlux(InputStream stream, long length, final int byteBufferSize) { + assert stream.markSupported() : "input stream must support mark and reset"; + // always marks the input stream in case it needs to be retried stream.mark(Integer.MAX_VALUE); + // defer the creation of the flux until it is subscribed return Flux.defer(() -> { - // TODO Code in this Flux.defer() can be concurrently executed by multiple threads? try { stream.reset(); } catch (IOException e) { - throw new RuntimeException(e); + // Flux.defer() catches and propagates the exception + throw new UncheckedIOException(e); } + // the number of bytes read is updated in a thread pool (repository_azure) and later compared to the expected length in another + // thread pool (azure_event_loop), so we need this to be atomic. final var bytesRead = new AtomicLong(0L); - // This flux is subscribed by a downstream operator that finally queues the - // buffers into netty output queue. Sadly we are not able to get a signal once - // the buffer has been flushed, so we have to allocate those and let the GC to - // reclaim them (see MonoSendMany). Additionally, that very same operator requests - // 128 elements (that's hardcoded) once it's subscribed (later on, it requests - // by 64 elements), that's why we provide 64kb buffers. - // length is at most 100MB so it's safe to cast back to an integer in this case - final int parts = (int) length / chunkSize; - final long remaining = length % chunkSize; - return Flux.range(0, remaining == 0 ? parts : parts + 1).map(i -> i * chunkSize).concatMap(pos -> Mono.fromCallable(() -> { - long count = pos + chunkSize > length ? length - pos : chunkSize; + assert length <= ByteSizeValue.ofMb(100L).getBytes() : length; + // length is at most 100MB so it's safe to cast back to an integer + final int parts = Math.toIntExact(length / byteBufferSize); + final long remaining = length % byteBufferSize; + + // This flux is subscribed by a downstream subscriber (reactor.netty.channel.MonoSendMany) that queues the buffers into netty + // output queue. Sadly we are not able to get a signal once the buffer has been flushed, so we have to allocate those and let + // the GC to reclaim them. Additionally, the MonoSendMany subscriber requests 128 elements from the flux when it subscribes to + // it. This 128 value is hardcoded in reactor.netty.channel.MonoSend.MAX_SIZE). After 128 byte buffers have been published by + // the flux, the MonoSendMany subscriber requests 64 more byte buffers (see reactor.netty.channel.MonoSend.REFILL_SIZE) and so + // on. + // + // So this flux instantiates 128 ByteBuffer objects of DEFAULT_UPLOAD_BUFFERS_SIZE bytes in heap every time the NettyOutbound in + // the Azure's Netty event loop requests byte buffers to write to the network channel. That represents 128 * 64kb = 8 mb per + // flux. The creation of the ByteBuffer objects are forked to the repository_azure thread pool, which has a maximum of 15 + // threads (most of the time, can be less than that for nodes with less than 750mb). It means that max. 15 * 8 = 120mb bytes are + // allocated on heap at a time here (omitting the ones already created and pending garbage collection). + return Flux.range(0, remaining == 0 ? parts : parts + 1).map(i -> i * byteBufferSize).concatMap(pos -> Mono.fromCallable(() -> { + long count = pos + byteBufferSize > length ? length - pos : byteBufferSize; int numOfBytesRead = 0; int offset = 0; int len = (int) count; @@ -860,16 +885,15 @@ private static Flux toFlux(InputStream stream, long length, int chun ); } return ByteBuffer.wrap(buffer); - })).doOnComplete(() -> { + }), 0 /* no prefetching needed for concatMap as the MonoSendMany subscriber requests 128 elements */).doOnComplete(() -> { if (bytesRead.get() > length) { throw new IllegalStateException( format("Input stream [%s] emitted %d bytes, more than the expected %d bytes.", stream, bytesRead, length) ); } }); - // We need to subscribe on a different scheduler to avoid blocking the io threads when we read the input stream + // subscribe on a different scheduler to avoid blocking the network io threads when reading bytes from disk }).subscribeOn(Schedulers.elastic()); - } /** From 680f4b618fb0d41a42dc7840f1d2a8b28a9064b0 Mon Sep 17 00:00:00 2001 From: tlrx Date: Tue, 3 Jun 2025 17:20:55 +0200 Subject: [PATCH 2/2] feedback --- .../repositories/azure/AzureBlobStore.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index 41c1057fb885f..818cc4c0cd560 100644 --- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -830,7 +830,7 @@ private boolean assertThread(Thread current, Thread updated) { * * @param stream the input stream that needs to be converted * @param length the expected length in bytes of the input stream - * @param byteBufferSize the size of the ByteBuffer to be created + * @param byteBufferSize the size of the ByteBuffers to be created **/ private static Flux toFlux(InputStream stream, long length, final int byteBufferSize) { assert stream.markSupported() : "input stream must support mark and reset"; @@ -862,9 +862,10 @@ private static Flux toFlux(InputStream stream, long length, final in // // So this flux instantiates 128 ByteBuffer objects of DEFAULT_UPLOAD_BUFFERS_SIZE bytes in heap every time the NettyOutbound in // the Azure's Netty event loop requests byte buffers to write to the network channel. That represents 128 * 64kb = 8 mb per - // flux. The creation of the ByteBuffer objects are forked to the repository_azure thread pool, which has a maximum of 15 - // threads (most of the time, can be less than that for nodes with less than 750mb). It means that max. 15 * 8 = 120mb bytes are - // allocated on heap at a time here (omitting the ones already created and pending garbage collection). + // flux which is aligned with BlobAsyncClient.BLOB_DEFAULT_HTBB_UPLOAD_BLOCK_SIZE. The creation of the ByteBuffer objects are + // forked to the repository_azure thread pool, which has a maximum of 15 threads (most of the time, can be less than that for + // nodes with less than 750mb heap). It means that max. 15 * 8 = 120mb bytes are allocated on heap at a time here (omitting the + // ones already created and pending garbage collection). return Flux.range(0, remaining == 0 ? parts : parts + 1).map(i -> i * byteBufferSize).concatMap(pos -> Mono.fromCallable(() -> { long count = pos + byteBufferSize > length ? length - pos : byteBufferSize; int numOfBytesRead = 0; @@ -885,7 +886,7 @@ private static Flux toFlux(InputStream stream, long length, final in ); } return ByteBuffer.wrap(buffer); - }), 0 /* no prefetching needed for concatMap as the MonoSendMany subscriber requests 128 elements */).doOnComplete(() -> { + })).doOnComplete(() -> { if (bytesRead.get() > length) { throw new IllegalStateException( format("Input stream [%s] emitted %d bytes, more than the expected %d bytes.", stream, bytesRead, length)