Skip to content

Commit dd37857

Browse files
committed
682 Add tests
1 parent 22a08d0 commit dd37857

File tree

4 files changed

+1171
-0
lines changed

4 files changed

+1171
-0
lines changed

server/libs/platform/platform-coordinator/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,6 @@ dependencies {
1919
implementation(project(":server:libs:platform:platform-notification:platform-notification-api"))
2020
implementation(project(":server:libs:platform:platform-webhook:platform-webhook-api"))
2121
implementation(project(":server:libs:platform:platform-worker"))
22+
23+
testImplementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")
2224
}
Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
/*
2+
* Copyright 2025 ByteChef
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.bytechef.platform.coordinator.job;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.Mockito.when;
22+
23+
import com.bytechef.atlas.configuration.service.WorkflowService;
24+
import com.bytechef.atlas.coordinator.event.JobStatusApplicationEvent;
25+
import com.bytechef.atlas.coordinator.event.TaskExecutionCompleteEvent;
26+
import com.bytechef.atlas.coordinator.event.TaskExecutionErrorEvent;
27+
import com.bytechef.atlas.coordinator.event.TaskStartedApplicationEvent;
28+
import com.bytechef.atlas.coordinator.message.route.TaskCoordinatorMessageRoute;
29+
import com.bytechef.atlas.execution.domain.Job;
30+
import com.bytechef.atlas.execution.domain.TaskExecution;
31+
import com.bytechef.atlas.execution.service.ContextService;
32+
import com.bytechef.atlas.execution.service.JobService;
33+
import com.bytechef.atlas.execution.service.TaskExecutionService;
34+
import com.bytechef.atlas.file.storage.TaskFileStorage;
35+
import com.bytechef.atlas.worker.task.handler.TaskHandlerRegistry;
36+
import com.bytechef.commons.util.ConvertUtils;
37+
import com.bytechef.commons.util.JsonUtils;
38+
import com.bytechef.commons.util.MapUtils;
39+
import com.bytechef.evaluator.Evaluator;
40+
import com.bytechef.message.broker.memory.MemoryMessageBroker;
41+
import com.bytechef.message.broker.memory.SyncMessageBroker;
42+
import com.bytechef.tenant.TenantContext;
43+
import com.bytechef.tenant.constant.TenantConstants;
44+
import com.fasterxml.jackson.databind.DeserializationFeature;
45+
import com.fasterxml.jackson.databind.ObjectMapper;
46+
import com.fasterxml.jackson.databind.SerializationFeature;
47+
import com.fasterxml.jackson.databind.json.JsonMapper;
48+
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
49+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
50+
import java.time.Duration;
51+
import java.util.List;
52+
import java.util.Map;
53+
import java.util.Objects;
54+
import java.util.concurrent.CountDownLatch;
55+
import java.util.concurrent.TimeUnit;
56+
import java.util.concurrent.atomic.AtomicBoolean;
57+
import org.junit.jupiter.api.BeforeEach;
58+
import org.junit.jupiter.api.Test;
59+
import org.mockito.Mockito;
60+
import org.springframework.core.task.SyncTaskExecutor;
61+
import org.springframework.test.util.ReflectionTestUtils;
62+
63+
/**
64+
* @author Ivica Cardic
65+
*/
66+
class JobSyncExecutorTest {
67+
68+
private static final String TENANT = "t1";
69+
70+
@SuppressFBWarnings("NP")
71+
private MemoryMessageBroker memoryMessageBroker;
72+
@SuppressFBWarnings("NP")
73+
private JobSyncExecutor jobSyncExecutor;
74+
private final JobService jobService = Mockito.mock(JobService.class);;
75+
private final TaskExecutionService taskExecutionService = Mockito.mock(TaskExecutionService.class);
76+
77+
@BeforeEach
78+
void beforeEach() {
79+
ObjectMapper objectMapper = JsonMapper.builder()
80+
.addModule(new JavaTimeModule())
81+
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
82+
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
83+
.build();
84+
85+
ConvertUtils.setObjectMapper(objectMapper);
86+
JsonUtils.setObjectMapper(objectMapper);
87+
MapUtils.setObjectMapper(objectMapper);
88+
89+
TenantContext.setCurrentTenantId(TENANT);
90+
91+
memoryMessageBroker = new SyncMessageBroker();
92+
93+
ContextService contextService = Mockito.mock(ContextService.class);
94+
Evaluator evaluator = Mockito.mock(Evaluator.class);
95+
TaskHandlerRegistry taskHandlerRegistry = Mockito.mock(TaskHandlerRegistry.class);
96+
TaskFileStorage taskFileStorage = Mockito.mock(TaskFileStorage.class);
97+
WorkflowService workflowService = Mockito.mock(WorkflowService.class);
98+
99+
jobSyncExecutor = new JobSyncExecutor(
100+
contextService, evaluator, jobService, -1, (type) -> memoryMessageBroker, List.of(), List.of(), List.of(),
101+
List.of(), taskExecutionService, new SyncTaskExecutor(), taskHandlerRegistry, taskFileStorage, 2L,
102+
workflowService);
103+
}
104+
105+
@Test
106+
@SuppressWarnings("PMD.UnusedLocalVariable")
107+
void testNotifiesTaskExecutionCompleteListeners() throws Exception {
108+
long jobId = 101L;
109+
110+
CountDownLatch countDownLatch = new CountDownLatch(1);
111+
AtomicBoolean called = new AtomicBoolean(false);
112+
113+
try (AutoCloseable handle = jobSyncExecutor.addTaskExecutionCompleteListener(jobId, evt -> {
114+
called.set(true);
115+
countDownLatch.countDown();
116+
})) {
117+
TaskExecution taskExecution = new TaskExecution();
118+
119+
taskExecution.setJobId(jobId);
120+
121+
// Ensure retryDelay is parsable by Duration.parse used internally by getRetryDelayMillis
122+
taskExecution.setRetryDelay("1S");
123+
124+
TaskExecutionCompleteEvent taskExecutionCompleteEvent = new TaskExecutionCompleteEvent(taskExecution);
125+
126+
taskExecutionCompleteEvent.putMetadata(TenantConstants.CURRENT_TENANT_ID, TENANT);
127+
128+
memoryMessageBroker.send(taskExecutionCompleteEvent.getRoute(), taskExecutionCompleteEvent);
129+
130+
boolean ok = countDownLatch.await(1, TimeUnit.SECONDS);
131+
132+
assertThat(ok).isTrue();
133+
assertThat(called.get()).isTrue();
134+
}
135+
}
136+
137+
@Test
138+
@SuppressWarnings("PMD.UnusedLocalVariable")
139+
void testNotifiesErrorListeners() throws Exception {
140+
long jobId = 202L;
141+
142+
CountDownLatch countDownLatch = new CountDownLatch(1);
143+
AtomicBoolean called = new AtomicBoolean(false);
144+
145+
try (AutoCloseable handle = jobSyncExecutor.addErrorListener(jobId, evt -> {
146+
called.set(true);
147+
countDownLatch.countDown();
148+
})) {
149+
TaskExecution taskExecution = new TaskExecution();
150+
151+
taskExecution.setJobId(jobId);
152+
taskExecution.setRetryDelay("1S");
153+
154+
TaskExecutionErrorEvent taskExecutionErrorEvent = new TaskExecutionErrorEvent(taskExecution);
155+
156+
taskExecutionErrorEvent.putMetadata(TenantConstants.CURRENT_TENANT_ID, TENANT);
157+
158+
memoryMessageBroker.send(taskExecutionErrorEvent.getRoute(), taskExecutionErrorEvent);
159+
160+
boolean ok = countDownLatch.await(1, TimeUnit.SECONDS);
161+
162+
assertThat(ok).isTrue();
163+
assertThat(called.get()).isTrue();
164+
}
165+
}
166+
167+
@Test
168+
@SuppressWarnings("PMD.UnusedLocalVariable")
169+
void testJobStatusListenerInvokedAndWaitUnblocksOnStopped() throws Exception {
170+
long jobId = 303L;
171+
172+
// Return STARTED initially, so waitForJobCompletion does not short-circuit
173+
Job startedJob = new Job();
174+
175+
startedJob.setId(jobId);
176+
startedJob.setStatus(Job.Status.STARTED);
177+
178+
when(jobService.getJob(jobId)).thenReturn(startedJob);
179+
180+
CountDownLatch statusLatch = new CountDownLatch(1);
181+
182+
try (AutoCloseable handle = jobSyncExecutor.addJobStatusListener(jobId, evt -> {
183+
if (evt.getStatus() == Job.Status.STOPPED) {
184+
statusLatch.countDown();
185+
}
186+
})) {
187+
// Await in a separate thread
188+
CountDownLatch awaitReturnedLatch = new CountDownLatch(1);
189+
190+
Thread thread = new Thread(() -> {
191+
try {
192+
// Ensure TenantContext is set in the waiter thread so latch keys match
193+
TenantContext.setCurrentTenantId(TENANT);
194+
jobSyncExecutor.awaitJob(jobId, false);
195+
} finally {
196+
awaitReturnedLatch.countDown();
197+
}
198+
});
199+
200+
thread.start();
201+
202+
// Wait until the await thread has registered the CountDownLatch so STOPPED can count it down
203+
waitForLatchRegistration(jobSyncExecutor, TENANT + "_" + jobId, Duration.ofMillis(250));
204+
205+
JobStatusApplicationEvent jobStatusApplicationEvent = new JobStatusApplicationEvent(
206+
jobId, Job.Status.STOPPED);
207+
208+
jobStatusApplicationEvent.putMetadata(TenantConstants.CURRENT_TENANT_ID, TENANT);
209+
210+
memoryMessageBroker.send(TaskCoordinatorMessageRoute.APPLICATION_EVENTS, jobStatusApplicationEvent);
211+
212+
// Listener invoked and waiter unblocked
213+
assertThat(statusLatch.await(1, TimeUnit.SECONDS)).isTrue();
214+
215+
// Now when awaitJob fetches the job after latch, return STOPPED
216+
Job stoppedJob = new Job();
217+
218+
stoppedJob.setId(jobId);
219+
stoppedJob.setStatus(Job.Status.STOPPED);
220+
221+
when(jobService.getJob(jobId)).thenReturn(stoppedJob);
222+
223+
assertThat(awaitReturnedLatch.await(1, TimeUnit.SECONDS)).isTrue();
224+
}
225+
}
226+
227+
@Test
228+
@SuppressWarnings("PMD.UnusedLocalVariable")
229+
void testTaskStartedListenerInvokedAndJobIdDerivedFromTaskExecutionId() throws Exception {
230+
long jobId = 404L;
231+
long taskExecutionId = 9001L;
232+
233+
TaskExecution taskExecution = new TaskExecution();
234+
235+
taskExecution.setId(taskExecutionId);
236+
taskExecution.setStatus(TaskExecution.Status.CREATED);
237+
taskExecution.setStartDate(null);
238+
239+
when(taskExecutionService.getTaskExecution(taskExecutionId)).thenReturn(taskExecution);
240+
241+
Job job = new Job();
242+
243+
job.setId(jobId);
244+
job.setStatus(Job.Status.STARTED);
245+
246+
when(jobService.getTaskExecutionJob(taskExecutionId)).thenReturn(job);
247+
when(taskExecutionService.update(any(TaskExecution.class))).thenAnswer(inv -> inv.getArgument(0));
248+
249+
CountDownLatch latch = new CountDownLatch(1);
250+
AtomicBoolean called = new AtomicBoolean(false);
251+
252+
try (AutoCloseable handle = jobSyncExecutor.addTaskStartedListener(jobId, evt -> {
253+
called.set(true);
254+
latch.countDown();
255+
})) {
256+
TaskStartedApplicationEvent taskStartedApplicationEvent = new TaskStartedApplicationEvent(taskExecutionId);
257+
258+
taskStartedApplicationEvent.putMetadata(TenantConstants.CURRENT_TENANT_ID, TENANT);
259+
260+
memoryMessageBroker.send(TaskCoordinatorMessageRoute.APPLICATION_EVENTS, taskStartedApplicationEvent);
261+
262+
boolean ok = latch.await(1, TimeUnit.SECONDS);
263+
264+
assertThat(ok).isTrue();
265+
assertThat(called.get()).isTrue();
266+
}
267+
}
268+
269+
private static void waitForLatchRegistration(JobSyncExecutor jobSyncExecutor, String key, Duration timeout)
270+
throws Exception {
271+
long deadline = System.nanoTime() + timeout.toNanos();
272+
273+
@SuppressWarnings("unchecked")
274+
Map<String, ?> latches = (Map<String, ?>) ReflectionTestUtils.getField(jobSyncExecutor, "jobCompletionLatches");
275+
276+
while (System.nanoTime() < deadline) {
277+
if (Objects.requireNonNull(latches)
278+
.containsKey(key)) {
279+
280+
return;
281+
}
282+
283+
Thread.sleep(10);
284+
}
285+
}
286+
}

0 commit comments

Comments
 (0)