2828import reactor .core .publisher .Operators ;
2929import reactor .util .context .Context ;
3030
31- /** Specific interface for all RSocket store in {@link RSocketPool} */
32- final class ResolvingPooledRSocket extends ResolvingOperator <RSocket >
31+ /** Default implementation of {@link PooledRSocket} stored in {@link RSocketPool} */
32+ final class DefaultPooledRSocket extends ResolvingOperator <RSocket >
3333 implements CoreSubscriber <RSocket >, PooledRSocket {
3434
35- final LoadbalanceTarget loadbalanceTarget ;
35+ final RSocketPool parent ;
36+ final LoadbalanceRSocketSource loadbalanceRSocketSource ;
3637 final Stats stats ;
3738
3839 volatile Subscription s ;
3940
40- static final AtomicReferenceFieldUpdater <ResolvingPooledRSocket , Subscription > S =
41- AtomicReferenceFieldUpdater .newUpdater (ResolvingPooledRSocket .class , Subscription .class , "s" );
41+ static final AtomicReferenceFieldUpdater <DefaultPooledRSocket , Subscription > S =
42+ AtomicReferenceFieldUpdater .newUpdater (DefaultPooledRSocket .class , Subscription .class , "s" );
4243
43- ResolvingPooledRSocket (LoadbalanceTarget loadbalanceTarget , Stats stats ) {
44+ DefaultPooledRSocket (
45+ RSocketPool parent , LoadbalanceRSocketSource loadbalanceRSocketSource , Stats stats ) {
46+ this .parent = parent ;
4447 this .stats = stats ;
45- this .loadbalanceTarget = loadbalanceTarget ;
48+ this .loadbalanceRSocketSource = loadbalanceRSocketSource ;
4649 }
4750
4851 @ Override
@@ -100,21 +103,61 @@ public void onNext(RSocket value) {
100103
101104 @ Override
102105 protected void doSubscribe () {
103- this .loadbalanceTarget .source ().subscribe (this );
106+ this .loadbalanceRSocketSource .source ().subscribe (this );
104107 }
105108
106109 @ Override
107110 protected void doOnValueResolved (RSocket value ) {
111+ stats .setAvailability (1.0 );
108112 value .onClose ().subscribe (null , t -> this .invalidate (), this ::invalidate );
109113 }
110114
111115 @ Override
112116 protected void doOnValueExpired (RSocket value ) {
117+ stats .setAvailability (0.0 );
113118 value .dispose ();
119+ this .dispose ();
120+ }
121+
122+ @ Override
123+ public void dispose () {
124+ super .dispose ();
114125 }
115126
116127 @ Override
117128 protected void doOnDispose () {
129+ final RSocketPool parent = this .parent ;
130+ for (; ; ) {
131+ final PooledRSocket [] sockets = parent .activeSockets ;
132+ final int activeSocketsCount = sockets .length ;
133+
134+ int index = -1 ;
135+ for (int i = 0 ; i < activeSocketsCount ; i ++) {
136+ if (sockets [i ] == this ) {
137+ index = i ;
138+ break ;
139+ }
140+ }
141+
142+ if (index == -1 ) {
143+ break ;
144+ }
145+
146+ final int lastIndex = activeSocketsCount - 1 ;
147+ final PooledRSocket [] newSockets = new PooledRSocket [lastIndex ];
148+ if (index != 0 ) {
149+ System .arraycopy (sockets , 0 , newSockets , 0 , index );
150+ }
151+
152+ if (index != lastIndex ) {
153+ System .arraycopy (sockets , index + 1 , newSockets , index , lastIndex - index );
154+ }
155+
156+ if (RSocketPool .ACTIVE_SOCKETS .compareAndSet (parent , sockets , newSockets )) {
157+ break ;
158+ }
159+ }
160+ stats .setAvailability (0.0 );
118161 Operators .terminate (S , this );
119162 }
120163
@@ -154,47 +197,21 @@ public Stats stats() {
154197 }
155198
156199 @ Override
157- public LoadbalanceTarget supplier () {
158- return loadbalanceTarget ;
200+ public LoadbalanceRSocketSource source () {
201+ return loadbalanceRSocketSource ;
159202 }
160203
161204 @ Override
162205 public double availability () {
163206 return stats .availability ();
164207 }
165208
166- /**
167- * Try to dispose this instance if possible. Otherwise, if there is ongoing requests, mark this as
168- * pending for removal and dispose once all the requests are terminated.<br>
169- * This operation may be cancelled if {@link #markActive()} is invoked prior this instance has
170- * been disposed
171- *
172- * @return {@code true} if this instance was disposed
173- */
174- @ Override
175- public boolean markForRemoval () {
176- // FIXME: provide real logic here
177- this .dispose ();
178- return true ;
179- }
180-
181- /**
182- * Try to restore state of this RSocket to be active after marking as pending removal again.
183- *
184- * @return {@code true} if marked as active. Otherwise, should be treated as it was disposed.
185- */
186- @ Override
187- public boolean markActive () {
188- return false ;
189- }
190-
191209 static final class RequestTrackingMonoInner <RESULT >
192210 extends MonoDeferredResolution <RESULT , RSocket > {
193211
194212 long startTime ;
195213
196- RequestTrackingMonoInner (
197- ResolvingPooledRSocket parent , Payload payload , FrameType requestType ) {
214+ RequestTrackingMonoInner (DefaultPooledRSocket parent , Payload payload , FrameType requestType ) {
198215 super (parent , payload , requestType );
199216 }
200217
@@ -228,7 +245,7 @@ public void accept(RSocket rSocket, Throwable t) {
228245 return ;
229246 }
230247
231- startTime = ((ResolvingPooledRSocket ) parent ).stats .startRequest ();
248+ startTime = ((DefaultPooledRSocket ) parent ).stats .startRequest ();
232249
233250 source .subscribe ((CoreSubscriber ) this );
234251 } else {
@@ -240,7 +257,7 @@ public void accept(RSocket rSocket, Throwable t) {
240257 public void onComplete () {
241258 final long state = this .requested ;
242259 if (state != TERMINATED_STATE && REQUESTED .compareAndSet (this , state , TERMINATED_STATE )) {
243- final Stats stats = ((ResolvingPooledRSocket ) parent ).stats ;
260+ final Stats stats = ((DefaultPooledRSocket ) parent ).stats ;
244261 final long now = stats .stopRequest (startTime );
245262 stats .record (now - startTime );
246263 super .onComplete ();
@@ -251,7 +268,7 @@ public void onComplete() {
251268 public void onError (Throwable t ) {
252269 final long state = this .requested ;
253270 if (state != TERMINATED_STATE && REQUESTED .compareAndSet (this , state , TERMINATED_STATE )) {
254- Stats stats = ((ResolvingPooledRSocket ) parent ).stats ;
271+ Stats stats = ((DefaultPooledRSocket ) parent ).stats ;
255272 stats .stopRequest (startTime );
256273 stats .recordError (0.0 );
257274 super .onError (t );
@@ -267,7 +284,7 @@ public void cancel() {
267284
268285 if (state == STATE_SUBSCRIBED ) {
269286 this .s .cancel ();
270- ((ResolvingPooledRSocket ) parent ).stats .stopRequest (startTime );
287+ ((DefaultPooledRSocket ) parent ).stats .stopRequest (startTime );
271288 } else {
272289 this .parent .remove (this );
273290 ReferenceCountUtil .safeRelease (this .payload );
@@ -279,7 +296,7 @@ static final class RequestTrackingFluxInner<INPUT>
279296 extends FluxDeferredResolution <INPUT , RSocket > {
280297
281298 RequestTrackingFluxInner (
282- ResolvingPooledRSocket parent , INPUT fluxOrPayload , FrameType requestType ) {
299+ DefaultPooledRSocket parent , INPUT fluxOrPayload , FrameType requestType ) {
283300 super (parent , fluxOrPayload , requestType );
284301 }
285302
@@ -312,7 +329,7 @@ public void accept(RSocket rSocket, Throwable t) {
312329 return ;
313330 }
314331
315- ((ResolvingPooledRSocket ) parent ).stats .startStream ();
332+ ((DefaultPooledRSocket ) parent ).stats .startStream ();
316333
317334 source .subscribe (this );
318335 } else {
@@ -324,7 +341,7 @@ public void accept(RSocket rSocket, Throwable t) {
324341 public void onComplete () {
325342 final long state = this .requested ;
326343 if (state != TERMINATED_STATE && REQUESTED .compareAndSet (this , state , TERMINATED_STATE )) {
327- ((ResolvingPooledRSocket ) parent ).stats .stopStream ();
344+ ((DefaultPooledRSocket ) parent ).stats .stopStream ();
328345 super .onComplete ();
329346 }
330347 }
@@ -333,7 +350,7 @@ public void onComplete() {
333350 public void onError (Throwable t ) {
334351 final long state = this .requested ;
335352 if (state != TERMINATED_STATE && REQUESTED .compareAndSet (this , state , TERMINATED_STATE )) {
336- ((ResolvingPooledRSocket ) parent ).stats .stopStream ();
353+ ((DefaultPooledRSocket ) parent ).stats .stopStream ();
337354 super .onError (t );
338355 }
339356 }
@@ -347,7 +364,7 @@ public void cancel() {
347364
348365 if (state == STATE_SUBSCRIBED ) {
349366 this .s .cancel ();
350- ((ResolvingPooledRSocket ) parent ).stats .stopStream ();
367+ ((DefaultPooledRSocket ) parent ).stats .stopStream ();
351368 } else {
352369 this .parent .remove (this );
353370 if (requestType == FrameType .REQUEST_STREAM ) {
0 commit comments