36
36
37
37
private static final class CachedWorkerPool {
38
38
private final long keepAliveTime ;
39
- private final ConcurrentLinkedQueue <PoolWorker > expiringQueue ;
39
+ private final ConcurrentLinkedQueue <ThreadWorker > expiringWorkerQueue ;
40
40
private final ScheduledExecutorService evictExpiredWorkerExecutor ;
41
41
42
42
CachedWorkerPool (long keepAliveTime , TimeUnit unit ) {
43
43
this .keepAliveTime = unit .toNanos (keepAliveTime );
44
- this .expiringQueue = new ConcurrentLinkedQueue <PoolWorker >();
44
+ this .expiringWorkerQueue = new ConcurrentLinkedQueue <ThreadWorker >();
45
45
46
46
evictExpiredWorkerExecutor = Executors .newScheduledThreadPool (1 , EVICTOR_THREAD_FACTORY );
47
47
evictExpiredWorkerExecutor .scheduleWithFixedDelay (
@@ -58,35 +58,35 @@ public void run() {
58
58
60L , TimeUnit .SECONDS
59
59
);
60
60
61
- PoolWorker get () {
62
- while (!expiringQueue .isEmpty ()) {
63
- PoolWorker poolWorker = expiringQueue .poll ();
64
- if (poolWorker != null ) {
65
- return poolWorker ;
61
+ ThreadWorker get () {
62
+ while (!expiringWorkerQueue .isEmpty ()) {
63
+ ThreadWorker threadWorker = expiringWorkerQueue .poll ();
64
+ if (threadWorker != null ) {
65
+ return threadWorker ;
66
66
}
67
67
}
68
68
69
69
// No cached worker found, so create a new one.
70
- return new PoolWorker (WORKER_THREAD_FACTORY );
70
+ return new ThreadWorker (WORKER_THREAD_FACTORY );
71
71
}
72
72
73
- void release (PoolWorker poolWorker ) {
73
+ void release (ThreadWorker threadWorker ) {
74
74
// Refresh expire time before putting worker back in pool
75
- poolWorker .setExpirationTime (now () + keepAliveTime );
75
+ threadWorker .setExpirationTime (now () + keepAliveTime );
76
76
77
- expiringQueue . add ( poolWorker );
77
+ expiringWorkerQueue . offer ( threadWorker );
78
78
}
79
79
80
80
void evictExpiredWorkers () {
81
- if (!expiringQueue .isEmpty ()) {
81
+ if (!expiringWorkerQueue .isEmpty ()) {
82
82
long currentTimestamp = now ();
83
83
84
- Iterator <PoolWorker > poolWorkerIterator = expiringQueue .iterator ();
85
- while (poolWorkerIterator .hasNext ()) {
86
- PoolWorker poolWorker = poolWorkerIterator .next ();
87
- if (poolWorker .getExpirationTime () <= currentTimestamp ) {
88
- poolWorkerIterator .remove ();
89
- poolWorker .unsubscribe ();
84
+ Iterator <ThreadWorker > threadWorkerIterator = expiringWorkerQueue .iterator ();
85
+ while (threadWorkerIterator .hasNext ()) {
86
+ ThreadWorker threadWorker = threadWorkerIterator .next ();
87
+ if (threadWorker .getExpirationTime () <= currentTimestamp ) {
88
+ threadWorkerIterator .remove ();
89
+ threadWorker .unsubscribe ();
90
90
} else {
91
91
// Queue is ordered with the worker that will expire first in the beginning, so when we
92
92
// find a non-expired worker we can stop evicting.
@@ -108,20 +108,20 @@ public Worker createWorker() {
108
108
109
109
private static class EventLoopWorker extends Scheduler .Worker {
110
110
private final CompositeSubscription innerSubscription = new CompositeSubscription ();
111
- private final PoolWorker poolWorker ;
111
+ private final ThreadWorker threadWorker ;
112
112
volatile int once ;
113
113
static final AtomicIntegerFieldUpdater <EventLoopWorker > ONCE_UPDATER
114
114
= AtomicIntegerFieldUpdater .newUpdater (EventLoopWorker .class , "once" );
115
115
116
- EventLoopWorker (PoolWorker poolWorker ) {
117
- this .poolWorker = poolWorker ;
116
+ EventLoopWorker (ThreadWorker threadWorker ) {
117
+ this .threadWorker = threadWorker ;
118
118
}
119
119
120
120
@ Override
121
121
public void unsubscribe () {
122
122
if (ONCE_UPDATER .compareAndSet (this , 0 , 1 )) {
123
123
// unsubscribe should be idempotent, so only do this once
124
- CachedWorkerPool .INSTANCE .release (poolWorker );
124
+ CachedWorkerPool .INSTANCE .release (threadWorker );
125
125
}
126
126
innerSubscription .unsubscribe ();
127
127
}
@@ -143,17 +143,17 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
143
143
return Subscriptions .empty ();
144
144
}
145
145
146
- NewThreadScheduler .NewThreadWorker .ScheduledAction s = poolWorker .scheduleActual (action , delayTime , unit );
146
+ NewThreadScheduler .NewThreadWorker .ScheduledAction s = threadWorker .scheduleActual (action , delayTime , unit );
147
147
innerSubscription .add (s );
148
148
s .addParent (innerSubscription );
149
149
return s ;
150
150
}
151
151
}
152
152
153
- private static final class PoolWorker extends NewThreadScheduler .NewThreadWorker {
153
+ private static final class ThreadWorker extends NewThreadScheduler .NewThreadWorker {
154
154
private long expirationTime ;
155
155
156
- PoolWorker (ThreadFactory threadFactory ) {
156
+ ThreadWorker (ThreadFactory threadFactory ) {
157
157
super (threadFactory );
158
158
this .expirationTime = 0L ;
159
159
}
0 commit comments