Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
Expand All @@ -74,6 +75,7 @@
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.FilterInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -101,6 +103,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -507,7 +510,11 @@ void writeBlobAtomic(
return asyncClient.commitBlockList(
multiParts.stream().map(MultiPart::blockId).toList(),
failIfAlreadyExists == false
).doOnSuccess(unused -> logger.debug("{}: all {} parts committed", blobName, multiParts.size()));
)
.doOnSuccess(unused -> logger.debug("{}: all {} parts committed", blobName, multiParts.size()))
// Note: non-committed uploaded blocks will be deleted by Azure after a week
// (see https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#remarks)
.doOnError(e -> logger.error(() -> format("%s: failed to commit %d parts", blobName, multiParts.size()), e));
})
.block();
}
Expand Down Expand Up @@ -562,12 +569,13 @@ private static Mono<String> stageBlock(
multiPart.blockOffset()
);
try {
var stream = toSynchronizedInputStream(blobName, provider.apply(multiPart.blockOffset(), multiPart.blockSize()), multiPart);
final var stream = provider.apply(multiPart.blockOffset(), multiPart.blockSize());
assert stream.markSupported() : "provided input stream must support mark and reset";
boolean success = false;
try {
var stageBlock = asyncClient.stageBlock(
multiPart.blockId(),
toFlux(stream, multiPart.blockSize(), DEFAULT_UPLOAD_BUFFERS_SIZE),
toFlux(wrapInputStream(blobName, stream, multiPart), multiPart.blockSize(), DEFAULT_UPLOAD_BUFFERS_SIZE),
multiPart.blockSize()
).doOnSuccess(unused -> {
logger.debug(() -> format("%s: part [%s] of size [%s] uploaded", blobName, multiPart.part(), multiPart.blockSize()));
Expand Down Expand Up @@ -760,88 +768,106 @@ public synchronized int read() throws IOException {
// we read the input stream (i.e. when it's rate limited)
}

private static InputStream toSynchronizedInputStream(String blobName, InputStream delegate, MultiPart multipart) {
assert delegate.markSupported() : "An InputStream with mark support was expected";
// We need to introduce a read barrier in order to provide visibility for the underlying
// input stream state as the input stream can be read from different threads.
// TODO See if this is still needed
/**
* Wraps an {@link InputStream} to assert that it is read only by a single thread at a time and to add log traces.
*/
private static InputStream wrapInputStream(final String blobName, final InputStream delegate, final MultiPart multipart) {
return new FilterInputStream(delegate) {

private final AtomicReference<Thread> currentThread = Assertions.ENABLED ? new AtomicReference<>() : null;
private final boolean isTraceEnabled = logger.isTraceEnabled();

@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
var result = super.read(b, off, len);
if (isTraceEnabled) {
logger.trace("{} reads {} bytes from {} part {}", Thread.currentThread(), result, blobName, multipart.part());
}
return result;
}

@Override
public synchronized int read() throws IOException {
var result = super.read();
if (isTraceEnabled) {
logger.trace("{} reads {} byte from {} part {}", Thread.currentThread(), result, blobName, multipart.part());
}
return result;
}

@Override
public synchronized void mark(int readlimit) {
if (isTraceEnabled) {
logger.trace("{} marks stream {} part {}", Thread.currentThread(), blobName, multipart.part());
}
super.mark(readlimit);
}

@Override
public synchronized void reset() throws IOException {
if (isTraceEnabled) {
logger.trace("{} resets stream {} part {}", Thread.currentThread(), blobName, multipart.part());
public int read(byte[] b, int off, int len) throws IOException {
assert assertThread(null, Thread.currentThread());
assert ThreadPool.assertCurrentThreadPool(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
try {
var result = super.read(b, off, len);
if (isTraceEnabled) {
logger.trace("{} reads {} bytes from {} part {}", Thread.currentThread(), result, blobName, multipart.part());
}
return result;
} finally {
assert assertThread(Thread.currentThread(), null);
}
super.reset();
}

@Override
public synchronized void close() throws IOException {
if (isTraceEnabled) {
logger.trace("{} closes stream {} part {}", Thread.currentThread(), blobName, multipart.part());
public int read() throws IOException {
assert assertThread(null, Thread.currentThread());
assert ThreadPool.assertCurrentThreadPool(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
try {
var result = super.read();
if (isTraceEnabled) {
logger.trace("{} reads {} byte from {} part {}", Thread.currentThread(), result, blobName, multipart.part());
}
return result;
} finally {
assert assertThread(Thread.currentThread(), null);
}
super.close();
}

@Override
public String toString() {
return blobName + " part [" + multipart.part() + "] of size [" + multipart.blockSize() + ']';
private boolean assertThread(Thread current, Thread updated) {
final Thread witness = currentThread.compareAndExchange(current, updated);
assert witness == current
: "Unable to set current thread to ["
+ updated
+ "]: expected thread ["
+ current
+ "] to be the thread currently accessing the input stream for reading, but thread "
+ witness
+ " is already reading "
+ blobName
+ " part "
+ multipart.part();
return true;
}
};
}

private static Flux<ByteBuffer> toFlux(InputStream stream, long length, int chunkSize) {
assert stream.markSupported() : "An InputStream with mark support was expected";
// We need to mark the InputStream as it's possible that we need to retry for the same chunk
/**
* Converts an input stream to a Flux of ByteBuffer. This method also checks that the stream has provided the expected number of bytes.
*
* @param stream the input stream that needs to be converted
* @param length the expected length in bytes of the input stream
* @param byteBufferSize the size of the ByteBuffers to be created
**/
private static Flux<ByteBuffer> toFlux(InputStream stream, long length, final int byteBufferSize) {
assert stream.markSupported() : "input stream must support mark and reset";
// always marks the input stream in case it needs to be retried
stream.mark(Integer.MAX_VALUE);
// defer the creation of the flux until it is subscribed
return Flux.defer(() -> {
// TODO Code in this Flux.defer() can be concurrently executed by multiple threads?
try {
stream.reset();
} catch (IOException e) {
throw new RuntimeException(e);
// Flux.defer() catches and propagates the exception
throw new UncheckedIOException(e);
}
// the number of bytes read is updated in a thread pool (repository_azure) and later compared to the expected length in another
// thread pool (azure_event_loop), so we need this to be atomic.
final var bytesRead = new AtomicLong(0L);
// This flux is subscribed by a downstream operator that finally queues the
// buffers into netty output queue. Sadly we are not able to get a signal once
// the buffer has been flushed, so we have to allocate those and let the GC to
// reclaim them (see MonoSendMany). Additionally, that very same operator requests
// 128 elements (that's hardcoded) once it's subscribed (later on, it requests
// by 64 elements), that's why we provide 64kb buffers.

// length is at most 100MB so it's safe to cast back to an integer in this case
final int parts = (int) length / chunkSize;
final long remaining = length % chunkSize;
return Flux.range(0, remaining == 0 ? parts : parts + 1).map(i -> i * chunkSize).concatMap(pos -> Mono.fromCallable(() -> {
long count = pos + chunkSize > length ? length - pos : chunkSize;
assert length <= ByteSizeValue.ofMb(100L).getBytes() : length;
// length is at most 100MB so it's safe to cast back to an integer
final int parts = Math.toIntExact(length / byteBufferSize);
final long remaining = length % byteBufferSize;

// This flux is subscribed by a downstream subscriber (reactor.netty.channel.MonoSendMany) that queues the buffers into netty
// output queue. Sadly we are not able to get a signal once the buffer has been flushed, so we have to allocate those and let
// the GC to reclaim them. Additionally, the MonoSendMany subscriber requests 128 elements from the flux when it subscribes to
// it. This 128 value is hardcoded in reactor.netty.channel.MonoSend.MAX_SIZE). After 128 byte buffers have been published by
// the flux, the MonoSendMany subscriber requests 64 more byte buffers (see reactor.netty.channel.MonoSend.REFILL_SIZE) and so
// on.
//
// So this flux instantiates 128 ByteBuffer objects of DEFAULT_UPLOAD_BUFFERS_SIZE bytes in heap every time the NettyOutbound in
// the Azure's Netty event loop requests byte buffers to write to the network channel. That represents 128 * 64kb = 8 mb per
// flux which is aligned with BlobAsyncClient.BLOB_DEFAULT_HTBB_UPLOAD_BLOCK_SIZE. The creation of the ByteBuffer objects are
// forked to the repository_azure thread pool, which has a maximum of 15 threads (most of the time, can be less than that for
// nodes with less than 750mb heap). It means that max. 15 * 8 = 120mb bytes are allocated on heap at a time here (omitting the
// ones already created and pending garbage collection).
return Flux.range(0, remaining == 0 ? parts : parts + 1).map(i -> i * byteBufferSize).concatMap(pos -> Mono.fromCallable(() -> {
long count = pos + byteBufferSize > length ? length - pos : byteBufferSize;
int numOfBytesRead = 0;
int offset = 0;
int len = (int) count;
Expand All @@ -867,9 +893,8 @@ private static Flux<ByteBuffer> toFlux(InputStream stream, long length, int chun
);
}
});
// We need to subscribe on a different scheduler to avoid blocking the io threads when we read the input stream
// subscribe on a different scheduler to avoid blocking the network io threads when reading bytes from disk
}).subscribeOn(Schedulers.elastic());

}

/**
Expand Down