Skip to content

Commit 3600cd1

Browse files
committed
Merge branch 'master' into stevegury/availability
2 parents a7cb3d2 + f62e73d commit 3600cd1

File tree

1 file changed

+15
-17
lines changed

1 file changed

+15
-17
lines changed

src/main/java/io/reactivesocket/internal/Responder.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ public void onNext(Frame requestFrame) {
259259
} else if (requestFrame.getType() == FrameType.CANCEL) {
260260
Subscription s;
261261
synchronized (Responder.this) {
262-
s = cancellationSubscriptions.get(requestFrame.getStreamId());
262+
s = cancellationSubscriptions.get(streamId);
263263
}
264264
if (s != null) {
265265
s.cancel();
@@ -268,7 +268,7 @@ public void onNext(Frame requestFrame) {
268268
} else if (requestFrame.getType() == FrameType.REQUEST_N) {
269269
SubscriptionArbiter inFlightSubscription;
270270
synchronized (Responder.this) {
271-
inFlightSubscription = inFlight.get(requestFrame.getStreamId());
271+
inFlightSubscription = inFlight.get(streamId);
272272
}
273273
if (inFlightSubscription != null) {
274274
long requestN = Frame.RequestN.requestN(requestFrame);
@@ -399,6 +399,7 @@ private Publisher<Frame> handleRequestResponse(
399399
final RequestHandler requestHandler,
400400
final Int2ObjectHashMap<Subscription> cancellationSubscriptions) {
401401

402+
final int streamId = requestFrame.getStreamId();
402403
return child -> {
403404
Subscription s = new Subscription() {
404405

@@ -408,8 +409,6 @@ private Publisher<Frame> handleRequestResponse(
408409
@Override
409410
public void request(long n) {
410411
if (n > 0 && started.compareAndSet(false, true)) {
411-
final int streamId = requestFrame.getStreamId();
412-
413412
try {
414413
Publisher<Payload> responsePublisher =
415414
requestHandler.handleRequestResponse(requestFrame);
@@ -477,13 +476,13 @@ public void cancel() {
477476

478477
private void cleanup() {
479478
synchronized(Responder.this) {
480-
cancellationSubscriptions.remove(requestFrame.getStreamId());
479+
cancellationSubscriptions.remove(streamId);
481480
}
482481
}
483482

484483
};
485484
synchronized(Responder.this) {
486-
cancellationSubscriptions.put(requestFrame.getStreamId(), s);
485+
cancellationSubscriptions.put(streamId, s);
487486
}
488487
child.onSubscribe(s);
489488
};
@@ -541,7 +540,7 @@ private Publisher<Frame> _handleRequestStream(
541540
final Int2ObjectHashMap<Subscription> cancellationSubscriptions,
542541
final Int2ObjectHashMap<SubscriptionArbiter> inFlight,
543542
final boolean allowCompletion) {
544-
543+
final int streamId = requestFrame.getStreamId();
545544
return child -> {
546545
Subscription s = new Subscription() {
547546

@@ -556,7 +555,6 @@ public void request(long n) {
556555
}
557556
if (started.compareAndSet(false, true)) {
558557
arbiter.addTransportRequest(n);
559-
final int streamId = requestFrame.getStreamId();
560558

561559
try {
562560
Publisher<Payload> responses =
@@ -630,14 +628,14 @@ public void cancel() {
630628

631629
private void cleanup() {
632630
synchronized(Responder.this) {
633-
inFlight.remove(requestFrame.getStreamId());
634-
cancellationSubscriptions.remove(requestFrame.getStreamId());
631+
inFlight.remove(streamId);
632+
cancellationSubscriptions.remove(streamId);
635633
}
636634
}
637635

638636
};
639637
synchronized(Responder.this) {
640-
cancellationSubscriptions.put(requestFrame.getStreamId(), s);
638+
cancellationSubscriptions.put(streamId, s);
641639
}
642640
child.onSubscribe(s);
643641

@@ -704,8 +702,9 @@ private Publisher<Frame> handleRequestChannel(Frame requestFrame,
704702
Int2ObjectHashMap<SubscriptionArbiter> inFlight) {
705703

706704
UnicastSubject<Payload> channelSubject;
705+
final int streamId = requestFrame.getStreamId();
707706
synchronized(Responder.this) {
708-
channelSubject = channels.get(requestFrame.getStreamId());
707+
channelSubject = channels.get(streamId);
709708
}
710709
if (channelSubject == null) {
711710
return child -> {
@@ -722,7 +721,6 @@ public void request(long n) {
722721
}
723722
if (started.compareAndSet(false, true)) {
724723
arbiter.addTransportRequest(n);
725-
final int streamId = requestFrame.getStreamId();
726724

727725
// first request on this channel
728726
UnicastSubject<Payload> channelRequests =
@@ -816,14 +814,14 @@ public void cancel() {
816814

817815
private void cleanup() {
818816
synchronized(Responder.this) {
819-
inFlight.remove(requestFrame.getStreamId());
820-
cancellationSubscriptions.remove(requestFrame.getStreamId());
817+
inFlight.remove(streamId);
818+
cancellationSubscriptions.remove(streamId);
821819
}
822820
}
823821

824822
};
825823
synchronized(Responder.this) {
826-
cancellationSubscriptions.put(requestFrame.getStreamId(), s);
824+
cancellationSubscriptions.put(streamId, s);
827825
}
828826
child.onSubscribe(s);
829827

@@ -848,7 +846,7 @@ private void cleanup() {
848846
// handle time-gap issues like this?
849847
// TODO validate with unit tests.
850848
return PublisherUtils.errorFrame(
851-
requestFrame.getStreamId(), new RuntimeException("Channel unavailable"));
849+
streamId, new RuntimeException("Channel unavailable"));
852850
}
853851
}
854852
}

0 commit comments

Comments
 (0)