3434import java .time .Instant ;
3535
3636public class ConcurrencyLimiter extends AFn implements IObj {
37- final private Keyword KW_QUEUE = Keyword .intern ("queue" );
38- final private Keyword KW_STATISTICS = Keyword .intern ("statistics" );
39- final private Keyword KW_CURRENT_QUEUE_SIZE = Keyword .intern ("current-queue-size" );
40- final private Keyword KW_CURRENT_CONCURRENCY = Keyword .intern ("current-concurrency" );
41- final private Keyword KW_REMAINING_QUEUE_SIZE = Keyword .intern ("remaining-queue-size" );
42- final private Keyword KW_REMAINING_CONCURRENCY = Keyword .intern ("remaining-concurrency" );
43-
4437 private final BlockingQueue <Task > queue ;
4538 private final ExecutorService executor ;
4639 private final Semaphore limit ;
@@ -50,7 +43,6 @@ public class ConcurrencyLimiter extends AFn implements IObj {
5043
5144 protected IFn onQueueCallback ;
5245 protected IFn onRunCallback ;
53- protected IFn onPollCallback ;
5446
5547 public ConcurrencyLimiter (final ExecutorService executor ,
5648 final int maxConcurrency ,
@@ -70,10 +62,6 @@ public void setOnRunCallback(IFn f) {
7062 this .onRunCallback = f ;
7163 }
7264
73- public void setOnPollCallback (IFn f ) {
74- this .onPollCallback = f ;
75- }
76-
7765 public IObj withMeta (IPersistentMap meta ) {
7866 this .metadata = meta ;
7967 return this ;
@@ -95,7 +83,7 @@ public CompletableFuture invoke(Object arg1) {
9583 }
9684
9785 if (this .onQueueCallback != null ) {
98- this .onQueueCallback .invoke ();
86+ this .onQueueCallback .invoke (this );
9987 }
10088
10189 this .executor .submit ((Runnable )this );
@@ -155,15 +143,6 @@ public void run() {
155143 this .executor .submit (task );
156144 }
157145 }
158-
159- if (this .onRunCallback != null ) {
160- var stats = PersistentArrayMap .EMPTY
161- .assoc (KW_CURRENT_CONCURRENCY , this .getCurrentConcurrency ())
162- .assoc (KW_CURRENT_QUEUE_SIZE , this .getCurrentQueueSize ())
163- .assoc (KW_REMAINING_CONCURRENCY , this .getRemainingConcurrency ())
164- .assoc (KW_REMAINING_QUEUE_SIZE , this .getRemainingQueueSize ());
165- this .onRunCallback .invoke (stats );
166- }
167146 }
168147
169148 private static class Task implements Runnable {
@@ -187,8 +166,8 @@ public boolean isCancelled() {
187166
188167 @ SuppressWarnings ("unchecked" )
189168 public void run () {
190- if (this .limiter .onPollCallback != null ) {
191- this .limiter .onPollCallback .invoke (this .createdAt );
169+ if (this .limiter .onRunCallback != null ) {
170+ this .limiter .onRunCallback .invoke (this . limiter , this .createdAt );
192171 }
193172
194173 final CompletionStage future ;
@@ -217,7 +196,7 @@ public void run() {
217196 }
218197 }
219198
220- public static class CapacityLimitReachedException extends RuntimeException {
199+ public static class CapacityLimitReachedException extends RuntimeException {
221200 public CapacityLimitReachedException (String msg ) {
222201 super (msg );
223202 }
0 commit comments