A non-blocking Kafka client library for Kotlin with first-class coroutine support
Features โข Getting Started โข Examples โข Documentation โข Contributing
Quafka is a non-blocking Kafka client library built on top of the Apache Kafka Java client, designed for Kotlin with first-class coroutine support. It separates polling from processing, preserves per-partition ordering, and provides flexible single and batch message handling, backpressure control, and a fluent, type-safe configuration DSL.
Quafka also ships with an extensions module that adds higher-level capabilities like retry orchestration, delayed processing, and a middleware pipeline for consumer logic.
-
๐ Non-Blocking & Coroutine-First
- Fully supports Kotlin coroutines, enabling highly efficient, asynchronous operations
- Ensures your Kafka client remains responsive and scalable under heavy loads without thread management complexity
-
๐ Separated Polling and Processing Model
- Decoupled threading model where polling and processing happen independently
- Prevents consumer group rebalancing due to slow processing
- Ensures consistent throughput even for long-running tasks
-
โก Worker per Topic and Partition
- Dedicated workers for each topic-partition pair
- Sequential processing within a partition for order preservation
- Maximum parallelism across partitions for optimal resource utilization
-
๐ฆ Batch and Single Message Handling
- Seamless support for both batch and individual message processing
- Flexible callbacks to choose the most efficient approach
- Reduced boilerplate code
-
๐๏ธ Backpressure Management
- Advanced backpressure controls that dynamically adjust message fetching
- Prevents memory overload and ensures graceful degradation
- Configurable using message count, timeouts, or both
-
๐ง Fluent and Safe Configuration
- Developer-friendly configuration DSL
- Type-safe defaults and validations
- Step-by-step guidance for setup
-
๐ค Batch Publishing
- Efficiently publish multiple messages in a single operation
- Reduced network overhead for improved performance
- Ideal for high-throughput scenarios
-
๐ก๏ธ Error Handling
- Robust error handling with automatic retries
- Error callbacks and configurable failure strategies
- Graceful recovery from unexpected issues
| Feature | Quafka | Apache Kafka Java Client |
|---|---|---|
| Coroutine Support | โ Native coroutine APIs | โ Blocking/callback-based |
| Boilerplate | โ Minimal with builders | โ Verbose configuration |
| Poll/Process Separation | โ Built-in decoupling | โ Manual implementation |
| Backpressure | โ Simple configuration | โ Manual implementation |
| Retry Mechanism | โ Built-in with extensions | โ Custom implementation needed |
| Feature | Quafka | Spring Kafka |
|---|---|---|
| API Style | โ Lightweight Kotlin API | |
| Threading Model | โ Coroutine-based | |
| Control | โ Explicit per-topic DSL | |
| Framework Coupling | โ None required | โ Requires Spring |
| Extensions | โ Opt-in middleware |
Choose Quafka when you want a coroutine-first, lean, and highly controllable Kafka client with strong defaults and optional high-level extensions.
Add the following dependency to your build.gradle.kts:
dependencies {
implementation("com.trendyol:quafka:0.2.0")
}For extensions (retry, pipelines, etc.):
dependencies {
implementation("com.trendyol:quafka-extensions:0.2.0")
}The fastest way to explore Quafka is through our interactive console application:
cd examples/console
export Q_SERVERS="localhost:9092"
./gradlew runThe console app provides a menu-driven interface to try all producer and consumer patterns. Perfect for learning and testing!
See the Console Demo README for details.
import com.trendyol.quafka.consumer.configuration.*
import org.apache.kafka.common.serialization.StringDeserializer
val props = mapOf(
"bootstrap.servers" to "localhost:9092",
"group.id" to "example-group"
)
val consumer = QuafkaConsumerBuilder<String, String>(props)
.withDeserializer(StringDeserializer(), StringDeserializer())
.subscribe("example-topic") {
withSingleMessageHandler { message, context ->
println("Received: ${message.value}")
message.ack()
}
}
.build()
consumer.start()import com.trendyol.quafka.producer.*
import com.trendyol.quafka.producer.configuration.QuafkaProducerBuilder
import org.apache.kafka.common.serialization.StringSerializer
val props = mapOf("bootstrap.servers" to "localhost:9092")
val producer = QuafkaProducerBuilder<String?, String?>(props)
.withSerializer(StringSerializer(), StringSerializer())
.build()
val result = producer.send(
OutgoingMessage.create(
topic = "test-topic",
key = "key",
value = "Hello, Quafka!"
)
)- Single Consumer Examples - Comprehensive guide for single message consumers
- Basic Single Consumer - Simple message processing
- Consumer with Backpressure - Rate limiting and flow control
- Pipeline Consumer - Middleware-based processing
- Retryable Consumer - Advanced error handling and retry
- Custom Middleware - Creating reusable middleware
- Batch Consumer Examples - Comprehensive guide for batch message consumers
- Basic Batch Consumer - Simple batch processing
- Pipelined Batch Consumer - Middleware for batches
- Advanced Pipelined Batch Consumer - Complex workflows
- Batch with Single Message Pipeline - Hybrid approach
- Advanced Batch with Single Message Pipeline - Context sharing
- Parallel Batch Processing - Concurrent processing
- Flexible Batch Processing - Configurable modes
- Concurrent with Attributes - Parallel + shared state
- Custom Batch Middleware - Reusable components
- Producer Examples - Comprehensive guide for producers
- Basic Producer - Simple message publishing
- Batch Producer - Bulk message operations
- JSON Producer with Extensions - Type-safe JSON serialization
- Producer with Error Handling - Robust error handling and DLQ
- Producer with Custom Headers - Metadata and tracing
- Best Practices - Production-ready patterns
import kotlin.time.Duration.Companion.minutes
val consumer = QuafkaConsumerBuilder<String, String>(props)
.withDeserializer(StringDeserializer(), StringDeserializer())
.subscribe("example-topic") {
withSingleMessageHandler { msg, ctx ->
// Slow processing work
processMessage(msg)
msg.ack()
}.withBackpressure(
backpressureBufferSize = 10_000,
backpressureReleaseTimeout = 1.minutes
)
}
.build()val messages = (1..100).map { index ->
OutgoingMessage.create(
topic = "test-topic",
key = "key-$index",
value = "value-$index"
)
}
val results: Collection<DeliveryResult> = producer.sendAll(messages)Quafka-Extensions is a companion module that provides advanced, high-level features:
- Built-in support for JSON and common formats
- Extensible design for custom serialization strategies
- Type-safe serialization leveraging Kotlin's type system
- Schedule message processing with fine-grained delay control
- Fixed and dynamic delay strategies
- Perfect for delayed retries and scheduled workflows
- In-Memory Retry: Lightweight retry for transient failures
- Multi-Topic Retry: Forward to dedicated retry topics with configurable backoff
- Flexible Policies: Exponential backoff, max attempts, custom retry logic
Middleware-style architecture for composable message processing:
import com.trendyol.quafka.extensions.consumer.single.pipelines.PipelineMessageHandler.Companion.usePipelineMessageHandler
val consumer = QuafkaConsumerBuilder<String, String>(props)
.withDeserializer(StringDeserializer(), StringDeserializer())
.subscribe("example-topic") {
usePipelineMessageHandler {
// Logging middleware
use { envelope, next ->
logger.info("Processing: ${envelope.message.offset}")
next()
}
// Validation middleware
use { envelope, next ->
if (isValid(envelope.message.value)) {
next()
} else {
logger.warn("Invalid message")
}
}
// Business logic
use { envelope, next ->
processMessage(envelope.message.value)
envelope.message.ack()
next()
}
}
}
.build()Available Pipeline Steps:
- TracingPipelineStep: Distributed tracing integration
- LoggingPipelineStep: Detailed logging for each stage
- ErrorHandlerPipelineStep: Centralized error management
- DeserializationPipelineStep: Type-safe deserialization
- Custom Middleware: Create your own reusable steps
- Fundamentals - Core concepts and architecture
- Message Processing - How messages are processed internally
- Single Consumer Examples - Detailed single message patterns
- Batch Consumer Examples - Detailed batch processing patterns
- Producer Examples - Producer patterns and best practices
- Pipeline Architecture - Batch pipeline middleware guide
- Console Demo Application - Interactive CLI for testing all Quafka patterns
- Producer examples (Basic, Batch, JSON, Headers)
- Single consumer examples (Basic, Backpressure, Pipeline, Retry)
- Batch consumer examples (8 different patterns)
- Easy-to-use menu-driven interface
Warning
While Quafka is production-ready and extensively used at Trendyol, the API is not yet fully stabilized. Breaking changes may occur in minor releases, but migration guides will always be provided.
Note
Report any issue or bug in the GitHub repository.
Contributions are welcome! We appreciate:
- ๐ Bug reports - Help us identify and fix issues
- ๐ก Feature requests - Share your ideas for improvements
- ๐ Documentation improvements - Help make our docs better
- ๐ Code contributions - Submit pull requests
Please read our Contributing Guidelines before submitting a pull request.
Quafka is licensed under the Apache License 2.0. See the LICENSE file for details.
Made with โค๏ธ by Trendyol
