Skip to content

Commit e3e429d

Browse files
committed
added retryable task test && refactored util module
1 parent 15053da commit e3e429d

File tree

5 files changed

+208
-12
lines changed

5 files changed

+208
-12
lines changed

coordination/src/main/java/tech/ydb/coordination/recipes/util/Listenable.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,33 @@
33
import java.util.concurrent.ExecutorService;
44
import java.util.function.Consumer;
55

6+
/**
7+
* Generic interface for objects that allow adding and removing listeners for events of type T.
8+
*
9+
* @param <T> the type of event data that listeners will receive
10+
*/
611
public interface Listenable<T> {
12+
/**
13+
* Adds a listener that will be notified synchronously when the event occurs.
14+
*
15+
* @param listener the listener to add, must not be null
16+
* @throws NullPointerException if listener is null
17+
*/
718
void addListener(Consumer<T> listener);
819

920
/**
10-
* Listener call will be processed in executor
11-
* @param listener
12-
* @param executor
21+
* Adds a listener that will be notified asynchronously using the provided executor.
22+
*
23+
* @param listener the listener to add, must not be null
24+
* @param executor the executor to use for asynchronous notification, must not be null
25+
* @throws NullPointerException if listener or executor is null
1326
*/
1427
void addListener(Consumer<T> listener, ExecutorService executor);
1528

29+
/**
30+
* Removes the specified listener.
31+
*
32+
* @param listener the listener to remove
33+
*/
1634
void removeListener(Consumer<T> listener);
1735
}
Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,87 @@
11
package tech.ydb.coordination.recipes.util;
22

33
import java.util.Map;
4+
import java.util.Objects;
45
import java.util.concurrent.ConcurrentHashMap;
56
import java.util.concurrent.ExecutorService;
67
import java.util.function.Consumer;
78

89
import org.slf4j.Logger;
910
import org.slf4j.LoggerFactory;
1011

