2525import java .util .concurrent .ThreadFactory ;
2626import java .util .concurrent .atomic .AtomicBoolean ;
2727import java .util .concurrent .atomic .AtomicInteger ;
28+ import java .util .concurrent .locks .Lock ;
29+ import java .util .concurrent .locks .ReentrantLock ;
2830import java .util .function .Supplier ;
2931import java .util .stream .IntStream ;
3032import org .slf4j .Logger ;
@@ -42,12 +44,13 @@ class DefaultExecutorServiceFactory implements ExecutorServiceFactory {
4244 private final int minSize ;
4345 private final int clientPerExecutor ;
4446 private final Supplier <Executor > executorFactory ;
47+ private final Lock lock = new ReentrantLock ();
4548
4649 DefaultExecutorServiceFactory (int minSize , int clientPerExecutor , String prefix ) {
4750 this .minSize = minSize ;
4851 this .clientPerExecutor = clientPerExecutor ;
4952 this .threadFactory = threadFactory (prefix );
50- this .executorFactory = () -> newExecutor () ;
53+ this .executorFactory = this :: newExecutor ;
5154 List <Executor > l = new ArrayList <>(this .minSize );
5255 IntStream .range (0 , this .minSize ).forEach (ignored -> l .add (this .executorFactory .get ()));
5356 executors = new CopyOnWriteArrayList <>(l );
@@ -111,29 +114,39 @@ private Executor newExecutor() {
111114 }
112115
113116 @ Override
114- public synchronized ExecutorService get () {
115- if (closed .get ()) {
116- throw new IllegalStateException ("Executor service factory is closed" );
117- } else {
118- maybeResize (this .executors , this .minSize , this .clientPerExecutor , this .executorFactory );
119- LOGGER .debug ("Looking least used executor in {}" , this .executors );
120- Executor executor = this .executors .stream ().min (EXECUTOR_COMPARATOR ).get ();
121- LOGGER .debug ("Least used executor is {}" , executor );
122- executor .incrementUsage ();
123- return executor .executorService ;
117+ public ExecutorService get () {
118+ this .lock .lock ();
119+ try {
120+ if (closed .get ()) {
121+ throw new IllegalStateException ("Executor service factory is closed" );
122+ } else {
123+ maybeResize (this .executors , this .minSize , this .clientPerExecutor , this .executorFactory );
124+ LOGGER .debug ("Looking least used executor in {}" , this .executors );
125+ Executor executor = this .executors .stream ().min (EXECUTOR_COMPARATOR ).get ();
126+ LOGGER .debug ("Least used executor is {}" , executor );
127+ executor .incrementUsage ();
128+ return executor .executorService ;
129+ }
130+ } finally {
131+ this .lock .unlock ();
124132 }
125133 }
126134
127135 @ Override
128- public synchronized void clientClosed (ExecutorService executorService ) {
129- if (!closed .get ()) {
130- Executor executor = find (executorService );
131- if (executor == null ) {
132- LOGGER .info ("Could not find executor service wrapper" );
133- } else {
134- executor .decrementUsage ();
135- maybeResize (this .executors , this .minSize , this .clientPerExecutor , this .executorFactory );
136+ public void clientClosed (ExecutorService executorService ) {
137+ this .lock .lock ();
138+ try {
139+ if (!closed .get ()) {
140+ Executor executor = find (executorService );
141+ if (executor == null ) {
142+ LOGGER .info ("Could not find executor service wrapper" );
143+ } else {
144+ executor .decrementUsage ();
145+ maybeResize (this .executors , this .minSize , this .clientPerExecutor , this .executorFactory );
146+ }
136147 }
148+ } finally {
149+ this .lock .unlock ();
137150 }
138151 }
139152
@@ -148,17 +161,26 @@ private Executor find(ExecutorService executorService) {
148161
149162 @ Override
150163 public synchronized void close () {
151- if (closed .compareAndSet (false , true )) {
152- this .executors .forEach (executor -> executor .executorService .shutdownNow ());
164+ this .lock .lock ();
165+ try {
166+ if (closed .compareAndSet (false , true )) {
167+ this .executors .forEach (executor -> executor .executorService .shutdownNow ());
168+ }
169+ } finally {
170+ this .lock .unlock ();
153171 }
154172 }
155173
156174 static class Executor {
157175
176+ private static final AtomicInteger ID_SEQUENCE = new AtomicInteger ();
177+
158178 private final ExecutorService executorService ;
159179 private AtomicInteger usage = new AtomicInteger (0 );
180+ private final int id ;
160181
161182 Executor (ExecutorService executorService ) {
183+ this .id = ID_SEQUENCE .getAndIncrement ();
162184 this .executorService = executorService ;
163185 }
164186
@@ -192,7 +214,7 @@ private void close() {
192214
193215 @ Override
194216 public String toString () {
195- return "Executor{" + "usage=" + usage .get () + '}' ;
217+ return "Executor{" + "id=" + id + ", usage=" + usage .get () + '}' ;
196218 }
197219 }
198220}
0 commit comments