18
18
import java .nio .ByteBuffer ;
19
19
import java .util .concurrent .CompletableFuture ;
20
20
import java .util .concurrent .atomic .AtomicBoolean ;
21
+ import java .util .concurrent .atomic .AtomicInteger ;
21
22
import java .util .concurrent .atomic .AtomicLong ;
22
23
import org .reactivestreams .Subscriber ;
23
24
import org .reactivestreams .Subscription ;
@@ -57,7 +58,7 @@ public class SplittingTransformer<ResponseT, ResultT> implements SdkPublisher<As
57
58
/**
58
59
* Set to true once {@code .onStream()} is called on the upstreamResponseTransformer
59
60
*/
60
- private final AtomicBoolean onStreamCalled = new AtomicBoolean ( false ) ;
61
+ private boolean onStreamCalled ;
61
62
62
63
/**
63
64
* Set to true once {@code .cancel()} is called in the subscription of the downstream subscriber, or if the
@@ -108,16 +109,9 @@ public class SplittingTransformer<ResponseT, ResultT> implements SdkPublisher<As
108
109
private volatile CompletableFuture <ResultT > upstreamFuture ;
109
110
110
111
/**
111
- * Tracks whether an {@code IndividualTransformer} has been created for the first part yet. Errors will only be retried for
112
- * the first part.
112
+ * Tracks the part number. Errors will only be retried for the first part.
113
113
*/
114
- private final AtomicBoolean isFirstIndividualTransformer = new AtomicBoolean (true );
115
-
116
- /**
117
- * Tracks whether an {@code IndividualPartSubscriber} has been created for the first part yet. Errors will only be retried for
118
- * the first part.
119
- */
120
- private final AtomicBoolean isFirstIndividualSubscriber = new AtomicBoolean (true );
114
+ private final AtomicInteger partNumber = new AtomicInteger (0 );
121
115
122
116
private SplittingTransformer (AsyncResponseTransformer <ResponseT , ResultT > upstreamResponseTransformer ,
123
117
Long maximumBufferSizeInBytes ,
@@ -206,7 +200,7 @@ private boolean doEmit() {
206
200
}
207
201
if (outstandingDemand .get () > 0 ) {
208
202
demand = outstandingDemand .decrementAndGet ();
209
- downstreamSubscriber .onNext (new IndividualTransformer (isFirstIndividualTransformer . compareAndSet ( true , false )));
203
+ downstreamSubscriber .onNext (new IndividualTransformer (partNumber . incrementAndGet ( )));
210
204
}
211
205
}
212
206
return false ;
@@ -224,7 +218,7 @@ private void handleSubscriptionCancel() {
224
218
log .trace (() -> "downstreamSubscriber already null, skipping downstreamSubscriber.onComplete()" );
225
219
return ;
226
220
}
227
- if (!onStreamCalled . get () ) {
221
+ if (!onStreamCalled ) {
228
222
// we never subscribe publisherToUpstream to the upstream, it would not complete
229
223
downstreamSubscriber = null ;
230
224
return ;
@@ -268,19 +262,19 @@ private void handleFutureCancel(Throwable e) {
268
262
* body publisher.
269
263
*/
270
264
private class IndividualTransformer implements AsyncResponseTransformer <ResponseT , ResponseT > {
271
- private final boolean isFirstPart ;
265
+ private final int partNumber ;
272
266
private ResponseT response ;
273
267
private CompletableFuture <ResponseT > individualFuture ;
274
268
275
- IndividualTransformer (boolean isFirstPart ) {
276
- this .isFirstPart = isFirstPart ;
269
+ IndividualTransformer (int partNumber ) {
270
+ this .partNumber = partNumber ;
277
271
}
278
272
279
273
@ Override
280
274
public CompletableFuture <ResponseT > prepare () {
281
275
this .individualFuture = new CompletableFuture <>();
282
276
283
- if (isFirstPart ) {
277
+ if (partNumber == 1 ) {
284
278
if (isCancelled .get ()) {
285
279
return individualFuture ;
286
280
}
@@ -299,7 +293,7 @@ public CompletableFuture<ResponseT> prepare() {
299
293
300
294
@ Override
301
295
public void onResponse (ResponseT response ) {
302
- if (isFirstPart ) {
296
+ if (partNumber == 1 ) {
303
297
log .trace (() -> "calling onResponse on the upstream transformer" );
304
298
upstreamResponseTransformer .onResponse (response );
305
299
}
@@ -312,8 +306,8 @@ public void onStream(SdkPublisher<ByteBuffer> publisher) {
312
306
return ;
313
307
}
314
308
synchronized (cancelLock ) {
315
- if (isFirstPart ) {
316
- onStreamCalled . set ( true ) ;
309
+ if (partNumber == 1 ) {
310
+ onStreamCalled = true ;
317
311
log .trace (() -> "calling onStream on the upstream transformer" );
318
312
upstreamResponseTransformer .onStream (upstreamSubscriber -> publisherToUpstream .subscribe (
319
313
DelegatingBufferingSubscriber .builder ()
@@ -324,22 +318,25 @@ public void onStream(SdkPublisher<ByteBuffer> publisher) {
324
318
}
325
319
}
326
320
327
- if (!resultFuture .isDone ()) {
328
- CompletableFutureUtils .forwardResultTo (upstreamFuture , resultFuture );
329
- }
330
-
331
- publisher .subscribe (new IndividualPartSubscriber <>(this .individualFuture , response ,
332
- isFirstIndividualSubscriber .compareAndSet (true , false )));
321
+ CompletableFutureUtils .forwardResultTo (upstreamFuture , resultFuture );
322
+ publisher .subscribe (new IndividualPartSubscriber <>(this .individualFuture , response , partNumber ));
333
323
}
334
324
335
325
@ Override
336
326
public void exceptionOccurred (Throwable error ) {
337
327
log .trace (() -> "calling exceptionOccurred on the upstream transformer" );
338
- upstreamResponseTransformer .exceptionOccurred (error );
339
328
340
- if (! isFirstPart || onStreamCalled . get () ) {
341
- publisherToUpstream . error (error );
329
+ if (partNumber == 1 ) {
330
+ upstreamResponseTransformer . exceptionOccurred (error );
342
331
}
332
+
333
+ // TODO - add comments explaining
334
+ synchronized (cancelLock ) {
335
+ if (partNumber > 1 || onStreamCalled ) {
336
+ publisherToUpstream .error (error );
337
+ }
338
+ }
339
+
343
340
}
344
341
}
345
342
@@ -350,13 +347,13 @@ class IndividualPartSubscriber<T> implements Subscriber<ByteBuffer> {
350
347
351
348
private final CompletableFuture <T > future ;
352
349
private final T response ;
353
- private final boolean isFirstPart ;
350
+ private final int partNumber ;
354
351
private Subscription subscription ;
355
352
356
- IndividualPartSubscriber (CompletableFuture <T > future , T response , boolean isFirstPart ) {
353
+ IndividualPartSubscriber (CompletableFuture <T > future , T response , int partNumber ) {
357
354
this .future = future ;
358
355
this .response = response ;
359
- this .isFirstPart = isFirstPart ;
356
+ this .partNumber = partNumber ;
360
357
}
361
358
362
359
@ Override
@@ -396,7 +393,7 @@ public void onComplete() {
396
393
}
397
394
398
395
private void handleError (Throwable t ) {
399
- if (! isFirstPart ) {
396
+ if (partNumber > 1 ) {
400
397
publisherToUpstream .error (t );
401
398
}
402
399
future .completeExceptionally (t );
0 commit comments