@@ -48,6 +48,7 @@ public class ConcurrencyLimiter extends AFn implements IObj {
4848 private final int maxQueueSize ;
4949 private IPersistentMap metadata = PersistentArrayMap .EMPTY ;
5050
51+ protected IFn onQueueCallback ;
5152 protected IFn onRunCallback ;
5253 protected IFn onPollCallback ;
5354
@@ -61,6 +62,10 @@ public ConcurrencyLimiter(final ExecutorService executor,
6162 this .limit = new Semaphore (maxConcurrency );
6263 }
6364
65+ public void setOnQueueCallback (IFn f ) {
66+ this .onQueueCallback = f ;
67+ }
68+
6469 public void setOnRunCallback (IFn f ) {
6570 this .onRunCallback = f ;
6671 }
@@ -89,6 +94,10 @@ public CompletableFuture invoke(Object arg1) {
8994 return result ;
9095 }
9196
97+ if (this .onQueueCallback != null ) {
98+ this .onQueueCallback .invoke ();
99+ }
100+
92101 this .executor .submit ((Runnable )this );
93102 return result ;
94103 }
@@ -196,10 +205,10 @@ public void run() {
196205 return ;
197206 }
198207
199- future .whenComplete ((result , t ) -> {
200- if (t != null ) {
208+ future .whenComplete ((result , cause ) -> {
209+ if (cause != null ) {
201210 this .limiter .releaseAndSchedule ();
202- this .result .completeExceptionally ((Throwable )t );
211+ this .result .completeExceptionally ((Throwable )cause );
203212 } else {
204213 this .limiter .releaseAndSchedule ();
205214 this .result .complete (result );
0 commit comments