Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024 the Eclipse Milo Authors
* Copyright (c) 2025 the Eclipse Milo Authors
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand All @@ -14,10 +14,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -176,6 +173,10 @@ public void resume() {
public List<Task> shutdown(boolean awaitQuiescence) throws InterruptedException {
taskQueueLock.lock();
try {
if (shutdown) {
return List.of();
}

shutdown = true;

if (taskQueue.isEmpty() && pending == 0) {
Expand All @@ -196,8 +197,8 @@ public List<Task> shutdown(boolean awaitQuiescence) throws InterruptedException
taskQueueLock.unlock();
}

// if we made it this far we have pending tasks to await
assert shutdownLatch != null;
// if we made it this far, we have pending tasks to await
Objects.requireNonNull(shutdownLatch);
shutdownLatch.await();

taskQueueLock.lock();
Expand All @@ -224,8 +225,16 @@ private void maybePollAndExecute() {
taskQueueLock.lock();
try {
if (pending < maxConcurrentTasks && !paused && !shutdown && !taskQueue.isEmpty()) {
executor.execute(Objects.requireNonNull(taskQueue.poll()));
pending++;
TaskWrapper task = Objects.requireNonNull(taskQueue.poll());

try {
executor.execute(task);
pending++;
} catch (RejectedExecutionException e) {
if (task.callback != null) {
task.callback.completeExceptionally(e);
}
}
}
} finally {
taskQueueLock.unlock();
Expand Down Expand Up @@ -259,10 +268,14 @@ public void run() {
task.execute();

if (callback != null) {
executor.execute(() -> callback.complete(Unit.VALUE));
notifyCallback(callback);
}
} catch (Throwable throwable) {
logger.warn("Uncaught Throwable during Task execution.", throwable);

if (callback != null) {
notifyCallbackExceptionally(callback, throwable);
}
}

TaskWrapper inlineTask = null;
Expand All @@ -276,7 +289,7 @@ public void run() {
shutdownLatch.countDown();
}
} else {
// pending count remains the same
// the pending count remains the same
inlineTask = taskQueue.poll();
}
} finally {
Expand All @@ -286,12 +299,18 @@ public void run() {
if (inlineTask != null) {
try {
inlineTask.task.execute();

if (inlineTask.callback != null) {
CompletableFuture<Unit> callback = inlineTask.callback;
executor.execute(() -> callback.complete(Unit.VALUE));
notifyCallback(callback);
}
} catch (Throwable throwable) {
logger.warn("Uncaught Throwable during Task execution.", throwable);

if (inlineTask.callback != null) {
CompletableFuture<Unit> callback = inlineTask.callback;
notifyCallbackExceptionally(callback, throwable);
}
}

taskQueueLock.lock();
Expand All @@ -303,14 +322,42 @@ public void run() {
shutdownLatch.countDown();
}
} else {
// pending count remains the same
executor.execute(Objects.requireNonNull(taskQueue.poll()));
// the pending count remains the same
TaskWrapper nextTask = Objects.requireNonNull(taskQueue.poll());
try {
executor.execute(nextTask);
} catch (RejectedExecutionException e) {
pending--;

if (nextTask.callback != null) {
nextTask.callback.completeExceptionally(e);
}
}
}
} finally {
taskQueueLock.unlock();
}
}
}

private void notifyCallback(CompletableFuture<Unit> callback) {
try {
executor.execute(() -> callback.complete(Unit.VALUE));
} catch (RejectedExecutionException e) {
// complete the notification inline, still a success, because the task was executed.
callback.complete(Unit.VALUE);
}
}

private void notifyCallbackExceptionally(
CompletableFuture<Unit> callback, Throwable throwable) {
try {
executor.execute(() -> callback.completeExceptionally(throwable));
} catch (RejectedExecutionException e) {
// complete the notification inline, using the original failure.
callback.completeExceptionally(throwable);
}
}
}

public interface Task {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024 the Eclipse Milo Authors
* Copyright (c) 2025 the Eclipse Milo Authors
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand All @@ -10,19 +10,12 @@

package org.eclipse.milo.opcua.stack.core.util;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.*;

import java.util.ArrayList;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.jspecify.annotations.NonNull;
import org.junit.jupiter.api.Test;

class TaskQueueTest {
Expand Down Expand Up @@ -252,6 +245,66 @@ void executionCallback() throws ExecutionException, InterruptedException {
}
}

@Test
void callbackCompletesExceptionallyWhenTaskThrows() throws Exception {
var taskExecutor = new TaskQueue(executor);

var task =
new TestTask() {
@Override
public void execute() {
throw new RuntimeException("Task execution failed");
}
};

CompletionStage<Unit> callback = taskExecutor.submit(task);
assertNotNull(callback);

// Expected: callback completes exceptionally when the task throws
assertThrows(
ExecutionException.class,
() -> callback.toCompletableFuture().get(250, TimeUnit.MILLISECONDS),
"Callback should complete exceptionally when task throws");
}

@Test
void callbackCompletesWhenSchedulingCallbackFails() throws Exception {
// Executor that runs the first task submission normally but throws on later executions
Executor flakyExecutor =
new Executor() {
final AtomicInteger calls = new AtomicInteger();

@Override
public void execute(@NonNull Runnable command) {
if (calls.getAndIncrement() == 0) {
// Schedule the TaskQueue's first task execution on the real executor
executor.execute(command);
} else {
// Simulate executor failing to schedule the callback completion runnable
throw new RejectedExecutionException("simulated execution rejection");
}
}
};

var taskExecutor = TaskQueue.newBuilder().setExecutor(flakyExecutor).build();

var task = new TestTask();
var callback = taskExecutor.submit(task);
assertNotNull(callback);

// Ensure the task itself executed successfully
task.awaitExecution();

// Give a brief moment for any (failing) callback scheduling attempt
Thread.sleep(25);

// Expected: even if scheduling the callback completion throws, the callback should be
// completed.
assertDoesNotThrow(
() -> callback.toCompletableFuture().get(250, TimeUnit.MILLISECONDS),
"Callback should complete even when scheduling the completion throws");
}

private final AtomicInteger sequence = new AtomicInteger(0);

private class TestTask implements TaskQueue.Task {
Expand All @@ -274,6 +327,14 @@ void awaitExecution() {
throw new RuntimeException(e);
}
}

boolean awaitExecution(long timeout, TimeUnit unit) {
try {
return executed.await(timeout, unit);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

private class TriggeredTestTask extends TestTask {
Expand Down
Loading