Skip to content

Commit ae0a8be

Browse files
committed
682 Additional fixes
1 parent 61747f2 commit ae0a8be

File tree

2 files changed

+14
-22
lines changed

2 files changed

+14
-22
lines changed

server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-rest/src/main/java/com/bytechef/platform/workflow/test/web/rest/WorkflowTestApiController.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,9 @@ public SseEmitter attachWorkflowTest(@PathVariable Long jobId) {
117117
if (future == null) {
118118
List<SseEmitter.SseEventBuilder> bufferedEvents = pendingEvents.getIfPresent(key);
119119

120-
if (bufferedEvents != null) {
120+
if (bufferedEvents != null && !bufferedEvents.isEmpty()) {
121121
pendingEvents.invalidate(key);
122-
}
123122

124-
if (bufferedEvents != null && !bufferedEvents.isEmpty()) {
125123
try {
126124
for (SseEmitter.SseEventBuilder eventBuilder : bufferedEvents) {
127125
try {
@@ -136,6 +134,10 @@ public SseEmitter attachWorkflowTest(@PathVariable Long jobId) {
136134
emitter.complete();
137135
}
138136
} else {
137+
if (bufferedEvents != null) {
138+
pendingEvents.invalidate(key);
139+
}
140+
139141
try {
140142
emitter.send(createEvent("error", "Not running"));
141143
} catch (Exception exception) {
@@ -313,9 +315,7 @@ private void completeAndClearEmitter(String key) {
313315

314316
if (emitter != null) {
315317
this.emitter.invalidate(key);
316-
}
317318

318-
if (emitter != null) {
319319
try {
320320
emitter.complete();
321321
} catch (Exception exception) {
@@ -379,9 +379,7 @@ private void registerEmitter(String key, long jobId, SseEmitter emitter) {
379379

380380
if (bufferedEvents != null) {
381381
pendingEvents.invalidate(key);
382-
}
383382

384-
if (bufferedEvents != null) {
385383
for (SseEmitter.SseEventBuilder eventBuilder : bufferedEvents) {
386384
sendToEmitter(key, eventBuilder);
387385
}
@@ -422,9 +420,7 @@ private void unregisterListeners(String key) {
422420

423421
if (handles != null) {
424422
listenerHandles.invalidate(key);
425-
}
426423

427-
if (handles != null) {
428424
for (AutoCloseable handle : handles) {
429425
try {
430426
handle.close();

server/libs/platform/platform-workflow/platform-workflow-test/platform-workflow-test-rest/src/test/java/com/bytechef/platform/workflow/test/web/rest/WorkflowTestApiControllerTest.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -364,21 +364,19 @@ void testPendingEventsBoundedAndFlushedOnAttach() throws Exception {
364364
// Obtain the key from the controller's 'runs' map (wait briefly if needed)
365365
String key = waitForRunKey();
366366

367-
// Remove emitters to force buffering
368-
Object emitters = ReflectionTestUtils.getField(controller, "emitter");
367+
// Remove emitter to force buffering
368+
Object emitter = ReflectionTestUtils.getField(controller, "emitter");
369369

370-
assert emitters != null;
370+
assert emitter != null;
371371

372-
if (emitters instanceof Cache<?, ?> cache) {
372+
if (emitter instanceof Cache<?, ?> cache) {
373373
@SuppressWarnings("unchecked")
374-
Cache<String, CopyOnWriteArrayList<SseEmitter>> emitterCache =
375-
(Cache<String, CopyOnWriteArrayList<SseEmitter>>) cache;
374+
Cache<String, SseEmitter> emitterCache = (Cache<String, SseEmitter>) cache;
376375

377376
emitterCache.invalidate(key);
378-
} else if (emitters instanceof ConcurrentMap<?, ?>) {
377+
} else if (emitter instanceof ConcurrentMap<?, ?>) {
379378
@SuppressWarnings("unchecked")
380-
ConcurrentMap<String, CopyOnWriteArrayList<SseEmitter>> emitterMap =
381-
(ConcurrentMap<String, CopyOnWriteArrayList<SseEmitter>>) emitters;
379+
ConcurrentMap<String, SseEmitter> emitterMap = (ConcurrentMap<String, SseEmitter>) emitter;
382380

383381
emitterMap.remove(key);
384382
}
@@ -477,14 +475,12 @@ void testPendingEventsClearedOnStop() throws Exception {
477475

478476
if (emitter instanceof Cache<?, ?> cache) {
479477
@SuppressWarnings("unchecked")
480-
Cache<String, CopyOnWriteArrayList<SseEmitter>> emitterCache =
481-
(Cache<String, CopyOnWriteArrayList<SseEmitter>>) cache;
478+
Cache<String, SseEmitter> emitterCache = (Cache<String, SseEmitter>) cache;
482479

483480
emitterCache.invalidate(key);
484481
} else if (emitter instanceof ConcurrentMap<?, ?>) {
485482
@SuppressWarnings("unchecked")
486-
ConcurrentMap<String, CopyOnWriteArrayList<SseEmitter>> emitterMap =
487-
(ConcurrentMap<String, CopyOnWriteArrayList<SseEmitter>>) emitter;
483+
ConcurrentMap<String, SseEmitter> emitterMap = (ConcurrentMap<String, SseEmitter>) emitter;
488484

489485
emitterMap.remove(key);
490486
}

0 commit comments

Comments
 (0)