Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ allprojects {
version = "0.5.0"

repositories {
mavenLocal()
mavenCentral()
mavenLocal()
}
}

Expand Down
2 changes: 1 addition & 1 deletion config/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
<!-- <module name="FinalParameters"/> disabled here. Controlled by PMD.AvoidReassigningParameters -->
<!-- <module name="HiddenField"/> invalid -->
<!-- <module name="HideUtilityClassConstructor"/> invalid -->
<module name="IllegalCatch"/>
<!--<module name="IllegalCatch"/> -->
<module name="IllegalIdentifierName"/>
<module name="IllegalInstantiation"/>
<module name="IllegalThrows"/>
Expand Down
13 changes: 12 additions & 1 deletion db-migrations/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
plugins {
id("java")
id("java-library")
id("sb-ot-demo.java-conventions")
id("io.freefair.lombok")
}

dependencies {
implementation(platform(project(":common-internal-bom")))
implementation(platform(libs.spring.boot.v3.dependencies))

implementation("io.micrometer:micrometer-tracing")
implementation("org.apache.kafka:kafka-clients")
implementation("org.slf4j:slf4j-api")
implementation("org.springframework:spring-jdbc")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2020-2025. Ivan Vakhrushev and others.
* https://github.com/mfvanek/spring-boot-open-telemetry-demo
*
* Licensed under the Apache License 2.0
*/

package io.github.mfvanek.db.migrations.common.saver;

import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.MDC;
import org.springframework.jdbc.core.simple.JdbcClient;

import java.time.Clock;
import java.time.LocalDateTime;
import java.util.UUID;

@Slf4j
@RequiredArgsConstructor
public class DbSaver {

private final String tenantName;
private final Tracer tracer;
private final Clock clock;
private final JdbcClient jdbcClient;

public void processSingleRecord(ConsumerRecord<UUID, String> record) {
try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) {
final Span currentSpan = tracer.currentSpan();
final String traceId = currentSpan != null ? currentSpan.context().traceId() : "";
final String spanId = currentSpan != null ? currentSpan.context().spanId() : "";
log.info("Received record: {} with traceId {} spanId {}", record.value(), traceId, spanId);
jdbcClient.sql("""
insert into otel_demo.storage(message, trace_id, span_id, created_at)
values(:msg, :traceId, :currentSpan, :createdAt);""")
.param("msg", record.value())
.param("traceId", traceId)
.param("currentSpan", spanId)
.param("createdAt", LocalDateTime.now(clock))
.update();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ databaseChangeLog:
file: db/changelog/sql/schema.sql
- include:
file: db/changelog/sql/storage.sql
- include:
file: db/changelog/sql/add_span_column.sql
- include:
file: db/changelog/sql/set_span_and_trace_unique.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
--liquibase formatted sql

--changeset marina.zharinova:2025.08.31:add span column
alter table otel_demo.storage add column span_id varchar(64);

--changeset marina.zharinova:2025.08.31:comment on span_id
comment on column otel_demo.storage.span_id is 'SpanId of operation';
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
--liquibase formatted sql

--changeset ivan.vakhrushev:2025.08.31:remove unique from trace_id
alter table otel_demo.storage drop constraint storage_trace_id_key;

--changeset marina.zharinova:2025.08.31:add constraint on trace_id with span_id
alter table otel_demo.storage add constraint trace_span_unique unique(trace_id, span_id);
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2020-2025. Ivan Vakhrushev and others.
* https://github.com/mfvanek/spring-boot-open-telemetry-demo
*
* Licensed under the Apache License 2.0
*/

package io.github.mfvanek.spring.boot3.kotlin.test.config

import io.github.mfvanek.db.migrations.common.saver.DbSaver
import io.micrometer.tracing.Tracer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jdbc.core.simple.JdbcClient
import java.time.Clock

@Configuration(proxyBeanMethods = false)
class DbConfig {

@Bean
fun dbSaver(
@Value("\${app.tenant.name}") tenantName: String,
tracer: Tracer,
clock: Clock,
jdbcClient: JdbcClient
): DbSaver {
return DbSaver(tenantName, tracer, clock, jdbcClient)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,22 @@ class TimeController(
private val kafkaSendingService: KafkaSendingService,
private val publicApiService: PublicApiService
) {

// http://localhost:8090/current-time
@GetMapping(path = ["/current-time"])
fun getNow(): LocalDateTime {
logger.trace { "tracer $tracer" }
val traceId = tracer.currentSpan()?.context()?.traceId()
logger.info { "Called method getNow. TraceId = $traceId" }
val nowFromRemote = publicApiService.getZonedTime()
val now = nowFromRemote ?: LocalDateTime.now(clock)
kafkaSendingService.sendNotification("Current time = $now")
val message = "Current time = $now"
kafkaSendingService.sendNotification(message)
.thenRun { logger.info { "Awaiting acknowledgement from Kafka" } }
.get()

kafkaSendingService.sendNotificationToOtherTopic(message)
.thenRun { logger.info { "Awaiting acknowledgement from Kafka with batch" } }
.get()
return now
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,47 +7,70 @@

package io.github.mfvanek.spring.boot3.kotlin.test.service

import io.github.mfvanek.db.migrations.common.saver.DbSaver
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
import io.micrometer.tracing.Tracer
import io.micrometer.tracing.propagation.Propagator
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.beans.factory.annotation.Value
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.support.Acknowledgment
import org.springframework.stereotype.Service
import java.time.Clock
import java.time.LocalDateTime
import java.nio.charset.StandardCharsets
import java.util.*

private val logger = KotlinLogging.logger {}

internal object KafkaHeadersGetter : Propagator.Getter<ConsumerRecord<UUID, String>> {
override fun get(carrier: ConsumerRecord<UUID, String>, key: String): String? =
carrier.headers()?.lastHeader(key)?.value()?.toString(StandardCharsets.UTF_8)
}

@Service
class KafkaReadingService(
@Value("\${app.tenant.name}") private val tenantName: String,
private val tracer: Tracer,
private val clock: Clock,
private val jdbcTemplate: NamedParameterJdbcTemplate
private val propagator: Propagator,
private val dbSaver: DbSaver
) {
@KafkaListener(topics = ["\${spring.kafka.template.default-topic}"])
fun listen(message: ConsumerRecord<UUID, String>, ack: Acknowledgment) {
withLoggingContext("tenant.name" to tenantName) {
processMessage(message)
fun listen(record: ConsumerRecord<UUID, String>, ack: Acknowledgment) {
dbSaver.processSingleRecord(record)
ack.acknowledge()
}

@KafkaListener(
id = "\${spring.kafka.consumer.additional-groupId}",
topics = ["\${spring.kafka.template.additional-topic}"],
batch = "true"
)
fun listenAdditional(records: List<ConsumerRecord<UUID, String>>, ack: Acknowledgment) {
val batchSpan = tracer.startScopedSpan("batch-processing")
logger.info { "current span: ${tracer.currentSpan()}" }
try {
logger.info {
"Received from Kafka ${records.size} records"
}
records.forEach { record -> restoreContextAndProcessSingleRecordIfNeed(record) }
ack.acknowledge()
} catch (e: Throwable) {
batchSpan.error(e)
throw e
} finally {
batchSpan.end()
}
}

private fun processMessage(message: ConsumerRecord<UUID, String>) {
val currentSpan = tracer.currentSpan()
val traceId = currentSpan?.context()?.traceId().orEmpty()
logger.info { "Received record: ${message.value()} with traceId $traceId" }
jdbcTemplate.update(
"insert into otel_demo.storage(message, trace_id, created_at) values(:msg, :traceId, :createdAt);",
mapOf(
"msg" to message.value(),
"traceId" to traceId,
"createdAt" to LocalDateTime.now(clock)
)
)
private fun restoreContextAndProcessSingleRecordIfNeed(record: ConsumerRecord<UUID, String>) {
val builder = propagator.extract(record, KafkaHeadersGetter)
val spanFromRecord = builder.name("processing-record-from-kafka").start()
try {
tracer.withSpan(spanFromRecord).use {
dbSaver.processSingleRecord(record)
}
} catch (e: Throwable) {
spanFromRecord.error(e)
throw e
} finally {
spanFromRecord.end()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,20 @@ private val logger = KotlinLogging.logger {}
@Service
class KafkaSendingService(
@Value("\${app.tenant.name}") private val tenantName: String,
private val kafkaTemplate: KafkaTemplate<UUID, String>
private val kafkaTemplate: KafkaTemplate<UUID, String>,
@Value("\${spring.kafka.template.additional-topic}") private val additionalTopic: String,
) {
fun sendNotification(message: String): CompletableFuture<SendResult<UUID, String>> {
withLoggingContext("tenant.name" to tenantName) {
logger.info { "Sending message \"$message\" to Kafka" }
return kafkaTemplate.sendDefault(UUID.randomUUID(), message)
}
}

fun sendNotificationToOtherTopic(message: String): CompletableFuture<SendResult<UUID, String>> {
withLoggingContext("tenant.name" to tenantName) {
logger.info { "Sending message \"$message\" to $additionalTopic of Kafka" }
return kafkaTemplate.send(additionalTopic, UUID.randomUUID(), message)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,19 @@ spring:
template:
default-topic: open.telemetry.sb3.queue
observation-enabled: true # Important!!!
additional-topic: open.telemetry.sb3.queue.additional
producer:
key-serializer: org.apache.kafka.common.serialization.UUIDSerializer
properties:
linger.ms: 100
batch.size: 10000
listener:
observation-enabled: true # Important!!!
ack-mode: manual_immediate
consumer:
auto-offset-reset: earliest
group-id: ${spring.kafka.template.default-topic}-group
additional-groupId: ${spring.kafka.template.additional-topic}-group
client-id: open.telemetry.client
bootstrap-servers: localhost:9092
security:
Expand All @@ -49,6 +54,9 @@ spring:
sasl:
mechanism: PLAIN
jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="${demo.kafka.opentelemetry.username}" password="${demo.kafka.opentelemetry.password}";
opentelemetry:
additional-topic: open.telemetry.sb3.queue.additional
additional-consumer-groupId: open.telemetry.sb3.queue.additional-group
jdbc:
template:
query-timeout: 1s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,15 @@ class TimeControllerTest : TestBase() {
assertThat(output.all)
.contains("Received record: " + received.value() + " with traceId " + traceId)
.contains("\"tenant.name\":\"ru-a1-private\"")
val messageFromDb = namedParameterJdbcTemplate.queryForObject(
"select message from otel_demo.storage where trace_id = :traceId",
mapOf("traceId" to traceId),
String::class.java
)
assertThat(messageFromDb).isEqualTo(received.value())
val tracesFromDb = namedParameterJdbcTemplate
.query(
"select trace_id from otel_demo.storage where message like :message",
mapOf("message" to received.value())
) { rs, _ ->
rs.getString("trace_id")
}
assertThat(tracesFromDb.size).isEqualTo(2)
assertThat(tracesFromDb.stream().filter { it == traceId }).hasSize(2)
}

@Order(2)
Expand Down Expand Up @@ -179,7 +182,7 @@ class TimeControllerTest : TestBase() {
.await()
.atMost(10, TimeUnit.SECONDS)
.pollInterval(Duration.ofMillis(500L))
.until { countRecordsInTable() >= 1L }
.until { countRecordsInTable() >= 2L }
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2020-2025. Ivan Vakhrushev and others.
* https://github.com/mfvanek/spring-boot-open-telemetry-demo
*
* Licensed under the Apache License 2.0
*/

package io.github.mfvanek.spring.boot3.reactive.config;

import io.github.mfvanek.db.migrations.common.saver.DbSaver;
import io.micrometer.tracing.Tracer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.simple.JdbcClient;

import java.time.Clock;

@Configuration(proxyBeanMethods = false)
public class DbConfig {

@Bean
public DbSaver dbSaver(
@Value("${app.tenant.name}") String tenantName,
Tracer tracer,
Clock clock,
JdbcClient jdbcClient
) {
return new DbSaver(tenantName, tracer, clock, jdbcClient);
}
}
Loading
Loading