6666import org .elasticsearch .common .unit .ByteSizeUnit ;
6767import org .elasticsearch .common .unit .ByteSizeValue ;
6868import org .elasticsearch .common .util .BigArrays ;
69+ import org .elasticsearch .core .Assertions ;
6970import org .elasticsearch .core .CheckedConsumer ;
7071import org .elasticsearch .core .IOUtils ;
7172import org .elasticsearch .core .Nullable ;
7475import org .elasticsearch .repositories .azure .AzureRepository .Repository ;
7576import org .elasticsearch .repositories .blobstore .ChunkedBlobOutputStream ;
7677import org .elasticsearch .rest .RestStatus ;
78+ import org .elasticsearch .threadpool .ThreadPool ;
7779
7880import java .io .FilterInputStream ;
7981import java .io .IOException ;
101103import java .util .concurrent .TimeUnit ;
102104import java .util .concurrent .atomic .AtomicInteger ;
103105import java .util .concurrent .atomic .AtomicLong ;
106+ import java .util .concurrent .atomic .AtomicReference ;
104107import java .util .concurrent .atomic .LongAdder ;
105108import java .util .function .BiPredicate ;
106109import java .util .stream .Collectors ;
@@ -507,7 +510,11 @@ void writeBlobAtomic(
507510 return asyncClient .commitBlockList (
508511 multiParts .stream ().map (MultiPart ::blockId ).toList (),
509512 failIfAlreadyExists == false
510- ).doOnSuccess (unused -> logger .debug ("{}: all {} parts committed" , blobName , multiParts .size ()));
513+ )
514+ .doOnSuccess (unused -> logger .debug ("{}: all {} parts committed" , blobName , multiParts .size ()))
515+ // Note: non-committed uploaded blocks will be deleted by Azure after a week
516+ // (see https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#remarks)
517+ .doOnError (e -> logger .error (() -> format ("%s: failed to commit %d parts" , blobName , multiParts .size ()), e ));
511518 })
512519 .block ();
513520 }
@@ -562,12 +569,13 @@ private static Mono<String> stageBlock(
562569 multiPart .blockOffset ()
563570 );
564571 try {
565- var stream = toSynchronizedInputStream (blobName , provider .apply (multiPart .blockOffset (), multiPart .blockSize ()), multiPart );
572+ final var stream = provider .apply (multiPart .blockOffset (), multiPart .blockSize ());
573+ assert stream .markSupported () : "provided input stream must support mark and reset" ;
566574 boolean success = false ;
567575 try {
568576 var stageBlock = asyncClient .stageBlock (
569577 multiPart .blockId (),
570- toFlux (stream , multiPart .blockSize (), DEFAULT_UPLOAD_BUFFERS_SIZE ),
578+ toFlux (wrapInputStream ( blobName , stream , multiPart ) , multiPart .blockSize (), DEFAULT_UPLOAD_BUFFERS_SIZE ),
571579 multiPart .blockSize ()
572580 ).doOnSuccess (unused -> {
573581 logger .debug (() -> format ("%s: part [%s] of size [%s] uploaded" , blobName , multiPart .part (), multiPart .blockSize ()));
@@ -760,88 +768,106 @@ public synchronized int read() throws IOException {
760768 // we read the input stream (i.e. when it's rate limited)
761769 }
762770
763- private static InputStream toSynchronizedInputStream (String blobName , InputStream delegate , MultiPart multipart ) {
764- assert delegate .markSupported () : "An InputStream with mark support was expected" ;
765- // We need to introduce a read barrier in order to provide visibility for the underlying
766- // input stream state as the input stream can be read from different threads.
767- // TODO See if this is still needed
771+ /**
772+ * Wraps an {@link InputStream} to assert that it is read only by a single thread at a time and to add log traces.
773+ */
774+ private static InputStream wrapInputStream (final String blobName , final InputStream delegate , final MultiPart multipart ) {
768775 return new FilterInputStream (delegate ) {
769776
777+ private final AtomicReference <Thread > currentThread = Assertions .ENABLED ? new AtomicReference <>() : null ;
770778 private final boolean isTraceEnabled = logger .isTraceEnabled ();
771779
772780 @ Override
773- public synchronized int read (byte [] b , int off , int len ) throws IOException {
774- var result = super .read (b , off , len );
775- if (isTraceEnabled ) {
776- logger .trace ("{} reads {} bytes from {} part {}" , Thread .currentThread (), result , blobName , multipart .part ());
777- }
778- return result ;
779- }
780-
781- @ Override
782- public synchronized int read () throws IOException {
783- var result = super .read ();
784- if (isTraceEnabled ) {
785- logger .trace ("{} reads {} byte from {} part {}" , Thread .currentThread (), result , blobName , multipart .part ());
786- }
787- return result ;
788- }
789-
790- @ Override
791- public synchronized void mark (int readlimit ) {
792- if (isTraceEnabled ) {
793- logger .trace ("{} marks stream {} part {}" , Thread .currentThread (), blobName , multipart .part ());
794- }
795- super .mark (readlimit );
796- }
797-
798- @ Override
799- public synchronized void reset () throws IOException {
800- if (isTraceEnabled ) {
801- logger .trace ("{} resets stream {} part {}" , Thread .currentThread (), blobName , multipart .part ());
781+ public int read (byte [] b , int off , int len ) throws IOException {
782+ assert assertThread (null , Thread .currentThread ());
783+ assert ThreadPool .assertCurrentThreadPool (AzureRepositoryPlugin .REPOSITORY_THREAD_POOL_NAME );
784+ try {
785+ var result = super .read (b , off , len );
786+ if (isTraceEnabled ) {
787+ logger .trace ("{} reads {} bytes from {} part {}" , Thread .currentThread (), result , blobName , multipart .part ());
788+ }
789+ return result ;
790+ } finally {
791+ assert assertThread (Thread .currentThread (), null );
802792 }
803- super .reset ();
804793 }
805794
806795 @ Override
807- public synchronized void close () throws IOException {
808- if (isTraceEnabled ) {
809- logger .trace ("{} closes stream {} part {}" , Thread .currentThread (), blobName , multipart .part ());
796+ public int read () throws IOException {
797+ assert assertThread (null , Thread .currentThread ());
798+ assert ThreadPool .assertCurrentThreadPool (AzureRepositoryPlugin .REPOSITORY_THREAD_POOL_NAME );
799+ try {
800+ var result = super .read ();
801+ if (isTraceEnabled ) {
802+ logger .trace ("{} reads {} byte from {} part {}" , Thread .currentThread (), result , blobName , multipart .part ());
803+ }
804+ return result ;
805+ } finally {
806+ assert assertThread (Thread .currentThread (), null );
810807 }
811- super .close ();
812808 }
813809
814- @ Override
815- public String toString () {
816- return blobName + " part [" + multipart .part () + "] of size [" + multipart .blockSize () + ']' ;
810+ private boolean assertThread (Thread current , Thread updated ) {
811+ final Thread witness = currentThread .compareAndExchange (current , updated );
812+ assert witness == current
813+ : "Unable to set current thread to ["
814+ + updated
815+ + "]: expected thread ["
816+ + current
817+ + "] to be the thread currently accessing the input stream for reading, but thread "
818+ + witness
819+ + " is already reading "
820+ + blobName
821+ + " part "
822+ + multipart .part ();
823+ return true ;
817824 }
818825 };
819826 }
820827
821- private static Flux <ByteBuffer > toFlux (InputStream stream , long length , int chunkSize ) {
822- assert stream .markSupported () : "An InputStream with mark support was expected" ;
823- // We need to mark the InputStream as it's possible that we need to retry for the same chunk
828+ /**
829+ * Converts an input stream to a Flux of ByteBuffer. This method also checks that the stream has provided the expected number of bytes.
830+ *
831+ * @param stream the input stream that needs to be converted
832+ * @param length the expected length in bytes of the input stream
833+ * @param byteBufferSize the size of the ByteBuffers to be created
834+ **/
835+ private static Flux <ByteBuffer > toFlux (InputStream stream , long length , final int byteBufferSize ) {
836+ assert stream .markSupported () : "input stream must support mark and reset" ;
837+ // always marks the input stream in case it needs to be retried
824838 stream .mark (Integer .MAX_VALUE );
839+ // defer the creation of the flux until it is subscribed
825840 return Flux .defer (() -> {
826- // TODO Code in this Flux.defer() can be concurrently executed by multiple threads?
827841 try {
828842 stream .reset ();
829843 } catch (IOException e ) {
830- throw new RuntimeException (e );
844+ // Flux.defer() catches and propagates the exception
845+ throw new UncheckedIOException (e );
831846 }
847+ // the number of bytes read is updated in a thread pool (repository_azure) and later compared to the expected length in another
848+ // thread pool (azure_event_loop), so we need this to be atomic.
832849 final var bytesRead = new AtomicLong (0L );
833- // This flux is subscribed by a downstream operator that finally queues the
834- // buffers into netty output queue. Sadly we are not able to get a signal once
835- // the buffer has been flushed, so we have to allocate those and let the GC to
836- // reclaim them (see MonoSendMany). Additionally, that very same operator requests
837- // 128 elements (that's hardcoded) once it's subscribed (later on, it requests
838- // by 64 elements), that's why we provide 64kb buffers.
839850
840- // length is at most 100MB so it's safe to cast back to an integer in this case
841- final int parts = (int ) length / chunkSize ;
842- final long remaining = length % chunkSize ;
843- return Flux .range (0 , remaining == 0 ? parts : parts + 1 ).map (i -> i * chunkSize ).concatMap (pos -> Mono .fromCallable (() -> {
844- long count = pos + chunkSize > length ? length - pos : chunkSize ;
851+ assert length <= ByteSizeValue .ofMb (100L ).getBytes () : length ;
852+ // length is at most 100MB so it's safe to cast back to an integer
853+ final int parts = Math .toIntExact (length / byteBufferSize );
854+ final long remaining = length % byteBufferSize ;
855+
856+ // This flux is subscribed by a downstream subscriber (reactor.netty.channel.MonoSendMany) that queues the buffers into netty
857+ // output queue. Sadly we are not able to get a signal once the buffer has been flushed, so we have to allocate those and let
858+ // the GC to reclaim them. Additionally, the MonoSendMany subscriber requests 128 elements from the flux when it subscribes to
859+ // it. This 128 value is hardcoded in reactor.netty.channel.MonoSend.MAX_SIZE). After 128 byte buffers have been published by
860+ // the flux, the MonoSendMany subscriber requests 64 more byte buffers (see reactor.netty.channel.MonoSend.REFILL_SIZE) and so
861+ // on.
862+ //
863+ // So this flux instantiates 128 ByteBuffer objects of DEFAULT_UPLOAD_BUFFERS_SIZE bytes in heap every time the NettyOutbound in
864+ // the Azure's Netty event loop requests byte buffers to write to the network channel. That represents 128 * 64kb = 8 mb per
865+ // flux which is aligned with BlobAsyncClient.BLOB_DEFAULT_HTBB_UPLOAD_BLOCK_SIZE. The creation of the ByteBuffer objects are
866+ // forked to the repository_azure thread pool, which has a maximum of 15 threads (most of the time, can be less than that for
867+ // nodes with less than 750mb heap). It means that max. 15 * 8 = 120mb bytes are allocated on heap at a time here (omitting the
868+ // ones already created and pending garbage collection).
869+ return Flux .range (0 , remaining == 0 ? parts : parts + 1 ).map (i -> i * byteBufferSize ).concatMap (pos -> Mono .fromCallable (() -> {
870+ long count = pos + byteBufferSize > length ? length - pos : byteBufferSize ;
845871 int numOfBytesRead = 0 ;
846872 int offset = 0 ;
847873 int len = (int ) count ;
@@ -867,9 +893,8 @@ private static Flux<ByteBuffer> toFlux(InputStream stream, long length, int chun
867893 );
868894 }
869895 });
870- // We need to subscribe on a different scheduler to avoid blocking the io threads when we read the input stream
896+ // subscribe on a different scheduler to avoid blocking the network io threads when reading bytes from disk
871897 }).subscribeOn (Schedulers .elastic ());
872-
873898 }
874899
875900 /**
0 commit comments