Skip to content

Commit 8078c7b

Browse files
committed
682 Add tests
1 parent 6837e08 commit 8078c7b

File tree

4 files changed

+1202
-0
lines changed

4 files changed

+1202
-0
lines changed

server/libs/platform/platform-coordinator/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,6 @@ dependencies {
1919
implementation(project(":server:libs:platform:platform-notification:platform-notification-api"))
2020
implementation(project(":server:libs:platform:platform-webhook:platform-webhook-api"))
2121
implementation(project(":server:libs:platform:platform-worker"))
22+
23+
testImplementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")
2224
}
Lines changed: 302 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
/*
2+
* Copyright 2025 ByteChef
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.bytechef.platform.coordinator.job;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.Mockito.when;
22+
23+
import com.bytechef.atlas.configuration.service.WorkflowService;
24+
import com.bytechef.atlas.coordinator.event.JobStatusApplicationEvent;
25+
import com.bytechef.atlas.coordinator.event.StopJobEvent;
26+
import com.bytechef.atlas.coordinator.event.TaskExecutionCompleteEvent;
27+
import com.bytechef.atlas.coordinator.event.TaskExecutionErrorEvent;
28+
import com.bytechef.atlas.coordinator.event.TaskStartedApplicationEvent;
29+
import com.bytechef.atlas.coordinator.message.route.TaskCoordinatorMessageRoute;
30+
import com.bytechef.atlas.execution.domain.Job;
31+
import com.bytechef.atlas.execution.domain.TaskExecution;
32+
import com.bytechef.atlas.execution.service.ContextService;
33+
import com.bytechef.atlas.execution.service.JobService;
34+
import com.bytechef.atlas.execution.service.TaskExecutionService;
35+
import com.bytechef.atlas.file.storage.TaskFileStorage;
36+
import com.bytechef.atlas.worker.task.handler.TaskHandlerRegistry;
37+
import com.bytechef.commons.util.ConvertUtils;
38+
import com.bytechef.commons.util.JsonUtils;
39+
import com.bytechef.commons.util.MapUtils;
40+
import com.bytechef.evaluator.Evaluator;
41+
import com.bytechef.message.broker.memory.MemoryMessageBroker;
42+
import com.bytechef.message.broker.memory.SyncMessageBroker;
43+
import com.bytechef.tenant.TenantContext;
44+
import com.bytechef.tenant.constant.TenantConstants;
45+
import com.fasterxml.jackson.databind.DeserializationFeature;
46+
import com.fasterxml.jackson.databind.ObjectMapper;
47+
import com.fasterxml.jackson.databind.SerializationFeature;
48+
import com.fasterxml.jackson.databind.json.JsonMapper;
49+
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
50+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
51+
import java.time.Duration;
52+
import java.util.List;
53+
import java.util.Map;
54+
import java.util.Objects;
55+
import java.util.concurrent.CountDownLatch;
56+
import java.util.concurrent.TimeUnit;
57+
import java.util.concurrent.atomic.AtomicBoolean;
58+
import org.junit.jupiter.api.BeforeEach;
59+
import org.junit.jupiter.api.Test;
60+
import org.mockito.Mockito;
61+
import org.springframework.core.task.SyncTaskExecutor;
62+
import org.springframework.test.util.ReflectionTestUtils;
63+
64+
/**
65+
* @author Ivica Cardic
66+
*/
67+
class JobSyncExecutorTest {
68+
69+
private static final String TENANT = "t1";
70+
71+
@SuppressFBWarnings("NP")
72+
private MemoryMessageBroker memoryMessageBroker;
73+
@SuppressFBWarnings("NP")
74+
private JobSyncExecutor jobSyncExecutor;
75+
private final JobService jobService = Mockito.mock(JobService.class);
76+
private final TaskExecutionService taskExecutionService = Mockito.mock(TaskExecutionService.class);
77+
78+
@BeforeEach
79+
void beforeEach() {
80+
ObjectMapper objectMapper = JsonMapper.builder()
81+
.addModule(new JavaTimeModule())
82+
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
83+
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
84+
.build();
85+
86+
ConvertUtils.setObjectMapper(objectMapper);
87+
JsonUtils.setObjectMapper(objectMapper);
88+
MapUtils.setObjectMapper(objectMapper);
89+
90+
TenantContext.setCurrentTenantId(TENANT);
91+
92+
memoryMessageBroker = new SyncMessageBroker();
93+
94+
ContextService contextService = Mockito.mock(ContextService.class);
95+
Evaluator evaluator = Mockito.mock(Evaluator.class);
96+
TaskHandlerRegistry taskHandlerRegistry = Mockito.mock(TaskHandlerRegistry.class);
97+
TaskFileStorage taskFileStorage = Mockito.mock(TaskFileStorage.class);
98+
WorkflowService workflowService = Mockito.mock(WorkflowService.class);
99+
100+
jobSyncExecutor = new JobSyncExecutor(
101+
contextService, evaluator, jobService, -1, (type) -> memoryMessageBroker, List.of(), List.of(), List.of(),
102+
List.of(), taskExecutionService, new SyncTaskExecutor(), taskHandlerRegistry, taskFileStorage, 2L,
103+
workflowService);
104+
}
105+
106+
@Test
107+
@SuppressWarnings("PMD.UnusedLocalVariable")
108+
void testNotifiesTaskExecutionCompleteListeners() throws Exception {
109+
long jobId = 101L;
110+
111+
CountDownLatch countDownLatch = new CountDownLatch(1);
112+
AtomicBoolean called = new AtomicBoolean(false);
113+
114+
try (AutoCloseable handle = jobSyncExecutor.addTaskExecutionCompleteListener(jobId, evt -> {
115+
called.set(true);
116+
countDownLatch.countDown();
117+
})) {
118+
TaskExecution taskExecution = new TaskExecution();
119+
120+
taskExecution.setJobId(jobId);
121+
122+
// Ensure retryDelay is parsable by Duration.parse used internally by getRetryDelayMillis
123+
taskExecution.setRetryDelay("1S");
124+
125+
TaskExecutionCompleteEvent taskExecutionCompleteEvent = new TaskExecutionCompleteEvent(taskExecution);
126+
127+
taskExecutionCompleteEvent.putMetadata(TenantConstants.CURRENT_TENANT_ID, TENANT);
128+
129+
memoryMessageBroker.send(taskExecutionCompleteEvent.getRoute(), taskExecutionCompleteEvent);
130+
131+
boolean ok = countDownLatch.await(1, TimeUnit.SECONDS);
132+
133+
assertThat(ok).isTrue();
134+
assertThat(called.get()).isTrue();
135+
}
136+
}
137+
138+
@Test
139+
@SuppressWarnings("PMD.UnusedLocalVariable")
140+
void testNotifiesErrorListeners() throws Exception {
141+
long jobId = 202L;
142+
143+
CountDownLatch countDownLatch = new CountDownLatch(1);
144+
AtomicBoolean called = new AtomicBoolean(false);
145+
146+
try (AutoCloseable handle = jobSyncExecutor.addErrorListener(jobId, evt -> {
147+
called.set(true);
148+
countDownLatch.countDown();
149+
})) {
150+
TaskExecution taskExecution = new TaskExecution();
151+
152+
taskExecution.setJobId(jobId);
153+
taskExecution.setRetryDelay("1S");
154+
155+
TaskExecutionErrorEvent taskExecutionErrorEvent = new TaskExecutionErrorEvent(taskExecution);
156+
157+
taskExecutionErrorEvent.putMetadata(TenantConstants.CURRENT_TENANT_ID, TENANT);
158+
159+
memoryMessageBroker.send(taskExecutionErrorEvent.getRoute(), taskExecutionErrorEvent);
160+
161+
boolean ok = countDownLatch.await(1, TimeUnit.SECONDS);
162+
163+
assertThat(ok).isTrue();
164+
assertThat(called.get()).isTrue();
165+
}
166+
}
167+
168+
@Test
169+
@SuppressWarnings("PMD.UnusedLocalVariable")
170+
void testJobStatusListenerInvokedAndWaitUnblocksOnStopped() throws Exception {
171+
long jobId = 303L;
172+
173+
// Return STARTED initially, so waitForJobCompletion does not short-circuit
174+
Job startedJob = new Job();
175+
176+
startedJob.setId(jobId);
177+
startedJob.setStatus(Job.Status.STARTED);
178+
179+
when(jobService.getJob(jobId)).thenReturn(startedJob);
180+
181+
CountDownLatch statusLatch = new CountDownLatch(1);
182+
183+
try (AutoCloseable handle = jobSyncExecutor.addJobStatusListener(jobId, evt -> {
184+
if (evt.getStatus() == Job.Status.STOPPED) {
185+
statusLatch.countDown();
186+
}
187+
})) {
188+
// Await in a separate thread
189+
CountDownLatch awaitReturnedLatch = new CountDownLatch(1);
190+
191+
Thread thread = new Thread(() -> {
192+
try {
193+
// Ensure TenantContext is set in the waiter thread so latch keys match
194+
TenantContext.setCurrentTenantId(TENANT);
195+
jobSyncExecutor.awaitJob(jobId, false);
196+
} finally {
197+
awaitReturnedLatch.countDown();
198+
}
199+
});
200+
201+
thread.start();
202+
203+
// Wait until the await thread has registered the CountDownLatch so STOPPED can count it down
204+
waitForLatchRegistration(jobSyncExecutor, TENANT + "_" + jobId, Duration.ofMillis(250));
205+
206+
JobStatusApplicationEvent jobStatusApplicationEvent = new JobStatusApplicationEvent(
207+
jobId, Job.Status.STOPPED);
208+
209+
jobStatusApplicationEvent.putMetadata(TenantConstants.CURRENT_TENANT_ID, TENANT);
210+
211+
memoryMessageBroker.send(TaskCoordinatorMessageRoute.APPLICATION_EVENTS, jobStatusApplicationEvent);
212+
213+
// Listener invoked and waiter unblocked
214+
assertThat(statusLatch.await(1, TimeUnit.SECONDS)).isTrue();
215+
216+
// Now when awaitJob fetches the job after latch, return STOPPED
217+
Job stoppedJob = new Job();
218+
219+
stoppedJob.setId(jobId);
220+
stoppedJob.setStatus(Job.Status.STOPPED);
221+
222+
when(jobService.getJob(jobId)).thenReturn(stoppedJob);
223+
224+
assertThat(awaitReturnedLatch.await(1, TimeUnit.SECONDS)).isTrue();
225+
}
226+
}
227+
228+
@Test
229+
@SuppressWarnings("PMD.UnusedLocalVariable")
230+
void testTaskStartedListenerInvokedAndJobIdDerivedFromTaskExecutionId() throws Exception {
231+
long jobId = 404L;
232+
long taskExecutionId = 9001L;
233+
234+
TaskExecution taskExecution = new TaskExecution();
235+
236+
taskExecution.setId(taskExecutionId);
237+
taskExecution.setStatus(TaskExecution.Status.CREATED);
238+
taskExecution.setStartDate(null);
239+
240+
when(taskExecutionService.getTaskExecution(taskExecutionId)).thenReturn(taskExecution);
241+
242+
Job job = new Job();
243+
244+
job.setId(jobId);
245+
job.setStatus(Job.Status.STARTED);
246+
247+
when(jobService.getTaskExecutionJob(taskExecutionId)).thenReturn(job);
248+
when(taskExecutionService.update(any(TaskExecution.class))).thenAnswer(inv -> inv.getArgument(0));
249+
250+
CountDownLatch latch = new CountDownLatch(1);
251+
AtomicBoolean called = new AtomicBoolean(false);
252+
253+
try (AutoCloseable handle = jobSyncExecutor.addTaskStartedListener(jobId, evt -> {
254+
called.set(true);
255+
latch.countDown();
256+
})) {
257+
TaskStartedApplicationEvent taskStartedApplicationEvent = new TaskStartedApplicationEvent(taskExecutionId);
258+
259+
taskStartedApplicationEvent.putMetadata(TenantConstants.CURRENT_TENANT_ID, TENANT);
260+
261+
memoryMessageBroker.send(TaskCoordinatorMessageRoute.APPLICATION_EVENTS, taskStartedApplicationEvent);
262+
263+
boolean ok = latch.await(1, TimeUnit.SECONDS);
264+
265+
assertThat(ok).isTrue();
266+
assertThat(called.get()).isTrue();
267+
}
268+
}
269+
270+
@Test
271+
void testStopJobEventForDeletedJobDoesNotThrowException() {
272+
long jobId = 505L;
273+
274+
when(jobService.setStatusToStopped(jobId))
275+
.thenThrow(new java.util.NoSuchElementException("Unknown job " + jobId));
276+
277+
StopJobEvent stopJobEvent = new StopJobEvent(jobId);
278+
279+
stopJobEvent.putMetadata(TenantConstants.CURRENT_TENANT_ID, TENANT);
280+
281+
// This should not throw an exception because it's caught in handleCoordinatorJobStopEvent
282+
memoryMessageBroker.send(TaskCoordinatorMessageRoute.JOB_STOP_EVENTS, stopJobEvent);
283+
}
284+
285+
private static void waitForLatchRegistration(JobSyncExecutor jobSyncExecutor, String key, Duration timeout)
286+
throws Exception {
287+
long deadline = System.nanoTime() + timeout.toNanos();
288+
289+
@SuppressWarnings("unchecked")
290+
Map<String, ?> latches = (Map<String, ?>) ReflectionTestUtils.getField(jobSyncExecutor, "jobCompletionLatches");
291+
292+
while (System.nanoTime() < deadline) {
293+
if (Objects.requireNonNull(latches)
294+
.containsKey(key)) {
295+
296+
return;
297+
}
298+
299+
Thread.sleep(10);
300+
}
301+
}
302+
}

0 commit comments

Comments
 (0)