Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
<!-- <module name="FinalParameters"/> disabled here. Controlled by PMD.AvoidReassigningParameters -->
<!-- <module name="HiddenField"/> invalid -->
<!-- <module name="HideUtilityClassConstructor"/> invalid -->
<module name="IllegalCatch"/>
<!--<module name="IllegalCatch"/> -->
<module name="IllegalIdentifierName"/>
<module name="IllegalInstantiation"/>
<module name="IllegalThrows"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ databaseChangeLog:
file: db/changelog/sql/schema.sql
- include:
file: db/changelog/sql/storage.sql
- include:
file: db/changelog/sql/add_span_column.sql
- include:
file: db/changelog/sql/set_span_and_trace_unique.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
--liquibase formatted sql

--changeset marina.zharinova:2025.08.31:add span column
alter table otel_demo.storage add column span_id varchar(64);

--changeset marina.zharinova:2025.08.31:comment on span_id
comment on column otel_demo.storage.span_id is 'SpanId of operation';
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
--liquibase formatted sql

--changeset ivan.vakhrushev:2025.08.31:remove unique from trace_id
alter table otel_demo.storage drop constraint storage_trace_id_key;

--changeset marina.zharinova:2025.08.31:add constraint on trace_id with span_id
alter table otel_demo.storage add constraint trace_span_unique unique(trace_id, span_id);
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,22 @@ class TimeController(
private val kafkaSendingService: KafkaSendingService,
private val publicApiService: PublicApiService
) {

// http://localhost:8090/current-time
@GetMapping(path = ["/current-time"])
fun getNow(): LocalDateTime {
logger.trace { "tracer $tracer" }
val traceId = tracer.currentSpan()?.context()?.traceId()
logger.info { "Called method getNow. TraceId = $traceId" }
val nowFromRemote = publicApiService.getZonedTime()
val now = nowFromRemote ?: LocalDateTime.now(clock)
kafkaSendingService.sendNotification("Current time = $now")
val message = "Current time = $now"
kafkaSendingService.sendNotification(message)
.thenRun { logger.info { "Awaiting acknowledgement from Kafka" } }
.get()

kafkaSendingService.sendNotificationToOtherTopic(message)
.thenRun { logger.info { "Awaiting acknowledgement from Kafka with batch" } }
.get()
return now
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package io.github.mfvanek.spring.boot3.kotlin.test.service
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
import io.micrometer.tracing.Tracer
import io.micrometer.tracing.propagation.Propagator
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.beans.factory.annotation.Value
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate
Expand All @@ -27,7 +28,8 @@ class KafkaReadingService(
@Value("\${app.tenant.name}") private val tenantName: String,
private val tracer: Tracer,
private val clock: Clock,
private val jdbcTemplate: NamedParameterJdbcTemplate
private val jdbcTemplate: NamedParameterJdbcTemplate,
private val propagator: Propagator
) {
@KafkaListener(topics = ["\${spring.kafka.template.default-topic}"])
fun listen(message: ConsumerRecord<UUID, String>, ack: Acknowledgment) {
Expand All @@ -40,14 +42,65 @@ class KafkaReadingService(
private fun processMessage(message: ConsumerRecord<UUID, String>) {
val currentSpan = tracer.currentSpan()
val traceId = currentSpan?.context()?.traceId().orEmpty()
val spanId = currentSpan?.context()?.spanId()
logger.info { "Received record: ${message.value()} with traceId $traceId" }
jdbcTemplate.update(
"insert into otel_demo.storage(message, trace_id, created_at) values(:msg, :traceId, :createdAt);",
"insert into otel_demo.storage(message, trace_id, span_id, created_at) values(:msg, :traceId, :currentSpan, :createdAt);",
mapOf(
"msg" to message.value(),
"traceId" to traceId,
"currentSpan" to spanId,
"createdAt" to LocalDateTime.now(clock)
)
)
}

@KafkaListener(
id = "\${spring.kafka.consumer.additional-groupId}",
topics = ["\${spring.kafka.template.additional-topic}"],
batch = "true"
)
fun listenAdditional(records: List<ConsumerRecord<UUID, String>>, ack: Acknowledgment) {
val batchSpan = tracer.startScopedSpan("batch-processing")
logger.info { "current span: ${tracer.currentSpan()}" }
try {
logger.info {
"Received from Kafka ${records.size} records"
}
records.forEach { record ->
restoreContextAndProcessSingleRecordIfNeed(record, ack)
}
ack.acknowledge()
} catch (e: Throwable) {
batchSpan.error(e)
throw e
} finally {
batchSpan.end()
}
}

private fun restoreContextAndProcessSingleRecordIfNeed(record: ConsumerRecord<UUID, String>, ack: Acknowledgment) {
val kafkaPropagatorGetter = Propagator.Getter<ConsumerRecord<UUID, String>> { carrier, _ ->
carrier.headers().find { it.key() == "traceparent" }?.value()?.decodeToString()
}
val builder = propagator.extract(record, kafkaPropagatorGetter)
val spanFromRecord = builder.name("processing-record-from-kafka").start()
try {
tracer.withSpan(spanFromRecord).use {
processSingleRecordIfNeed(record, ack)
}
} catch (e: Throwable) {
spanFromRecord.error(e)
throw e
} finally {
spanFromRecord.end()
}
}

private fun processSingleRecordIfNeed(record: ConsumerRecord<UUID, String>, ack: Acknowledgment) {
withLoggingContext("tenant.name" to tenantName) {
processMessage(record)
ack.acknowledge()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,20 @@ private val logger = KotlinLogging.logger {}
@Service
class KafkaSendingService(
@Value("\${app.tenant.name}") private val tenantName: String,
private val kafkaTemplate: KafkaTemplate<UUID, String>
private val kafkaTemplate: KafkaTemplate<UUID, String>,
@Value("\${spring.kafka.template.additional-topic}") private val additionalTopic: String,
) {
fun sendNotification(message: String): CompletableFuture<SendResult<UUID, String>> {
withLoggingContext("tenant.name" to tenantName) {
logger.info { "Sending message \"$message\" to Kafka" }
return kafkaTemplate.sendDefault(UUID.randomUUID(), message)
}
}

fun sendNotificationToOtherTopic(message: String): CompletableFuture<SendResult<UUID, String>> {
withLoggingContext("tenant.name" to tenantName) {
logger.info { "Sending message \"$message\" to $additionalTopic of Kafka" }
return kafkaTemplate.send(additionalTopic, UUID.randomUUID(), message)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,19 @@ spring:
template:
default-topic: open.telemetry.sb3.queue
observation-enabled: true # Important!!!
additional-topic: open.telemetry.sb3.queue.additional
producer:
key-serializer: org.apache.kafka.common.serialization.UUIDSerializer
properties:
linger.ms: 100
batch.size: 10000
listener:
observation-enabled: true # Important!!!
ack-mode: manual_immediate
consumer:
auto-offset-reset: earliest
group-id: ${spring.kafka.template.default-topic}-group
additional-groupId: ${spring.kafka.template.additional-topic}-group
client-id: open.telemetry.client
bootstrap-servers: localhost:9092
security:
Expand All @@ -49,6 +54,9 @@ spring:
sasl:
mechanism: PLAIN
jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="${demo.kafka.opentelemetry.username}" password="${demo.kafka.opentelemetry.password}";
opentelemetry:
additional-topic: open.telemetry.sb3.queue.additional
additional-consumer-groupId: open.telemetry.sb3.queue.additional-group
jdbc:
template:
query-timeout: 1s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,15 @@ class TimeControllerTest : TestBase() {
assertThat(output.all)
.contains("Received record: " + received.value() + " with traceId " + traceId)
.contains("\"tenant.name\":\"ru-a1-private\"")
val messageFromDb = namedParameterJdbcTemplate.queryForObject(
"select message from otel_demo.storage where trace_id = :traceId",
mapOf("traceId" to traceId),
String::class.java
)
assertThat(messageFromDb).isEqualTo(received.value())
val tracesFromDb = namedParameterJdbcTemplate
.query(
"select trace_id from otel_demo.storage where message like :message",
mapOf("message" to received.value())
) { rs, _ ->
rs.getString("trace_id")
}
assertThat(tracesFromDb.size).isEqualTo(2)
assertThat(tracesFromDb.stream().filter { it == traceId }).hasSize(2)
}

@Order(2)
Expand Down Expand Up @@ -179,7 +182,7 @@ class TimeControllerTest : TestBase() {
.await()
.atMost(10, TimeUnit.SECONDS)
.pollInterval(Duration.ofMillis(500L))
.until { countRecordsInTable() >= 1L }
.until { countRecordsInTable() >= 2L }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,26 @@ public class TimeController {
private final KafkaSendingService kafkaSendingService;
private final PublicApiService publicApiService;

// http://localhost:8080/current-time
// http://localhost:8081/current-time
@GetMapping(path = "/current-time")
public Mono<LocalDateTime> getNow() {
log.trace("tracer {}", tracer);

final String traceId = Optional.ofNullable(tracer.currentSpan())
.map(Span::context)
.map(TraceContext::traceId)
.orElse(null);
log.info("Called method getNow. TraceId = {}", traceId);

return publicApiService.getZonedTime()
return Mono.just(tracer)
.map(tracer -> {
log.trace("tracer {}", tracer);
return Optional.ofNullable(tracer.currentSpan())
.map(Span::context)
.map(TraceContext::traceId)
.orElse(null);
})
.doOnNext(traceId -> log.info("Called method getNow. TraceId = {}", traceId))
.then(publicApiService.getZonedTime())
.defaultIfEmpty(LocalDateTime.now(clock))
.flatMap(now -> kafkaSendingService.sendNotification("Current time = " + now)
.doOnSuccess(v -> log.info("Awaiting acknowledgement from Kafka"))
.thenReturn(now)
);
)
.flatMap(now -> kafkaSendingService.sendNotificationToOtherTopic("Current time = " + now)
.doOnSuccess(v -> log.info("Awaiting acknowledgement from Kafka with batch"))
.thenReturn(now));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

package io.github.mfvanek.spring.boot3.reactive.service;

import io.micrometer.tracing.ScopedSpan;
import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.propagation.Propagator;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -21,6 +23,8 @@

import java.time.Clock;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;

Expand All @@ -34,9 +38,51 @@ public class KafkaReadingService {
private final NamedParameterJdbcTemplate jdbcTemplate;
@Value("${app.tenant.name}")
private String tenantName;
private final Propagator propagator;

@KafkaListener(topics = "${spring.kafka.template.default-topic}")
public void listen(ConsumerRecord<UUID, String> message, Acknowledgment ack) {
processSingleRecordIfNeed(message, ack);
}

@KafkaListener(
id = "${spring.kafka.consumer.additional-groupId}",
topics = "${spring.kafka.template.additional-topic}",
batch = "true"
)
public void listenAdditional(List<ConsumerRecord<UUID, String>> records, Acknowledgment ack) {
final ScopedSpan batchSpan = tracer.startScopedSpan("batch-processing");
log.info("current span: {}", tracer.currentSpan());
try {
log.info(
"Received from Kafka {} records", records.size()
);
records.forEach(record ->
restoreContextAndProcessSingleRecordIfNeed(record, ack));
ack.acknowledge();
} catch (Exception e) {
batchSpan.error(e);
throw e;
} finally {
batchSpan.end();
}
}

private void restoreContextAndProcessSingleRecordIfNeed(ConsumerRecord<UUID, String> record, Acknowledgment ack) {
final Propagator.Getter<ConsumerRecord<UUID, String>> kafkaPropagatorGetter = (carrier, key) -> Arrays.toString(carrier.headers().lastHeader("traceparent").value());
final Span.Builder builder = propagator.extract(record, kafkaPropagatorGetter);
final Span spanFromRecord = builder.name("processing-record-from-kafka").start();
try (Tracer.SpanInScope ignored = tracer.withSpan(spanFromRecord)) {
processSingleRecordIfNeed(record, ack);
} catch (Exception e) {
spanFromRecord.error(e);
throw e;
} finally {
spanFromRecord.end();
}
}

private void processSingleRecordIfNeed(ConsumerRecord<UUID, String> message, Acknowledgment ack) {
try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) {
final Span currentSpan = tracer.currentSpan();
final String traceId = currentSpan != null ? currentSpan.context().traceId() : "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class KafkaSendingService {
private final KafkaTemplate<UUID, String> kafkaTemplate;
@Value("${app.tenant.name}")
private String tenantName;
@Value("${spring.kafka.template.additional-topic}") private String additionalTopic;

public Mono<SendResult<UUID, String>> sendNotification(@Nonnull final String message) {
return Mono.deferContextual(contextView -> {
Expand All @@ -38,4 +39,14 @@ public Mono<SendResult<UUID, String>> sendNotification(@Nonnull final String mes
}
});
}

public Mono<SendResult<UUID, String>> sendNotificationToOtherTopic(@Nonnull final String message) {
return Mono.deferContextual(contextView -> {
try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) {
log.info("Sending message \"{}\" to {} of Kafka", message, additionalTopic);
return Mono.fromFuture(() -> kafkaTemplate.send(additionalTopic, UUID.randomUUID(), message))
.subscribeOn(Schedulers.boundedElastic());
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,19 @@ spring:
template:
default-topic: open.telemetry.sb3.queue
observation-enabled: true # Important!!!
additional-topic: open.telemetry.sb3.queue.additional
producer:
key-serializer: org.apache.kafka.common.serialization.UUIDSerializer
properties:
linger.ms: 100
batch.size: 10000
listener:
observation-enabled: true # Important!!!
ack-mode: manual_immediate
consumer:
auto-offset-reset: earliest
group-id: ${spring.kafka.template.default-topic}-group
additional-groupId: ${spring.kafka.template.additional-topic}-group
client-id: open.telemetry.client
bootstrap-servers: localhost:9092
security:
Expand Down
Loading
Loading