Skip to content

Commit 85a7527

Browse files
committed
feat: add deliveryMode (1: non-persistent, 2: persistent) to AMQP configuration with default of 1 (non-persistent)
1 parent fd811ac commit 85a7527

File tree

3 files changed

+18
-13
lines changed

3 files changed

+18
-13
lines changed

keycloak-webhook-provider-amqp/src/main/kotlin/AmqpWebhookHandler.kt

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ class AmqpWebhookHandler : WebhookHandler {
1515
private lateinit var connection: Connection
1616
private lateinit var exchange: String
1717
private lateinit var connectionFactory: ConnectionFactory
18+
private var deliveryMode: Int = 1
1819

1920
companion object {
2021
const val PROVIDER_ID = "webhook-amqp"
@@ -25,23 +26,23 @@ class AmqpWebhookHandler : WebhookHandler {
2526
@JvmStatic
2627
private val logger = LoggerFactory.getLogger(AmqpWebhookHandler::class.java)
2728

28-
@JvmStatic
29-
private fun getMessageProps(className: String): BasicProperties {
30-
val headers: MutableMap<String, Any> = HashMap()
31-
headers["__TypeId__"] = className
32-
return BasicProperties.Builder()
33-
.appId("Keycloak/Kotlin")
34-
.headers(headers)
35-
.contentType(MediaType.APPLICATION_JSON)
36-
.contentEncoding("UTF-8")
37-
.build()
38-
}
39-
4029
@JvmStatic
4130
private fun genRoutingKey(request: WebhookPayload): String =
4231
"KC_CLIENT.${request.realmId}.${request.clientId ?: "xxx"}.${request.userId ?: "xxx"}.${request.type}"
4332
}
4433

34+
private fun getMessageProps(className: String): BasicProperties {
35+
val headers: MutableMap<String, Any> = HashMap()
36+
headers["__TypeId__"] = className
37+
return BasicProperties.Builder()
38+
.appId("Keycloak/Kotlin")
39+
.headers(headers)
40+
.contentType(MediaType.APPLICATION_JSON)
41+
.contentEncoding("UTF-8")
42+
.deliveryMode(this.deliveryMode)
43+
.build()
44+
}
45+
4546
/**
4647
* Ensures that the connection and channel are open.
4748
* If either is closed, it will try to reinitialize them up to 3 times.
@@ -122,6 +123,7 @@ class AmqpWebhookHandler : WebhookHandler {
122123
val amqp = AmqpConfig.fromEnv()
123124

124125
exchange = amqp.exchange
126+
deliveryMode = amqp.deliveryMode
125127

126128
if (this::connection.isInitialized && this::channel.isInitialized && connection.isOpen && channel.isOpen) {
127129
logger.debug("Connection is already open")

keycloak-webhook-provider-amqp/src/main/kotlin/models/AmqpConfig.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ data class AmqpConfig(
1010
val vHost: String?,
1111
val ssl: Boolean,
1212
val exchange: String,
13+
val deliveryMode: Int
1314
) {
1415
companion object {
1516
fun fromEnv(): AmqpConfig = AmqpConfig(
@@ -19,7 +20,8 @@ data class AmqpConfig(
1920
port = amqpPortKey.cff(),
2021
vHost = amqpVHostKey.cf(),
2122
ssl = amqpSsl.bf(),
22-
exchange = amqpExchangeKey.cff()
23+
exchange = amqpExchangeKey.cff(),
24+
deliveryMode = amqpDeliveryMode.cfe(fun() = "1").toInt()
2325
)
2426
}
2527
}

keycloak-webhook-provider-core/src/main/kotlin/helper/Config.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const val amqpPortKey = "WEBHOOK_AMQP_PORT"
1313
const val amqpVHostKey = "WEBHOOK_AMQP_VHOST"
1414
const val amqpExchangeKey = "WEBHOOK_AMQP_EXCHANGE"
1515
const val amqpSsl = "WEBHOOK_AMQP_SSL"
16+
const val amqpDeliveryMode = "WEBHOOK_AMQP_DELIVERY_MODE"
1617

1718
const val syslogProtocol = "WEBHOOK_SYSLOG_PROTOCOL"
1819
const val syslogHostname = "WEBHOOK_SYSLOG_HOSTNAME"

0 commit comments

Comments
 (0)