Skip to content

Commit 7c2e952

Browse files
add coroutine sse and clean up
1 parent 4bfd075 commit 7c2e952

File tree

23 files changed

+392
-356
lines changed

23 files changed

+392
-356
lines changed

http/server-events.http

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,3 @@
22
GET {{host}}/coroutine/currency-rate/current
33
Accept: text/event-stream
44
Authorization: Bearer {{oauthToken}}
5-
6-
### Get server event stream of user notification
7-
GET {{host}}/coroutine/notification/user
8-
Accept: text/event-stream
9-
Authorization: Bearer {{oauthToken}}

http/update.http

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
###
22
GET {{host}}/update/user
33
Content-Type: text/event-stream
4-
#Authorization: Bearer {{oauthToken}}
4+
5+
###
6+
GET {{host}}/update/user/reactive
7+
Content-Type: text/event-stream

src/main/kotlin/com/softeno/template/SoftenoReactiveMongoApp.kt

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,6 @@
11
package com.softeno.template
22

33
import com.softeno.template.playground.CoroutinePlayground
4-
import com.softeno.template.sample.http.internal.serverevents.UserNotificationService
5-
import kotlinx.coroutines.CoroutineScope
6-
import kotlinx.coroutines.SupervisorJob
7-
import kotlinx.coroutines.delay
8-
import kotlinx.coroutines.flow.launchIn
9-
import kotlinx.coroutines.flow.onEach
10-
import kotlinx.coroutines.runBlocking
114
import org.slf4j.LoggerFactory
125
import org.springframework.boot.autoconfigure.SpringBootApplication
136
import org.springframework.boot.context.event.ApplicationReadyEvent
@@ -32,22 +25,12 @@ fun main(args: Array<String>) {
3225

3326
@Component
3427
@Profile(value = ["!integration"])
35-
class SpringApplicationReadyEventListener(private val userNotificationService: UserNotificationService) {
28+
class SpringApplicationReadyEventListener {
3629
private val logger = LoggerFactory.getLogger(this::class.java)
3730

3831
@EventListener(ApplicationReadyEvent::class)
3932
fun onApplicationReady() {
40-
logger.info(">> Application Ready")
41-
42-
runBlocking {
43-
val subscribingScope = CoroutineScope(SupervisorJob())
44-
userNotificationService.stream()
45-
.onEach { logger.info(">> UserNotificationService: $it") }
46-
.launchIn(subscribingScope)
47-
48-
logger.info(">> UserNotificationService: Subscribed to stream")
49-
while (true) { delay(Long.MAX_VALUE) }
50-
}
33+
logger.info("Application Ready")
5134
}
5235
}
5336

src/main/kotlin/com/softeno/template/app/common/PrincipalHandler.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import java.security.Principal
1010
interface PrincipalHandler {
1111
suspend fun showPrincipal(log: Log, monoPrincipal: Mono<Principal>){
1212
val principal = monoPrincipal.awaitSingleOrNull()
13-
log.info("principal: $principal, name: ${principal?.name}")
13+
log.debug("principal: $principal, name: ${principal?.name}")
1414
val authentication = ReactiveSecurityContextHolder.getContext().map { it.authentication }.awaitSingleOrNull()
1515
if (authentication != null) {
1616
val token = (authentication as JwtAuthenticationToken).token

src/main/kotlin/com/softeno/template/app/event/AppEvent.kt

Lines changed: 0 additions & 54 deletions
This file was deleted.
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.softeno.template.app.event
2+
3+
import com.softeno.template.app.kafka.KafkaMessage
4+
import com.softeno.template.app.kafka.ReactiveKafkaSampleProducer
5+
import kotlinx.coroutines.DelicateCoroutinesApi
6+
import org.apache.commons.logging.LogFactory
7+
import org.slf4j.MDC
8+
import org.springframework.context.ApplicationEvent
9+
import org.springframework.context.ApplicationListener
10+
import org.springframework.stereotype.Component
11+
12+
data class UserAction(val source: String, val traceId: String? = null, val spanId: String? = null) : ApplicationEvent(source)
13+
14+
@Component
15+
class SampleApplicationEventPublisher(
16+
17+
private val reactiveKafkaProducer: ReactiveKafkaSampleProducer,
18+
) : ApplicationListener<UserAction> {
19+
private val log = LogFactory.getLog(javaClass)
20+
21+
@OptIn(DelicateCoroutinesApi::class)
22+
override fun onApplicationEvent(event: UserAction) {
23+
// note: propagate traceId and spanId in MDC context
24+
if (!event.spanId.isNullOrBlank() && !event.traceId.isNullOrBlank()) {
25+
MDC.put("traceId", event.traceId)
26+
MDC.put("spanId", event.spanId)
27+
}
28+
log.debug("[app event handler]: Received event: $event")
29+
reactiveKafkaProducer.send(event.toKafkaMessage())
30+
}
31+
32+
}
33+
34+
fun UserAction.toKafkaMessage() = KafkaMessage(content = this.source, traceId = this.traceId, spanId = this.spanId)

src/main/kotlin/com/softeno/template/app/kafka/KafkaKeycloakController.kt

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package com.softeno.template.app.kafka
22

3+
import com.fasterxml.jackson.annotation.JsonCreator
4+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties
35
import com.fasterxml.jackson.databind.JsonNode
46
import com.fasterxml.jackson.databind.ObjectMapper
5-
import com.softeno.template.app.kafka.dto.KeycloakUserEvent
67
import org.apache.commons.logging.LogFactory
78
import org.apache.kafka.clients.consumer.ConsumerRecord
89
import org.springframework.beans.factory.annotation.Qualifier
@@ -38,4 +39,31 @@ class ReactiveKafkaKeycloakController(
3839
log.info("[kafka]: keycloak consumer starts")
3940
consumeKafkaMessage().subscribe()
4041
}
41-
}
42+
}
43+
44+
enum class KeycloakEventType {
45+
LOGIN,
46+
LOGOUT,
47+
REGISTER;
48+
49+
companion object {
50+
@JsonCreator
51+
fun byString(input: String): KeycloakEventType? {
52+
return KeycloakEventType.entries.firstOrNull { it.name.equals(input, true) }
53+
}
54+
}
55+
}
56+
57+
@JsonIgnoreProperties(ignoreUnknown = true)
58+
data class KeycloakUserEvent(
59+
val id: String,
60+
val time: Long,
61+
val type: KeycloakEventType,
62+
val realmId: String,
63+
val clientId: String?,
64+
val userId: String,
65+
val sessionId: String?,
66+
val ipAddress: String,
67+
val error: String?,
68+
val details: Map<String, String>
69+
)

src/main/kotlin/com/softeno/template/sample/kafka/KafkaSampleHandler.kt renamed to src/main/kotlin/com/softeno/template/app/kafka/KafkaSampleHandler.kt

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
1-
package com.softeno.template.sample.kafka
1+
package com.softeno.template.app.kafka
22

33

4+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties
45
import com.fasterxml.jackson.databind.JsonNode
56
import com.fasterxml.jackson.databind.ObjectMapper
67
import com.softeno.template.app.kafka.config.KafkaApplicationProperties
7-
import com.softeno.template.app.kafka.dto.KafkaMessage
8-
import io.micrometer.observation.ObservationRegistry
8+
import com.softeno.template.app.user.notification.CoroutineUserUpdateEmitter
9+
import com.softeno.template.app.user.notification.ReactiveUserUpdateEmitter
10+
import com.softeno.template.sample.websocket.Message
11+
import com.softeno.template.sample.websocket.ReactiveMessageService
912
import io.micrometer.tracing.Span
1013
import io.micrometer.tracing.Tracer
14+
import kotlinx.coroutines.DelicateCoroutinesApi
1115
import org.apache.commons.lang3.RandomUtils
1216
import org.apache.commons.logging.LogFactory
1317
import org.apache.kafka.clients.consumer.ConsumerRecord
@@ -18,20 +22,25 @@ import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate
1822
import org.springframework.stereotype.Controller
1923
import org.springframework.stereotype.Service
2024
import reactor.core.publisher.Flux
21-
import reactor.kafka.receiver.ReceiverOptions
2225
import reactor.kafka.sender.SenderResult
2326

27+
@JsonIgnoreProperties(ignoreUnknown = true)
28+
data class KafkaMessage(val content: String, val traceId: String? = null, val spanId: String? = null)
29+
30+
fun KafkaMessage.toMessage() = Message(from = "SYSTEM", to = "ALL", content = this.content)
2431

2532
@Controller
2633
class ReactiveKafkaSampleController(
2734
@Qualifier(value = "kafkaSampleConsumerTemplate") private val reactiveKafkaConsumerTemplate: ReactiveKafkaConsumerTemplate<String, JsonNode>,
2835
private val objectMapper: ObjectMapper,
2936
private val tracer: Tracer,
30-
private val observationRegistry: ObservationRegistry,
31-
@Qualifier(value = "kafkaSampleOptions") private val kafkaReceiverOptions: ReceiverOptions<String, JsonNode>
37+
private val reactiveMessageService: ReactiveMessageService,
38+
private val reactiveUserUpdateEmitter: ReactiveUserUpdateEmitter,
39+
private val userUpdateEmitter: CoroutineUserUpdateEmitter,
3240
) : CommandLineRunner {
3341
private val log = LogFactory.getLog(javaClass)
3442

43+
@OptIn(DelicateCoroutinesApi::class)
3544
private fun consumeKafkaMessage(): Flux<JsonNode> {
3645
return reactiveKafkaConsumerTemplate
3746
.receiveAutoAck()
@@ -56,7 +65,9 @@ class ReactiveKafkaSampleController(
5665
val span = tracer.nextSpan().name("kafka-consumer")
5766
tracer.withSpan(span.start()).use {
5867
log.info("[kafka] rx sample: $kafkaMessage")
59-
// additional processing ...
68+
reactiveMessageService.broadcast(kafkaMessage.toMessage())
69+
reactiveUserUpdateEmitter.broadcast(kafkaMessage.toMessage())
70+
userUpdateEmitter.broadcast(kafkaMessage.toMessage())
6071
}
6172
span.end()
6273
}

src/main/kotlin/com/softeno/template/app/kafka/config/KafkaConfig.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.softeno.template.app.kafka.config
22

33
import com.fasterxml.jackson.databind.JsonNode
4-
import com.softeno.template.app.kafka.dto.KafkaMessage
4+
import com.softeno.template.app.kafka.KafkaMessage
55
import io.micrometer.observation.ObservationRegistry
66
import org.springframework.beans.factory.annotation.Qualifier
77
import org.springframework.boot.autoconfigure.kafka.KafkaProperties

src/main/kotlin/com/softeno/template/app/kafka/dto/dto.kt

Lines changed: 0 additions & 34 deletions
This file was deleted.

0 commit comments

Comments
 (0)