22
33import io .netty .util .concurrent .FastThreadLocal ;
44import lombok .AllArgsConstructor ;
5+ import lombok .extern .slf4j .Slf4j ;
56import org .jctools .queues .MpmcArrayQueue ;
67import reactor .core .scheduler .Schedulers ;
78import reactor .function .Function6 ;
89
910import java .util .Queue ;
11+ import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
1012import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
1113import java .util .function .Consumer ;
1214import java .util .function .Supplier ;
1315
16+ @ Slf4j
1417class RecyclerImpl <T > extends FastThreadLocal <RecyclerImpl .ThreadLocalRecyclable <T >> implements Recycler <T > {
1518
1619 private final Supplier <T > factory ;
1720 private final Consumer <T > rest ;
1821
1922 private final Queue <T > queue ;
20- private final int maxSize ;
2123
2224 public RecyclerImpl (int size , Supplier <T > factory , Consumer <T > rest ) {
2325 if (size < 2 ) {
@@ -32,53 +34,52 @@ public RecyclerImpl(int size, Supplier<T> factory, Consumer<T> rest) {
3234
3335 this .factory = factory ;
3436 this .rest = rest ;
35- this .maxSize = size ;
3637 this .queue = new MpmcArrayQueue <>(size );
3738 }
3839
3940 @ Override
4041 protected ThreadLocalRecyclable <T > initialValue () throws Exception {
41- return new ThreadLocalRecyclable <T >(factory .get (), false );
42+ return new ThreadLocalRecyclable <T >(this , factory .get (), null );
4243 }
4344
4445 @ Override
4546 protected void onRemoval (ThreadLocalRecyclable <T > value ) {
4647 rest .accept (value .value );
4748 }
4849
50+ private void doReset (T val ) {
51+ try {
52+ rest .accept (val );
53+ } catch (Throwable e ) {
54+ log .warn ("reset object [{}] failed" , val , e );
55+ }
56+ }
57+
4958 @ Override
5059 public <A , A1 , A2 , A3 , A4 , R > R doWith (A arg0 , A1 arg1 , A2 arg2 , A3 arg3 , A4 arg4 , Function6 <T , A , A1 , A2 , A3 , A4 , R > call ) {
5160 // 非阻塞线程里 优先使用ThreadLocal池
5261 if (Schedulers .isInNonBlockingThread ()) {
5362 ThreadLocalRecyclable <T > ref = this .get ();
5463 // 使用中,回调里又执行了?
55- if (! ref .using ) {
64+ if (ref .use () ) {
5665 try {
57- ref .using = true ;
5866 return call .apply (ref .value , arg0 , arg1 , arg2 , arg3 , arg4 );
5967 } finally {
60- ref .using = false ;
61- rest . accept ( ref .value );
68+ doReset ( ref .value ) ;
69+ ref .recycle ( );
6270 }
6371 }
6472 }
6573 // 在阻塞线程中,使用队列的方式,防止在虚拟线程等场景下创建大量对象导致性能反而降低.
6674 T t = queue .poll ();
67- boolean recycle = true ;
6875 if (t == null ) {
6976 t = factory .get ();
70- // 如果队列已满,不回收新创建的对象
71- if (queue .size () >= maxSize ) {
72- recycle = false ;
73- }
7477 }
7578 try {
7679 return call .apply (t , arg0 , arg1 , arg2 , arg3 , arg4 );
7780 } finally {
78- rest .accept (t );
79- if (recycle ) {
80- queue .offer (t );
81- }
81+ doReset (t );
82+ queue .offer (t );
8283 }
8384 }
8485
@@ -88,21 +89,43 @@ public Recyclable<T> take(boolean synchronous) {
8889 // 同步的,尝试使用ThreadLocal
8990 if (synchronous && Schedulers .isInNonBlockingThread ()) {
9091 ThreadLocalRecyclable <T > ref = this .get ();
91- if (!ref .using ) {
92- ref .using = true ;
93- return ref ;
92+ if (ref .use ()) {
93+ return new OnceRecyclable <>(ref );
9494 }
9595 }
9696 T t = queue .poll ();
97- boolean recycle = true ;
9897 if (t == null ) {
9998 t = factory .get ();
100- // 如果队列已满,不回收新创建的对象
101- if (queue .size () >= maxSize ) {
102- recycle = false ;
99+ }
100+ return new QueueRecyclable <>(this , t );
101+ }
102+
103+ @ AllArgsConstructor
104+ static class OnceRecyclable <T > implements Recyclable <T > {
105+ @ SuppressWarnings ("all" )
106+ static final AtomicReferenceFieldUpdater <OnceRecyclable , Recyclable >
107+ REF = AtomicReferenceFieldUpdater .newUpdater (OnceRecyclable .class , Recyclable .class , "recyclable" );
108+
109+ private volatile Recyclable <T > recyclable ;
110+
111+ @ Override
112+ public T get () {
113+ @ SuppressWarnings ("unchecked" )
114+ Recyclable <T > recyclable = REF .get (this );
115+ if (recyclable == null ) {
116+ throw new IllegalStateException ("Object is recycled!" );
117+ }
118+ return recyclable .get ();
119+ }
120+
121+ @ Override
122+ public void recycle () {
123+ @ SuppressWarnings ("unchecked" )
124+ Recyclable <T > recyclable = REF .getAndSet (this , null );
125+ if (recyclable != null ) {
126+ recyclable .recycle ();
103127 }
104128 }
105- return new QueueRecyclable <>(this , recycle , t );
106129 }
107130
108131 @ AllArgsConstructor
@@ -112,7 +135,6 @@ static class QueueRecyclable<T> implements Recyclable<T> {
112135 VALUE = AtomicReferenceFieldUpdater .newUpdater (QueueRecyclable .class , Object .class , "value" );
113136
114137 final RecyclerImpl <T > main ;
115- final boolean doRecycle ;
116138 volatile T value ;
117139
118140 @ Override
@@ -130,27 +152,40 @@ public void recycle() {
130152 @ SuppressWarnings ("all" )
131153 T val = (T ) VALUE .getAndSet (this , null );
132154 if (val != null ) {
133- main .rest .accept (val );
134- if (doRecycle ) {
135- main .queue .offer (val );
136- }
155+ main .doReset (val );
156+ main .queue .offer (val );
137157 }
138158 }
139159 }
140160
141161 @ AllArgsConstructor
142- static class ThreadLocalRecyclable <T > implements Recyclable <T > {
143- private T value ;
144- private boolean using ;
162+ static class ThreadLocalRecyclable <T > implements Recyclable <T > {
163+ @ SuppressWarnings ("all" )
164+ static final AtomicReferenceFieldUpdater <ThreadLocalRecyclable , Thread >
165+ USING = AtomicReferenceFieldUpdater .newUpdater (ThreadLocalRecyclable .class , Thread .class , "using" );
166+ private final RecyclerImpl <T > main ;
167+ private final T value ;
168+ private volatile Thread using ;
145169
146170 @ Override
147171 public T get () {
148172 return value ;
149173 }
150174
175+ boolean use () {
176+ return USING .compareAndSet (this , null , Thread .currentThread ());
177+ }
178+
151179 @ Override
152180 public void recycle () {
153- using = false ;
181+ main .doReset (value );
182+ Thread current = Thread .currentThread ();
183+ Thread hold = USING .getAndSet (this , null );
184+ if (hold != null ) {
185+ if (hold != current ) {
186+ log .warn ("Recycle object cross thread! request by {},recycle by {}" , hold , current );
187+ }
188+ }
154189 }
155190 }
156191}
0 commit comments