Skip to content

Commit ad15083

Browse files
committed
improves logging and api surface
1 parent dbb7c2a commit ad15083

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+319
-244
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ Quafka-Extensions is a companion module designed to extend Quafka with advanced,
9595
Add the following dependency to your `build.gradle.kts`:
9696
```kotlin
9797
dependencies {
98-
implementation("com.trendyol:quafka:0.1.0")
98+
implementation("com.trendyol:quafka:0.1.1")
9999
}
100100
```
101101

@@ -203,7 +203,7 @@ Contributions are welcome! Whether it's:
203203

204204

205205
## License
206-
Quafka is licensed under the MIT License. See the [LICENSE](LICENSE) file for details.
206+
Quafka is licensed under the Apache License. See the [LICENSE](LICENSE) file for details.
207207

208208

209209

build.gradle.kts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,6 @@ plugins {
1414
group = "com.trendyol"
1515
version = CI.version(project)
1616

17-
allprojects {
18-
extra.set("dokka.outputDirectory", rootDir.resolve("docs"))
19-
}
20-
2117
kover {
2218
reports {
2319
filters {
@@ -29,6 +25,7 @@ kover {
2925
}
3026
}
3127
}
28+
3229
val koverProjects = subprojects.of("lib")
3330
dependencies {
3431
koverProjects.forEach {
@@ -37,6 +34,7 @@ dependencies {
3734
}
3835

3936
subprojects.of("lib", "examples") {
37+
val p = this
4038
apply {
4139
plugin("kotlin")
4240
plugin(
@@ -124,6 +122,7 @@ val publishedProjects = listOf(
124122
)
125123

126124
subprojects.of("lib", filter = { p -> publishedProjects.contains(p.name) }) {
125+
val p = this
127126
println("publishing $name")
128127
apply {
129128
plugin("java")
@@ -146,6 +145,7 @@ subprojects.of("lib", filter = { p -> publishedProjects.contains(p.name) }) {
146145
}
147146
}
148147
}
148+
149149
mavenPublishing {
150150
publishToMavenCentral()
151151
coordinates(groupId = rootProject.group.toString(), artifactId = project.name, version = rootProject.version.toString())

gradle.properties

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,6 @@ projectUrl=https://github.com/Trendyol/quafka
99
licenceUrl=https://github.com/Trendyol/quafka/blob/master/LICENCE
1010
licence=Apache-2.0 license
1111
snapshot=1.0.0
12-
version=0.1.0
13-
org.jetbrains.dokka.experimental.gradle.pluginMode=V2EnabledWithHelpers
12+
version=0.1.1
13+
org.jetbrains.dokka.experimental.gradle.pluginMode=V2Enabled
14+
org.jetbrains.dokka.experimental.gradle.pluginMode.noWarn=true

gradle/libs.versions.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ slf4j = "2.0.17"
1111
kafka = "3.9.1"
1212
logback-classic= "1.5.18"
1313
spotless = "7.2.1"
14-
dokka = "2.0.0"
14+
dokka = "2.1.0"
1515
kover = "0.9.1"
1616
kotest = "5.9.1"
1717
embedded-kafka = "3.9.1"
@@ -77,3 +77,4 @@ kover = { id = "org.jetbrains.kotlinx.kover", version.ref = "kover" }
7777
testLogger = { id = "com.adarshr.test-logger", version = "4.0.0" }
7878
spotless = { id = "com.diffplug.spotless", version.ref = "spotless" }
7979
maven-publish = { id = "com.vanniktech.maven.publish", version = "0.34.0" }
80+
dokka = { id = "org.jetbrains.dokka", version.ref = "dokka" }

lib/quafka-extensions/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
12
dependencies {
23
api(project(":lib:quafka"))
34
api(libs.jackson.core)
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
package com.trendyol.quafka.extensions.common
22

33
import com.trendyol.quafka.common.QuafkaException
4-
import org.apache.kafka.common.TopicPartition
4+
import com.trendyol.quafka.consumer.TopicPartitionOffset
55

66
class TopicPartitionProcessException(
7-
val topicPartition: TopicPartition,
8-
val offset: Long,
7+
val topicPartitionOffset: TopicPartitionOffset,
98
message: String,
109
cause: Throwable? = null
1110
) : QuafkaException(message, cause)

lib/quafka-extensions/src/main/kotlin/com/trendyol/quafka/extensions/delaying/MessageDelayer.kt

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import com.trendyol.quafka.common.HeaderParsers.asString
77
import com.trendyol.quafka.common.HeaderParsers.key
88
import com.trendyol.quafka.consumer.*
99
import com.trendyol.quafka.extensions.common.toEnum
10-
import com.trendyol.quafka.extensions.delaying.DelayHeaders.withDelay
1110
import com.trendyol.quafka.extensions.delaying.DelayStrategy.*
1211
import com.trendyol.quafka.logging.*
1312
import com.trendyol.quafka.producer.OutgoingMessage
@@ -155,13 +154,15 @@ open class MessageDelayer(
155154
consumerContext: ConsumerContext,
156155
duration: Duration
157156
) {
158-
logger
159-
.atTrace()
160-
.enrichWithConsumerContext(consumerContext)
161-
.log(
162-
"message will be delayed ${duration.inWholeMilliseconds} ms | {}",
163-
message.toString(Level.TRACE)
164-
)
157+
if (logger.isTraceEnabled) {
158+
logger
159+
.atTrace()
160+
.enrichWithConsumerContext(consumerContext)
161+
.log(
162+
"message will be delayed ${duration.inWholeMilliseconds} ms | {}",
163+
message.toString(Level.TRACE)
164+
)
165+
}
165166
delay(duration)
166167
}
167168

lib/quafka-extensions/src/main/kotlin/com/trendyol/quafka/extensions/errorHandling/recoverer/FailedMessageRouter.kt

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -443,13 +443,15 @@ class FailedMessageRouter<TKey, TValue>(
443443
if (nextOverall > strategy.maxTotalRetryAttempts) {
444444
return RetryOutcome.Error
445445
}
446-
logger
447-
.atDebug()
448-
.enrichWithConsumerContext(consumerContext)
449-
.log(
450-
"Backoff delay | $nextOverall = $calculatedDelay | message = {} ",
451-
incomingMessage.toString(Level.DEBUG, addValue = false, addHeaders = true)
452-
)
446+
if (logger.isDebugEnabled) {
447+
logger
448+
.atDebug()
449+
.enrichWithConsumerContext(consumerContext)
450+
.log(
451+
"Backoff delay | $nextOverall = $calculatedDelay | message = {} ",
452+
incomingMessage.toString(Level.DEBUG, addValue = false, addHeaders = true)
453+
)
454+
}
453455

454456
val bucket = strategy.findBucket(calculatedDelay)
455457
if (bucket != null) {
@@ -476,13 +478,15 @@ class FailedMessageRouter<TKey, TValue>(
476478
if (nextOverall > strategy.maxTotalRetryAttempts) {
477479
return RetryOutcome.Error
478480
}
479-
logger
480-
.atDebug()
481-
.enrichWithConsumerContext(consumerContext)
482-
.log(
483-
"Backoff delay | $nextOverall = $calculatedDelay | message = {} ",
484-
incomingMessage.toString(Level.DEBUG, addValue = false, addHeaders = true)
485-
)
481+
if (logger.isDebugEnabled) {
482+
logger
483+
.atDebug()
484+
.enrichWithConsumerContext(consumerContext)
485+
.log(
486+
"Backoff delay | $nextOverall = $calculatedDelay | message = {} ",
487+
incomingMessage.toString(Level.DEBUG, addValue = false, addHeaders = true)
488+
)
489+
}
486490

487491
val bucket = strategy.findBucket(calculatedDelay)
488492
if (bucket != null) {
@@ -495,13 +499,15 @@ class FailedMessageRouter<TKey, TValue>(
495499
retryIdentifier = policyIdentifier
496500
)
497501
} else {
498-
logger
499-
.atDebug()
500-
.enrichWithConsumerContext(consumerContext)
501-
.log(
502-
"No retry bucket for delay | calculated delay = $calculatedDelay | message = {} ",
503-
incomingMessage.toString(Level.DEBUG, addValue = false, addHeaders = true)
504-
)
502+
if (logger.isDebugEnabled) {
503+
logger
504+
.atDebug()
505+
.enrichWithConsumerContext(consumerContext)
506+
.log(
507+
"No retry bucket for delay | calculated delay = $calculatedDelay | message = {} ",
508+
incomingMessage.toString(Level.DEBUG, addValue = false, addHeaders = true)
509+
)
510+
}
505511
RetryOutcome.Error
506512
}
507513
}

lib/quafka-extensions/src/main/kotlin/com/trendyol/quafka/extensions/errorHandling/recoverer/RecoverableMessageExecutor.kt

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,15 @@ class RecoverableMessageExecutor<TKey, TValue>(
8181
) {
8282
val identifierInHeader = incomingMessage.headers.getRetryIdentifier()
8383
if (identifierInHeader == identifier) {
84-
logger
85-
.atDebug()
86-
.enrichWithConsumerContext(consumerContext)
87-
.log(
88-
"Skipping in-memory retry for identifier '$identifier', already in non-blocking retry loop. | message = {} ",
89-
incomingMessage.toString(Level.DEBUG, addValue = false, addHeaders = true)
90-
)
84+
if (logger.isDebugEnabled) {
85+
logger
86+
.atDebug()
87+
.enrichWithConsumerContext(consumerContext)
88+
.log(
89+
"Skipping in-memory retry for identifier '$identifier', already in non-blocking retry loop. | message = {} ",
90+
incomingMessage.toString(Level.DEBUG, addValue = false, addHeaders = true)
91+
)
92+
}
9193
throw exception
9294
}
9395

lib/quafka-extensions/src/main/kotlin/com/trendyol/quafka/extensions/serialization/MessageSerde.kt

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
package com.trendyol.quafka.extensions.serialization
22

3-
import com.trendyol.quafka.consumer.IncomingMessage
3+
import com.trendyol.quafka.consumer.*
44
import com.trendyol.quafka.extensions.common.TopicPartitionProcessException
5-
import org.apache.kafka.common.TopicPartition
65

76
interface MessageSerde<TKey, TValue> {
87
fun deserializeValue(incomingMessage: IncomingMessage<TKey, TValue>): DeserializationResult
@@ -16,28 +15,25 @@ interface MessageSerde<TKey, TValue> {
1615

1716
sealed class DeserializationResult {
1817
data class Error(
19-
val topicPartition: TopicPartition,
20-
val offset: Long,
18+
val topicPartitionOffset: TopicPartitionOffset,
2119
val message: String,
2220
val cause: Throwable? = null
2321
) : DeserializationResult() {
2422
override fun toString(): String =
25-
"DeserializationError ( topic: ${topicPartition.topic()} | partition: ${topicPartition.partition()} | offset: $offset | message: $message | cause: $cause )"
23+
"DeserializationError ($topicPartitionOffset | message: $message | cause: $cause )"
2624

2725
fun toException(): TopicPartitionProcessException = TopicPartitionProcessException(
28-
topicPartition = topicPartition,
29-
offset = offset,
26+
topicPartitionOffset = topicPartitionOffset,
3027
message = this.toString(),
3128
cause = cause
3229
)
3330
}
3431

3532
data object Null : DeserializationResult() {
3633
fun toError(
37-
topicPartition: TopicPartition,
38-
offset: Long,
34+
topicPartitionOffset: TopicPartitionOffset,
3935
message: String
40-
) = Error(topicPartition, offset, message)
36+
) = Error(topicPartitionOffset, message)
4137
}
4238

4339
data class Deserialized(

0 commit comments

Comments
 (0)