Skip to content

Commit 323e2a8

Browse files
committed
once event source
1 parent a389ecf commit 323e2a8

File tree

3 files changed

+72
-2
lines changed

3 files changed

+72
-2
lines changed

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEvent.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ public class TimerEvent extends AbstractEvent {
77
public TimerEvent(String relatedCustomResourceUid, TimerEventSource eventSource) {
88
super(relatedCustomResourceUid, eventSource);
99
}
10+
1011
}

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEventSource.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public class TimerEventSource extends AbstractEventSource {
1616

1717
private final Timer timer = new Timer();
1818

19+
private final Map<String, EventProducerTimeTask> onceTasks = new ConcurrentHashMap<>();
1920
private final Map<String, EventProducerTimeTask> timerTasks = new ConcurrentHashMap<>();
2021

2122
public void schedule(CustomResource customResource, long delay, long period) {
@@ -28,14 +29,34 @@ public void schedule(CustomResource customResource, long delay, long period) {
2829
timer.schedule(task, delay, period);
2930
}
3031

32+
public void scheduleOnce(CustomResource customResource, long delay) {
33+
String resourceUid = KubernetesResourceUtils.getUID(customResource);
34+
if (onceTasks.containsKey(resourceUid)) {
35+
cancelOnceSchedule(resourceUid);
36+
}
37+
EventProducerTimeTask task = new EventProducerTimeTask(resourceUid);
38+
onceTasks.put(resourceUid, task);
39+
timer.schedule(task, delay);
40+
}
41+
3142
@Override
3243
public void eventSourceDeRegisteredForResource(String customResourceUid) {
3344
cancelSchedule(customResourceUid);
45+
cancelOnceSchedule(customResourceUid);
3446
}
3547

3648
public void cancelSchedule(String customResourceUid) {
3749
TimerTask timerTask = timerTasks.remove(customResourceUid);
38-
timerTask.cancel();
50+
if (timerTask != null) {
51+
timerTask.cancel();
52+
}
53+
}
54+
55+
public void cancelOnceSchedule(String customResourceUid) {
56+
TimerTask timerTask = onceTasks.remove(customResourceUid);
57+
if (timerTask != null) {
58+
timerTask.cancel();
59+
}
3960
}
4061

4162
public class EventProducerTimeTask extends TimerTask {

operator-framework/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/TimerEventSourceTest.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ class TimerEventSourceTest {
2323

2424
private TimerEventSource timerEventSource;
2525
private EventHandler eventHandlerMock = mock(EventHandler.class);
26-
private EventSourceManager eventSourceManagerMock = mock(EventSourceManager.class);
2726

2827
@BeforeEach
2928
public void setup() {
@@ -58,4 +57,53 @@ public void deRegistersPeriodicalEventSources() throws InterruptedException {
5857
.handleEvent(any());
5958
}
6059

60+
@Test
61+
public void schedulesOnce() throws InterruptedException {
62+
CustomResource customResource = TestUtils.testCustomResource();
63+
64+
timerEventSource.scheduleOnce(customResource, PERIOD);
65+
66+
Thread.sleep(2 * PERIOD + TESTING_TIME_SLACK);
67+
verify(eventHandlerMock, times(1))
68+
.handleEvent(any());
69+
}
70+
71+
@Test
72+
public void canCancelOnce() throws InterruptedException {
73+
CustomResource customResource = TestUtils.testCustomResource();
74+
75+
timerEventSource.scheduleOnce(customResource, PERIOD);
76+
timerEventSource.cancelOnceSchedule(KubernetesResourceUtils.getUID(customResource));
77+
78+
Thread.sleep(PERIOD + TESTING_TIME_SLACK);
79+
verify(eventHandlerMock, never())
80+
.handleEvent(any());
81+
}
82+
83+
@Test
84+
public void canRescheduleOnceEvent() throws InterruptedException {
85+
CustomResource customResource = TestUtils.testCustomResource();
86+
87+
timerEventSource.scheduleOnce(customResource, PERIOD);
88+
timerEventSource.scheduleOnce(customResource, 2 * PERIOD);
89+
90+
Thread.sleep(PERIOD + TESTING_TIME_SLACK);
91+
verify(eventHandlerMock, never())
92+
.handleEvent(any());
93+
Thread.sleep(PERIOD + TESTING_TIME_SLACK);
94+
verify(eventHandlerMock, times(1))
95+
.handleEvent(any());
96+
}
97+
98+
@Test
99+
public void deRegistersOnceEventSources() throws InterruptedException {
100+
CustomResource customResource = TestUtils.testCustomResource();
101+
102+
timerEventSource.scheduleOnce(customResource, PERIOD);
103+
timerEventSource.eventSourceDeRegisteredForResource(getUID(customResource));
104+
Thread.sleep(PERIOD + TESTING_TIME_SLACK);
105+
106+
verify(eventHandlerMock, never())
107+
.handleEvent(any());
108+
}
61109
}

0 commit comments

Comments
 (0)