Skip to content

Commit 13e2899

Browse files
committed
* sse: channel.close() now closes sse connection gracefully
Signed-off-by: neo <1100909+neowu@users.noreply.github.com>
1 parent 7c016e1 commit 13e2899

File tree

6 files changed

+26
-5
lines changed

6 files changed

+26
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
* undertow: updated to 2.3.18.Final
77
> due to vulnerability of old versions, has to update to latest despite potential memory consumption is higher
88
* sse: support method PUT/POST with body
9+
* sse: channel.close() now closes sse connection gracefully
910

1011
### 9.1.5 (11/11/2024 - 01/22/2025)
1112

core-ng/src/main/java/core/framework/internal/web/sse/ChannelImpl.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ void send(String data) {
6363
try {
6464
queue.add(Strings.bytes(data));
6565
lastSentTime = System.nanoTime();
66-
exchange.getIoThread().execute(() -> writeListener.handleEvent(sink));
66+
sink.getIoThread().execute(() -> writeListener.handleEvent(sink));
6767
} finally {
6868
long elapsed = watch.elapsed();
6969
ActionLogContext.track("sse", elapsed, 0, data.length());
@@ -79,10 +79,27 @@ public boolean isOpen() {
7979
@Override
8080
public void close() {
8181
LOGGER.debug("close sse connection, channel={}", id);
82+
sink.getIoThread().execute(() -> {
83+
try {
84+
lock.lock();
85+
if (closed) return;
86+
87+
closed = true;
88+
if (queue.isEmpty()) {
89+
exchange.endExchange();
90+
}
91+
} finally {
92+
lock.unlock();
93+
}
94+
});
95+
}
96+
97+
public void shutdown() {
8298
try {
8399
lock.lock();
84100
if (closed) return;
85101

102+
LOGGER.debug("shutdown sse connection, channel={}", id);
86103
closed = true;
87104
queue.clear();
88105
exchange.endExchange();

core-ng/src/main/java/core/framework/internal/web/sse/ServerSentEventCloseHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public void exchangeEvent(HttpServerExchange exchange, NextListener next) {
3636
actionLog.correlationIds = refIds;
3737
if (!channel.groups.isEmpty()) actionLog.context("group", channel.groups.toArray());
3838
context.remove(channel);
39-
channel.close();
39+
channel.shutdown();
4040
} catch (Throwable e) {
4141
logManager.logError(e);
4242
} finally {

core-ng/src/main/java/core/framework/internal/web/sse/ServerSentEventHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void shutdown() {
121121
logger.info("close sse connections");
122122
for (ChannelSupport<?> support : supports.values()) {
123123
for (var channel : support.context.all()) {
124-
channel.close();
124+
((ChannelImpl<?>) channel).shutdown();
125125
}
126126
}
127127
}

core-ng/src/main/java/core/framework/web/sse/Channel.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ default void send(T event) {
77
send(null, event);
88
}
99

10+
// gracefully close, queue "end exchange" into io thread
1011
void close();
1112

1213
void join(String group);

core-ng/src/test/java/core/framework/internal/web/sse/ChannelImplTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.junit.jupiter.api.BeforeEach;
77
import org.junit.jupiter.api.Test;
88
import org.xnio.XnioIoThread;
9+
import org.xnio.channels.StreamSinkChannel;
910

1011
import java.nio.ByteBuffer;
1112

@@ -18,9 +19,10 @@ class ChannelImplTest {
1819

1920
@BeforeEach
2021
void createServerSentEventChannelImpl() {
22+
StreamSinkChannel sink = mock(StreamSinkChannel.class);
23+
when(sink.getIoThread()).thenReturn(mock(XnioIoThread.class));
2124
ServerConnection connection = mock(ServerConnection.class);
22-
when(connection.getIoThread()).thenReturn(mock(XnioIoThread.class));
23-
channel = new ChannelImpl<>(new HttpServerExchange(connection), null, null, new ServerSentEventBuilder<>(TestEvent.class), null);
25+
channel = new ChannelImpl<>(new HttpServerExchange(connection), sink, null, new ServerSentEventBuilder<>(TestEvent.class), null);
2426
}
2527

2628
@Test

0 commit comments

Comments
 (0)