@@ -35,13 +35,14 @@ private PresenceAwareRSocket(
3535 this .groupRoute = destination == null || destination .isEmpty ();
3636
3737 onClose ()
38- .doFinally (f -> {
38+ .doFinally (signalType -> {
3939 if (groupRoute ) {
4040 presenceNotifier .stopWatching (accountId , group );
4141 } else {
4242 presenceNotifier .stopWatching (accountId , destination , group );
4343 }
44- });
44+ })
45+ .subscribe ();
4546 }
4647
4748 public static PresenceAwareRSocket wrap (
@@ -55,27 +56,36 @@ public static PresenceAwareRSocket wrap(
5556
5657 @ Override
5758 public Mono <Void > fireAndForget (Payload payload ) {
58- return _notify ().then (source .fireAndForget (payload ));
59+ return _notify ()
60+ .doOnError (t -> payload .release ())
61+ .then (source .fireAndForget (payload ));
5962 }
6063
6164 @ Override
6265 public Mono <Payload > requestResponse (Payload payload ) {
63- return _notify ().then (source .requestResponse (payload ));
66+ return _notify ()
67+ .doOnError (t -> payload .release ())
68+ .then (source .requestResponse (payload ));
6469 }
6570
6671 @ Override
6772 public Flux <Payload > requestStream (Payload payload ) {
68- return _notify ().thenMany (source .requestStream (payload ));
73+ return _notify ()
74+ .doOnError (t -> payload .release ())
75+ .thenMany (source .requestStream (payload ));
6976 }
7077
7178 @ Override
7279 public Flux <Payload > requestChannel (Publisher <Payload > payloads ) {
73- return _notify ().thenMany (source .requestChannel (payloads ));
80+ return _notify ()
81+ .thenMany (source .requestChannel (payloads ));
7482 }
7583
7684 @ Override
7785 public Mono <Void > metadataPush (Payload payload ) {
78- return _notify ().then (source .metadataPush (payload ));
86+ return _notify ()
87+ .doOnError (t -> payload .release ())
88+ .then (source .metadataPush (payload ));
7989 }
8090
8191 private Mono <Void > _notify () {
0 commit comments