@@ -234,7 +234,9 @@ public HttpResponse addToQueue(SessionRequest request) {
234234 }
235235
236236 Lock writeLock = this .lock .writeLock ();
237- writeLock .lock ();
237+ if (!writeLock .tryLock ()) {
238+ writeLock .lock ();
239+ }
238240 try {
239241 requests .remove (request .getRequestId ());
240242 queue .remove (request );
@@ -268,7 +270,9 @@ Data injectIntoQueue(SessionRequest request) {
268270 Data data = new Data (request .getEnqueued ());
269271
270272 Lock writeLock = lock .writeLock ();
271- writeLock .lock ();
273+ if (!writeLock .tryLock ()) {
274+ writeLock .lock ();
275+ }
272276 try {
273277 requests .put (request .getRequestId (), data );
274278 queue .addLast (request );
@@ -288,7 +292,9 @@ public boolean retryAddToQueue(SessionRequest request) {
288292 contexts .getOrDefault (request .getRequestId (), tracer .getCurrentContext ());
289293 try (Span ignored = context .createSpan ("sessionqueue.retry" )) {
290294 Lock writeLock = lock .writeLock ();
291- writeLock .lock ();
295+ if (!writeLock .tryLock ()) {
296+ writeLock .lock ();
297+ }
292298 try {
293299 if (!requests .containsKey (request .getRequestId ())) {
294300 return false ;
@@ -319,7 +325,9 @@ public Optional<SessionRequest> remove(RequestId reqId) {
319325 Require .nonNull ("Request ID" , reqId );
320326
321327 Lock writeLock = lock .writeLock ();
322- writeLock .lock ();
328+ if (!writeLock .tryLock ()) {
329+ writeLock .lock ();
330+ }
323331 try {
324332 Iterator <SessionRequest > iterator = queue .iterator ();
325333 while (iterator .hasNext ()) {
@@ -340,6 +348,28 @@ public Optional<SessionRequest> remove(RequestId reqId) {
340348 public List <SessionRequest > getNextAvailable (Map <Capabilities , Long > stereotypes ) {
341349 Require .nonNull ("Stereotypes" , stereotypes );
342350
351+ // delay the response to avoid heavy polling via http
352+ long started = System .currentTimeMillis ();
353+ while (8000 > System .currentTimeMillis () - started ) {
354+ Lock readLock = lock .readLock ();
355+ readLock .lock ();
356+
357+ try {
358+ if (!queue .isEmpty ()) {
359+ break ;
360+ }
361+ } finally {
362+ readLock .unlock ();
363+ }
364+
365+ try {
366+ Thread .sleep (10 );
367+ } catch (InterruptedException ex ) {
368+ Thread .currentThread ().interrupt ();
369+ break ;
370+ }
371+ }
372+
343373 Predicate <Capabilities > matchesStereotype =
344374 caps ->
345375 stereotypes .entrySet ().stream ()
@@ -355,7 +385,9 @@ public List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes
355385 });
356386
357387 Lock writeLock = lock .writeLock ();
358- writeLock .lock ();
388+ if (!writeLock .tryLock ()) {
389+ writeLock .lock ();
390+ }
359391 try {
360392 List <SessionRequest > availableRequests =
361393 queue .stream ()
@@ -381,7 +413,9 @@ public boolean complete(
381413 try (Span ignored = context .createSpan ("sessionqueue.completed" )) {
382414 Data data ;
383415 Lock writeLock = lock .writeLock ();
384- writeLock .lock ();
416+ if (!writeLock .tryLock ()) {
417+ writeLock .lock ();
418+ }
385419 try {
386420 data = requests .remove (reqId );
387421 queue .removeIf (req -> reqId .equals (req .getRequestId ()));
@@ -401,7 +435,9 @@ public boolean complete(
401435 @ Override
402436 public int clearQueue () {
403437 Lock writeLock = lock .writeLock ();
404- writeLock .lock ();
438+ if (!writeLock .tryLock ()) {
439+ writeLock .lock ();
440+ }
405441
406442 try {
407443 int size = queue .size ();
0 commit comments