Skip to content

Commit 479cefa

Browse files
minor improvements to the websocket service
1 parent 7c2e952 commit 479cefa

File tree

2 files changed

+40
-14
lines changed

2 files changed

+40
-14
lines changed

src/main/kotlin/com/softeno/template/app/user/notification/Service.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,7 @@ class CoroutineUserUpdateEmitter(
128128

129129
fun getFlow(): Flow<ServerSentEvent<String>> = flow {
130130
log.info("[coroutine] New SSE client subscribed")
131-
132131
val heartbeatFlow = createHeartbeatFlow()
133-
val eventFlow = events
134132

135133
merge(heartbeatFlow, events)
136134
.collect { event -> emit(event) }

src/main/kotlin/com/softeno/template/sample/websocket/WebSocket.kt

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAd
1616
import reactor.core.publisher.Flux
1717
import reactor.core.publisher.Sinks
1818
import reactor.core.publisher.Sinks.Many
19+
import java.util.concurrent.ConcurrentHashMap
20+
import kotlin.jvm.java
1921

2022

2123
@Configuration
@@ -73,29 +75,41 @@ class WebSocketConfig(private val reactiveMessageService: ReactiveMessageService
7375
log.info("ws: [chat] disconnect chat session: $sessionId with sig: ${sig.name}")
7476
session.close()
7577
reactiveMessageService.remove(sessionId)
76-
}
77-
.doOnNext { wsMessage ->
78-
val message = objectMapper.readValue(wsMessage.payloadAsText, Message::class.java)
79-
log.info("ws: [chat] rx: $message")
80-
reactiveMessageService.send(message, message.to)
78+
}.doOnNext { wsMessage ->
79+
try {
80+
val message = objectMapper.readValue(wsMessage.payloadAsText, Message::class.java)
81+
log.info("ws: [chat] rx: $message")
82+
reactiveMessageService.send(message, message.to)
83+
} catch (e: Exception) {
84+
log.error("ws: [chat] failed to parse message: ${wsMessage.payloadAsText}", e)
85+
// optionally send error message back to client
86+
}
87+
}.doOnError { error ->
88+
log.error("ws: [chat] error in session: $sessionId", error)
89+
// handle the error
90+
}.onErrorContinue { error, _ ->
91+
log.warn("ws: [chat] continuing after error: ${error.message}")
8192
}
8293

8394
session.send(messages).and(reading)
8495
}
8596
}
8697
}
8798

88-
data class Message(val from: String, val to: String, val content: String)
89-
90-
fun Message.toJson(objectMapper: ObjectMapper): String = objectMapper.writeValueAsString(this)
91-
9299
@Service
93100
class ReactiveMessageService(
94101
private val objectMapper: ObjectMapper
95102
) {
96-
private val sinks: MutableMap<String, Many<String>> = mutableMapOf()
103+
private val log = LogFactory.getLog(javaClass)
104+
105+
private val sinks: MutableMap<String, Many<String>> = ConcurrentHashMap()
97106

98107
fun send(next: Message, session: String): Message {
108+
if (!sinks.containsKey(session)) {
109+
log.warn("ws: [chat] attempting to send to non-existent session: $session")
110+
return next
111+
}
112+
99113
val payload = next.toJson(objectMapper)
100114
getSink(session).emitNext(payload, Sinks.EmitFailureHandler.FAIL_FAST)
101115
return next
@@ -121,6 +135,20 @@ class ReactiveMessageService(
121135
return sinks[session]!!
122136
}
123137

124-
fun remove(session: String): Many<String>? = sinks.remove(session)
138+
fun remove(session: String): Many<String>? {
139+
return sinks.remove(session)?.also { sink ->
140+
sink.tryEmitComplete()
141+
}
142+
}
143+
144+
}
125145

126-
}
146+
data class Message(val from: String, val to: String, val content: String) {
147+
init {
148+
require(from.isNotBlank()) { "Message 'from' cannot be blank" }
149+
require(to.isNotBlank()) { "Message 'to' cannot be blank" }
150+
require(content.isNotBlank()) { "Message 'content' cannot be blank" }
151+
}
152+
}
153+
154+
fun Message.toJson(objectMapper: ObjectMapper): String = objectMapper.writeValueAsString(this)

0 commit comments

Comments
 (0)