Skip to content

Commit 393f391

Browse files
committed
682 Additional fixes
1 parent b80fc99 commit 393f391

File tree

3 files changed

+71
-64
lines changed

3 files changed

+71
-64
lines changed

server/libs/platform/platform-coordinator/src/main/java/com/bytechef/platform/coordinator/job/JobSyncExecutor.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -483,8 +483,8 @@ private void handleCoordinatorErrorEvent(
483483
try {
484484
notifyErrorListeners(event);
485485
} catch (Exception exception) {
486-
if (logger.isTraceEnabled()) {
487-
logger.trace(exception.getMessage(), exception);
486+
if (logger.isDebugEnabled()) {
487+
logger.debug(exception.getMessage(), exception);
488488
}
489489
}
490490

@@ -640,8 +640,8 @@ private void notifyTaskExecutionCompleteListeners(
640640
jobId = Validate.notNull(taskExecution.getJobId(), "jobId");
641641
}
642642
} catch (Exception exception) {
643-
if (logger.isTraceEnabled()) {
644-
logger.trace(exception.getMessage(), exception);
643+
if (logger.isDebugEnabled()) {
644+
logger.debug(exception.getMessage(), exception);
645645
}
646646
}
647647

@@ -658,8 +658,8 @@ private void notifyTaskExecutionCompleteListeners(
658658
try {
659659
listener.accept(event);
660660
} catch (Exception exception) {
661-
if (logger.isTraceEnabled()) {
662-
logger.trace(exception.getMessage(), exception);
661+
if (logger.isDebugEnabled()) {
662+
logger.debug(exception.getMessage(), exception);
663663
}
664664
}
665665
}
@@ -676,8 +676,8 @@ private void notifyErrorListeners(ErrorEvent event) {
676676
}
677677
}
678678
} catch (Exception exception) {
679-
if (logger.isTraceEnabled()) {
680-
logger.trace(exception.getMessage(), exception);
679+
if (logger.isDebugEnabled()) {
680+
logger.debug(exception.getMessage(), exception);
681681
}
682682
}
683683

@@ -695,8 +695,8 @@ private void notifyErrorListeners(ErrorEvent event) {
695695
try {
696696
listener.accept(event);
697697
} catch (Exception exception) {
698-
if (logger.isTraceEnabled()) {
699-
logger.trace(exception.getMessage(), exception);
698+
if (logger.isDebugEnabled()) {
699+
logger.debug(exception.getMessage(), exception);
700700
}
701701
}
702702
}
@@ -713,8 +713,8 @@ private void notifyTaskStartedListeners(long jobId, TaskStartedApplicationEvent
713713
try {
714714
listener.accept(event);
715715
} catch (Exception exception) {
716-
if (logger.isTraceEnabled()) {
717-
logger.trace(exception.getMessage(), exception);
716+
if (logger.isDebugEnabled()) {
717+
logger.debug(exception.getMessage(), exception);
718718
}
719719
}
720720
}
@@ -731,8 +731,8 @@ private void notifyJobStatusListeners(long jobId, JobStatusApplicationEvent even
731731
try {
732732
listener.accept(event);
733733
} catch (Exception exception) {
734-
if (logger.isTraceEnabled()) {
735-
logger.trace(exception.getMessage(), exception);
734+
if (logger.isDebugEnabled()) {
735+
logger.debug(exception.getMessage(), exception);
736736
}
737737
}
738738
}

server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-rest/src/main/java/com/bytechef/platform/workflow/test/web/rest/WorkflowTestApiController.java

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@
1818

1919
import com.bytechef.atlas.coordinator.annotation.ConditionalOnCoordinator;
2020
import com.bytechef.commons.util.EncodingUtils;
21+
import com.bytechef.commons.util.JsonUtils;
2122
import com.bytechef.commons.util.MapUtils;
2223
import com.bytechef.file.storage.domain.FileEntry;
2324
import com.bytechef.platform.file.storage.TempFileStorage;
24-
import com.bytechef.platform.workflow.test.dto.ExecutionErrorEventDTO;
25-
import com.bytechef.platform.workflow.test.dto.JobStatusEventDTO;
26-
import com.bytechef.platform.workflow.test.dto.TaskStatusEventDTO;
2725
import com.bytechef.platform.workflow.test.facade.WorkflowTestFacade;
2826
import com.bytechef.platform.workflow.test.web.rest.model.WorkflowTestExecutionModel;
2927
import com.bytechef.tenant.TenantContext;
@@ -118,25 +116,41 @@ public SseEmitter attachWorkflowTest(@PathVariable Long jobId) {
118116
if (future == null) {
119117
List<SseEmitter.SseEventBuilder> bufferedEvents = pendingEvents.getIfPresent(key);
120118

121-
if (bufferedEvents != null) {
119+
if (bufferedEvents != null && !bufferedEvents.isEmpty()) {
122120
pendingEvents.invalidate(key);
123-
}
124121

125-
if (bufferedEvents != null && !bufferedEvents.isEmpty()) {
126122
try {
127123
for (SseEmitter.SseEventBuilder eventBuilder : bufferedEvents) {
128124
try {
129125
emitter.send(eventBuilder);
130126
} catch (IOException exception) {
131127
if (logger.isTraceEnabled()) {
132-
logger.trace(exception.getMessage(), exception);
128+
logger.trace(
129+
"Failed to send buffered SSE event for job {}: {}", jobId, exception.getMessage(),
130+
exception);
131+
}
132+
133+
try {
134+
emitter.send(createEvent("error", "Failed to deliver buffered events"));
135+
} catch (IOException ioException) {
136+
if (logger.isTraceEnabled()) {
137+
logger.trace(
138+
"Failed to send SSE error event for job {}: {}", jobId,
139+
ioException.getMessage(), ioException);
140+
}
141+
142+
break;
133143
}
134144
}
135145
}
136146
} finally {
137147
emitter.complete();
138148
}
139149
} else {
150+
if (bufferedEvents != null) {
151+
pendingEvents.invalidate(key);
152+
}
153+
140154
try {
141155
emitter.send(createEvent("error", "Not running"));
142156
} catch (Exception exception) {
@@ -291,23 +305,19 @@ private void registerListenersIfAbsent(String key, long jobId) {
291305
List<AutoCloseable> handles = new ArrayList<>();
292306

293307
handles.add(workflowTestFacade.addJobStatusListener(
294-
jobId, (JobStatusEventDTO event) -> sendToEmitter(key, createEvent("job", String.valueOf(event)))));
308+
jobId, (event) -> sendToEmitter(key, createEvent("job", JsonUtils.write(event)))));
295309

296310
handles.add(
297311
workflowTestFacade.addTaskStartedListener(
298-
jobId,
299-
(TaskStatusEventDTO event) -> sendToEmitter(key, createEvent("task", String.valueOf(event)))));
312+
jobId, (event) -> sendToEmitter(key, createEvent("task", JsonUtils.write(event)))));
300313

301314
handles.add(
302315
workflowTestFacade.addTaskExecutionCompleteListener(
303-
jobId,
304-
(TaskStatusEventDTO event) -> sendToEmitter(key, createEvent("task", String.valueOf(event)))));
316+
jobId, (event) -> sendToEmitter(key, createEvent("task", JsonUtils.write(event)))));
305317

306318
handles.add(
307319
workflowTestFacade.addErrorListener(
308-
jobId,
309-
(ExecutionErrorEventDTO event) -> sendToEmitter(
310-
key, createEvent("error", String.valueOf(event)))));
320+
jobId, (event) -> sendToEmitter(key, createEvent("error", JsonUtils.write(event)))));
311321

312322
return handles;
313323
});
@@ -318,9 +328,7 @@ private void completeAndClearEmitter(String key) {
318328

319329
if (emitter != null) {
320330
this.emitter.invalidate(key);
321-
}
322331

323-
if (emitter != null) {
324332
try {
325333
emitter.complete();
326334
} catch (Exception exception) {
@@ -384,9 +392,7 @@ private void registerEmitter(String key, long jobId, SseEmitter emitter) {
384392

385393
if (bufferedEvents != null) {
386394
pendingEvents.invalidate(key);
387-
}
388395

389-
if (bufferedEvents != null) {
390396
for (SseEmitter.SseEventBuilder eventBuilder : bufferedEvents) {
391397
sendToEmitter(key, eventBuilder);
392398
}
@@ -427,9 +433,7 @@ private void unregisterListeners(String key) {
427433

428434
if (handles != null) {
429435
listenerHandles.invalidate(key);
430-
}
431436

432-
if (handles != null) {
433437
for (AutoCloseable handle : handles) {
434438
try {
435439
handle.close();

server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-rest/src/test/java/com/bytechef/platform/workflow/test/web/rest/WorkflowTestApiControllerTest.java

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -279,23 +279,23 @@ void testListenerForwardingEmitsJobTaskAndErrorEvents() throws Exception {
279279
Thread.sleep(50);
280280
}
281281

282-
Consumer<JobStatusEventDTO> jobStatusEventDTOConsumer = jobListener.get();
283-
Consumer<TaskStatusEventDTO> taskStatusEventDTOConsumerStarted = taskStartedListener.get();
284-
Consumer<TaskStatusEventDTO> taskStatusEventDTOConsumerCompleted = taskCompletedListener.get();
285-
Consumer<ExecutionErrorEventDTO> executionErrorEventDTOConsumer = errorListener.get();
282+
Consumer<JobStatusEventDTO> jobStatusEventConsumer = jobListener.get();
283+
Consumer<TaskStatusEventDTO> taskStatusEventStartedConsumer = taskStartedListener.get();
284+
Consumer<TaskStatusEventDTO> taskStatusEventCompletedConsumer = taskCompletedListener.get();
285+
Consumer<ExecutionErrorEventDTO> executionErrorEventConsumer = errorListener.get();
286286

287-
assertThat(jobStatusEventDTOConsumer).isNotNull();
288-
assertThat(taskStatusEventDTOConsumerStarted).isNotNull();
289-
assertThat(taskStatusEventDTOConsumerCompleted).isNotNull();
290-
assertThat(executionErrorEventDTOConsumer).isNotNull();
287+
assertThat(jobStatusEventConsumer).isNotNull();
288+
assertThat(taskStatusEventStartedConsumer).isNotNull();
289+
assertThat(taskStatusEventCompletedConsumer).isNotNull();
290+
assertThat(executionErrorEventConsumer).isNotNull();
291291

292292
// Fire synthetic events
293-
jobStatusEventDTOConsumer.accept(new JobStatusEventDTO(jobId, "STARTED", Instant.now()));
294-
taskStatusEventDTOConsumerStarted.accept(
293+
jobStatusEventConsumer.accept(new JobStatusEventDTO(jobId, "STARTED", Instant.now()));
294+
taskStatusEventStartedConsumer.accept(
295295
new TaskStatusEventDTO(jobId, 1L, STARTED, null, null, Instant.now(), null));
296-
taskStatusEventDTOConsumerCompleted.accept(
296+
taskStatusEventCompletedConsumer.accept(
297297
new TaskStatusEventDTO(jobId, 1L, COMPLETED, "t", "type", null, Instant.now()));
298-
executionErrorEventDTOConsumer.accept(new ExecutionErrorEventDTO(jobId, "Oops"));
298+
executionErrorEventConsumer.accept(new ExecutionErrorEventDTO(jobId, "Oops"));
299299

300300
// Allow a brief moment for SSE forwarding to flush before finishing
301301
Duration duration = Duration.ofMillis(50);
@@ -363,21 +363,19 @@ void testPendingEventsBoundedAndFlushedOnAttach() throws Exception {
363363
// Obtain the key from the controller's 'runs' map (wait briefly if needed)
364364
String key = waitForRunKey();
365365

366-
// Remove emitters to force buffering
367-
Object emitters = ReflectionTestUtils.getField(controller, "emitter");
366+
// Remove emitter to force buffering
367+
Object emitter = ReflectionTestUtils.getField(controller, "emitter");
368368

369-
assert emitters != null;
369+
assert emitter != null;
370370

371-
if (emitters instanceof Cache<?, ?> cache) {
371+
if (emitter instanceof Cache<?, ?> cache) {
372372
@SuppressWarnings("unchecked")
373-
Cache<String, CopyOnWriteArrayList<SseEmitter>> emitterCache =
374-
(Cache<String, CopyOnWriteArrayList<SseEmitter>>) cache;
373+
Cache<String, SseEmitter> emitterCache = (Cache<String, SseEmitter>) cache;
375374

376375
emitterCache.invalidate(key);
377-
} else if (emitters instanceof ConcurrentMap<?, ?>) {
376+
} else if (emitter instanceof ConcurrentMap<?, ?>) {
378377
@SuppressWarnings("unchecked")
379-
ConcurrentMap<String, CopyOnWriteArrayList<SseEmitter>> emitterMap =
380-
(ConcurrentMap<String, CopyOnWriteArrayList<SseEmitter>>) emitters;
378+
ConcurrentMap<String, SseEmitter> emitterMap = (ConcurrentMap<String, SseEmitter>) emitter;
381379

382380
emitterMap.remove(key);
383381
}
@@ -476,14 +474,12 @@ void testPendingEventsClearedOnStop() throws Exception {
476474

477475
if (emitter instanceof Cache<?, ?> cache) {
478476
@SuppressWarnings("unchecked")
479-
Cache<String, CopyOnWriteArrayList<SseEmitter>> emitterCache =
480-
(Cache<String, CopyOnWriteArrayList<SseEmitter>>) cache;
477+
Cache<String, SseEmitter> emitterCache = (Cache<String, SseEmitter>) cache;
481478

482479
emitterCache.invalidate(key);
483480
} else if (emitter instanceof ConcurrentMap<?, ?>) {
484481
@SuppressWarnings("unchecked")
485-
ConcurrentMap<String, CopyOnWriteArrayList<SseEmitter>> emitterMap =
486-
(ConcurrentMap<String, CopyOnWriteArrayList<SseEmitter>>) emitter;
482+
ConcurrentMap<String, SseEmitter> emitterMap = (ConcurrentMap<String, SseEmitter>) emitter;
487483

488484
emitterMap.remove(key);
489485
}
@@ -546,11 +542,18 @@ void testPendingEventsClearedOnStop() throws Exception {
546542
}
547543

548544
private static String normalizeSse(String body) {
549-
// Normalize potential double-space after 'event:' introduced by different SSE encoders
550-
String normalized = body.replace("event: ", "event: ");
545+
if (body == null || body.isEmpty()) {
546+
return body;
547+
}
551548

552-
// Also collapse any accidental triple spaces just in case
553-
return normalized.replace("event: ", "event: ");
549+
// Normalize whitespace after 'event:' at the beginning of SSE lines, regardless of encoder quirks.
550+
// This collapses any sequence of whitespace after 'event:' to a single space.
551+
// Example:
552+
// event: started
553+
// event:\tcompleted
554+
// becomes:
555+
// event: started
556+
return body.replaceAll("(?m)^(event:)(\\s*)", "$1 ");
554557
}
555558

556559
@Configuration

0 commit comments

Comments
 (0)