Skip to content

Commit 983b6a4

Browse files
committed
682 Additional fixes
1 parent 9786b77 commit 983b6a4

File tree

4 files changed

+106
-65
lines changed

4 files changed

+106
-65
lines changed

server/libs/core/message/message-broker/message-broker-memory/src/main/java/com/bytechef/message/broker/memory/AsyncMessageBroker.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,13 @@ public void send(MessageRoute messageRoute, Object message) {
6363

6464
List<Receiver> receivers = receiverMap.get(messageRoute);
6565

66-
Assert.isTrue(receivers != null && !receivers.isEmpty(), "no listeners subscribed for: " + messageRoute);
66+
if (receivers == null || receivers.isEmpty()) {
67+
if (logger.isDebugEnabled()) {
68+
logger.debug("No listeners subscribed for: " + messageRoute);
69+
}
70+
71+
return;
72+
}
6773

6874
for (Receiver receiver : Validate.notNull(receivers, "receivers")) {
6975
executor.execute(() -> receiver.receive(

server/libs/platform/platform-coordinator/src/main/java/com/bytechef/platform/coordinator/job/JobSyncExecutor.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -483,8 +483,8 @@ private void handleCoordinatorErrorEvent(
483483
try {
484484
notifyErrorListeners(event);
485485
} catch (Exception exception) {
486-
if (logger.isTraceEnabled()) {
487-
logger.trace(exception.getMessage(), exception);
486+
if (logger.isDebugEnabled()) {
487+
logger.debug(exception.getMessage(), exception);
488488
}
489489
}
490490

@@ -640,8 +640,8 @@ private void notifyTaskExecutionCompleteListeners(
640640
jobId = Validate.notNull(taskExecution.getJobId(), "jobId");
641641
}
642642
} catch (Exception exception) {
643-
if (logger.isTraceEnabled()) {
644-
logger.trace(exception.getMessage(), exception);
643+
if (logger.isDebugEnabled()) {
644+
logger.debug(exception.getMessage(), exception);
645645
}
646646
}
647647

@@ -658,8 +658,8 @@ private void notifyTaskExecutionCompleteListeners(
658658
try {
659659
listener.accept(event);
660660
} catch (Exception exception) {
661-
if (logger.isTraceEnabled()) {
662-
logger.trace(exception.getMessage(), exception);
661+
if (logger.isDebugEnabled()) {
662+
logger.debug(exception.getMessage(), exception);
663663
}
664664
}
665665
}
@@ -676,8 +676,8 @@ private void notifyErrorListeners(ErrorEvent event) {
676676
}
677677
}
678678
} catch (Exception exception) {
679-
if (logger.isTraceEnabled()) {
680-
logger.trace(exception.getMessage(), exception);
679+
if (logger.isDebugEnabled()) {
680+
logger.debug(exception.getMessage(), exception);
681681
}
682682
}
683683

@@ -695,8 +695,8 @@ private void notifyErrorListeners(ErrorEvent event) {
695695
try {
696696
listener.accept(event);
697697
} catch (Exception exception) {
698-
if (logger.isTraceEnabled()) {
699-
logger.trace(exception.getMessage(), exception);
698+
if (logger.isDebugEnabled()) {
699+
logger.debug(exception.getMessage(), exception);
700700
}
701701
}
702702
}
@@ -713,8 +713,8 @@ private void notifyTaskStartedListeners(long jobId, TaskStartedApplicationEvent
713713
try {
714714
listener.accept(event);
715715
} catch (Exception exception) {
716-
if (logger.isTraceEnabled()) {
717-
logger.trace(exception.getMessage(), exception);
716+
if (logger.isDebugEnabled()) {
717+
logger.debug(exception.getMessage(), exception);
718718
}
719719
}
720720
}
@@ -731,8 +731,8 @@ private void notifyJobStatusListeners(long jobId, JobStatusApplicationEvent even
731731
try {
732732
listener.accept(event);
733733
} catch (Exception exception) {
734-
if (logger.isTraceEnabled()) {
735-
logger.trace(exception.getMessage(), exception);
734+
if (logger.isDebugEnabled()) {
735+
logger.debug(exception.getMessage(), exception);
736736
}
737737
}
738738
}

server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-rest/src/main/java/com/bytechef/platform/workflow/test/web/rest/WorkflowTestApiController.java

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@
1818

1919
import com.bytechef.atlas.coordinator.annotation.ConditionalOnCoordinator;
2020
import com.bytechef.commons.util.EncodingUtils;
21+
import com.bytechef.commons.util.JsonUtils;
2122
import com.bytechef.commons.util.MapUtils;
2223
import com.bytechef.file.storage.domain.FileEntry;
2324
import com.bytechef.platform.file.storage.TempFileStorage;
24-
import com.bytechef.platform.workflow.test.dto.ExecutionErrorEventDTO;
25-
import com.bytechef.platform.workflow.test.dto.JobStatusEventDTO;
26-
import com.bytechef.platform.workflow.test.dto.TaskStatusEventDTO;
2725
import com.bytechef.platform.workflow.test.facade.WorkflowTestFacade;
2826
import com.bytechef.platform.workflow.test.web.rest.model.WorkflowTestExecutionModel;
2927
import com.bytechef.tenant.TenantContext;
@@ -118,25 +116,41 @@ public SseEmitter attachWorkflowTest(@PathVariable Long jobId) {
118116
if (future == null) {
119117
List<SseEmitter.SseEventBuilder> bufferedEvents = pendingEvents.getIfPresent(key);
120118

121-
if (bufferedEvents != null) {
119+
if (bufferedEvents != null && !bufferedEvents.isEmpty()) {
122120
pendingEvents.invalidate(key);
123-
}
124121

125-
if (bufferedEvents != null && !bufferedEvents.isEmpty()) {
126122
try {
127123
for (SseEmitter.SseEventBuilder eventBuilder : bufferedEvents) {
128124
try {
129125
emitter.send(eventBuilder);
130126
} catch (IOException exception) {
131127
if (logger.isTraceEnabled()) {
132-
logger.trace(exception.getMessage(), exception);
128+
logger.trace(
129+
"Failed to send buffered SSE event for job {}: {}", jobId, exception.getMessage(),
130+
exception);
131+
}
132+
133+
try {
134+
emitter.send(createEvent("error", "Failed to deliver buffered events"));
135+
} catch (IOException ioException) {
136+
if (logger.isTraceEnabled()) {
137+
logger.trace(
138+
"Failed to send SSE error event for job {}: {}", jobId,
139+
ioException.getMessage(), ioException);
140+
}
141+
142+
break;
133143
}
134144
}
135145
}
136146
} finally {
137147
emitter.complete();
138148
}
139149
} else {
150+
if (bufferedEvents != null) {
151+
pendingEvents.invalidate(key);
152+
}
153+
140154
try {
141155
emitter.send(createEvent("error", "Not running"));
142156
} catch (Exception exception) {
@@ -185,6 +199,11 @@ public SseEmitter attachWorkflowTest(@PathVariable Long jobId) {
185199
@Override
186200
@PostMapping(value = "/workflow-tests/{jobId}/stop")
187201
public ResponseEntity<Void> stopWorkflowTest(@PathVariable String jobId) {
202+
if (!jobId.matches("\\d+")) {
203+
return ResponseEntity.badRequest()
204+
.build();
205+
}
206+
188207
final String key = TenantCacheKeyUtils.getKey(jobId);
189208

190209
CompletableFuture<WorkflowTestExecutionModel> future = runs.getIfPresent(key);
@@ -291,23 +310,19 @@ private void registerListenersIfAbsent(String key, long jobId) {
291310
List<AutoCloseable> handles = new ArrayList<>();
292311

293312
handles.add(workflowTestFacade.addJobStatusListener(
294-
jobId, (JobStatusEventDTO event) -> sendToEmitter(key, createEvent("job", String.valueOf(event)))));
313+
jobId, (event) -> sendToEmitter(key, createEvent("job", JsonUtils.write(event)))));
295314

296315
handles.add(
297316
workflowTestFacade.addTaskStartedListener(
298-
jobId,
299-
(TaskStatusEventDTO event) -> sendToEmitter(key, createEvent("task", String.valueOf(event)))));
317+
jobId, (event) -> sendToEmitter(key, createEvent("task", JsonUtils.write(event)))));
300318

301319
handles.add(
302320
workflowTestFacade.addTaskExecutionCompleteListener(
303-
jobId,
304-
(TaskStatusEventDTO event) -> sendToEmitter(key, createEvent("task", String.valueOf(event)))));
321+
jobId, (event) -> sendToEmitter(key, createEvent("task", JsonUtils.write(event)))));
305322

306323
handles.add(
307324
workflowTestFacade.addErrorListener(
308-
jobId,
309-
(ExecutionErrorEventDTO event) -> sendToEmitter(
310-
key, createEvent("error", String.valueOf(event)))));
325+
jobId, (event) -> sendToEmitter(key, createEvent("error", JsonUtils.write(event)))));
311326

312327
return handles;
313328
});
@@ -318,9 +333,7 @@ private void completeAndClearEmitter(String key) {
318333

319334
if (emitter != null) {
320335
this.emitter.invalidate(key);
321-
}
322336

323-
if (emitter != null) {
324337
try {
325338
emitter.complete();
326339
} catch (Exception exception) {
@@ -384,9 +397,7 @@ private void registerEmitter(String key, long jobId, SseEmitter emitter) {
384397

385398
if (bufferedEvents != null) {
386399
pendingEvents.invalidate(key);
387-
}
388400

389-
if (bufferedEvents != null) {
390401
for (SseEmitter.SseEventBuilder eventBuilder : bufferedEvents) {
391402
sendToEmitter(key, eventBuilder);
392403
}
@@ -427,9 +438,7 @@ private void unregisterListeners(String key) {
427438

428439
if (handles != null) {
429440
listenerHandles.invalidate(key);
430-
}
431441

432-
if (handles != null) {
433442
for (AutoCloseable handle : handles) {
434443
try {
435444
handle.close();

server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-rest/src/test/java/com/bytechef/platform/workflow/test/web/rest/WorkflowTestApiControllerTest.java

Lines changed: 55 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
3232

3333
import com.bytechef.atlas.execution.domain.Job;
34+
import com.bytechef.commons.util.JsonUtils;
3435
import com.bytechef.platform.file.storage.TempFileStorage;
3536
import com.bytechef.platform.workflow.execution.dto.JobDTO;
3637
import com.bytechef.platform.workflow.test.dto.ExecutionErrorEventDTO;
@@ -39,6 +40,10 @@
3940
import com.bytechef.platform.workflow.test.dto.WorkflowTestExecutionDTO;
4041
import com.bytechef.platform.workflow.test.facade.WorkflowTestFacade;
4142
import com.bytechef.platform.workflow.test.web.rest.model.WorkflowTestExecutionModel;
43+
import com.fasterxml.jackson.databind.DeserializationFeature;
44+
import com.fasterxml.jackson.databind.SerializationFeature;
45+
import com.fasterxml.jackson.databind.json.JsonMapper;
46+
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
4247
import com.github.benmanes.caffeine.cache.Cache;
4348
import java.nio.charset.StandardCharsets;
4449
import java.time.Duration;
@@ -52,6 +57,7 @@
5257
import java.util.concurrent.TimeUnit;
5358
import java.util.concurrent.atomic.AtomicReference;
5459
import java.util.function.Consumer;
60+
import org.junit.jupiter.api.BeforeEach;
5561
import org.junit.jupiter.api.Test;
5662
import org.springframework.beans.factory.annotation.Autowired;
5763
import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest;
@@ -85,6 +91,16 @@ class WorkflowTestApiControllerTest {
8591
@Autowired
8692
private WorkflowTestApiController controller;
8793

94+
@BeforeEach
95+
public void beforeEach() {
96+
JsonUtils.setObjectMapper(
97+
JsonMapper.builder()
98+
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
99+
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
100+
.addModules(new JavaTimeModule())
101+
.build());
102+
}
103+
88104
@Test
89105
void testStartStreamEmitsStartAndResult() throws Exception {
90106
long jobId = 123L;
@@ -189,6 +205,13 @@ void testStopAbortsActiveStreamAndInvokesFacade() throws Exception {
189205
verify(workflowTestFacade, times(1)).stopTest(eq(jobId));
190206
}
191207

208+
@Test
209+
void testStopWorkflowTestWithUndefinedJobId() throws Exception {
210+
mockMvc.perform(
211+
post("/internal/workflow-tests/{jobId}/stop", "undefined"))
212+
.andExpect(status().isBadRequest());
213+
}
214+
192215
@Test
193216
void testAttachWhenNotRunningEmitsErrorNotRunning() throws Exception {
194217
// No runs have been started in this test -> attach should report not running
@@ -279,23 +302,23 @@ void testListenerForwardingEmitsJobTaskAndErrorEvents() throws Exception {
279302
Thread.sleep(50);
280303
}
281304

282-
Consumer<JobStatusEventDTO> jobStatusEventDTOConsumer = jobListener.get();
283-
Consumer<TaskStatusEventDTO> taskStatusEventDTOConsumerStarted = taskStartedListener.get();
284-
Consumer<TaskStatusEventDTO> taskStatusEventDTOConsumerCompleted = taskCompletedListener.get();
285-
Consumer<ExecutionErrorEventDTO> executionErrorEventDTOConsumer = errorListener.get();
305+
Consumer<JobStatusEventDTO> jobStatusEventConsumer = jobListener.get();
306+
Consumer<TaskStatusEventDTO> taskStatusEventStartedConsumer = taskStartedListener.get();
307+
Consumer<TaskStatusEventDTO> taskStatusEventCompletedConsumer = taskCompletedListener.get();
308+
Consumer<ExecutionErrorEventDTO> executionErrorEventConsumer = errorListener.get();
286309

287-
assertThat(jobStatusEventDTOConsumer).isNotNull();
288-
assertThat(taskStatusEventDTOConsumerStarted).isNotNull();
289-
assertThat(taskStatusEventDTOConsumerCompleted).isNotNull();
290-
assertThat(executionErrorEventDTOConsumer).isNotNull();
310+
assertThat(jobStatusEventConsumer).isNotNull();
311+
assertThat(taskStatusEventStartedConsumer).isNotNull();
312+
assertThat(taskStatusEventCompletedConsumer).isNotNull();
313+
assertThat(executionErrorEventConsumer).isNotNull();
291314

292315
// Fire synthetic events
293-
jobStatusEventDTOConsumer.accept(new JobStatusEventDTO(jobId, "STARTED", Instant.now()));
294-
taskStatusEventDTOConsumerStarted.accept(
316+
jobStatusEventConsumer.accept(new JobStatusEventDTO(jobId, "STARTED", Instant.now()));
317+
taskStatusEventStartedConsumer.accept(
295318
new TaskStatusEventDTO(jobId, 1L, STARTED, null, null, Instant.now(), null));
296-
taskStatusEventDTOConsumerCompleted.accept(
319+
taskStatusEventCompletedConsumer.accept(
297320
new TaskStatusEventDTO(jobId, 1L, COMPLETED, "t", "type", null, Instant.now()));
298-
executionErrorEventDTOConsumer.accept(new ExecutionErrorEventDTO(jobId, "Oops"));
321+
executionErrorEventConsumer.accept(new ExecutionErrorEventDTO(jobId, "Oops"));
299322

300323
// Allow a brief moment for SSE forwarding to flush before finishing
301324
Duration duration = Duration.ofMillis(50);
@@ -363,21 +386,19 @@ void testPendingEventsBoundedAndFlushedOnAttach() throws Exception {
363386
// Obtain the key from the controller's 'runs' map (wait briefly if needed)
364387
String key = waitForRunKey();
365388

366-
// Remove emitters to force buffering
367-
Object emitters = ReflectionTestUtils.getField(controller, "emitter");
389+
// Remove emitter to force buffering
390+
Object emitter = ReflectionTestUtils.getField(controller, "emitter");
368391

369-
assert emitters != null;
392+
assert emitter != null;
370393

371-
if (emitters instanceof Cache<?, ?> cache) {
394+
if (emitter instanceof Cache<?, ?> cache) {
372395
@SuppressWarnings("unchecked")
373-
Cache<String, CopyOnWriteArrayList<SseEmitter>> emitterCache =
374-
(Cache<String, CopyOnWriteArrayList<SseEmitter>>) cache;
396+
Cache<String, SseEmitter> emitterCache = (Cache<String, SseEmitter>) cache;
375397

376398
emitterCache.invalidate(key);
377-
} else if (emitters instanceof ConcurrentMap<?, ?>) {
399+
} else if (emitter instanceof ConcurrentMap<?, ?>) {
378400
@SuppressWarnings("unchecked")
379-
ConcurrentMap<String, CopyOnWriteArrayList<SseEmitter>> emitterMap =
380-
(ConcurrentMap<String, CopyOnWriteArrayList<SseEmitter>>) emitters;
401+
ConcurrentMap<String, SseEmitter> emitterMap = (ConcurrentMap<String, SseEmitter>) emitter;
381402

382403
emitterMap.remove(key);
383404
}
@@ -476,14 +497,12 @@ void testPendingEventsClearedOnStop() throws Exception {
476497

477498
if (emitter instanceof Cache<?, ?> cache) {
478499
@SuppressWarnings("unchecked")
479-
Cache<String, CopyOnWriteArrayList<SseEmitter>> emitterCache =
480-
(Cache<String, CopyOnWriteArrayList<SseEmitter>>) cache;
500+
Cache<String, SseEmitter> emitterCache = (Cache<String, SseEmitter>) cache;
481501

482502
emitterCache.invalidate(key);
483503
} else if (emitter instanceof ConcurrentMap<?, ?>) {
484504
@SuppressWarnings("unchecked")
485-
ConcurrentMap<String, CopyOnWriteArrayList<SseEmitter>> emitterMap =
486-
(ConcurrentMap<String, CopyOnWriteArrayList<SseEmitter>>) emitter;
505+
ConcurrentMap<String, SseEmitter> emitterMap = (ConcurrentMap<String, SseEmitter>) emitter;
487506

488507
emitterMap.remove(key);
489508
}
@@ -546,11 +565,18 @@ void testPendingEventsClearedOnStop() throws Exception {
546565
}
547566

548567
private static String normalizeSse(String body) {
549-
// Normalize potential double-space after 'event:' introduced by different SSE encoders
550-
String normalized = body.replace("event: ", "event: ");
568+
if (body == null || body.isEmpty()) {
569+
return body;
570+
}
551571

552-
// Also collapse any accidental triple spaces just in case
553-
return normalized.replace("event: ", "event: ");
572+
// Normalize whitespace after 'event:' at the beginning of SSE lines, regardless of encoder quirks.
573+
// This collapses any sequence of whitespace after 'event:' to a single space.
574+
// Example:
575+
// event: started
576+
// event:\tcompleted
577+
// becomes:
578+
// event: started
579+
return body.replaceAll("(?m)^(event:)(\\s*)", "$1 ");
554580
}
555581

556582
@Configuration

0 commit comments

Comments
 (0)