Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions .github/workflows/pull-request-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ jobs:
steps:
- name: Checkout sources
uses: actions/checkout@v4

- name: Setup Gradle
uses: gradle/actions/setup-gradle@v3

- name: Run Java Testing
run: |
./gradlew ktor-server-rabbitmq-java:allTests
- name: Run all tests and generate coverage reports
run: ./gradlew allTests koverXmlReport
- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
72 changes: 61 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
![Deployment Status](https://github.com/DamirDenis-Tudor/ktor-server-rabbitmq/actions/workflows/deployment.yml/badge.svg) ![Pull Request Checks](https://github.com/DamirDenis-Tudor/ktor-server-rabbitmq/actions/workflows/pull-request-checks.yml/badge.svg)

### Overview
- This plugin provides access to major core functionalities of the `com.rabbitmq:amqp-client` library.

- This plugin provides access to major core functionalities of the `com.rabbitmq:amqp-client` and
`dev.kourier:amqp-client` libraries.

### Features

Expand All @@ -20,20 +22,59 @@

### Table of Contents

1. [Installation](#installation)
2. [Queue Binding Example](#queue-binding-example)
3. [Producer Example](#producer-example)
4. [Consumer Example](#consumer-example)
5. [Advanced Consumer Example](#consumer-example-with-coroutinepollsize)
6. [Library Calls Example](#library-calls-example)
7. [Custom Coroutine Scope Example](#custom-coroutine-scope-example)
8. [Serialization Fallback Example](#serialization-fallback-example)
9. [Dead Letter Queue Example](#dead-letter-queue-example)
10. [Logging](#logging)
1. [Choose a distribution](#choose-a-distribution)
2. [Installation](#installation)
3. [Queue Binding Example](#queue-binding-example)
4. [Producer Example](#producer-example)
5. [Consumer Example](#consumer-example)
6. [Advanced Consumer Example](#consumer-example-with-coroutinepollsize)
7. [Library Calls Example](#library-calls-example)
8. [Custom Coroutine Scope Example](#custom-coroutine-scope-example)
9. [Serialization Fallback Example](#serialization-fallback-example)
10. [Dead Letter Queue Example](#dead-letter-queue-example)
11. [Logging](#logging)

## Choose a distribution

This library is available in multiple distributions. Choose the one that best fits your needs:

### Default Distribution

This distribution is an alias to the Java Client distribution for the JVM platform, and to the Kourier Client
distribution for Kotlin Native platforms.

```kotlin
dependencies {
implementation("io.github.damirdenis-tudor:ktor-server-rabbitmq:<version>")
}
```

### Java Client Distribution

This distribution uses the [official RabbitMQ Java client library](https://github.com/rabbitmq/rabbitmq-java-client)
(`com.rabbitmq:amqp-client`) under the hood, and is available only for the JVM platform.

```kotlin
dependencies {
implementation("io.github.damirdenis-tudor:ktor-server-rabbitmq-java:<version>")
}
```

### Kourier (Pure Kotlin & Kotlin Multiplatform) Client Distribution

This distribution uses the [pure Kotlin Kourier client library](https://github.com/guimauvedigital/kourier)
(`dev.kourier:amqp-client`) under the hood, and is available for both JVM and Kotlin Native platforms.

```kotlin
dependencies {
implementation("io.github.damirdenis-tudor:ktor-server-rabbitmq-kourier:<version>")
}
```

## Usage

### Installation

```kotlin
install(RabbitMQ) {
uri = "amqp://<user>:<password>@<address>:<port>"
Expand All @@ -46,6 +87,7 @@ install(RabbitMQ) {
```

### Queue binding example

```kotlin
rabbitmq {
queueBind {
Expand All @@ -65,6 +107,7 @@ rabbitmq {
```

### Producer example

```kotlin
rabbitmq {
repeat(10) {
Expand All @@ -78,6 +121,7 @@ rabbitmq {
```

### Consumer Example

```kotlin
rabbitmq {
basicConsume {
Expand All @@ -91,6 +135,7 @@ rabbitmq {
```

### Consumer Example with coroutinePollSize

```kotlin
rabbitmq {
connection(id = "consume") {
Expand All @@ -109,6 +154,7 @@ rabbitmq {
```

### Consumer Example with coroutinePollSize

```kotlin
rabbitmq {
connection(id = "consume") {
Expand All @@ -127,6 +173,7 @@ rabbitmq {
```

### Library Calls Example

```kotlin
rabbitmq {
libChannel(id = 2) {
Expand Down Expand Up @@ -170,7 +217,9 @@ rabbitmq {
}
}
```

### Custom Coroutine Scope Example

```kotlin
val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
println("ExceptionHandler got $throwable")
Expand Down Expand Up @@ -354,5 +403,6 @@ fun Application.module() {
- In order to set a logging level to this library add this line in `logback.xml` file:

```xml

<logger name="io.github.damir.denis.tudor.ktor.server.rabbitmq" level="DEBUG"/>
```
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
plugins {
alias(libs.plugins.multiplatform) apply false
alias(libs.plugins.maven) apply false
alias(libs.plugins.jreleaser)
}

Expand Down
26 changes: 25 additions & 1 deletion ktor-server-rabbitmq-api/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,39 @@ mavenPublishing {
}

kotlin {
// Tiers are in accordance with <https://kotlinlang.org/docs/native-target-support.html>
// Tier 1
macosX64()
macosArm64()
iosSimulatorArm64()
iosX64()

// Tier 2
linuxX64()
linuxArm64()
watchosSimulatorArm64()
watchosX64()
watchosArm32()
watchosArm64()
tvosSimulatorArm64()
tvosX64()
tvosArm64()
iosArm64()

// Tier 3
mingwX64()
watchosDeviceArm64()

// jvm & js
jvmToolchain(17)
jvm()

applyDefaultHierarchyTemplate()
sourceSets {
val commonMain by getting {
dependencies {
api(kotlin("reflect"))
api(libs.ktor.server.core)
api(libs.ktor.server.netty)
api(libs.kotlinx.coroutines)
api(libs.kotlinx.serialization.json)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class BasicConsumeBuilder(
val connectionManager: ConnectionManager,
private val channel: Channel,
) {
val defaultLogger = KtorSimpleLogger(this.javaClass.name)
val defaultLogger = KtorSimpleLogger(this::class.qualifiedName!!)

var noLocal: Boolean by Delegator()
var exclusive: Boolean by Delegator()
Expand Down Expand Up @@ -75,10 +75,10 @@ class BasicConsumeBuilder(
receiverChannel.consumeAsFlow().collect { (consumerTag, delivery) ->
runCatching {
when (T::class) {
String::class -> String(delivery.body) as T
String::class -> delivery.body.decodeToString() as T
ByteArray::class -> delivery.body as T

else -> Json.decodeFromString<T>(String(delivery.body))
else -> Json.decodeFromString<T>(delivery.body.decodeToString())
}
}.onFailure { error ->
defaultLogger.error(error)
Expand All @@ -100,10 +100,10 @@ class BasicConsumeBuilder(
receiverChannel.consumeAsFlow().collect { (consumerTag, delivery) ->
runCatching {
when (T::class) {
String::class -> String(delivery.body) as T
String::class -> delivery.body.decodeToString() as T
ByteArray::class -> delivery.body as T

else -> Json.decodeFromString<T>(String(delivery.body))
else -> Json.decodeFromString<T>(delivery.body.decodeToString())
}
}.onFailure { error ->
defaultLogger.error(error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import io.github.damir.denis.tudor.ktor.server.rabbitmq.delegator.StateRegistry.
import io.github.damir.denis.tudor.ktor.server.rabbitmq.dsl.RabbitDslMarker
import io.github.damir.denis.tudor.ktor.server.rabbitmq.model.Channel
import io.github.damir.denis.tudor.ktor.server.rabbitmq.model.Properties
import io.ktor.utils.io.charsets.*
import io.ktor.utils.io.core.*
import kotlinx.serialization.json.Json

@RabbitDslMarker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,135 @@ package io.github.damir.denis.tudor.ktor.server.rabbitmq.connection

import io.github.damir.denis.tudor.ktor.server.rabbitmq.model.Channel
import io.github.damir.denis.tudor.ktor.server.rabbitmq.model.Connection
import io.ktor.util.collections.*
import io.ktor.util.logging.*
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExecutorCoroutineDispatcher
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

abstract class ConnectionManager {

abstract val dispatcher: ExecutorCoroutineDispatcher
protected val logger = KtorSimpleLogger(this::class.simpleName ?: "ConnectionManager")

abstract val dispatcher: CoroutineDispatcher
abstract val coroutineScope: CoroutineScope
abstract val configuration: ConnectionConfig

abstract fun getConnectionId(connection: Connection): String
protected val channelCache = ConcurrentMap<String, Channel>()
protected val connectionCache = ConcurrentMap<String, Connection>()

protected val connectionMutex = Mutex()
protected val channelMutex = Mutex()

/**
* Retries a block of code a specified number of times with delays between attempts.
*
* @param block The block of code to execute. If it succeeds, the result is returned.
* @return The result of the block if it succeeds.
* @throws IllegalStateException if the block fails after the maximum number of retries.
*/
abstract suspend fun <T> retry(block: suspend () -> T): T

/**
* Retrieves the ID of a connection from the cache.
*
* @param connection The RabbitMQ connection to identify.
* @return The associated ID or the default connection name if not found.
*/
fun getConnectionId(connection: Connection): String =
connectionCache.entries.find { it.value == connection }?.key ?: configuration.defaultConnectionName

/**
* Retrieves or creates a RabbitMQ connection by its ID.
*
* If the connection is not found in the cache or is closed, a new connection is created.
*
* @param id the ID of the connection to retrieve. Defaults to the default connection name.
* @return the RabbitMQ connection.
*/
abstract suspend fun getConnection(id: String = configuration.defaultConnectionName): Connection
abstract suspend fun closeConnection(connectionId: String)

abstract suspend fun getChannel(
/**
* Closes and removes a RabbitMQ connection by its ID.
*
* @param connectionId the ID of the connection to close.
*/
suspend fun closeConnection(connectionId: String) = connectionMutex.withLock {
connectionCache[connectionId]?.close()
connectionCache.remove(connectionId)

logger.debug("Connection with id: <$connectionId>, closed")
}

/**
* Generates a unique key for identifying a channel within the channel cache.
*
* @param connectionId the ID of the connection.
* @param channelId the ID of the channel.
* @return a unique string key for the channel.
*/
protected fun getChannelKey(connectionId: String, channelId: Int): String =
"$connectionId-channel-$channelId"

/**
* Retrieves or creates a RabbitMQ channel by its ID.
*
* If the channel is not found in the cache or is closed, a new one is created.
*
* @param channelId the ID of the channel to retrieve. Defaults to 1.
* @param connectionId the ID of the connection to use. Defaults to the default connection name.
* @return the RabbitMQ channel.
*/
suspend fun getChannel(
channelId: Int = 1,
connectionId: String = configuration.defaultConnectionName,
): Channel
): Channel = channelMutex.withLock {
retry {
val id = getChannelKey(connectionId, channelId)

if (channelCache.containsKey(id)) logger.debug("Channel with id: <$id> will be taken from cache.")

val channel = channelCache.getOrPut(id) {
logger.debug("Creating new channel with id <$channelId> for connection with id <$connectionId>.")
getConnection(connectionId).createChannel()
?: error("Could not allocate this channel id <$channelId>. ")
}

if (!channel.isOpen) {
channelCache.remove(id)
error("Channel <$channelId> is not open. ${channel.closeReason}")
}

return@retry channel
}
}

/**
* Closes and removes a RabbitMQ channel by its ID.
*
* @param channelId the ID of the channel to close.
* @param connectionId the ID of the associated connection.
*/
suspend fun closeChannel(
channelId: Int = 1,
connectionId: String = configuration.defaultConnectionName,
) = channelMutex.withLock {
val id = getChannelKey(connectionId, channelId)

channelCache[id]?.close()
channelCache.remove(id)

abstract suspend fun closeChannel(channelId: Int = 1, connectionId: String = configuration.defaultConnectionName)
logger.debug("Channel with id: <$channelId> for connection with id <$connectionId>, closed")
}

abstract suspend fun close()
/**
* Closes all active RabbitMQ connections.
*
* This method iterates through all connections in the connection cache and closes each one.
*/
suspend fun close() = connectionMutex.withLock {
connectionCache.values.forEach { connection -> connection.close() }
}

}
Loading
Loading