Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.github.damir.denis.tudor.ktor.server.rabbitmq.delegator.StateRegistry.
import io.github.damir.denis.tudor.ktor.server.rabbitmq.delegator.StateRegistry.verify
import io.github.damir.denis.tudor.ktor.server.rabbitmq.dsl.RabbitDslMarker
import io.github.damir.denis.tudor.ktor.server.rabbitmq.rabbitMQ
import io.ktor.util.logging.*
import io.ktor.utils.io.*
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
Expand All @@ -24,6 +25,8 @@ class BasicConsumeBuilder(
val connectionManager: ConnectionManager,
private val channel: Channel,
) {
val defaultLogger = KtorSimpleLogger(this.javaClass.name)

var noLocal: Boolean by Delegator()
var exclusive: Boolean by Delegator()
var arguments: Map<String, Any> by Delegator()
Expand All @@ -40,7 +43,15 @@ class BasicConsumeBuilder(
var coroutinePollSize: Int = 1

@InternalAPI
var receiverChannel = kotlinx.coroutines.channels.Channel<Pair<Long, String>>(
var receiverChannel = kotlinx.coroutines.channels.Channel<Pair<Long, ByteArray>>(
connectionManager.configuration.consumerChannelCoroutineSize
)

@InternalAPI
var failureCallbackDefined = false

@InternalAPI
var receiverFailChannel = kotlinx.coroutines.channels.Channel<Pair<Long, ByteArray>>(
connectionManager.configuration.consumerChannelCoroutineSize
)

Expand All @@ -50,7 +61,7 @@ class BasicConsumeBuilder(
arguments = emptyMap()
deliverCallback = DeliverCallback { _, delivery ->
receiverChannel.trySendBlocking(
delivery.envelope.deliveryTag to delivery.body.toString(Charsets.UTF_8)
delivery.envelope.deliveryTag to delivery.body
)
}
cancelCallback = CancelCallback { }
Expand All @@ -61,13 +72,37 @@ class BasicConsumeBuilder(
inline fun <reified T> deliverCallback(crossinline callback: suspend (tag: Long, message: T) -> Unit) {
repeat(coroutinePollSize) {
connectionManager.coroutineScope.launch(dispatcher) {
receiverChannel.consumeAsFlow().collect { (deliveryTag, message) ->
callback(deliveryTag, Json.decodeFromString<T>(message))
receiverChannel.consumeAsFlow().collect { (deliveryTag, messageBytes) ->
runCatching {
val message: T = when (T::class) {
String::class -> String(messageBytes) as T
ByteArray::class -> messageBytes as T

else -> Json.decodeFromString<T>(String(messageBytes))
}

callback(deliveryTag, message)
}.onFailure { error ->
defaultLogger.error(error)
if (failureCallbackDefined){
receiverFailChannel.trySendBlocking(deliveryTag to messageBytes)
}
}
}
}
}
}

@RabbitDslMarker
fun deliverFailureCallback(callback: suspend (tag: Long, message: ByteArray) -> Unit) {
failureCallbackDefined = true
connectionManager.coroutineScope.launch(dispatcher) {
receiverFailChannel.consumeAsFlow().collect { (deliveryTag, messageBytes) ->
callback(deliveryTag, messageBytes)
}
}
}

@RabbitDslMarker
fun cancelCallback(callback: (tag: String) -> Unit) {
cancelCallback = CancelCallback { consumerTag ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ object StateRegistry {

currentRef::class.memberProperties.map {
val state = states.get()[currentRef.javaClass.name to it.name]
val initialized = (state is State.Initialized)
val value = if (initialized) state.value else "Uninitialized"
val value = if ((state is State.Initialized)) state.value else "Uninitialized"
logger.get().error("<{}> <{}>, value: <{}>", ref.get()?.javaClass?.simpleName, it.name, value)
}
}
Expand Down
Loading
Loading