Skip to content

Commit 47da8a2

Browse files
committed
Fix problem with propagator
1 parent 7d35754 commit 47da8a2

File tree

10 files changed

+195
-123
lines changed

10 files changed

+195
-123
lines changed

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ allprojects {
1212
version = "0.5.0"
1313

1414
repositories {
15-
mavenLocal()
1615
mavenCentral()
16+
mavenLocal()
1717
}
1818
}
1919

db-migrations/build.gradle.kts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,15 @@
11
plugins {
2-
id("java")
2+
id("java-library")
33
id("sb-ot-demo.java-conventions")
4+
id("io.freefair.lombok")
5+
}
6+
7+
dependencies {
8+
implementation(platform(project(":common-internal-bom")))
9+
implementation(platform(libs.spring.boot.v3.dependencies))
10+
11+
implementation("io.micrometer:micrometer-tracing")
12+
implementation("org.apache.kafka:kafka-clients")
13+
implementation("org.slf4j:slf4j-api")
14+
implementation("org.springframework:spring-jdbc")
415
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (c) 2020-2025. Ivan Vakhrushev and others.
3+
* https://github.com/mfvanek/spring-boot-open-telemetry-demo
4+
*
5+
* Licensed under the Apache License 2.0
6+
*/
7+
8+
package io.github.mfvanek.db.migrations.common.saver;
9+
10+
import io.micrometer.tracing.Span;
11+
import io.micrometer.tracing.Tracer;
12+
import lombok.RequiredArgsConstructor;
13+
import lombok.extern.slf4j.Slf4j;
14+
import org.apache.kafka.clients.consumer.ConsumerRecord;
15+
import org.slf4j.MDC;
16+
import org.springframework.jdbc.core.simple.JdbcClient;
17+
18+
import java.time.Clock;
19+
import java.time.LocalDateTime;
20+
import java.util.UUID;
21+
22+
@Slf4j
23+
@RequiredArgsConstructor
24+
public class DbSaver {
25+
26+
private final String tenantName;
27+
private final Tracer tracer;
28+
private final Clock clock;
29+
private final JdbcClient jdbcClient;
30+
31+
public void processSingleRecord(ConsumerRecord<UUID, String> record) {
32+
try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) {
33+
final Span currentSpan = tracer.currentSpan();
34+
final String traceId = currentSpan != null ? currentSpan.context().traceId() : "";
35+
final String spanId = currentSpan != null ? currentSpan.context().spanId() : "";
36+
log.info("Received record: {} with traceId {} spanId {}", record.value(), traceId, spanId);
37+
jdbcClient.sql("""
38+
insert into otel_demo.storage(message, trace_id, span_id, created_at)
39+
values(:msg, :traceId, :currentSpan, :createdAt);""")
40+
.param("msg", record.value())
41+
.param("traceId", traceId)
42+
.param("currentSpan", spanId)
43+
.param("createdAt", LocalDateTime.now(clock))
44+
.update();
45+
}
46+
}
47+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright (c) 2020-2025. Ivan Vakhrushev and others.
3+
* https://github.com/mfvanek/spring-boot-open-telemetry-demo
4+
*
5+
* Licensed under the Apache License 2.0
6+
*/
7+
8+
package io.github.mfvanek.spring.boot3.kotlin.test.config
9+
10+
import io.github.mfvanek.db.migrations.common.saver.DbSaver
11+
import io.micrometer.tracing.Tracer
12+
import org.springframework.beans.factory.annotation.Value
13+
import org.springframework.context.annotation.Bean
14+
import org.springframework.context.annotation.Configuration
15+
import org.springframework.jdbc.core.simple.JdbcClient
16+
import java.time.Clock
17+
18+
@Configuration(proxyBeanMethods = false)
19+
class DbConfig {
20+
21+
@Bean
22+
fun dbSaver(
23+
@Value("\${app.tenant.name}") tenantName: String,
24+
tracer: Tracer,
25+
clock: Clock,
26+
jdbcClient: JdbcClient
27+
): DbSaver {
28+
return DbSaver(tenantName, tracer, clock, jdbcClient)
29+
}
30+
}

spring-boot-3-demo-app-kotlin/src/main/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/service/KafkaReadingService.kt

Lines changed: 16 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -7,52 +7,34 @@
77

88
package io.github.mfvanek.spring.boot3.kotlin.test.service
99

10+
import io.github.mfvanek.db.migrations.common.saver.DbSaver
1011
import io.github.oshai.kotlinlogging.KotlinLogging
11-
import io.github.oshai.kotlinlogging.withLoggingContext
1212
import io.micrometer.tracing.Tracer
1313
import io.micrometer.tracing.propagation.Propagator
1414
import org.apache.kafka.clients.consumer.ConsumerRecord
15-
import org.springframework.beans.factory.annotation.Value
16-
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate
1715
import org.springframework.kafka.annotation.KafkaListener
1816
import org.springframework.kafka.support.Acknowledgment
1917
import org.springframework.stereotype.Service
20-
import java.time.Clock
21-
import java.time.LocalDateTime
18+
import java.nio.charset.StandardCharsets
2219
import java.util.*
2320

2421
private val logger = KotlinLogging.logger {}
2522

23+
internal object KafkaHeadersGetter : Propagator.Getter<ConsumerRecord<UUID, String>> {
24+
override fun get(carrier: ConsumerRecord<UUID, String>, key: String): String? =
25+
carrier.headers()?.lastHeader(key)?.value()?.toString(StandardCharsets.UTF_8)
26+
}
27+
2628
@Service
2729
class KafkaReadingService(
28-
@Value("\${app.tenant.name}") private val tenantName: String,
2930
private val tracer: Tracer,
30-
private val clock: Clock,
31-
private val jdbcTemplate: NamedParameterJdbcTemplate,
32-
private val propagator: Propagator
31+
private val propagator: Propagator,
32+
private val dbSaver: DbSaver
3333
) {
3434
@KafkaListener(topics = ["\${spring.kafka.template.default-topic}"])
35-
fun listen(message: ConsumerRecord<UUID, String>, ack: Acknowledgment) {
36-
withLoggingContext("tenant.name" to tenantName) {
37-
processMessage(message)
38-
ack.acknowledge()
39-
}
40-
}
41-
42-
private fun processMessage(message: ConsumerRecord<UUID, String>) {
43-
val currentSpan = tracer.currentSpan()
44-
val traceId = currentSpan?.context()?.traceId().orEmpty()
45-
val spanId = currentSpan?.context()?.spanId()
46-
logger.info { "Received record: ${message.value()} with traceId $traceId" }
47-
jdbcTemplate.update(
48-
"insert into otel_demo.storage(message, trace_id, span_id, created_at) values(:msg, :traceId, :currentSpan, :createdAt);",
49-
mapOf(
50-
"msg" to message.value(),
51-
"traceId" to traceId,
52-
"currentSpan" to spanId,
53-
"createdAt" to LocalDateTime.now(clock)
54-
)
55-
)
35+
fun listen(record: ConsumerRecord<UUID, String>, ack: Acknowledgment) {
36+
dbSaver.processSingleRecord(record)
37+
ack.acknowledge()
5638
}
5739

5840
@KafkaListener(
@@ -67,9 +49,7 @@ class KafkaReadingService(
6749
logger.info {
6850
"Received from Kafka ${records.size} records"
6951
}
70-
records.forEach { record ->
71-
restoreContextAndProcessSingleRecordIfNeed(record, ack)
72-
}
52+
records.forEach { record -> restoreContextAndProcessSingleRecordIfNeed(record) }
7353
ack.acknowledge()
7454
} catch (e: Throwable) {
7555
batchSpan.error(e)
@@ -79,15 +59,12 @@ class KafkaReadingService(
7959
}
8060
}
8161

82-
private fun restoreContextAndProcessSingleRecordIfNeed(record: ConsumerRecord<UUID, String>, ack: Acknowledgment) {
83-
val kafkaPropagatorGetter = Propagator.Getter<ConsumerRecord<UUID, String>> { carrier, _ ->
84-
carrier.headers().find { it.key() == "traceparent" }?.value()?.decodeToString()
85-
}
86-
val builder = propagator.extract(record, kafkaPropagatorGetter)
62+
private fun restoreContextAndProcessSingleRecordIfNeed(record: ConsumerRecord<UUID, String>) {
63+
val builder = propagator.extract(record, KafkaHeadersGetter)
8764
val spanFromRecord = builder.name("processing-record-from-kafka").start()
8865
try {
8966
tracer.withSpan(spanFromRecord).use {
90-
processSingleRecordIfNeed(record, ack)
67+
dbSaver.processSingleRecord(record)
9168
}
9269
} catch (e: Throwable) {
9370
spanFromRecord.error(e)
@@ -96,11 +73,4 @@ class KafkaReadingService(
9673
spanFromRecord.end()
9774
}
9875
}
99-
100-
private fun processSingleRecordIfNeed(record: ConsumerRecord<UUID, String>, ack: Acknowledgment) {
101-
withLoggingContext("tenant.name" to tenantName) {
102-
processMessage(record)
103-
ack.acknowledge()
104-
}
105-
}
10676
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright (c) 2020-2025. Ivan Vakhrushev and others.
3+
* https://github.com/mfvanek/spring-boot-open-telemetry-demo
4+
*
5+
* Licensed under the Apache License 2.0
6+
*/
7+
8+
package io.github.mfvanek.spring.boot3.reactive.config;
9+
10+
import io.github.mfvanek.db.migrations.common.saver.DbSaver;
11+
import io.micrometer.tracing.Tracer;
12+
import org.springframework.beans.factory.annotation.Value;
13+
import org.springframework.context.annotation.Bean;
14+
import org.springframework.context.annotation.Configuration;
15+
import org.springframework.jdbc.core.simple.JdbcClient;
16+
17+
import java.time.Clock;
18+
19+
@Configuration(proxyBeanMethods = false)
20+
public class DbConfig {
21+
22+
@Bean
23+
public DbSaver dbSaver(
24+
@Value("${app.tenant.name}") String tenantName,
25+
Tracer tracer,
26+
Clock clock,
27+
JdbcClient jdbcClient
28+
) {
29+
return new DbSaver(tenantName, tracer, clock, jdbcClient);
30+
}
31+
}

spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/service/KafkaReadingService.java

Lines changed: 12 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,42 +7,37 @@
77

88
package io.github.mfvanek.spring.boot3.reactive.service;
99

10+
import io.github.mfvanek.db.migrations.common.saver.DbSaver;
1011
import io.micrometer.tracing.ScopedSpan;
1112
import io.micrometer.tracing.Span;
1213
import io.micrometer.tracing.Tracer;
1314
import io.micrometer.tracing.propagation.Propagator;
1415
import lombok.RequiredArgsConstructor;
1516
import lombok.extern.slf4j.Slf4j;
1617
import org.apache.kafka.clients.consumer.ConsumerRecord;
17-
import org.slf4j.MDC;
18-
import org.springframework.beans.factory.annotation.Value;
19-
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
2018
import org.springframework.kafka.annotation.KafkaListener;
2119
import org.springframework.kafka.support.Acknowledgment;
2220
import org.springframework.stereotype.Service;
2321

24-
import java.time.Clock;
25-
import java.time.LocalDateTime;
26-
import java.util.Arrays;
22+
import java.nio.charset.StandardCharsets;
2723
import java.util.List;
28-
import java.util.Map;
2924
import java.util.UUID;
3025

3126
@Slf4j
3227
@Service
3328
@RequiredArgsConstructor
3429
public class KafkaReadingService {
3530

31+
private static final Propagator.Getter<ConsumerRecord<UUID, String>> KAFKA_PROPAGATOR_GETTER = (carrier, key) -> new String(carrier.headers().lastHeader(key).value(), StandardCharsets.UTF_8);
32+
3633
private final Tracer tracer;
37-
private final Clock clock;
38-
private final NamedParameterJdbcTemplate jdbcTemplate;
39-
@Value("${app.tenant.name}")
40-
private String tenantName;
4134
private final Propagator propagator;
35+
private final DbSaver dbSaver;
4236

4337
@KafkaListener(topics = "${spring.kafka.template.default-topic}")
44-
public void listen(ConsumerRecord<UUID, String> message, Acknowledgment ack) {
45-
processSingleRecordIfNeed(message, ack);
38+
public void listen(ConsumerRecord<UUID, String> record, Acknowledgment ack) {
39+
dbSaver.processSingleRecord(record);
40+
ack.acknowledge();
4641
}
4742

4843
@KafkaListener(
@@ -57,8 +52,7 @@ public void listenAdditional(List<ConsumerRecord<UUID, String>> records, Acknowl
5752
log.info(
5853
"Received from Kafka {} records", records.size()
5954
);
60-
records.forEach(record ->
61-
restoreContextAndProcessSingleRecordIfNeed(record, ack));
55+
records.forEach(this::restoreContextAndProcessSingleRecordIfNeed);
6256
ack.acknowledge();
6357
} catch (Exception e) {
6458
batchSpan.error(e);
@@ -68,33 +62,16 @@ public void listenAdditional(List<ConsumerRecord<UUID, String>> records, Acknowl
6862
}
6963
}
7064

71-
private void restoreContextAndProcessSingleRecordIfNeed(ConsumerRecord<UUID, String> record, Acknowledgment ack) {
72-
final Propagator.Getter<ConsumerRecord<UUID, String>> kafkaPropagatorGetter = (carrier, key) -> Arrays.toString(carrier.headers().lastHeader("traceparent").value());
73-
final Span.Builder builder = propagator.extract(record, kafkaPropagatorGetter);
65+
private void restoreContextAndProcessSingleRecordIfNeed(ConsumerRecord<UUID, String> record) {
66+
final Span.Builder builder = propagator.extract(record, KAFKA_PROPAGATOR_GETTER);
7467
final Span spanFromRecord = builder.name("processing-record-from-kafka").start();
7568
try (Tracer.SpanInScope ignored = tracer.withSpan(spanFromRecord)) {
76-
processSingleRecordIfNeed(record, ack);
69+
dbSaver.processSingleRecord(record);
7770
} catch (Exception e) {
7871
spanFromRecord.error(e);
7972
throw e;
8073
} finally {
8174
spanFromRecord.end();
8275
}
8376
}
84-
85-
private void processSingleRecordIfNeed(ConsumerRecord<UUID, String> message, Acknowledgment ack) {
86-
try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) {
87-
final Span currentSpan = tracer.currentSpan();
88-
final String traceId = currentSpan != null ? currentSpan.context().traceId() : "";
89-
log.info("Received record: {} with traceId {}", message.value(), traceId);
90-
jdbcTemplate.update("insert into otel_demo.storage(message, trace_id, created_at) values(:msg, :traceId, :createdAt);",
91-
Map.ofEntries(
92-
Map.entry("msg", message.value()),
93-
Map.entry("traceId", traceId),
94-
Map.entry("createdAt", LocalDateTime.now(clock))
95-
)
96-
);
97-
ack.acknowledge();
98-
}
99-
}
10077
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright (c) 2020-2025. Ivan Vakhrushev and others.
3+
* https://github.com/mfvanek/spring-boot-open-telemetry-demo
4+
*
5+
* Licensed under the Apache License 2.0
6+
*/
7+
8+
package io.github.mfvanek.spring.boot3.test.config;
9+
10+
import io.github.mfvanek.db.migrations.common.saver.DbSaver;
11+
import io.micrometer.tracing.Tracer;
12+
import org.springframework.beans.factory.annotation.Value;
13+
import org.springframework.context.annotation.Bean;
14+
import org.springframework.context.annotation.Configuration;
15+
import org.springframework.jdbc.core.simple.JdbcClient;
16+
17+
import java.time.Clock;
18+
19+
@Configuration(proxyBeanMethods = false)
20+
public class DbConfig {
21+
22+
@Bean
23+
public DbSaver dbSaver(
24+
@Value("${app.tenant.name}") String tenantName,
25+
Tracer tracer,
26+
Clock clock,
27+
JdbcClient jdbcClient
28+
) {
29+
return new DbSaver(tenantName, tracer, clock, jdbcClient);
30+
}
31+
}

0 commit comments

Comments
 (0)