Skip to content

Commit 72d4156

Browse files
committed
Change AMQP listeners implementations and add @ConditionalOnExpression on them
1 parent dddab09 commit 72d4156

File tree

3 files changed

+49
-18
lines changed

3 files changed

+49
-18
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Copyright (c) Cosmo Tech.
2+
// Licensed under the MIT license.
3+
package com.cosmotech.run.service.amqp
4+
5+
import com.cosmotech.api.events.RunStart
6+
import com.cosmotech.run.config.RabbitMqConfigModel
7+
import com.cosmotech.runner.domain.Runner
8+
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
9+
import org.springframework.context.ApplicationListener
10+
import org.springframework.stereotype.Component
11+
12+
@Component
13+
@ConditionalOnExpression(
14+
"'\${csm.platform.internalResultServices.enabled}' == 'true' " +
15+
"and '\${csm.platform.internalResultServices.eventBus.enabled}' == 'true'")
16+
class AddQueueOnRunStartListener(
17+
private val rabbitMqConfigModel: RabbitMqConfigModel,
18+
private val amqpClientServiceImpl: AmqpClientServiceImpl,
19+
) : ApplicationListener<RunStart> {
20+
21+
override fun onApplicationEvent(event: RunStart) {
22+
val exchange = rabbitMqConfigModel.exchange
23+
amqpClientServiceImpl.addNewQueue(exchange, (event.runnerData as Runner).workspaceId!!)
24+
}
25+
}

run/src/main/kotlin/com/cosmotech/run/service/amqp/AmqpClientServiceImpl.kt

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,7 @@
22
// Licensed under the MIT license.
33
package com.cosmotech.run.service.amqp
44

5-
import com.cosmotech.api.events.RunStart
6-
import com.cosmotech.api.events.WorkspaceDeleted
7-
import com.cosmotech.run.config.RabbitMqConfigModel
85
import com.cosmotech.run.service.RunServiceImpl
9-
import com.cosmotech.runner.domain.Runner
106
import com.google.gson.Gson
117
import com.google.gson.reflect.TypeToken
128
import java.io.BufferedReader
@@ -22,7 +18,6 @@ import org.springframework.amqp.rabbit.core.RabbitAdmin
2218
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
2319
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry
2420
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
25-
import org.springframework.context.event.EventListener
2621
import org.springframework.stereotype.Service
2722

2823
@Suppress("ConstructorParameterNaming")
@@ -40,24 +35,11 @@ data class ProbeMessage(
4035
class AmqpClientServiceImpl(
4136
private val rabbitAdmin: RabbitAdmin,
4237
private val rabbitListenerEndpointRegistry: RabbitListenerEndpointRegistry,
43-
private val rabbitMqConfigModel: RabbitMqConfigModel,
4438
private val runServiceImpl: RunServiceImpl
4539
) : AmqpClientServiceInterface {
4640

4741
private val logger = LoggerFactory.getLogger(AmqpClientServiceImpl::class.java)
4842

49-
@EventListener(RunStart::class)
50-
fun awakeListener(event: RunStart) {
51-
val exchange = rabbitMqConfigModel.exchange
52-
this.addNewQueue(exchange, (event.runnerData as Runner).workspaceId!!)
53-
}
54-
55-
@EventListener(WorkspaceDeleted::class)
56-
fun removeQueueFromDeletedWorkspace(event: WorkspaceDeleted) {
57-
val exchange = rabbitMqConfigModel.exchange
58-
this.removeQueue(exchange, event.workspaceId)
59-
}
60-
6143
@RabbitListener(
6244
id = "\${csm.platform.internalResultServices.eventBus.default-exchange}",
6345
queues = ["\${csm.platform.internalResultServices.eventBus.default-queue}"],
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright (c) Cosmo Tech.
2+
// Licensed under the MIT license.
3+
package com.cosmotech.run.service.amqp
4+
5+
import com.cosmotech.api.events.WorkspaceDeleted
6+
import com.cosmotech.run.config.RabbitMqConfigModel
7+
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
8+
import org.springframework.context.ApplicationListener
9+
import org.springframework.stereotype.Component
10+
11+
@Component
12+
@ConditionalOnExpression(
13+
"'\${csm.platform.internalResultServices.enabled}' == 'true' " +
14+
"and '\${csm.platform.internalResultServices.eventBus.enabled}' == 'true'")
15+
class RemoveQueueOnWorkspaceDeletedListener(
16+
private val rabbitMqConfigModel: RabbitMqConfigModel,
17+
private val amqpClientServiceImpl: AmqpClientServiceImpl
18+
) : ApplicationListener<WorkspaceDeleted> {
19+
20+
override fun onApplicationEvent(event: WorkspaceDeleted) {
21+
val exchange = rabbitMqConfigModel.exchange
22+
amqpClientServiceImpl.removeQueue(exchange, event.workspaceId)
23+
}
24+
}

0 commit comments

Comments
 (0)