Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
Expand All @@ -74,6 +75,7 @@
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.FilterInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -101,6 +103,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -507,7 +510,11 @@ void writeBlobAtomic(
return asyncClient.commitBlockList(
multiParts.stream().map(MultiPart::blockId).toList(),
failIfAlreadyExists == false
).doOnSuccess(unused -> logger.debug("{}: all {} parts committed", blobName, multiParts.size()));
)
.doOnSuccess(unused -> logger.debug("{}: all {} parts committed", blobName, multiParts.size()))
// Note: non-committed uploaded blocks will be deleted by Azure after a week
// (see https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#remarks)
.doOnError(e -> logger.error(() -> format("%s: failed to commit %d parts", blobName, multiParts.size()), e));
})
.block();
}
Expand Down Expand Up @@ -562,12 +569,13 @@ private static Mono<String> stageBlock(
multiPart.blockOffset()
);
try {
var stream = toSynchronizedInputStream(blobName, provider.apply(multiPart.blockOffset(), multiPart.blockSize()), multiPart);
final var stream = provider.apply(multiPart.blockOffset(), multiPart.blockSize());
assert stream.markSupported() : "provided input stream must support mark and reset";
boolean success = false;
try {
var stageBlock = asyncClient.stageBlock(
multiPart.blockId(),
toFlux(stream, multiPart.blockSize(), DEFAULT_UPLOAD_BUFFERS_SIZE),
toFlux(wrapInputStream(blobName, stream, multiPart), multiPart.blockSize(), DEFAULT_UPLOAD_BUFFERS_SIZE),
multiPart.blockSize()
).doOnSuccess(unused -> {
logger.debug(() -> format("%s: part [%s] of size [%s] uploaded", blobName, multiPart.part(), multiPart.blockSize()));
Expand Down Expand Up @@ -760,88 +768,105 @@ public synchronized int read() throws IOException {
// we read the input stream (i.e. when it's rate limited)
}

private static InputStream toSynchronizedInputStream(String blobName, InputStream delegate, MultiPart multipart) {
assert delegate.markSupported() : "An InputStream with mark support was expected";
// We need to introduce a read barrier in order to provide visibility for the underlying
// input stream state as the input stream can be read from different threads.
// TODO See if this is still needed
/**
* Wraps an {@link InputStream} to assert that it is read only by a single thread at a time and to add log traces.
*/
private static InputStream wrapInputStream(final String blobName, final InputStream delegate, final MultiPart multipart) {
return new FilterInputStream(delegate) {

private final AtomicReference<Thread> currentThread = Assertions.ENABLED ? new AtomicReference<>() : null;
private final boolean isTraceEnabled = logger.isTraceEnabled();

@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
var result = super.read(b, off, len);
if (isTraceEnabled) {
logger.trace("{} reads {} bytes from {} part {}", Thread.currentThread(), result, blobName, multipart.part());
}
return result;
}

@Override
public synchronized int read() throws IOException {
var result = super.read();
if (isTraceEnabled) {
logger.trace("{} reads {} byte from {} part {}", Thread.currentThread(), result, blobName, multipart.part());
}
return result;
}

@Override
public synchronized void mark(int readlimit) {
if (isTraceEnabled) {
logger.trace("{} marks stream {} part {}", Thread.currentThread(), blobName, multipart.part());
}
super.mark(readlimit);
}

@Override
public synchronized void reset() throws IOException {
if (isTraceEnabled) {
logger.trace("{} resets stream {} part {}", Thread.currentThread(), blobName, multipart.part());
public int read(byte[] b, int off, int len) throws IOException {
assert assertThread(null, Thread.currentThread());
assert ThreadPool.assertCurrentThreadPool(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
try {
var result = super.read(b, off, len);
if (isTraceEnabled) {
logger.trace("{} reads {} bytes from {} part {}", Thread.currentThread(), result, blobName, multipart.part());
}
return result;
} finally {
assert assertThread(Thread.currentThread(), null);
}
super.reset();
}

@Override
public synchronized void close() throws IOException {
if (isTraceEnabled) {
logger.trace("{} closes stream {} part {}", Thread.currentThread(), blobName, multipart.part());
public int read() throws IOException {
assert assertThread(null, Thread.currentThread());
assert ThreadPool.assertCurrentThreadPool(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
try {
var result = super.read();
if (isTraceEnabled) {
logger.trace("{} reads {} byte from {} part {}", Thread.currentThread(), result, blobName, multipart.part());
}
return result;
} finally {
assert assertThread(Thread.currentThread(), null);
}
super.close();
}

@Override
public String toString() {
return blobName + " part [" + multipart.part() + "] of size [" + multipart.blockSize() + ']';
private boolean assertThread(Thread current, Thread updated) {
final Thread witness = currentThread.compareAndExchange(current, updated);
assert witness == current
: "Unable to set current thread to ["
+ updated
+ "]: expected thread ["
+ current
+ "] to be the thread currently accessing the input stream for reading, but thread "
+ witness
+ " is already reading "
+ blobName
+ " part "
+ multipart.part();
return true;
}
};
}

private static Flux<ByteBuffer> toFlux(InputStream stream, long length, int chunkSize) {
assert stream.markSupported() : "An InputStream with mark support was expected";
// We need to mark the InputStream as it's possible that we need to retry for the same chunk
/**
* Converts an input stream to a Flux of ByteBuffer. This method also checks that the stream has provided the expected number of bytes.
*
* @param stream the input stream that needs to be converted
* @param length the expected length in bytes of the input stream
* @param byteBufferSize the size of the ByteBuffer to be created
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param byteBufferSize the size of the ByteBuffer to be created
* @param byteBufferSize the size of the ByteBuffers to be created

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in 680f4b6

**/
private static Flux<ByteBuffer> toFlux(InputStream stream, long length, final int byteBufferSize) {
assert stream.markSupported() : "input stream must support mark and reset";
// always marks the input stream in case it needs to be retried
stream.mark(Integer.MAX_VALUE);
// defer the creation of the flux until it is subscribed
return Flux.defer(() -> {
// TODO Code in this Flux.defer() can be concurrently executed by multiple threads?
try {
stream.reset();
} catch (IOException e) {
throw new RuntimeException(e);
// Flux.defer() catches and propagates the exception
throw new UncheckedIOException(e);
}
// the number of bytes read is updated in a thread pool (repository_azure) and later compared to the expected length in another
// thread pool (azure_event_loop), so we need this to be atomic.
final var bytesRead = new AtomicLong(0L);
// This flux is subscribed by a downstream operator that finally queues the
// buffers into netty output queue. Sadly we are not able to get a signal once
// the buffer has been flushed, so we have to allocate those and let the GC to
// reclaim them (see MonoSendMany). Additionally, that very same operator requests
// 128 elements (that's hardcoded) once it's subscribed (later on, it requests
// by 64 elements), that's why we provide 64kb buffers.

// length is at most 100MB so it's safe to cast back to an integer in this case
final int parts = (int) length / chunkSize;
final long remaining = length % chunkSize;
return Flux.range(0, remaining == 0 ? parts : parts + 1).map(i -> i * chunkSize).concatMap(pos -> Mono.fromCallable(() -> {
long count = pos + chunkSize > length ? length - pos : chunkSize;
assert length <= ByteSizeValue.ofMb(100L).getBytes() : length;
// length is at most 100MB so it's safe to cast back to an integer
final int parts = Math.toIntExact(length / byteBufferSize);
final long remaining = length % byteBufferSize;

// This flux is subscribed by a downstream subscriber (reactor.netty.channel.MonoSendMany) that queues the buffers into netty
// output queue. Sadly we are not able to get a signal once the buffer has been flushed, so we have to allocate those and let
// the GC to reclaim them. Additionally, the MonoSendMany subscriber requests 128 elements from the flux when it subscribes to
// it. This 128 value is hardcoded in reactor.netty.channel.MonoSend.MAX_SIZE). After 128 byte buffers have been published by
// the flux, the MonoSendMany subscriber requests 64 more byte buffers (see reactor.netty.channel.MonoSend.REFILL_SIZE) and so
// on.
//
// So this flux instantiates 128 ByteBuffer objects of DEFAULT_UPLOAD_BUFFERS_SIZE bytes in heap every time the NettyOutbound in
// the Azure's Netty event loop requests byte buffers to write to the network channel. That represents 128 * 64kb = 8 mb per
// flux. The creation of the ByteBuffer objects are forked to the repository_azure thread pool, which has a maximum of 15
// threads (most of the time, can be less than that for nodes with less than 750mb). It means that max. 15 * 8 = 120mb bytes are
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// threads (most of the time, can be less than that for nodes with less than 750mb). It means that max. 15 * 8 = 120mb bytes are
// threads (most of the time, can be less than that for nodes with less than 750mb heap). It means that max. 15 * 8 = 120mb bytes are

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 680f4b6

// allocated on heap at a time here (omitting the ones already created and pending garbage collection).
return Flux.range(0, remaining == 0 ? parts : parts + 1).map(i -> i * byteBufferSize).concatMap(pos -> Mono.fromCallable(() -> {
long count = pos + byteBufferSize > length ? length - pos : byteBufferSize;
int numOfBytesRead = 0;
int offset = 0;
int len = (int) count;
Expand All @@ -860,16 +885,15 @@ private static Flux<ByteBuffer> toFlux(InputStream stream, long length, int chun
);
}
return ByteBuffer.wrap(buffer);
})).doOnComplete(() -> {
}), 0 /* no prefetching needed for concatMap as the MonoSendMany subscriber requests 128 elements */).doOnComplete(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not entirely sure I understand this comment. Is it an important thing or a nit? It seems like we'd generate 128 elements regardless of this being 0 or the default? Is there a difference in when?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's really a nit; I've been puzzled about the byte buffers being fetched by 32 instead of the 128 from the initial comment. This prefetching do not have much impact in our case I think.

if (bytesRead.get() > length) {
throw new IllegalStateException(
format("Input stream [%s] emitted %d bytes, more than the expected %d bytes.", stream, bytesRead, length)
);
}
});
// We need to subscribe on a different scheduler to avoid blocking the io threads when we read the input stream
// subscribe on a different scheduler to avoid blocking the network io threads when reading bytes from disk
}).subscribeOn(Schedulers.elastic());

}

/**
Expand Down