Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ allprojects {
version = "0.5.0"

repositories {
mavenLocal()
mavenCentral()
mavenLocal()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
if (osdetector.arch == "aarch_64") {
testImplementation("io.netty:netty-all:4.1.104.Final")
}
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
}

java {
Expand Down
13 changes: 12 additions & 1 deletion db-migrations/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
plugins {
id("java")
id("java-library")
id("sb-ot-demo.java-conventions")
id("io.freefair.lombok")
}

dependencies {
implementation(platform(project(":common-internal-bom")))
implementation(platform(libs.spring.boot.v3.dependencies))

implementation("io.micrometer:micrometer-tracing")
implementation("org.apache.kafka:kafka-clients")
implementation("org.slf4j:slf4j-api")
implementation("org.springframework:spring-jdbc")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2020-2025. Ivan Vakhrushev and others.
* https://github.com/mfvanek/spring-boot-open-telemetry-demo
*
* Licensed under the Apache License 2.0
*/

package io.github.mfvanek.db.migrations.common.saver;

import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.MDC;
import org.springframework.jdbc.core.simple.JdbcClient;

import java.time.Clock;
import java.time.LocalDateTime;
import java.util.UUID;

@Slf4j
@RequiredArgsConstructor
public class DbSaver {

private final String tenantName;
private final Tracer tracer;
private final Clock clock;
private final JdbcClient jdbcClient;

public void processSingleRecord(ConsumerRecord<UUID, String> record) {
try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) {
final Span currentSpan = tracer.currentSpan();
final String traceId = currentSpan != null ? currentSpan.context().traceId() : "";
final String spanId = currentSpan != null ? currentSpan.context().spanId() : "";
log.info("Received record: {} with traceId {} spanId {}", record.value(), traceId, spanId);
jdbcClient.sql("""
insert into otel_demo.storage(message, trace_id, span_id, created_at)
values(:msg, :traceId, :currentSpan, :createdAt);""")
.param("msg", record.value())
.param("traceId", traceId)
.param("currentSpan", spanId)
.param("createdAt", LocalDateTime.now(clock))
.update();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ databaseChangeLog:
file: db/changelog/sql/schema.sql
- include:
file: db/changelog/sql/storage.sql
- include:
file: db/changelog/sql/add_span_column.sql
- include:
file: db/changelog/sql/set_span_and_trace_unique.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
--liquibase formatted sql

--changeset marina.zharinova:2025.08.31:add span column
alter table otel_demo.storage add column span_id text;

--changeset marina.zharinova:2025.08.31:comment on span_id
comment on column otel_demo.storage.span_id is 'SpanId of operation';
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
--liquibase formatted sql

--changeset ivan.vakhrushev:2025.08.31:remove unique from trace_id
alter table otel_demo.storage drop constraint storage_trace_id_key;

--changeset marina.zharinova:2025.08.31:add constraint on trace_id with span_id
alter table otel_demo.storage add constraint trace_span_unique unique(trace_id, span_id);
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ create table if not exists otel_demo.storage
(
id bigint generated always as identity,
message text not null,
trace_id varchar(64) not null unique,
trace_id text not null unique,
created_at timestamptz not null
);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2020-2025. Ivan Vakhrushev and others.
* https://github.com/mfvanek/spring-boot-open-telemetry-demo
*
* Licensed under the Apache License 2.0
*/

package io.github.mfvanek.spring.boot3.kotlin.test.config

import io.github.mfvanek.db.migrations.common.saver.DbSaver
import io.micrometer.tracing.Tracer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jdbc.core.simple.JdbcClient
import java.time.Clock

@Configuration(proxyBeanMethods = false)
class DbConfig {

@Bean
fun dbSaver(
@Value("\${app.tenant.name}") tenantName: String,
tracer: Tracer,
clock: Clock,
jdbcClient: JdbcClient
): DbSaver {
return DbSaver(tenantName, tracer, clock, jdbcClient)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,22 @@ class TimeController(
private val kafkaSendingService: KafkaSendingService,
private val publicApiService: PublicApiService
) {

// http://localhost:8090/current-time
@GetMapping(path = ["/current-time"])
fun getNow(): LocalDateTime {
logger.trace { "tracer $tracer" }
val traceId = tracer.currentSpan()?.context()?.traceId()
logger.info { "Called method getNow. TraceId = $traceId" }
val nowFromRemote = publicApiService.getZonedTime()
val now = nowFromRemote ?: LocalDateTime.now(clock)
kafkaSendingService.sendNotification("Current time = $now")
val message = "Current time = $now"
kafkaSendingService.sendNotification(message)
.thenRun { logger.info { "Awaiting acknowledgement from Kafka" } }
.get()

kafkaSendingService.sendNotificationToOtherTopic(message)
.thenRun { logger.info { "Awaiting acknowledgement from Kafka with batch" } }
.get()
return now
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,47 +7,71 @@

package io.github.mfvanek.spring.boot3.kotlin.test.service

import io.github.mfvanek.db.migrations.common.saver.DbSaver
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.oshai.kotlinlogging.withLoggingContext
import io.micrometer.tracing.Tracer
import io.micrometer.tracing.propagation.Propagator
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.beans.factory.annotation.Value
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.support.Acknowledgment
import org.springframework.stereotype.Service
import java.time.Clock
import java.time.LocalDateTime
import java.nio.charset.StandardCharsets
import java.util.*

private val logger = KotlinLogging.logger {}

internal object KafkaHeadersGetter : Propagator.Getter<ConsumerRecord<UUID, String>> {
override fun get(carrier: ConsumerRecord<UUID, String>, key: String): String? =
carrier.headers()?.lastHeader(key)?.value()?.toString(StandardCharsets.UTF_8)
}

@Service
class KafkaReadingService(
@Value("\${app.tenant.name}") private val tenantName: String,
private val tracer: Tracer,
private val clock: Clock,
private val jdbcTemplate: NamedParameterJdbcTemplate
private val propagator: Propagator,
private val dbSaver: DbSaver
) {
@KafkaListener(topics = ["\${spring.kafka.template.default-topic}"])
fun listen(message: ConsumerRecord<UUID, String>, ack: Acknowledgment) {
withLoggingContext("tenant.name" to tenantName) {
processMessage(message)
fun listen(record: ConsumerRecord<UUID, String>, ack: Acknowledgment) {
dbSaver.processSingleRecord(record)
ack.acknowledge()
}

@SuppressWarnings("IllegalCatch", "PMD.AvoidCatchingThrowable")
@KafkaListener(
id = "\${spring.kafka.consumer.additional-groupId}",
topics = ["\${spring.kafka.template.additional-topic}"],
batch = "true"
)
fun listenAdditional(records: List<ConsumerRecord<UUID, String>>, ack: Acknowledgment) {
val batchSpan = tracer.startScopedSpan("batch-processing")
try {
logger.info {
"Received from Kafka ${records.size} records"
}
records.forEach { record -> restoreContextAndProcessSingleRecordIfNeed(record) }
ack.acknowledge()
} catch (throwable: Throwable) {
batchSpan.error(throwable)
throw throwable
} finally {
batchSpan.end()
}
}

private fun processMessage(message: ConsumerRecord<UUID, String>) {
val currentSpan = tracer.currentSpan()
val traceId = currentSpan?.context()?.traceId().orEmpty()
logger.info { "Received record: ${message.value()} with traceId $traceId" }
jdbcTemplate.update(
"insert into otel_demo.storage(message, trace_id, created_at) values(:msg, :traceId, :createdAt);",
mapOf(
"msg" to message.value(),
"traceId" to traceId,
"createdAt" to LocalDateTime.now(clock)
)
)
@SuppressWarnings("IllegalCatch", "PMD.AvoidCatchingThrowable")
private fun restoreContextAndProcessSingleRecordIfNeed(record: ConsumerRecord<UUID, String>) {
val builder = propagator.extract(record, KafkaHeadersGetter)
val spanFromRecord = builder.name("processing-record-from-kafka").start()
try {
tracer.withSpan(spanFromRecord).use {
dbSaver.processSingleRecord(record)
}
} catch (throwable: Throwable) {
spanFromRecord.error(throwable)
throw throwable
} finally {
spanFromRecord.end()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,20 @@ private val logger = KotlinLogging.logger {}
@Service
class KafkaSendingService(
@Value("\${app.tenant.name}") private val tenantName: String,
private val kafkaTemplate: KafkaTemplate<UUID, String>
private val kafkaTemplate: KafkaTemplate<UUID, String>,
@Value("\${spring.kafka.template.additional-topic}") private val additionalTopic: String,
) {
fun sendNotification(message: String): CompletableFuture<SendResult<UUID, String>> {
withLoggingContext("tenant.name" to tenantName) {
logger.info { "Sending message \"$message\" to Kafka" }
return kafkaTemplate.sendDefault(UUID.randomUUID(), message)
}
}

fun sendNotificationToOtherTopic(message: String): CompletableFuture<SendResult<UUID, String>> {
withLoggingContext("tenant.name" to tenantName) {
logger.info { "Sending message \"$message\" to $additionalTopic of Kafka" }
return kafkaTemplate.send(additionalTopic, UUID.randomUUID(), message)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,20 @@ spring:
template:
default-topic: open.telemetry.sb3.queue
observation-enabled: true # Important!!!
additional-topic: open.telemetry.sb3.queue.additional
producer:
key-serializer: org.apache.kafka.common.serialization.UUIDSerializer
batch-size: 32KB
properties:
linger.ms: 20
batch.size: 10000
listener:
observation-enabled: true # Important!!!
ack-mode: manual_immediate
consumer:
auto-offset-reset: earliest
group-id: ${spring.kafka.template.default-topic}-group
additional-groupId: ${spring.kafka.template.additional-topic}-group
client-id: open.telemetry.client
bootstrap-servers: localhost:9092
security:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,15 @@ class TimeControllerTest : TestBase() {
assertThat(output.all)
.contains("Received record: " + received.value() + " with traceId " + traceId)
.contains("\"tenant.name\":\"ru-a1-private\"")
val messageFromDb = namedParameterJdbcTemplate.queryForObject(
val messagesFromDb = namedParameterJdbcTemplate.queryForList(
"select message from otel_demo.storage where trace_id = :traceId",
mapOf("traceId" to traceId),
String::class.java
)
assertThat(messageFromDb).isEqualTo(received.value())
assertThat(messagesFromDb.size).isEqualTo(2)
messagesFromDb.forEach {
assertThat(it).isEqualTo(received.value())
}
}

@Order(2)
Expand Down Expand Up @@ -179,11 +182,11 @@ class TimeControllerTest : TestBase() {
.await()
.atMost(10, TimeUnit.SECONDS)
.pollInterval(Duration.ofMillis(500L))
.until { countRecordsInTable() >= 1L }
.until { countRecordsInTable() >= 2L }
}
}

private fun setUpKafkaConsumer(kafkaProperties: KafkaProperties, consumerRecords: BlockingQueue<ConsumerRecord<UUID, String>>): KafkaMessageListenerContainer<UUID, String> {
fun setUpKafkaConsumer(kafkaProperties: KafkaProperties, consumerRecords: BlockingQueue<ConsumerRecord<UUID, String>>): KafkaMessageListenerContainer<UUID, String> {
val containerProperties = ContainerProperties(kafkaProperties.template.defaultTopic)
val consumerProperties = KafkaTestUtils.consumerProps(KafkaInitializer.getBootstrapSevers(), "test-group", "false")
consumerProperties[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SASL_PLAINTEXT"
Expand Down
Loading