diff --git a/build.gradle.kts b/build.gradle.kts index f8d9e430..88ae1b97 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -12,8 +12,8 @@ allprojects { version = "0.5.0" repositories { - mavenLocal() mavenCentral() + mavenLocal() } } diff --git a/buildSrc/src/main/kotlin/sb-ot-demo.java-compile.gradle.kts b/buildSrc/src/main/kotlin/sb-ot-demo.java-compile.gradle.kts index 1244b437..2c481f33 100644 --- a/buildSrc/src/main/kotlin/sb-ot-demo.java-compile.gradle.kts +++ b/buildSrc/src/main/kotlin/sb-ot-demo.java-compile.gradle.kts @@ -21,6 +21,7 @@ dependencies { if (osdetector.arch == "aarch_64") { testImplementation("io.netty:netty-all:4.1.104.Final") } + testImplementation("io.opentelemetry:opentelemetry-sdk-testing") } java { diff --git a/db-migrations/build.gradle.kts b/db-migrations/build.gradle.kts index 00651f70..7fc9f66d 100644 --- a/db-migrations/build.gradle.kts +++ b/db-migrations/build.gradle.kts @@ -1,4 +1,15 @@ plugins { - id("java") + id("java-library") id("sb-ot-demo.java-conventions") + id("io.freefair.lombok") +} + +dependencies { + implementation(platform(project(":common-internal-bom"))) + implementation(platform(libs.spring.boot.v3.dependencies)) + + implementation("io.micrometer:micrometer-tracing") + implementation("org.apache.kafka:kafka-clients") + implementation("org.slf4j:slf4j-api") + implementation("org.springframework:spring-jdbc") } diff --git a/db-migrations/src/main/java/io/github/mfvanek/db/migrations/common/saver/DbSaver.java b/db-migrations/src/main/java/io/github/mfvanek/db/migrations/common/saver/DbSaver.java new file mode 100644 index 00000000..959a2eba --- /dev/null +++ b/db-migrations/src/main/java/io/github/mfvanek/db/migrations/common/saver/DbSaver.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2020-2025. Ivan Vakhrushev and others. + * https://github.com/mfvanek/spring-boot-open-telemetry-demo + * + * Licensed under the Apache License 2.0 + */ + +package io.github.mfvanek.db.migrations.common.saver; + +import io.micrometer.tracing.Span; +import io.micrometer.tracing.Tracer; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.MDC; +import org.springframework.jdbc.core.simple.JdbcClient; + +import java.time.Clock; +import java.time.LocalDateTime; +import java.util.UUID; + +@Slf4j +@RequiredArgsConstructor +public class DbSaver { + + private final String tenantName; + private final Tracer tracer; + private final Clock clock; + private final JdbcClient jdbcClient; + + public void processSingleRecord(ConsumerRecord record) { + try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) { + final Span currentSpan = tracer.currentSpan(); + final String traceId = currentSpan != null ? currentSpan.context().traceId() : ""; + final String spanId = currentSpan != null ? currentSpan.context().spanId() : ""; + log.info("Received record: {} with traceId {} spanId {}", record.value(), traceId, spanId); + jdbcClient.sql(""" + insert into otel_demo.storage(message, trace_id, span_id, created_at) + values(:msg, :traceId, :currentSpan, :createdAt);""") + .param("msg", record.value()) + .param("traceId", traceId) + .param("currentSpan", spanId) + .param("createdAt", LocalDateTime.now(clock)) + .update(); + } + } +} diff --git a/db-migrations/src/main/resources/db/changelog/db.changelog-master.yaml b/db-migrations/src/main/resources/db/changelog/db.changelog-master.yaml index 95804e80..cb315ac1 100644 --- a/db-migrations/src/main/resources/db/changelog/db.changelog-master.yaml +++ b/db-migrations/src/main/resources/db/changelog/db.changelog-master.yaml @@ -3,3 +3,7 @@ databaseChangeLog: file: db/changelog/sql/schema.sql - include: file: db/changelog/sql/storage.sql + - include: + file: db/changelog/sql/add_span_column.sql + - include: + file: db/changelog/sql/set_span_and_trace_unique.sql diff --git a/db-migrations/src/main/resources/db/changelog/sql/add_span_column.sql b/db-migrations/src/main/resources/db/changelog/sql/add_span_column.sql new file mode 100644 index 00000000..832900dc --- /dev/null +++ b/db-migrations/src/main/resources/db/changelog/sql/add_span_column.sql @@ -0,0 +1,7 @@ +--liquibase formatted sql + +--changeset marina.zharinova:2025.08.31:add span column +alter table otel_demo.storage add column span_id text; + +--changeset marina.zharinova:2025.08.31:comment on span_id +comment on column otel_demo.storage.span_id is 'SpanId of operation'; diff --git a/db-migrations/src/main/resources/db/changelog/sql/set_span_and_trace_unique.sql b/db-migrations/src/main/resources/db/changelog/sql/set_span_and_trace_unique.sql new file mode 100644 index 00000000..bc99e5d4 --- /dev/null +++ b/db-migrations/src/main/resources/db/changelog/sql/set_span_and_trace_unique.sql @@ -0,0 +1,7 @@ +--liquibase formatted sql + +--changeset ivan.vakhrushev:2025.08.31:remove unique from trace_id +alter table otel_demo.storage drop constraint storage_trace_id_key; + +--changeset marina.zharinova:2025.08.31:add constraint on trace_id with span_id +alter table otel_demo.storage add constraint trace_span_unique unique(trace_id, span_id); diff --git a/db-migrations/src/main/resources/db/changelog/sql/storage.sql b/db-migrations/src/main/resources/db/changelog/sql/storage.sql index 1da0d4c9..feeefc8b 100644 --- a/db-migrations/src/main/resources/db/changelog/sql/storage.sql +++ b/db-migrations/src/main/resources/db/changelog/sql/storage.sql @@ -5,7 +5,7 @@ create table if not exists otel_demo.storage ( id bigint generated always as identity, message text not null, - trace_id varchar(64) not null unique, + trace_id text not null unique, created_at timestamptz not null ); diff --git a/spring-boot-3-demo-app-kotlin/src/main/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/config/DbConfig.kt b/spring-boot-3-demo-app-kotlin/src/main/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/config/DbConfig.kt new file mode 100644 index 00000000..f7e4d9d9 --- /dev/null +++ b/spring-boot-3-demo-app-kotlin/src/main/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/config/DbConfig.kt @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2020-2025. Ivan Vakhrushev and others. + * https://github.com/mfvanek/spring-boot-open-telemetry-demo + * + * Licensed under the Apache License 2.0 + */ + +package io.github.mfvanek.spring.boot3.kotlin.test.config + +import io.github.mfvanek.db.migrations.common.saver.DbSaver +import io.micrometer.tracing.Tracer +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.jdbc.core.simple.JdbcClient +import java.time.Clock + +@Configuration(proxyBeanMethods = false) +class DbConfig { + + @Bean + fun dbSaver( + @Value("\${app.tenant.name}") tenantName: String, + tracer: Tracer, + clock: Clock, + jdbcClient: JdbcClient + ): DbSaver { + return DbSaver(tenantName, tracer, clock, jdbcClient) + } +} diff --git a/spring-boot-3-demo-app-kotlin/src/main/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/controllers/TimeController.kt b/spring-boot-3-demo-app-kotlin/src/main/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/controllers/TimeController.kt index a23d5cb7..faaecfe6 100644 --- a/spring-boot-3-demo-app-kotlin/src/main/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/controllers/TimeController.kt +++ b/spring-boot-3-demo-app-kotlin/src/main/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/controllers/TimeController.kt @@ -25,7 +25,7 @@ class TimeController( private val kafkaSendingService: KafkaSendingService, private val publicApiService: PublicApiService ) { - + // http://localhost:8090/current-time @GetMapping(path = ["/current-time"]) fun getNow(): LocalDateTime { logger.trace { "tracer $tracer" } @@ -33,9 +33,14 @@ class TimeController( logger.info { "Called method getNow. TraceId = $traceId" } val nowFromRemote = publicApiService.getZonedTime() val now = nowFromRemote ?: LocalDateTime.now(clock) - kafkaSendingService.sendNotification("Current time = $now") + val message = "Current time = $now" + kafkaSendingService.sendNotification(message) .thenRun { logger.info { "Awaiting acknowledgement from Kafka" } } .get() + + kafkaSendingService.sendNotificationToOtherTopic(message) + .thenRun { logger.info { "Awaiting acknowledgement from Kafka with batch" } } + .get() return now } } diff --git a/spring-boot-3-demo-app-kotlin/src/main/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/service/KafkaReadingService.kt b/spring-boot-3-demo-app-kotlin/src/main/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/service/KafkaReadingService.kt index 05c87676..b0faa81d 100644 --- a/spring-boot-3-demo-app-kotlin/src/main/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/service/KafkaReadingService.kt +++ b/spring-boot-3-demo-app-kotlin/src/main/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/service/KafkaReadingService.kt @@ -7,47 +7,71 @@ package io.github.mfvanek.spring.boot3.kotlin.test.service +import io.github.mfvanek.db.migrations.common.saver.DbSaver import io.github.oshai.kotlinlogging.KotlinLogging -import io.github.oshai.kotlinlogging.withLoggingContext import io.micrometer.tracing.Tracer +import io.micrometer.tracing.propagation.Propagator import org.apache.kafka.clients.consumer.ConsumerRecord -import org.springframework.beans.factory.annotation.Value -import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate import org.springframework.kafka.annotation.KafkaListener import org.springframework.kafka.support.Acknowledgment import org.springframework.stereotype.Service -import java.time.Clock -import java.time.LocalDateTime +import java.nio.charset.StandardCharsets import java.util.* private val logger = KotlinLogging.logger {} +internal object KafkaHeadersGetter : Propagator.Getter> { + override fun get(carrier: ConsumerRecord, key: String): String? = + carrier.headers()?.lastHeader(key)?.value()?.toString(StandardCharsets.UTF_8) +} + @Service class KafkaReadingService( - @Value("\${app.tenant.name}") private val tenantName: String, private val tracer: Tracer, - private val clock: Clock, - private val jdbcTemplate: NamedParameterJdbcTemplate + private val propagator: Propagator, + private val dbSaver: DbSaver ) { @KafkaListener(topics = ["\${spring.kafka.template.default-topic}"]) - fun listen(message: ConsumerRecord, ack: Acknowledgment) { - withLoggingContext("tenant.name" to tenantName) { - processMessage(message) + fun listen(record: ConsumerRecord, ack: Acknowledgment) { + dbSaver.processSingleRecord(record) + ack.acknowledge() + } + + @SuppressWarnings("IllegalCatch", "PMD.AvoidCatchingThrowable") + @KafkaListener( + id = "\${spring.kafka.consumer.additional-groupId}", + topics = ["\${spring.kafka.template.additional-topic}"], + batch = "true" + ) + fun listenAdditional(records: List>, ack: Acknowledgment) { + val batchSpan = tracer.startScopedSpan("batch-processing") + try { + logger.info { + "Received from Kafka ${records.size} records" + } + records.forEach { record -> restoreContextAndProcessSingleRecordIfNeed(record) } ack.acknowledge() + } catch (throwable: Throwable) { + batchSpan.error(throwable) + throw throwable + } finally { + batchSpan.end() } } - private fun processMessage(message: ConsumerRecord) { - val currentSpan = tracer.currentSpan() - val traceId = currentSpan?.context()?.traceId().orEmpty() - logger.info { "Received record: ${message.value()} with traceId $traceId" } - jdbcTemplate.update( - "insert into otel_demo.storage(message, trace_id, created_at) values(:msg, :traceId, :createdAt);", - mapOf( - "msg" to message.value(), - "traceId" to traceId, - "createdAt" to LocalDateTime.now(clock) - ) - ) + @SuppressWarnings("IllegalCatch", "PMD.AvoidCatchingThrowable") + private fun restoreContextAndProcessSingleRecordIfNeed(record: ConsumerRecord) { + val builder = propagator.extract(record, KafkaHeadersGetter) + val spanFromRecord = builder.name("processing-record-from-kafka").start() + try { + tracer.withSpan(spanFromRecord).use { + dbSaver.processSingleRecord(record) + } + } catch (throwable: Throwable) { + spanFromRecord.error(throwable) + throw throwable + } finally { + spanFromRecord.end() + } } } diff --git a/spring-boot-3-demo-app-kotlin/src/main/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/service/KafkaSendingService.kt b/spring-boot-3-demo-app-kotlin/src/main/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/service/KafkaSendingService.kt index 7e965252..88f4289b 100644 --- a/spring-boot-3-demo-app-kotlin/src/main/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/service/KafkaSendingService.kt +++ b/spring-boot-3-demo-app-kotlin/src/main/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/service/KafkaSendingService.kt @@ -21,7 +21,8 @@ private val logger = KotlinLogging.logger {} @Service class KafkaSendingService( @Value("\${app.tenant.name}") private val tenantName: String, - private val kafkaTemplate: KafkaTemplate + private val kafkaTemplate: KafkaTemplate, + @Value("\${spring.kafka.template.additional-topic}") private val additionalTopic: String, ) { fun sendNotification(message: String): CompletableFuture> { withLoggingContext("tenant.name" to tenantName) { @@ -29,4 +30,11 @@ class KafkaSendingService( return kafkaTemplate.sendDefault(UUID.randomUUID(), message) } } + + fun sendNotificationToOtherTopic(message: String): CompletableFuture> { + withLoggingContext("tenant.name" to tenantName) { + logger.info { "Sending message \"$message\" to $additionalTopic of Kafka" } + return kafkaTemplate.send(additionalTopic, UUID.randomUUID(), message) + } + } } diff --git a/spring-boot-3-demo-app-kotlin/src/main/resources/application.yml b/spring-boot-3-demo-app-kotlin/src/main/resources/application.yml index 5ffd1157..fa629b40 100644 --- a/spring-boot-3-demo-app-kotlin/src/main/resources/application.yml +++ b/spring-boot-3-demo-app-kotlin/src/main/resources/application.yml @@ -33,14 +33,20 @@ spring: template: default-topic: open.telemetry.sb3.queue observation-enabled: true # Important!!! + additional-topic: open.telemetry.sb3.queue.additional producer: key-serializer: org.apache.kafka.common.serialization.UUIDSerializer + batch-size: 32KB + properties: + linger.ms: 20 + batch.size: 10000 listener: observation-enabled: true # Important!!! ack-mode: manual_immediate consumer: auto-offset-reset: earliest group-id: ${spring.kafka.template.default-topic}-group + additional-groupId: ${spring.kafka.template.additional-topic}-group client-id: open.telemetry.client bootstrap-servers: localhost:9092 security: diff --git a/spring-boot-3-demo-app-kotlin/src/test/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/controllers/TimeControllerTest.kt b/spring-boot-3-demo-app-kotlin/src/test/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/controllers/TimeControllerTest.kt index 6f0797a6..760be89b 100644 --- a/spring-boot-3-demo-app-kotlin/src/test/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/controllers/TimeControllerTest.kt +++ b/spring-boot-3-demo-app-kotlin/src/test/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/controllers/TimeControllerTest.kt @@ -91,12 +91,15 @@ class TimeControllerTest : TestBase() { assertThat(output.all) .contains("Received record: " + received.value() + " with traceId " + traceId) .contains("\"tenant.name\":\"ru-a1-private\"") - val messageFromDb = namedParameterJdbcTemplate.queryForObject( + val messagesFromDb = namedParameterJdbcTemplate.queryForList( "select message from otel_demo.storage where trace_id = :traceId", mapOf("traceId" to traceId), String::class.java ) - assertThat(messageFromDb).isEqualTo(received.value()) + assertThat(messagesFromDb.size).isEqualTo(2) + messagesFromDb.forEach { + assertThat(it).isEqualTo(received.value()) + } } @Order(2) @@ -179,11 +182,11 @@ class TimeControllerTest : TestBase() { .await() .atMost(10, TimeUnit.SECONDS) .pollInterval(Duration.ofMillis(500L)) - .until { countRecordsInTable() >= 1L } + .until { countRecordsInTable() >= 2L } } } -private fun setUpKafkaConsumer(kafkaProperties: KafkaProperties, consumerRecords: BlockingQueue>): KafkaMessageListenerContainer { +fun setUpKafkaConsumer(kafkaProperties: KafkaProperties, consumerRecords: BlockingQueue>): KafkaMessageListenerContainer { val containerProperties = ContainerProperties(kafkaProperties.template.defaultTopic) val consumerProperties = KafkaTestUtils.consumerProps(KafkaInitializer.getBootstrapSevers(), "test-group", "false") consumerProperties[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SASL_PLAINTEXT" diff --git a/spring-boot-3-demo-app-kotlin/src/test/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/service/KafkaTracingTest.kt b/spring-boot-3-demo-app-kotlin/src/test/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/service/KafkaTracingTest.kt new file mode 100644 index 00000000..57bbdb0d --- /dev/null +++ b/spring-boot-3-demo-app-kotlin/src/test/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/service/KafkaTracingTest.kt @@ -0,0 +1,120 @@ +/* +* Copyright (c) 2020-2025. Ivan Vakhrushev and others. +* https://github.com/mfvanek/spring-boot-open-telemetry-demo +* +* Licensed under the Apache License 2.0 +*/ + +package io.github.mfvanek.spring.boot3.kotlin.test.service + +import com.fasterxml.jackson.databind.ObjectMapper +import com.github.tomakehurst.wiremock.client.WireMock +import io.github.mfvanek.db.migrations.common.saver.DbSaver +import io.github.mfvanek.spring.boot3.kotlin.test.filters.TraceIdInResponseServletFilter.Companion.TRACE_ID_HEADER_NAME +import io.github.mfvanek.spring.boot3.kotlin.test.service.dto.CurrentTime +import io.github.mfvanek.spring.boot3.kotlin.test.service.dto.ParsedDateTime +import io.github.mfvanek.spring.boot3.kotlin.test.service.dto.toParsedDateTime +import io.github.mfvanek.spring.boot3.kotlin.test.support.JaegerInitializer +import io.github.mfvanek.spring.boot3.kotlin.test.support.KafkaInitializer +import io.github.mfvanek.spring.boot3.kotlin.test.support.PostgresInitializer +import io.github.mfvanek.spring.boot3.kotlin.test.support.SpanExporterConfiguration +import io.opentelemetry.api.GlobalOpenTelemetry +import io.opentelemetry.api.trace.StatusCode +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter +import io.opentelemetry.sdk.trace.data.StatusData +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.Test +import org.mockito.ArgumentMatchers +import org.mockito.Mockito +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock +import org.springframework.test.context.ActiveProfiles +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.bean.override.mockito.MockitoBean +import org.springframework.test.web.reactive.server.WebTestClient +import org.springframework.web.util.UriBuilder +import java.time.Clock +import java.time.LocalDateTime +import java.util.* +import java.util.function.Function + +@ActiveProfiles("test") +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@ContextConfiguration( + classes = [SpanExporterConfiguration::class], + initializers = [KafkaInitializer::class, JaegerInitializer::class, PostgresInitializer::class] +) +@AutoConfigureWireMock(port = 0) +internal class KafkaTracingTest { + @Autowired + private lateinit var webTestClient: WebTestClient + + @Autowired + private lateinit var objectMapper: ObjectMapper + + @Autowired + private lateinit var clock: Clock + + @Autowired + private lateinit var spanExporter: InMemorySpanExporter + + @MockitoBean + private lateinit var dbSaver: DbSaver + + companion object { + @JvmStatic + @BeforeAll + fun resetTelemetry() { + GlobalOpenTelemetry.resetForTest() + } + } + + @Test + fun closeAllSpansWhenException() { + val testException: Exception = RuntimeException("saving failed") + Mockito.doThrow( + testException + ).`when`(dbSaver).processSingleRecord(ArgumentMatchers.any>()) + stubOkResponse((LocalDateTime.now(clock).minusDays(1)).toParsedDateTime()) + + val result = webTestClient.get() + .uri( + Function { uriBuilder: UriBuilder? -> + uriBuilder!!.path("current-time") + .build() + } + ) + .exchange() + .expectStatus().isOk() + .expectHeader().exists(TRACE_ID_HEADER_NAME) + .expectBody(LocalDateTime::class.java) + .returnResult() + val traceId = result.responseHeaders.getFirst(TRACE_ID_HEADER_NAME) + val finishedSpans = spanExporter.finishedSpanItems + + assertThat(finishedSpans.map { it.traceId }).contains(traceId) + assertThat(finishedSpans.map { it.status }).contains(StatusData.create(StatusCode.ERROR, "saving failed")) + assertThat(finishedSpans.map { it.name }).contains("processing-record-from-kafka") + } + + private fun stubOkResponse(parsedDateTime: ParsedDateTime): String { + val zoneName = TimeZone.getDefault().id + stubOkResponse(zoneName, parsedDateTime) + return zoneName + } + + private fun stubOkResponse(zoneName: String, parsedDateTime: ParsedDateTime) { + val currentTime = CurrentTime(parsedDateTime) + WireMock.stubFor( + WireMock.get(WireMock.urlPathMatching("/$zoneName")) + .willReturn( + WireMock.aResponse() + .withStatus(200) + .withBody(objectMapper.writeValueAsString(currentTime)) + ) + ) + } +} diff --git a/spring-boot-3-demo-app-kotlin/src/test/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/service/PublicApiServiceTest.kt b/spring-boot-3-demo-app-kotlin/src/test/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/service/PublicApiServiceTest.kt index 2dcf060b..2c8942dc 100644 --- a/spring-boot-3-demo-app-kotlin/src/test/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/service/PublicApiServiceTest.kt +++ b/spring-boot-3-demo-app-kotlin/src/test/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/service/PublicApiServiceTest.kt @@ -99,7 +99,7 @@ class PublicApiServiceTest : TestBase() { } @Test - fun throwsJsonProcessingExceptionWithBdResponse(output: CapturedOutput) { + fun throwsJsonProcessingExceptionWithBadResponse(output: CapturedOutput) { stubBadResponse() Observation.createNotStarted("test", observationRegistry).observe { val result = publicApiService.getZonedTime() diff --git a/spring-boot-3-demo-app-kotlin/src/test/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/support/SpanExporterConfiguration.kt b/spring-boot-3-demo-app-kotlin/src/test/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/support/SpanExporterConfiguration.kt new file mode 100644 index 00000000..3e8ad45a --- /dev/null +++ b/spring-boot-3-demo-app-kotlin/src/test/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/support/SpanExporterConfiguration.kt @@ -0,0 +1,35 @@ +package io.github.mfvanek.spring.boot3.kotlin.test.support + +import io.opentelemetry.sdk.OpenTelemetrySdk +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter +import io.opentelemetry.sdk.trace.SdkTracerProvider +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor +import io.opentelemetry.sdk.trace.export.SpanExporter +import org.springframework.boot.test.context.TestConfiguration +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Primary + +@TestConfiguration +class SpanExporterConfiguration { + @Bean + @Primary + fun spanExporter(): SpanExporter { + return InMemorySpanExporter.create() + } + + @Bean + @Primary + fun tracerProvider(spanExporter: SpanExporter): SdkTracerProvider { + return SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build() + } + + @Bean + @Primary + fun openTelemetrySdk(tracerProvider: SdkTracerProvider): OpenTelemetrySdk { + return OpenTelemetrySdk.builder() + .setTracerProvider(tracerProvider) + .buildAndRegisterGlobal() + } +} diff --git a/spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/config/DbConfig.java b/spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/config/DbConfig.java new file mode 100644 index 00000000..310f2d6a --- /dev/null +++ b/spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/config/DbConfig.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2020-2025. Ivan Vakhrushev and others. + * https://github.com/mfvanek/spring-boot-open-telemetry-demo + * + * Licensed under the Apache License 2.0 + */ + +package io.github.mfvanek.spring.boot3.reactive.config; + +import io.github.mfvanek.db.migrations.common.saver.DbSaver; +import io.micrometer.tracing.Tracer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.simple.JdbcClient; + +import java.time.Clock; + +@Configuration(proxyBeanMethods = false) +public class DbConfig { + + @Bean + public DbSaver dbSaver( + @Value("${app.tenant.name}") String tenantName, + Tracer tracer, + Clock clock, + JdbcClient jdbcClient + ) { + return new DbSaver(tenantName, tracer, clock, jdbcClient); + } +} diff --git a/spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/controllers/TimeController.java b/spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/controllers/TimeController.java index 8217ac43..518cb747 100644 --- a/spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/controllers/TimeController.java +++ b/spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/controllers/TimeController.java @@ -32,22 +32,25 @@ public class TimeController { private final KafkaSendingService kafkaSendingService; private final PublicApiService publicApiService; - // http://localhost:8080/current-time + // http://localhost:8081/current-time @GetMapping(path = "/current-time") public Mono getNow() { log.trace("tracer {}", tracer); - - final String traceId = Optional.ofNullable(tracer.currentSpan()) - .map(Span::context) - .map(TraceContext::traceId) - .orElse(null); - log.info("Called method getNow. TraceId = {}", traceId); - - return publicApiService.getZonedTime() + return Mono.justOrEmpty( + Optional.ofNullable(tracer.currentSpan()) + .map(Span::context) + .map(TraceContext::traceId) + .orElse(null) + ) + .doOnNext(traceId -> log.info("Called method getNow. TraceId = {}", traceId)) + .then(publicApiService.getZonedTime()) .defaultIfEmpty(LocalDateTime.now(clock)) .flatMap(now -> kafkaSendingService.sendNotification("Current time = " + now) .doOnSuccess(v -> log.info("Awaiting acknowledgement from Kafka")) .thenReturn(now) - ); + ) + .flatMap(now -> kafkaSendingService.sendNotificationToOtherTopic("Current time = " + now) + .doOnSuccess(v -> log.info("Awaiting acknowledgement from Kafka with batch")) + .thenReturn(now)); } } diff --git a/spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/filters/TraceIdInResponseReactiveFilter.java b/spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/filters/TraceIdInResponseReactiveFilter.java index 07b50847..92e54eeb 100644 --- a/spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/filters/TraceIdInResponseReactiveFilter.java +++ b/spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/filters/TraceIdInResponseReactiveFilter.java @@ -22,8 +22,8 @@ @RequiredArgsConstructor public class TraceIdInResponseReactiveFilter implements WebFilter { + public static final String TRACE_ID_HEADER_NAME = "X-Trace-Id"; private static final Logger LOGGER = LoggerFactory.getLogger(TraceIdInResponseReactiveFilter.class); - private static final String TRACE_ID_HEADER_NAME = "X-Trace-Id"; @Override public Mono filter(ServerWebExchange exchange, WebFilterChain chain) { diff --git a/spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/service/KafkaReadingService.java b/spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/service/KafkaReadingService.java index 2fddb203..8b3f19ac 100644 --- a/spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/service/KafkaReadingService.java +++ b/spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/service/KafkaReadingService.java @@ -7,21 +7,20 @@ package io.github.mfvanek.spring.boot3.reactive.service; +import io.github.mfvanek.db.migrations.common.saver.DbSaver; +import io.micrometer.tracing.ScopedSpan; import io.micrometer.tracing.Span; import io.micrometer.tracing.Tracer; +import io.micrometer.tracing.propagation.Propagator; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.slf4j.MDC; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service; -import java.time.Clock; -import java.time.LocalDateTime; -import java.util.Map; +import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.UUID; @Slf4j @@ -29,26 +28,51 @@ @RequiredArgsConstructor public class KafkaReadingService { + private static final Propagator.Getter> KAFKA_PROPAGATOR_GETTER = (carrier, key) -> new String(carrier.headers().lastHeader(key).value(), StandardCharsets.UTF_8); + private final Tracer tracer; - private final Clock clock; - private final NamedParameterJdbcTemplate jdbcTemplate; - @Value("${app.tenant.name}") - private String tenantName; + private final Propagator propagator; + private final DbSaver dbSaver; @KafkaListener(topics = "${spring.kafka.template.default-topic}") - public void listen(ConsumerRecord message, Acknowledgment ack) { - try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) { - final Span currentSpan = tracer.currentSpan(); - final String traceId = currentSpan != null ? currentSpan.context().traceId() : ""; - log.info("Received record: {} with traceId {}", message.value(), traceId); - jdbcTemplate.update("insert into otel_demo.storage(message, trace_id, created_at) values(:msg, :traceId, :createdAt);", - Map.ofEntries( - Map.entry("msg", message.value()), - Map.entry("traceId", traceId), - Map.entry("createdAt", LocalDateTime.now(clock)) - ) + public void listen(ConsumerRecord record, Acknowledgment ack) { + dbSaver.processSingleRecord(record); + ack.acknowledge(); + } + + @SuppressWarnings({"IllegalCatch", "PMD.AvoidCatchingThrowable"}) + @KafkaListener( + id = "${spring.kafka.consumer.additional-groupId}", + topics = "${spring.kafka.template.additional-topic}", + batch = "true" + ) + public void listenAdditional(List> records, Acknowledgment ack) { + final ScopedSpan batchSpan = tracer.startScopedSpan("batch-processing"); + try { + log.info( + "Received from Kafka {} records", records.size() ); + records.forEach(this::restoreContextAndProcessSingleRecordIfNeed); ack.acknowledge(); + } catch (Throwable throwable) { + batchSpan.error(throwable); + throw throwable; + } finally { + batchSpan.end(); + } + } + + @SuppressWarnings({"IllegalCatch", "PMD.AvoidCatchingThrowable"}) + private void restoreContextAndProcessSingleRecordIfNeed(ConsumerRecord record) { + final Span.Builder builder = propagator.extract(record, KAFKA_PROPAGATOR_GETTER); + final Span spanFromRecord = builder.name("processing-record-from-kafka").start(); + try (Tracer.SpanInScope ignored = tracer.withSpan(spanFromRecord)) { + dbSaver.processSingleRecord(record); + } catch (Throwable throwable) { + spanFromRecord.error(throwable); + throw throwable; + } finally { + spanFromRecord.end(); } } } diff --git a/spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/service/KafkaSendingService.java b/spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/service/KafkaSendingService.java index c254adb2..9f608187 100644 --- a/spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/service/KafkaSendingService.java +++ b/spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/service/KafkaSendingService.java @@ -28,6 +28,7 @@ public class KafkaSendingService { private final KafkaTemplate kafkaTemplate; @Value("${app.tenant.name}") private String tenantName; + @Value("${spring.kafka.template.additional-topic}") private String additionalTopic; public Mono> sendNotification(@Nonnull final String message) { return Mono.deferContextual(contextView -> { @@ -38,4 +39,14 @@ public Mono> sendNotification(@Nonnull final String mes } }); } + + public Mono> sendNotificationToOtherTopic(@Nonnull final String message) { + return Mono.deferContextual(contextView -> { + try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) { + log.info("Sending message \"{}\" to {} of Kafka", message, additionalTopic); + return Mono.fromFuture(() -> kafkaTemplate.send(additionalTopic, UUID.randomUUID(), message)) + .subscribeOn(Schedulers.boundedElastic()); + } + }); + } } diff --git a/spring-boot-3-demo-app-reactive/src/main/resources/application.yml b/spring-boot-3-demo-app-reactive/src/main/resources/application.yml index 138f52b1..9bf356c6 100644 --- a/spring-boot-3-demo-app-reactive/src/main/resources/application.yml +++ b/spring-boot-3-demo-app-reactive/src/main/resources/application.yml @@ -26,14 +26,19 @@ spring: template: default-topic: open.telemetry.sb3.queue observation-enabled: true # Important!!! + additional-topic: open.telemetry.sb3.queue.additional producer: key-serializer: org.apache.kafka.common.serialization.UUIDSerializer + batch-size: 32KB + properties: + linger.ms: 20 listener: observation-enabled: true # Important!!! ack-mode: manual_immediate consumer: auto-offset-reset: earliest group-id: ${spring.kafka.template.default-topic}-group + additional-groupId: ${spring.kafka.template.additional-topic}-group client-id: open.telemetry.client bootstrap-servers: localhost:9092 security: diff --git a/spring-boot-3-demo-app-reactive/src/test/java/io/github/mfvanek/spring/boot3/reactive/controllers/TimeControllerTest.java b/spring-boot-3-demo-app-reactive/src/test/java/io/github/mfvanek/spring/boot3/reactive/controllers/TimeControllerTest.java index b57913df..5e819d6b 100644 --- a/spring-boot-3-demo-app-reactive/src/test/java/io/github/mfvanek/spring/boot3/reactive/controllers/TimeControllerTest.java +++ b/spring-boot-3-demo-app-reactive/src/test/java/io/github/mfvanek/spring/boot3/reactive/controllers/TimeControllerTest.java @@ -101,10 +101,10 @@ void spanShouldBeReportedInLogs(@Nonnull final CapturedOutput output) throws Int assertThat(output.getAll()) .contains("Received record: " + received.value() + " with traceId " + traceId) .contains("\"tenant.name\":\"ru-a1-private\""); - final String messageFromDb = namedParameterJdbcTemplate.queryForObject("select message from otel_demo.storage where trace_id = :traceId", + final List messagesFromDb = namedParameterJdbcTemplate.queryForList("select message from otel_demo.storage where trace_id = :traceId", Map.of("traceId", traceId), String.class); - assertThat(messageFromDb) - .isEqualTo(received.value()); + assertThat(messagesFromDb.size()).isEqualTo(2); + messagesFromDb.forEach(it -> assertThat(it).isEqualTo(received.value())); } @Order(2) @@ -167,6 +167,6 @@ private void awaitStoringIntoDatabase() { .await() .atMost(10, TimeUnit.SECONDS) .pollInterval(Duration.ofMillis(500L)) - .until(() -> countRecordsInTable() >= 1L); + .until(() -> countRecordsInTable() >= 2L); } } diff --git a/spring-boot-3-demo-app-reactive/src/test/java/io/github/mfvanek/spring/boot3/reactive/service/KafkaTracingTest.java b/spring-boot-3-demo-app-reactive/src/test/java/io/github/mfvanek/spring/boot3/reactive/service/KafkaTracingTest.java new file mode 100644 index 00000000..2d94947f --- /dev/null +++ b/spring-boot-3-demo-app-reactive/src/test/java/io/github/mfvanek/spring/boot3/reactive/service/KafkaTracingTest.java @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2020-2025. Ivan Vakhrushev and others. + * https://github.com/mfvanek/spring-boot-open-telemetry-demo + * + * Licensed under the Apache License 2.0 + */ + +package io.github.mfvanek.spring.boot3.reactive.service; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.github.mfvanek.db.migrations.common.saver.DbSaver; +import io.github.mfvanek.spring.boot3.reactive.service.dto.CurrentTime; +import io.github.mfvanek.spring.boot3.reactive.service.dto.ParsedDateTime; +import io.github.mfvanek.spring.boot3.reactive.support.JaegerInitializer; +import io.github.mfvanek.spring.boot3.reactive.support.KafkaInitializer; +import io.github.mfvanek.spring.boot3.reactive.support.PostgresInitializer; +import io.github.mfvanek.spring.boot3.reactive.support.SpanExporterConfiguration; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.bean.override.mockito.MockitoBean; +import org.springframework.test.web.reactive.server.EntityExchangeResult; +import org.springframework.test.web.reactive.server.WebTestClient; + +import java.time.Clock; +import java.time.LocalDateTime; +import java.util.List; +import java.util.TimeZone; +import javax.annotation.Nonnull; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching; +import static io.github.mfvanek.spring.boot3.reactive.filters.TraceIdInResponseReactiveFilter.TRACE_ID_HEADER_NAME; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; + +@SuppressWarnings("checkstyle:classfanoutcomplexity") +@ActiveProfiles("test") +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@ContextConfiguration( + classes = SpanExporterConfiguration.class, + initializers = {KafkaInitializer.class, JaegerInitializer.class, PostgresInitializer.class} +) +@AutoConfigureWireMock(port = 0) +class KafkaTracingTest { + + @Autowired + private WebTestClient webTestClient; + @Autowired + private ObjectMapper objectMapper; + @Autowired + private Clock clock; + @Autowired + private InMemorySpanExporter spanExporter; + @MockitoBean + private DbSaver dbSaver; + + @BeforeAll + static void resetTelemetry() { + GlobalOpenTelemetry.resetForTest(); + } + + @Test + void closeAllSpansWhenException() { + final Exception testException = new RuntimeException("saving failed"); + doThrow(testException).when(dbSaver).processSingleRecord(any()); + stubOkResponse(ParsedDateTime.from(LocalDateTime.now(clock).minusDays(1))); + + final EntityExchangeResult result = webTestClient.get() + .uri(uriBuilder -> uriBuilder.path("current-time") + .build()) + .exchange() + .expectStatus().isOk() + .expectHeader().exists(TRACE_ID_HEADER_NAME) + .expectBody(LocalDateTime.class) + .returnResult(); + final String traceId = result.getResponseHeaders().getFirst(TRACE_ID_HEADER_NAME); + final List finishedSpans = spanExporter.getFinishedSpanItems(); + + assertThat(finishedSpans.stream().map(SpanData::getTraceId)).contains(traceId); + assertThat(finishedSpans.stream().map(SpanData::getStatus)).contains(StatusData.create(StatusCode.ERROR, "saving failed")); + assertThat(finishedSpans.stream().map(SpanData::getName)).contains("processing-record-from-kafka"); + } + + protected void stubOkResponse(@Nonnull final ParsedDateTime parsedDateTime) { + final String zoneName = TimeZone.getDefault().getID(); + stubOkResponse(zoneName, parsedDateTime); + } + + @SneakyThrows + private void stubOkResponse(@Nonnull final String zoneName, @Nonnull final ParsedDateTime parsedDateTime) { + final CurrentTime currentTime = new CurrentTime(parsedDateTime); + stubFor(get(urlPathMatching("/" + zoneName)) + .willReturn(aResponse() + .withStatus(200) + .withBody(objectMapper.writeValueAsString(currentTime)) + )); + } +} diff --git a/spring-boot-3-demo-app-reactive/src/test/java/io/github/mfvanek/spring/boot3/reactive/service/PublicApiServiceTest.java b/spring-boot-3-demo-app-reactive/src/test/java/io/github/mfvanek/spring/boot3/reactive/service/PublicApiServiceTest.java index 51cb25c7..1684c0ef 100644 --- a/spring-boot-3-demo-app-reactive/src/test/java/io/github/mfvanek/spring/boot3/reactive/service/PublicApiServiceTest.java +++ b/spring-boot-3-demo-app-reactive/src/test/java/io/github/mfvanek/spring/boot3/reactive/service/PublicApiServiceTest.java @@ -98,7 +98,7 @@ void emptyResponseWhen500StatusWithStepVerifier() { } @Test - void emptyResponseWhen200StatusWithBadResposeWithStepVerifier(@Nonnull final CapturedOutput output) { + void emptyResponseWhen200StatusWithBadResponseWithStepVerifier(@Nonnull final CapturedOutput output) { stubOkButNotCorrectResponse(); StepVerifier.create(publicApiService.getZonedTime()) diff --git a/spring-boot-3-demo-app-reactive/src/test/java/io/github/mfvanek/spring/boot3/reactive/support/SpanExporterConfiguration.java b/spring-boot-3-demo-app-reactive/src/test/java/io/github/mfvanek/spring/boot3/reactive/support/SpanExporterConfiguration.java new file mode 100644 index 00000000..34202f53 --- /dev/null +++ b/spring-boot-3-demo-app-reactive/src/test/java/io/github/mfvanek/spring/boot3/reactive/support/SpanExporterConfiguration.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2020-2025. Ivan Vakhrushev and others. + * https://github.com/mfvanek/spring-boot-open-telemetry-demo + * + * Licensed under the Apache License 2.0 + */ + +package io.github.mfvanek.spring.boot3.reactive.support; + +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; + +@TestConfiguration +public class SpanExporterConfiguration { + + @Bean + @Primary + public SpanExporter spanExporter() { + return InMemorySpanExporter.create(); + } + + @Bean + @Primary + public SdkTracerProvider tracerProvider(SpanExporter spanExporter) { + return SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build(); + } + + @Bean + @Primary + public OpenTelemetrySdk openTelemetrySdk(SdkTracerProvider tracerProvider) { + return OpenTelemetrySdk.builder() + .setTracerProvider(tracerProvider) + .buildAndRegisterGlobal(); + } +} diff --git a/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/config/DbConfig.java b/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/config/DbConfig.java new file mode 100644 index 00000000..d4a40d9f --- /dev/null +++ b/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/config/DbConfig.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2020-2025. Ivan Vakhrushev and others. + * https://github.com/mfvanek/spring-boot-open-telemetry-demo + * + * Licensed under the Apache License 2.0 + */ + +package io.github.mfvanek.spring.boot3.test.config; + +import io.github.mfvanek.db.migrations.common.saver.DbSaver; +import io.micrometer.tracing.Tracer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.simple.JdbcClient; + +import java.time.Clock; + +@Configuration(proxyBeanMethods = false) +public class DbConfig { + + @Bean + public DbSaver dbSaver( + @Value("${app.tenant.name}") String tenantName, + Tracer tracer, + Clock clock, + JdbcClient jdbcClient + ) { + return new DbSaver(tenantName, tracer, clock, jdbcClient); + } +} diff --git a/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/controllers/TimeController.java b/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/controllers/TimeController.java index 19df50de..b755a3ad 100644 --- a/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/controllers/TimeController.java +++ b/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/controllers/TimeController.java @@ -47,6 +47,9 @@ public LocalDateTime getNow() { kafkaSendingService.sendNotification("Current time = " + now) .thenRun(() -> log.info("Awaiting acknowledgement from Kafka")) .get(); + kafkaSendingService.sendNotificationToOtherTopic("Current time = " + now) + .thenRun(() -> log.info("Awaiting acknowledgement from Kafka with batch")) + .get(); return now; } } diff --git a/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/service/KafkaReadingService.java b/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/service/KafkaReadingService.java index 8dadc668..ef3a78c2 100644 --- a/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/service/KafkaReadingService.java +++ b/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/service/KafkaReadingService.java @@ -7,21 +7,20 @@ package io.github.mfvanek.spring.boot3.test.service; +import io.github.mfvanek.db.migrations.common.saver.DbSaver; +import io.micrometer.tracing.ScopedSpan; import io.micrometer.tracing.Span; import io.micrometer.tracing.Tracer; +import io.micrometer.tracing.propagation.Propagator; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.slf4j.MDC; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service; -import java.time.Clock; -import java.time.LocalDateTime; -import java.util.Map; +import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.UUID; @Slf4j @@ -29,26 +28,51 @@ @RequiredArgsConstructor public class KafkaReadingService { - @Value("${app.tenant.name}") - private String tenantName; + private static final Propagator.Getter> KAFKA_PROPAGATOR_GETTER = (carrier, key) -> new String(carrier.headers().lastHeader(key).value(), StandardCharsets.UTF_8); + private final Tracer tracer; - private final Clock clock; - private final NamedParameterJdbcTemplate jdbcTemplate; + private final Propagator propagator; + private final DbSaver dbSaver; @KafkaListener(topics = "${spring.kafka.template.default-topic}") - public void listen(ConsumerRecord message, Acknowledgment ack) { - try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) { - final Span currentSpan = tracer.currentSpan(); - final String traceId = currentSpan != null ? currentSpan.context().traceId() : ""; - log.info("Received record: {} with traceId {}", message.value(), traceId); - jdbcTemplate.update("insert into otel_demo.storage(message, trace_id, created_at) values(:msg, :traceId, :createdAt);", - Map.ofEntries( - Map.entry("msg", message.value()), - Map.entry("traceId", traceId), - Map.entry("createdAt", LocalDateTime.now(clock)) - ) + public void listen(ConsumerRecord record, Acknowledgment ack) { + dbSaver.processSingleRecord(record); + ack.acknowledge(); + } + + @SuppressWarnings({"IllegalCatch", "PMD.AvoidCatchingThrowable"}) + @KafkaListener( + id = "${spring.kafka.consumer.additional-groupId}", + topics = "${spring.kafka.template.additional-topic}", + batch = "true" + ) + public void listenAdditional(List> records, Acknowledgment ack) { + final ScopedSpan batchSpan = tracer.startScopedSpan("batch-processing"); + try { + log.info( + "Received from Kafka {} records", records.size() ); + records.forEach(this::restoreContextAndProcessSingleRecordIfNeed); ack.acknowledge(); + } catch (Throwable throwable) { + batchSpan.error(throwable); + throw throwable; + } finally { + batchSpan.end(); + } + } + + @SuppressWarnings({"IllegalCatch", "PMD.AvoidCatchingThrowable"}) + private void restoreContextAndProcessSingleRecordIfNeed(ConsumerRecord record) { + final Span.Builder builder = propagator.extract(record, KAFKA_PROPAGATOR_GETTER); + final Span spanFromRecord = builder.name("processing-record-from-kafka").start(); + try (Tracer.SpanInScope ignored = tracer.withSpan(spanFromRecord)) { + dbSaver.processSingleRecord(record); + } catch (Throwable throwable) { + spanFromRecord.error(throwable); + throw throwable; + } finally { + spanFromRecord.end(); } } } diff --git a/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/service/KafkaSendingService.java b/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/service/KafkaSendingService.java index 5076980b..a7ad5b62 100644 --- a/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/service/KafkaSendingService.java +++ b/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/service/KafkaSendingService.java @@ -27,6 +27,7 @@ public class KafkaSendingService { @Value("${app.tenant.name}") private String tenantName; private final KafkaTemplate kafkaTemplate; + @Value("${spring.kafka.template.additional-topic}") private String additionalTopic; public CompletableFuture> sendNotification(@Nonnull final String message) { try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) { @@ -34,4 +35,11 @@ public CompletableFuture> sendNotification(@Nonnull fin return kafkaTemplate.sendDefault(UUID.randomUUID(), message); } } + + public CompletableFuture> sendNotificationToOtherTopic(@Nonnull final String message) { + try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) { + log.info("Sending message \"{}\" to \"{}\" of Kafka", message, additionalTopic); + return kafkaTemplate.send(additionalTopic, UUID.randomUUID(), message); + } + } } diff --git a/spring-boot-3-demo-app/src/main/resources/application.yml b/spring-boot-3-demo-app/src/main/resources/application.yml index d94fd72b..49877a5c 100644 --- a/spring-boot-3-demo-app/src/main/resources/application.yml +++ b/spring-boot-3-demo-app/src/main/resources/application.yml @@ -33,14 +33,19 @@ spring: template: default-topic: open.telemetry.sb3.queue observation-enabled: true # Important!!! + additional-topic: open.telemetry.sb3.queue.additional producer: key-serializer: org.apache.kafka.common.serialization.UUIDSerializer + batch-size: 32KB + properties: + linger.ms: 20 listener: observation-enabled: true # Important!!! ack-mode: manual_immediate consumer: auto-offset-reset: earliest group-id: ${spring.kafka.template.default-topic}-group + additional-groupId: ${spring.kafka.template.additional-topic}-group client-id: open.telemetry.client bootstrap-servers: localhost:9092 security: diff --git a/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/controllers/HomeControllerTest.java b/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/controllers/HomeControllerTest.java new file mode 100644 index 00000000..d591c3a4 --- /dev/null +++ b/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/controllers/HomeControllerTest.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2020-2025. Ivan Vakhrushev and others. + * https://github.com/mfvanek/spring-boot-open-telemetry-demo + * + * Licensed under the Apache License 2.0 + */ + +package io.github.mfvanek.spring.boot3.test.controllers; + +import io.github.mfvanek.spring.boot3.test.support.TestBase; +import org.junit.jupiter.api.Test; + +import static io.github.mfvanek.spring.boot3.test.filters.TraceIdInResponseServletFilter.TRACE_ID_HEADER_NAME; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +class HomeControllerTest extends TestBase { + + @Test + void homeControllerShouldWork() { + final String result = webTestClient.get() + .uri("/") + .exchange() + .expectStatus().isEqualTo(200) + .expectHeader().exists(TRACE_ID_HEADER_NAME) + .expectBody(String.class) + .returnResult() + .getResponseBody(); + assertThat(result).isEqualTo("Hello!"); + } +} diff --git a/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/controllers/TimeControllerTest.java b/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/controllers/TimeControllerTest.java index 374794d4..b26b5ce9 100644 --- a/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/controllers/TimeControllerTest.java +++ b/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/controllers/TimeControllerTest.java @@ -48,8 +48,8 @@ @TestInstance(TestInstance.Lifecycle.PER_CLASS) class TimeControllerTest extends TestBase { - private KafkaMessageListenerContainer container; private final BlockingQueue> consumerRecords = new ArrayBlockingQueue<>(4); + private KafkaMessageListenerContainer container; @Autowired private KafkaProperties kafkaProperties; @@ -102,10 +102,10 @@ void spanShouldBeReportedInLogs(@Nonnull final CapturedOutput output) throws Int assertThat(output.getAll()) .contains("Received record: " + received.value() + " with traceId " + traceId) .contains("\"tenant.name\":\"ru-a1-private\""); - final String messageFromDb = namedParameterJdbcTemplate.queryForObject("select message from otel_demo.storage where trace_id = :traceId", + final List messagesFromDb = namedParameterJdbcTemplate.queryForList("select message from otel_demo.storage where trace_id = :traceId", Map.of("traceId", traceId), String.class); - assertThat(messageFromDb) - .isEqualTo(received.value()); + assertThat(messagesFromDb.size()).isEqualTo(2); + messagesFromDb.forEach(it -> assertThat(it).isEqualTo(received.value())); } @Order(2) @@ -168,6 +168,6 @@ private void awaitStoringIntoDatabase() { .await() .atMost(10, TimeUnit.SECONDS) .pollInterval(Duration.ofMillis(500L)) - .until(() -> countRecordsInTable() >= 1L); + .until(() -> countRecordsInTable() >= 2L); } } diff --git a/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/service/KafkaTracingTest.java b/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/service/KafkaTracingTest.java new file mode 100644 index 00000000..c5355f73 --- /dev/null +++ b/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/service/KafkaTracingTest.java @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2020-2025. Ivan Vakhrushev and others. + * https://github.com/mfvanek/spring-boot-open-telemetry-demo + * + * Licensed under the Apache License 2.0 + */ + +package io.github.mfvanek.spring.boot3.test.service; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.github.mfvanek.db.migrations.common.saver.DbSaver; +import io.github.mfvanek.spring.boot3.test.service.dto.CurrentTime; +import io.github.mfvanek.spring.boot3.test.service.dto.ParsedDateTime; +import io.github.mfvanek.spring.boot3.test.support.JaegerInitializer; +import io.github.mfvanek.spring.boot3.test.support.KafkaInitializer; +import io.github.mfvanek.spring.boot3.test.support.PostgresInitializer; +import io.github.mfvanek.spring.boot3.test.support.SpanExporterConfiguration; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.bean.override.mockito.MockitoBean; +import org.springframework.test.web.reactive.server.EntityExchangeResult; +import org.springframework.test.web.reactive.server.WebTestClient; + +import java.time.Clock; +import java.time.LocalDateTime; +import java.util.List; +import java.util.TimeZone; +import javax.annotation.Nonnull; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching; +import static io.github.mfvanek.spring.boot3.test.filters.TraceIdInResponseServletFilter.TRACE_ID_HEADER_NAME; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; + +@SuppressWarnings("checkstyle:classfanoutcomplexity") +@ActiveProfiles("test") +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@ContextConfiguration( + classes = SpanExporterConfiguration.class, + initializers = {KafkaInitializer.class, JaegerInitializer.class, PostgresInitializer.class} +) +@AutoConfigureWireMock(port = 0) +class KafkaTracingTest { + + @Autowired + protected WebTestClient webTestClient; + @Autowired + protected ObjectMapper objectMapper; + @Autowired + protected Clock clock; + @Autowired + private InMemorySpanExporter spanExporter; + @MockitoBean + private DbSaver dbSaver; + + @BeforeAll + static void resetTelemetry() { + GlobalOpenTelemetry.resetForTest(); + } + + @Test + void closeAllSpansWhenException() { + final Exception testException = new RuntimeException("saving failed"); + doThrow(testException).when(dbSaver).processSingleRecord(any()); + stubOkResponse(ParsedDateTime.from(LocalDateTime.now(clock).minusDays(1))); + + final EntityExchangeResult result = webTestClient.get() + .uri(uriBuilder -> uriBuilder.path("current-time") + .build()) + .exchange() + .expectStatus().isOk() + .expectHeader().exists(TRACE_ID_HEADER_NAME) + .expectBody(LocalDateTime.class) + .returnResult(); + final String traceId = result.getResponseHeaders().getFirst(TRACE_ID_HEADER_NAME); + final List finishedSpans = spanExporter.getFinishedSpanItems(); + + assertThat(finishedSpans.stream().map(SpanData::getTraceId)).contains(traceId); + assertThat(finishedSpans.stream().map(SpanData::getStatus)).contains(StatusData.create(StatusCode.ERROR, "saving failed")); + assertThat(finishedSpans.stream().map(SpanData::getName)).contains("processing-record-from-kafka"); + } + + protected void stubOkResponse(@Nonnull final ParsedDateTime parsedDateTime) { + final String zoneName = TimeZone.getDefault().getID(); + stubOkResponse(zoneName, parsedDateTime); + } + + @SneakyThrows + private void stubOkResponse(@Nonnull final String zoneName, @Nonnull final ParsedDateTime parsedDateTime) { + final CurrentTime currentTime = new CurrentTime(parsedDateTime); + stubFor(get(urlPathMatching("/" + zoneName)) + .willReturn(aResponse() + .withStatus(200) + .withBody(objectMapper.writeValueAsString(currentTime)) + )); + } +} diff --git a/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/service/PublicApiServiceTest.java b/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/service/PublicApiServiceTest.java index b6df16f9..d5a5b64f 100644 --- a/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/service/PublicApiServiceTest.java +++ b/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/service/PublicApiServiceTest.java @@ -11,6 +11,7 @@ import io.github.mfvanek.spring.boot3.test.support.TestBase; import io.micrometer.observation.Observation; import io.micrometer.observation.ObservationRegistry; +import io.micrometer.tracing.Span; import io.micrometer.tracing.Tracer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -84,4 +85,19 @@ void retriesOnceToGetZonedTime(@Nonnull final CapturedOutput output) { verify(2, getRequestedFor(urlPathMatching("/" + zoneName))); } + + @Test + void throwsJsonProcessingExceptionWithBadResponse(CapturedOutput output) { + final String zoneName = stubBadResponse(); + Observation.createNotStarted("test", observationRegistry).observe(() -> { + final LocalDateTime result = publicApiService.getZonedTime(); + final Span currentSpan = tracer.currentSpan(); + + assert currentSpan != null; + assertThat(result).isNull(); + assertThat(currentSpan.context().traceId()).isNotNull(); + assertThat(output.getAll()).contains("Failed to convert response"); + }); + verify(1, getRequestedFor(urlPathMatching("/" + zoneName))); + } } diff --git a/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/support/KafkaConsumerUtils.java b/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/support/KafkaConsumerUtils.java index a87fada4..f4c896fe 100644 --- a/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/support/KafkaConsumerUtils.java +++ b/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/support/KafkaConsumerUtils.java @@ -31,7 +31,7 @@ public class KafkaConsumerUtils { public KafkaMessageListenerContainer setUpKafkaConsumer( @Nonnull final KafkaProperties kafkaProperties, @Nonnull final BlockingQueue> consumerRecords) { - final var containerProperties = new ContainerProperties(kafkaProperties.getTemplate().getDefaultTopic()); + final var containerProperties = new ContainerProperties(kafkaProperties.getTemplate().getDefaultTopic(), "open.telemetry.sb3.queue.additional"); final Map consumerProperties = KafkaTestUtils.consumerProps(KafkaInitializer.getBootstrapSevers(), "test-group", "false"); consumerProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); consumerProperties.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); @@ -41,7 +41,7 @@ public KafkaMessageListenerContainer setUpKafkaConsumer( final var container = new KafkaMessageListenerContainer<>(consumer, containerProperties); container.setupMessageListener((MessageListener) consumerRecords::add); container.start(); - ContainerTestUtils.waitForAssignment(container, 1); + ContainerTestUtils.waitForAssignment(container, 2); return container; } } diff --git a/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/support/SpanExporterConfiguration.java b/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/support/SpanExporterConfiguration.java new file mode 100644 index 00000000..9e173356 --- /dev/null +++ b/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/support/SpanExporterConfiguration.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2020-2025. Ivan Vakhrushev and others. + * https://github.com/mfvanek/spring-boot-open-telemetry-demo + * + * Licensed under the Apache License 2.0 + */ + +package io.github.mfvanek.spring.boot3.test.support; + +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; + +@TestConfiguration +public class SpanExporterConfiguration { + + @Bean + @Primary + public SpanExporter spanExporter() { + return InMemorySpanExporter.create(); + } + + @Bean + @Primary + public SdkTracerProvider tracerProvider(SpanExporter spanExporter) { + return SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build(); + } + + @Bean + @Primary + public OpenTelemetrySdk openTelemetrySdk(SdkTracerProvider tracerProvider) { + return OpenTelemetrySdk.builder() + .setTracerProvider(tracerProvider) + .buildAndRegisterGlobal(); + } +} diff --git a/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/support/TestBase.java b/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/support/TestBase.java index e5b257da..4b0b649b 100644 --- a/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/support/TestBase.java +++ b/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/support/TestBase.java @@ -92,6 +92,12 @@ protected String stubErrorResponse() { return zoneName; } + protected String stubBadResponse() { + final String zoneName = TimeZone.getDefault().getID(); + stubBadResponse(zoneName); + return zoneName; + } + @SneakyThrows private void stubErrorResponse(@Nonnull final String zoneName, @Nonnull final RuntimeException errorForResponse) { stubFor(get(urlPathMatching("/" + zoneName)) @@ -101,6 +107,18 @@ private void stubErrorResponse(@Nonnull final String zoneName, @Nonnull final Ru )); } + @SneakyThrows + private void stubBadResponse(String zoneName) { + stubFor( + get(urlPathMatching("/" + zoneName)) + .willReturn( + aResponse() + .withStatus(200) + .withBody(objectMapper.writeValueAsString("Bad response")) + ) + ); + } + @TestConfiguration static class CustomClockConfiguration {