3434import java .util .concurrent .atomic .AtomicBoolean ;
3535import java .util .concurrent .atomic .AtomicLong ;
3636import org .reactivestreams .Publisher ;
37- import org .reactivestreams .Subscriber ;
3837import org .reactivestreams .Subscription ;
3938import org .slf4j .Logger ;
4039import org .slf4j .LoggerFactory ;
4140import reactor .core .CoreSubscriber ;
4241import reactor .core .publisher .Flux ;
4342import reactor .core .publisher .Mono ;
4443import reactor .core .publisher .MonoProcessor ;
44+ import reactor .core .publisher .Operators ;
45+ import reactor .util .context .Context ;
4546import reactor .util .retry .Retry ;
4647
4748/**
@@ -667,26 +668,28 @@ private class WeightedSocket implements LoadBalancerSocketMetrics, RSocket {
667668 @ Override
668669 public Mono <Payload > requestResponse (Payload payload ) {
669670 return rSocketMono .flatMap (
670- source -> {
671- return Mono .from (
672- subscriber ->
673- source
674- .requestResponse (payload )
675- .subscribe (new LatencySubscriber <>(subscriber , this )));
676- });
671+ source ->
672+ Mono .from (
673+ subscriber ->
674+ source
675+ .requestResponse (payload )
676+ .subscribe (
677+ new LatencySubscriber <>(
678+ Operators .toCoreSubscriber (subscriber ), this ))));
677679 }
678680
679681 @ Override
680682 public Flux <Payload > requestStream (Payload payload ) {
681683
682684 return rSocketMono .flatMapMany (
683- source -> {
684- return Flux .from (
685- subscriber ->
686- source
687- .requestStream (payload )
688- .subscribe (new CountingSubscriber <>(subscriber , this )));
689- });
685+ source ->
686+ Flux .from (
687+ subscriber ->
688+ source
689+ .requestStream (payload )
690+ .subscribe (
691+ new CountingSubscriber <>(
692+ Operators .toCoreSubscriber (subscriber ), this ))));
690693 }
691694
692695 @ Override
@@ -698,7 +701,9 @@ public Mono<Void> fireAndForget(Payload payload) {
698701 subscriber ->
699702 source
700703 .fireAndForget (payload )
701- .subscribe (new CountingSubscriber <>(subscriber , this )));
704+ .subscribe (
705+ new CountingSubscriber <>(
706+ Operators .toCoreSubscriber (subscriber ), this )));
702707 });
703708 }
704709
@@ -710,21 +715,24 @@ public Mono<Void> metadataPush(Payload payload) {
710715 subscriber ->
711716 source
712717 .metadataPush (payload )
713- .subscribe (new CountingSubscriber <>(subscriber , this )));
718+ .subscribe (
719+ new CountingSubscriber <>(
720+ Operators .toCoreSubscriber (subscriber ), this )));
714721 });
715722 }
716723
717724 @ Override
718725 public Flux <Payload > requestChannel (Publisher <Payload > payloads ) {
719726
720727 return rSocketMono .flatMapMany (
721- source -> {
722- return Flux .from (
723- subscriber ->
724- source
725- .requestChannel (payloads )
726- .subscribe (new CountingSubscriber <>(subscriber , this )));
727- });
728+ source ->
729+ Flux .from (
730+ subscriber ->
731+ source
732+ .requestChannel (payloads )
733+ .subscribe (
734+ new CountingSubscriber <>(
735+ Operators .toCoreSubscriber (subscriber ), this ))));
728736 }
729737
730738 synchronized double getPredictedLatency () {
@@ -867,18 +875,23 @@ public long lastTimeUsedMillis() {
867875 * Subscriber wrapper used for request/response interaction model, measure and collect latency
868876 * information.
869877 */
870- private class LatencySubscriber <U > implements Subscriber <U > {
871- private final Subscriber <U > child ;
878+ private class LatencySubscriber <U > implements CoreSubscriber <U > {
879+ private final CoreSubscriber <U > child ;
872880 private final WeightedSocket socket ;
873881 private final AtomicBoolean done ;
874882 private long start ;
875883
876- LatencySubscriber (Subscriber <U > child , WeightedSocket socket ) {
884+ LatencySubscriber (CoreSubscriber <U > child , WeightedSocket socket ) {
877885 this .child = child ;
878886 this .socket = socket ;
879887 this .done = new AtomicBoolean (false );
880888 }
881889
890+ @ Override
891+ public Context currentContext () {
892+ return child .currentContext ();
893+ }
894+
882895 @ Override
883896 public void onSubscribe (Subscription s ) {
884897 start = incr ();
@@ -931,15 +944,20 @@ public void onComplete() {
931944 * Subscriber wrapper used for stream like interaction model, it only counts the number of
932945 * active streams
933946 */
934- private class CountingSubscriber <U > implements Subscriber <U > {
935- private final Subscriber <U > child ;
947+ private class CountingSubscriber <U > implements CoreSubscriber <U > {
948+ private final CoreSubscriber <U > child ;
936949 private final WeightedSocket socket ;
937950
938- CountingSubscriber (Subscriber <U > child , WeightedSocket socket ) {
951+ CountingSubscriber (CoreSubscriber <U > child , WeightedSocket socket ) {
939952 this .child = child ;
940953 this .socket = socket ;
941954 }
942955
956+ @ Override
957+ public Context currentContext () {
958+ return child .currentContext ();
959+ }
960+
943961 @ Override
944962 public void onSubscribe (Subscription s ) {
945963 socket .pendingStreams .incrementAndGet ();
0 commit comments