1818import java .nio .ByteBuffer ;
1919import java .nio .file .Path ;
2020import java .nio .file .attribute .FileTime ;
21- import java .util .Collections ;
22- import java .util .HashSet ;
2321import java .util .Optional ;
24- import java .util .Set ;
2522import java .util .concurrent .atomic .AtomicBoolean ;
2623import java .util .concurrent .atomic .AtomicInteger ;
2724import java .util .concurrent .atomic .AtomicLong ;
@@ -55,7 +52,7 @@ public final class FileAsyncRequestBodySplitHelper {
5552
5653 private volatile boolean isDone = false ;
5754
58- private Set < Long > requestBodyStartPositionsInFlight = Collections . synchronizedSet ( new HashSet <>() );
55+ private AtomicInteger numAsyncRequestBodiesInFlight = new AtomicInteger ( 0 );
5956 private AtomicInteger chunkIndex = new AtomicInteger (0 );
6057 private final FileTime modifiedTimeAtStart ;
6158 private final long sizeAtStart ;
@@ -109,10 +106,9 @@ private void sendAsyncRequestBody(SimplePublisher<AsyncRequestBody> simplePublis
109106
110107 private void doSendAsyncRequestBody (SimplePublisher <AsyncRequestBody > simplePublisher ) {
111108 while (shouldSendMore ()) {
112- long position = chunkSize * chunkIndex .getAndIncrement ();
113- AsyncRequestBody currentAsyncRequestBody = newFileAsyncRequestBody (position , simplePublisher );
109+ AsyncRequestBody currentAsyncRequestBody = newFileAsyncRequestBody (simplePublisher );
114110 simplePublisher .send (currentAsyncRequestBody );
115- requestBodyStartPositionsInFlight . add ( position );
111+ numAsyncRequestBodiesInFlight . incrementAndGet ( );
116112 checkCompletion (simplePublisher , currentAsyncRequestBody );
117113 }
118114 }
@@ -130,12 +126,16 @@ private void checkCompletion(SimplePublisher<AsyncRequestBody> simplePublisher,
130126 }
131127 }
132128
133- private void startNextRequestBody (SimplePublisher <AsyncRequestBody > simplePublisher , long completedPosition ) {
134- requestBodyStartPositionsInFlight .remove (completedPosition );
129+ private void startNextRequestBody (SimplePublisher <AsyncRequestBody > simplePublisher ) {
130+ int d = numAsyncRequestBodiesInFlight .decrementAndGet ();
131+ if (d < 0 ) {
132+ throw new RuntimeException ("Unexpected error occurred. numAsyncRequestBodiesInFlight is negative: " + d );
133+ }
135134 sendAsyncRequestBody (simplePublisher );
136135 }
137136
138- private AsyncRequestBody newFileAsyncRequestBody (long position , SimplePublisher <AsyncRequestBody > simplePublisher ) {
137+ private AsyncRequestBody newFileAsyncRequestBody (SimplePublisher <AsyncRequestBody > simplePublisher ) {
138+ long position = chunkSize * chunkIndex .getAndIncrement ();
139139 long numBytesToReadForThisChunk = Math .min (totalContentLength - position , chunkSize );
140140 FileAsyncRequestBody fileAsyncRequestBody = FileAsyncRequestBody .builder ()
141141 .path (path )
@@ -145,7 +145,7 @@ private AsyncRequestBody newFileAsyncRequestBody(long position, SimplePublisher<
145145 .modifiedTimeAtStart (modifiedTimeAtStart )
146146 .sizeAtStart (sizeAtStart )
147147 .build ();
148- return new FileAsyncRequestBodyWrapper (fileAsyncRequestBody , simplePublisher , position );
148+ return new FileAsyncRequestBodyWrapper (fileAsyncRequestBody , simplePublisher );
149149 }
150150
151151 /**
@@ -156,40 +156,45 @@ private boolean shouldSendMore() {
156156 return false ;
157157 }
158158
159- long currentUsedBuffer = (long ) requestBodyStartPositionsInFlight . size () * bufferPerAsyncRequestBody ;
159+ long currentUsedBuffer = (long ) numAsyncRequestBodiesInFlight . get () * bufferPerAsyncRequestBody ;
160160 return currentUsedBuffer + bufferPerAsyncRequestBody <= totalBufferSize ;
161161 }
162162
163163 @ SdkTestInternalApi
164- int numAsyncRequestBodiesInFlight () {
165- return requestBodyStartPositionsInFlight . size () ;
164+ AtomicInteger numAsyncRequestBodiesInFlight () {
165+ return numAsyncRequestBodiesInFlight ;
166166 }
167167
168168 private final class FileAsyncRequestBodyWrapper implements AsyncRequestBody {
169169
170170 private final FileAsyncRequestBody fileAsyncRequestBody ;
171171 private final SimplePublisher <AsyncRequestBody > simplePublisher ;
172- private final long position ;
172+ private final AtomicBoolean isDone = new AtomicBoolean ( false ) ;
173173
174174 FileAsyncRequestBodyWrapper (FileAsyncRequestBody fileAsyncRequestBody ,
175- SimplePublisher <AsyncRequestBody > simplePublisher , long position ) {
175+ SimplePublisher <AsyncRequestBody > simplePublisher ) {
176176 this .fileAsyncRequestBody = fileAsyncRequestBody ;
177177 this .simplePublisher = simplePublisher ;
178- this .position = position ;
179178 }
180179
181180 @ Override
182181 public void subscribe (Subscriber <? super ByteBuffer > s ) {
183- fileAsyncRequestBody .doAfterOnComplete (() -> startNextRequestBody ( simplePublisher , position ) )
182+ fileAsyncRequestBody .doAfterOnComplete (this :: startNextIfNeeded )
184183 // The reason we still need to call startNextRequestBody when the subscription is
185184 // cancelled is that upstream could cancel the subscription even though the stream has
186185 // finished successfully before onComplete. If this happens, doAfterOnComplete callback
187186 // will never be invoked, and if the current buffer is full, the publisher will stop
188187 // sending new FileAsyncRequestBody, leading to uncompleted future.
189- .doAfterOnCancel (() -> startNextRequestBody ( simplePublisher , position ) )
188+ .doAfterOnCancel (this :: startNextIfNeeded )
190189 .subscribe (s );
191190 }
192191
192+ private void startNextIfNeeded () {
193+ if (isDone .compareAndSet (false , true )) {
194+ startNextRequestBody (simplePublisher );
195+ }
196+ }
197+
193198 @ Override
194199 public Optional <Long > contentLength () {
195200 return fileAsyncRequestBody .contentLength ();
0 commit comments