Skip to content

Commit fce4531

Browse files
author
m.zharinova
committed
review fixes
1 parent 27e558b commit fce4531

File tree

14 files changed

+45
-157
lines changed

14 files changed

+45
-157
lines changed

buildSrc/src/main/kotlin/sb-ot-demo.java-compile.gradle.kts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ dependencies {
2424
}
2525

2626
java {
27-
sourceCompatibility = JavaVersion.VERSION_21
28-
targetCompatibility = JavaVersion.VERSION_21
27+
sourceCompatibility = JavaVersion.VERSION_17
28+
targetCompatibility = JavaVersion.VERSION_17
2929
withSourcesJar()
3030
}
3131

buildSrc/src/main/kotlin/sb-ot-demo.kotlin-conventions.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ dependencies {
3333
tasks.withType<KotlinCompile> {
3434
compilerOptions {
3535
freeCompilerArgs.add("-Xjsr305=strict")
36-
jvmTarget = JvmTarget.JVM_21
36+
jvmTarget = JvmTarget.JVM_17
3737
}
3838
}
3939

config/checkstyle/checkstyle.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
<!-- <module name="FinalParameters"/> disabled here. Controlled by PMD.AvoidReassigningParameters -->
5757
<!-- <module name="HiddenField"/> invalid -->
5858
<!-- <module name="HideUtilityClassConstructor"/> invalid -->
59-
<!--<module name="IllegalCatch"/> -->
59+
<module name="IllegalCatch"/>
6060
<module name="IllegalIdentifierName"/>
6161
<module name="IllegalInstantiation"/>
6262
<module name="IllegalThrows"/>
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
--liquibase formatted sql
22

33
--changeset marina.zharinova:2025.08.31:add span column
4-
alter table otel_demo.storage add column span_id varchar(64);
4+
alter table otel_demo.storage add column span_id text;
55

66
--changeset marina.zharinova:2025.08.31:comment on span_id
77
comment on column otel_demo.storage.span_id is 'SpanId of operation';

db-migrations/src/main/resources/db/changelog/sql/storage.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ create table if not exists otel_demo.storage
55
(
66
id bigint generated always as identity,
77
message text not null,
8-
trace_id varchar(64) not null unique,
8+
trace_id text not null unique,
99
created_at timestamptz not null
1010
);
1111

spring-boot-3-demo-app-kotlin/src/main/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/service/KafkaReadingService.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import io.github.oshai.kotlinlogging.KotlinLogging
1212
import io.micrometer.tracing.Tracer
1313
import io.micrometer.tracing.propagation.Propagator
1414
import org.apache.kafka.clients.consumer.ConsumerRecord
15+
import org.springframework.dao.DataAccessException
16+
import org.springframework.kafka.KafkaException
1517
import org.springframework.kafka.annotation.KafkaListener
1618
import org.springframework.kafka.support.Acknowledgment
1719
import org.springframework.stereotype.Service
@@ -44,14 +46,13 @@ class KafkaReadingService(
4446
)
4547
fun listenAdditional(records: List<ConsumerRecord<UUID, String>>, ack: Acknowledgment) {
4648
val batchSpan = tracer.startScopedSpan("batch-processing")
47-
logger.info { "current span: ${tracer.currentSpan()}" }
4849
try {
4950
logger.info {
5051
"Received from Kafka ${records.size} records"
5152
}
5253
records.forEach { record -> restoreContextAndProcessSingleRecordIfNeed(record) }
5354
ack.acknowledge()
54-
} catch (e: Throwable) {
55+
} catch (e: KafkaException) {
5556
batchSpan.error(e)
5657
throw e
5758
} finally {
@@ -66,7 +67,7 @@ class KafkaReadingService(
6667
tracer.withSpan(spanFromRecord).use {
6768
dbSaver.processSingleRecord(record)
6869
}
69-
} catch (e: Throwable) {
70+
} catch (e: DataAccessException) {
7071
spanFromRecord.error(e)
7172
throw e
7273
} finally {

spring-boot-3-demo-app-kotlin/src/main/resources/application.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,6 @@ spring:
5454
sasl:
5555
mechanism: PLAIN
5656
jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="${demo.kafka.opentelemetry.username}" password="${demo.kafka.opentelemetry.password}";
57-
opentelemetry:
58-
additional-topic: open.telemetry.sb3.queue.additional
59-
additional-consumer-groupId: open.telemetry.sb3.queue.additional-group
6057
jdbc:
6158
template:
6259
query-timeout: 1s

spring-boot-3-demo-app-kotlin/src/test/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/controllers/TimeControllerTest.kt

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,15 @@ class TimeControllerTest : TestBase() {
9191
assertThat(output.all)
9292
.contains("Received record: " + received.value() + " with traceId " + traceId)
9393
.contains("\"tenant.name\":\"ru-a1-private\"")
94-
val tracesFromDb = namedParameterJdbcTemplate
95-
.query(
96-
"select trace_id from otel_demo.storage where message like :message",
97-
mapOf("message" to received.value())
98-
) { rs, _ ->
99-
rs.getString("trace_id")
100-
}
101-
assertThat(tracesFromDb.size).isEqualTo(2)
102-
assertThat(tracesFromDb.stream().filter { it == traceId }).hasSize(2)
94+
val messageFromDb = namedParameterJdbcTemplate.queryForList(
95+
"select message from otel_demo.storage where trace_id = :traceId",
96+
mapOf("traceId" to traceId),
97+
String::class.java
98+
)
99+
messageFromDb.forEach {
100+
assertThat(it).isNotNull()
101+
assertThat(it).isEqualTo(received.value())
102+
}
103103
}
104104

105105
@Order(2)

spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/controllers/TimeController.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,13 @@ public class TimeController {
3535
// http://localhost:8081/current-time
3636
@GetMapping(path = "/current-time")
3737
public Mono<LocalDateTime> getNow() {
38-
return Mono.just(tracer)
39-
.map(tracer -> {
40-
log.trace("tracer {}", tracer);
41-
return Optional.ofNullable(tracer.currentSpan())
38+
log.trace("tracer {}", tracer);
39+
return Mono.justOrEmpty(
40+
Optional.ofNullable(tracer.currentSpan())
4241
.map(Span::context)
4342
.map(TraceContext::traceId)
44-
.orElse(null);
45-
})
43+
.orElse(null)
44+
)
4645
.doOnNext(traceId -> log.info("Called method getNow. TraceId = {}", traceId))
4746
.then(publicApiService.getZonedTime())
4847
.defaultIfEmpty(LocalDateTime.now(clock))

spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/service/KafkaReadingService.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import lombok.RequiredArgsConstructor;
1616
import lombok.extern.slf4j.Slf4j;
1717
import org.apache.kafka.clients.consumer.ConsumerRecord;
18+
import org.springframework.dao.DataAccessException;
19+
import org.springframework.kafka.KafkaException;
1820
import org.springframework.kafka.annotation.KafkaListener;
1921
import org.springframework.kafka.support.Acknowledgment;
2022
import org.springframework.stereotype.Service;
@@ -47,14 +49,13 @@ public void listen(ConsumerRecord<UUID, String> record, Acknowledgment ack) {
4749
)
4850
public void listenAdditional(List<ConsumerRecord<UUID, String>> records, Acknowledgment ack) {
4951
final ScopedSpan batchSpan = tracer.startScopedSpan("batch-processing");
50-
log.info("current span: {}", tracer.currentSpan());
5152
try {
5253
log.info(
5354
"Received from Kafka {} records", records.size()
5455
);
5556
records.forEach(this::restoreContextAndProcessSingleRecordIfNeed);
5657
ack.acknowledge();
57-
} catch (Exception e) {
58+
} catch (KafkaException e) {
5859
batchSpan.error(e);
5960
throw e;
6061
} finally {
@@ -67,7 +68,7 @@ private void restoreContextAndProcessSingleRecordIfNeed(ConsumerRecord<UUID, Str
6768
final Span spanFromRecord = builder.name("processing-record-from-kafka").start();
6869
try (Tracer.SpanInScope ignored = tracer.withSpan(spanFromRecord)) {
6970
dbSaver.processSingleRecord(record);
70-
} catch (Exception e) {
71+
} catch (DataAccessException e) {
7172
spanFromRecord.error(e);
7273
throw e;
7374
} finally {

0 commit comments

Comments
 (0)