12+
/**
13+
* Thread-safe container for managing and notifying listeners.
14+
*
15+
* @param <T> the type of event data that listeners will receive
16+
*/
1117
public class ListenableContainer<T> implements Listenable<T> {
1218
private static final Logger logger = LoggerFactory.getLogger(ListenableContainer.class);
1319

20+
// Maps original listeners to potentially wrapped listeners
1421
private final Map<Consumer<T>, Consumer<T>> listenersMapping = new ConcurrentHashMap<>();
1522

23+
/**
24+
* Notifies all registered listeners with the provided data.
25+
* Exceptions thrown by listeners are caught and logged.
26+
*
27+
* @param data the data to send to listeners
28+
* @throws NullPointerException if data is null
29+
*/
1630
public void notifyListeners(T data) {
31+
Objects.requireNonNull(data, "Data cannot be null");
32+
1733
listenersMapping.values().forEach(listener -> {
1834
try {
1935
listener.accept(data);
2036
} catch (Exception ex) {
21-
logger.error("Listener threw exception", ex);
37+
logger.error("Listener threw exception during notification", ex);
2238
}
2339
});
2440
}
2541

2642
@Override
27-
public void addListener(Consumer listener) {
43+
public void addListener(Consumer<T> listener) {
44+
Objects.requireNonNull(listener, "Listener cannot be null");
45+
2846
if (listenersMapping.containsKey(listener)) {
47+
logger.debug("Listener already registered, skipping");
2948
return;
3049
}
3150

3251
listenersMapping.put(listener, listener);
3352
}
3453

3554
@Override
36-
public void addListener(Consumer listener, ExecutorService executor) {
55+
public void addListener(Consumer<T> listener, ExecutorService executor) {
56+
Objects.requireNonNull(listener, "Listener cannot be null");
57+
Objects.requireNonNull(executor, "Executor cannot be null");
58+
3759
if (listenersMapping.containsKey(listener)) {
60+
logger.debug("Listener already registered, skipping");
3861
return;
3962
}
4063

41-
Consumer<T> wrapper = new Consumer<T>() {
42-
@Override
43-
public void accept(T data) {
44-
executor.submit(() -> listener.accept(data));
64+
Consumer<T> wrapper = data -> {
65+
try {
66+
executor.submit(() -> {
67+
try {
68+
listener.accept(data);
69+
} catch (Exception ex) {
70+
logger.error("Asynchronous listener threw exception", ex);
71+
}
72+
});
73+
} catch (Exception ex) {
74+
logger.error("Failed to submit listener task to executor", ex);
4575
}
4676
};
77+
4778
listenersMapping.put(listener, wrapper);
4879
}
4980

5081
@Override
51-
public void removeListener(Consumer listener) {
82+
public void removeListener(Consumer<T> listener) {
83+
Objects.requireNonNull(listener, "Listener cannot be null");
84+
5285
listenersMapping.remove(listener);
5386
}
5487
}

coordination/src/main/java/tech/ydb/coordination/recipes/util/RetryableTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public CompletableFuture<Status> execute() {
4040
return result;
4141
}
4242

43-
private void attemptTask(CompletableFuture<Status> result) {
43+
void attemptTask(CompletableFuture<Status> result) {
4444
try {
4545
taskSupplier.get().whenComplete((status, throwable) -> {
4646
if (throwable != null) {

coordination/src/main/java/tech/ydb/coordination/recipes/util/SessionListenableProvider.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,14 @@
22

33
import tech.ydb.coordination.CoordinationSession;
44

5+
/**
6+
* Provides access to a Listenable for session state changes.
7+
*/
58
public interface SessionListenableProvider {
9+
/**
10+
* Gets the Listenable for session state changes.
11+
*
12+
* @return the Listenable instance for session state changes, never null
13+
*/
614
Listenable<CoordinationSession.State> getSessionListenable();
715
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package tech.ydb.coordination.recipes.util;
2+
3+
import org.junit.Before;
4+
import org.junit.Test;
5+
import org.junit.runner.RunWith;
6+
import org.mockito.Mock;
7+
import org.mockito.junit.MockitoJUnitRunner;
8+
import tech.ydb.common.retry.RetryPolicy;
9+
import tech.ydb.core.Status;
10+
import tech.ydb.core.StatusCode;
11+
12+
import java.util.concurrent.CompletableFuture;
13+
import java.util.concurrent.ScheduledExecutorService;
14+
import java.util.concurrent.TimeUnit;
15+
import java.util.function.Supplier;
16+
17+
import static org.junit.Assert.*;
18+
import static org.mockito.Mockito.*;
19+
20+
@RunWith(MockitoJUnitRunner.class)
21+
public class RetryableTaskTest {
22+
@Mock
23+
private Supplier<CompletableFuture<Status>> taskSupplier;
24+
25+
@Mock
26+
private ScheduledExecutorService executor;
27+
28+
@Mock
29+
private RetryPolicy retryPolicy;
30+
31+
private RetryableTask retryableTask;
32+
private final String taskName = "testTask";
33+
34+
@Before
35+
public void setUp() {
36+
retryableTask = new RetryableTask(taskName, taskSupplier, executor, retryPolicy);
37+
}
38+
39+
@Test
40+
public void testExecute_SuccessOnFirstAttempt() {
41+
Status successStatus = Status.SUCCESS;
42+
CompletableFuture<Status> future = CompletableFuture.completedFuture(successStatus);
43+
44+
when(taskSupplier.get()).thenReturn(future);
45+
46+
CompletableFuture<Status> result = retryableTask.execute();
47+
48+
assertTrue(result.isDone());
49+
assertEquals(successStatus, result.join());
50+
}
51+
52+
@Test
53+
public void testExecute_FailureWithRetries() {
54+
Status failureStatus = Status.of(StatusCode.CLIENT_INTERNAL_ERROR);
55+
RuntimeException exception = new RuntimeException("Operation failed");
56+
57+
// First attempt fails
58+
CompletableFuture<Status> failedFuture = new CompletableFuture<>();
59+
failedFuture.completeExceptionally(exception);
60+
61+
when(taskSupplier.get())
62+
.thenReturn(failedFuture)
63+
.thenReturn(CompletableFuture.completedFuture(failureStatus));
64+
65+
when(retryPolicy.nextRetryMs(anyInt(), anyLong()))
66+
.thenReturn(100L) // First retry after 100ms
67+
.thenReturn(-1L); // No more retries
68+
69+
CompletableFuture<Status> result = retryableTask.execute();
70+
71+
// Verify retry was scheduled
72+
verify(executor).schedule(any(Runnable.class), eq(100L), eq(TimeUnit.MILLISECONDS));
73+
74+
// Simulate retry execution
75+
retryableTask.attemptTask(result);
76+
77+
assertTrue(result.isDone());
78+
assertTrue(result.isCompletedExceptionally());
79+
}
80+
81+
@Test
82+
public void testExecute_SuccessAfterRetry() {
83+
Status successStatus = Status.SUCCESS;
84+
RuntimeException exception = new RuntimeException("Temporary failure");
85+
86+
// First attempt fails
87+
CompletableFuture<Status> failedFuture = new CompletableFuture<>();
88+
failedFuture.completeExceptionally(exception);
89+
90+
when(taskSupplier.get())
91+
.thenReturn(failedFuture)
92+
.thenReturn(CompletableFuture.completedFuture(successStatus));
93+
94+
when(retryPolicy.nextRetryMs(anyInt(), anyLong()))
95+
.thenReturn(0L); // Immediate retry
96+
97+
CompletableFuture<Status> result = retryableTask.execute();
98+
99+
// Verify immediate retry was scheduled
100+
verify(executor).execute(any(Runnable.class));
101+
102+
// Simulate retry execution
103+
retryableTask.attemptTask(result);
104+
105+
assertTrue(result.isDone());
106+
assertEquals(successStatus, result.join());
107+
}
108+
109+
@Test
110+
public void testExecute_NoMoreRetries() {
111+
RuntimeException exception = new RuntimeException("Permanent failure");
112+
113+
CompletableFuture<Status> failedFuture = new CompletableFuture<>();
114+
failedFuture.completeExceptionally(exception);
115+
116+
when(taskSupplier.get()).thenReturn(failedFuture);
117+
when(retryPolicy.nextRetryMs(anyInt(), anyLong())).thenReturn(-1L); // No more retries
118+
119+
CompletableFuture<Status> result = retryableTask.execute();
120+
121+
assertTrue(result.isDone());
122+
assertTrue(result.isCompletedExceptionally());
123+
}
124+
125+
@Test
126+
public void testExecute_TaskSupplierThrowsException() {
127+
RuntimeException exception = new RuntimeException("Supplier failure");
128+
129+
when(taskSupplier.get()).thenThrow(exception);
130+
when(retryPolicy.nextRetryMs(anyInt(), anyLong())).thenReturn(-1L); // No more retries
131+
132+
CompletableFuture<Status> result = retryableTask.execute();
133+
134+
assertTrue(result.isDone());
135+
assertTrue(result.isCompletedExceptionally());
136+
}
137+
}

0 commit comments

Comments
 (0)