Skip to content

Commit 6061fc4

Browse files
author
m.zharinova
committed
done with Mono.fromCallable()
1 parent 8f8e4ed commit 6061fc4

File tree

13 files changed

+80
-70
lines changed

13 files changed

+80
-70
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
kotlin version: 2.0.21
2+
error message: The daemon has terminated unexpectedly on startup attempt #1 with error code: 0. The daemon process output:
3+
1. Kotlin compile daemon is ready
4+

settings.gradle.kts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
plugins {
2-
id("org.gradle.toolchains.foojay-resolver-convention") version "0.8.0"
3-
}
41
rootProject.name = "spring-boot-open-telemetry-demo"
52

63
include("spring-boot-3-demo-app")

spring-boot-3-demo-app-kotlin/src/test/kotlin/io/github/mfvanek/spring/boot3/kotlin/test/controllers/TimeControllerTest.kt

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,6 @@ import io.github.mfvanek.spring.boot3.kotlin.test.filters.TraceIdInResponseServl
44
import io.github.mfvanek.spring.boot3.kotlin.test.service.dto.toParsedDateTime
55
import io.github.mfvanek.spring.boot3.kotlin.test.support.KafkaInitializer
66
import io.github.mfvanek.spring.boot3.kotlin.test.support.TestBase
7-
import java.nio.charset.StandardCharsets
8-
import java.time.Duration
9-
import java.time.LocalDateTime
10-
import java.time.temporal.ChronoUnit
11-
import java.util.Locale
12-
import java.util.UUID
13-
import java.util.concurrent.BlockingQueue
14-
import java.util.concurrent.LinkedBlockingQueue
15-
import java.util.concurrent.TimeUnit
167
import org.apache.kafka.clients.CommonClientConfigs
178
import org.apache.kafka.clients.consumer.ConsumerConfig
189
import org.apache.kafka.clients.consumer.ConsumerRecord
@@ -38,12 +29,20 @@ import org.springframework.kafka.listener.MessageListener
3829
import org.springframework.kafka.test.utils.ContainerTestUtils
3930
import org.springframework.kafka.test.utils.KafkaTestUtils
4031
import org.testcontainers.shaded.org.awaitility.Awaitility
32+
import java.nio.charset.StandardCharsets
33+
import java.time.Duration
34+
import java.time.LocalDateTime
35+
import java.time.temporal.ChronoUnit
36+
import java.util.*
37+
import java.util.concurrent.ArrayBlockingQueue
38+
import java.util.concurrent.BlockingQueue
39+
import java.util.concurrent.TimeUnit
4140

4241
@ExtendWith(OutputCaptureExtension::class)
4342
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
4443
class TimeControllerTest : TestBase() {
4544
private lateinit var container: KafkaMessageListenerContainer<UUID, String>
46-
private val consumerRecords = LinkedBlockingQueue<ConsumerRecord<UUID, String>>()
45+
private val consumerRecords = ArrayBlockingQueue<ConsumerRecord<UUID, String>>(4)
4746

4847
@Autowired
4948
private lateinit var kafkaProperties: KafkaProperties

spring-boot-3-demo-app-reactive/build.gradle.kts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@ dependencies {
2727
implementation("org.springframework.boot:spring-boot-starter-jdbc")
2828
implementation("org.postgresql:postgresql")
2929
implementation("com.zaxxer:HikariCP")
30-
implementation(project(":db-migrations")) {
31-
exclude(group = "io.gitlab.arturbosch.detekt")
32-
}
30+
implementation(project(":db-migrations"))
3331
implementation("org.liquibase:liquibase-core")
3432
implementation("com.github.blagerweij:liquibase-sessionlock")
3533
implementation(libs.datasource.micrometer)
@@ -43,8 +41,9 @@ dependencies {
4341
testImplementation("org.awaitility:awaitility")
4442
testImplementation("io.github.mfvanek:pg-index-health-test-starter")
4543
testImplementation("org.springframework.cloud:spring-cloud-starter-contract-stub-runner")
46-
testImplementation("io.projectreactor:reactor-test:3.8.0-M3")
44+
testImplementation("io.projectreactor:reactor-test")
4745
}
46+
4847
tasks {
4948
jacocoTestCoverageVerification {
5049
dependsOn(jacocoTestReport)

spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/config/OpenTelemetryConfig.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,31 +7,26 @@
77

88
package io.github.mfvanek.spring.boot3.reactive.config;
99

10-
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
11-
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder;
10+
import io.opentelemetry.sdk.common.export.RetryPolicy;
11+
import org.springframework.beans.factory.annotation.Value;
12+
import org.springframework.boot.actuate.autoconfigure.tracing.otlp.OtlpGrpcSpanExporterBuilderCustomizer;
1213
import org.springframework.boot.actuate.autoconfigure.tracing.otlp.OtlpTracingAutoConfiguration;
13-
import org.springframework.boot.actuate.autoconfigure.tracing.otlp.OtlpTracingProperties;
1414
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
15-
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
1615
import org.springframework.context.annotation.Bean;
1716
import org.springframework.context.annotation.Configuration;
1817

19-
import java.util.Locale;
20-
import javax.annotation.Nonnull;
21-
2218
@AutoConfigureBefore(OtlpTracingAutoConfiguration.class)
2319
@Configuration(proxyBeanMethods = false)
2420
class OpenTelemetryConfig {
2521

2622
@Bean
27-
@ConditionalOnMissingBean(OtlpGrpcSpanExporter.class)
28-
OtlpGrpcSpanExporter otelJaegerGrpcSpanExporter(@Nonnull final OtlpTracingProperties otlpProperties) {
29-
final OtlpGrpcSpanExporterBuilder builder = OtlpGrpcSpanExporter.builder()
30-
.setEndpoint(otlpProperties.getEndpoint())
31-
.setTimeout(otlpProperties.getTimeout())
32-
.setConnectTimeout(otlpProperties.getConnectTimeout())
33-
.setCompression(String.valueOf(otlpProperties.getCompression()).toLowerCase(Locale.ROOT));
34-
otlpProperties.getHeaders().forEach(builder::addHeader);
35-
return builder.build();
23+
OtlpGrpcSpanExporterBuilderCustomizer otelJaegerGrpcSpanExporterBuilderCustomizer(
24+
@Value("${management.otlp.tracing.retry.max-attempts:2}") int maxAttempts
25+
) {
26+
return builder -> builder.setRetryPolicy(
27+
RetryPolicy.builder()
28+
.setMaxAttempts(maxAttempts)
29+
.build()
30+
);
3631
}
3732
}

spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/controllers/TimeController.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.time.Clock;
2222
import java.time.LocalDateTime;
2323
import java.util.Optional;
24-
import java.util.concurrent.ExecutionException;
2524

2625
@Slf4j
2726
@RestController
@@ -44,16 +43,13 @@ public Mono<LocalDateTime> getNow() {
4443
log.info("Called method getNow. TraceId = {}", traceId);
4544
return publicApiService.getZonedTime()
4645
.switchIfEmpty(Mono.just(LocalDateTime.now(clock)))
47-
.doOnSuccess(this::sendWithKafka);
46+
.doOnNext(this::sendWithKafka);
4847
}
4948

5049
private void sendWithKafka(LocalDateTime localDateTime) {
51-
try {
52-
kafkaSendingService.sendNotification("Current time = " + localDateTime)
53-
.thenRun(() -> log.info("Awaiting acknowledgement from Kafka"))
54-
.get();
55-
} catch (InterruptedException | ExecutionException e) {
56-
log.info("error ", e);
57-
}
50+
kafkaSendingService.sendNotification("Current time = " + localDateTime)
51+
.doOnNext(result -> log.info("Awaiting acknowledgement from Kafka"))
52+
.doOnError(e -> log.info("error ", e))
53+
.subscribe();
5854
}
5955
}

spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/service/KafkaSendingService.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414
import org.springframework.kafka.core.KafkaTemplate;
1515
import org.springframework.kafka.support.SendResult;
1616
import org.springframework.stereotype.Service;
17+
import reactor.core.publisher.Mono;
1718

1819
import java.util.UUID;
19-
import java.util.concurrent.CompletableFuture;
2020
import javax.annotation.Nonnull;
2121

2222
@Slf4j
@@ -28,10 +28,12 @@ public class KafkaSendingService {
2828
@Value("${app.tenant.name}")
2929
private String tenantName;
3030

31-
public CompletableFuture<SendResult<UUID, String>> sendNotification(@Nonnull final String message) {
32-
try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) {
33-
log.info("Sending message \"{}\" to Kafka", message);
34-
return kafkaTemplate.sendDefault(UUID.randomUUID(), message);
35-
}
31+
public Mono<SendResult<UUID, String>> sendNotification(@Nonnull final String message) {
32+
return Mono.fromCallable(() -> {
33+
try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) {
34+
log.info("Sending message \"{}\" to Kafka", message);
35+
return kafkaTemplate.sendDefault(UUID.randomUUID(), message).get();
36+
}
37+
});
3638
}
3739
}

spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/service/PublicApiService.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77

88
package io.github.mfvanek.spring.boot3.reactive.service;
99

10-
import com.fasterxml.jackson.core.JsonProcessingException;
1110
import com.fasterxml.jackson.databind.ObjectMapper;
1211
import io.github.mfvanek.spring.boot3.reactive.service.dto.CurrentTime;
1312
import lombok.RequiredArgsConstructor;
13+
import lombok.SneakyThrows;
1414
import lombok.extern.slf4j.Slf4j;
1515
import org.slf4j.MDC;
1616
import org.springframework.beans.factory.annotation.Value;
@@ -53,14 +53,9 @@ private Mono<CurrentTime> getZonedTimeFromWorldTimeApi() {
5353
.retrieve()
5454
.bodyToMono(String.class)
5555
.retryWhen(prepareRetry(zoneName))
56-
.flatMap(string -> {
57-
try {
58-
return Mono.just(mapper.readValue(string, CurrentTime.class));
59-
} catch (JsonProcessingException e) {
60-
log.info("error from webclient ", e);
61-
}
62-
return Mono.empty();
63-
});
56+
.flatMap(this::convert)
57+
.onErrorComplete()
58+
.flatMap(Mono::justOrEmpty);
6459
}
6560

6661
private Retry prepareRetry(final String zoneName) {
@@ -77,4 +72,8 @@ private Retry prepareRetry(final String zoneName) {
7772
});
7873
}
7974

75+
@SneakyThrows
76+
private Mono<? extends CurrentTime> convert(String string) {
77+
return Mono.just(mapper.readValue(string, CurrentTime.class));
78+
}
8079
}

spring-boot-3-demo-app-reactive/src/main/resources/application.yml

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,8 @@ app:
44
tenant.name: ru-a1-private
55

66
server:
7-
port: 8080
7+
port: 8081
88
# See also https://docs.spring.io/spring-boot/docs/2.7.9/reference/html/application-properties.html#appendix.application-properties.server
9-
tomcat:
10-
accept-count: 10
11-
max-connections: 400
12-
threads:
13-
max: 10
14-
min-spare: 5 # actuator port uses the same configuration
15-
shutdown: graceful
169

1710
demo:
1811
kafka:
@@ -57,7 +50,7 @@ spring:
5750

5851
management:
5952
server:
60-
port: 8085
53+
port: 8087
6154
endpoints:
6255
web:
6356
exposure.include: '*'

spring-boot-3-demo-app-reactive/src/test/java/io/github/mfvanek/spring/boot3/reactive/controllers/TimeControllerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@
3636
import java.util.Map;
3737
import java.util.Objects;
3838
import java.util.UUID;
39+
import java.util.concurrent.ArrayBlockingQueue;
3940
import java.util.concurrent.BlockingQueue;
40-
import java.util.concurrent.LinkedBlockingQueue;
4141
import java.util.concurrent.TimeUnit;
4242
import javax.annotation.Nonnull;
4343

@@ -47,7 +47,7 @@
4747
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
4848
class TimeControllerTest extends TestBase {
4949

50-
private final BlockingQueue<ConsumerRecord<UUID, String>> consumerRecords = new LinkedBlockingQueue<>();
50+
private final BlockingQueue<ConsumerRecord<UUID, String>> consumerRecords = new ArrayBlockingQueue<>(4);
5151
private KafkaMessageListenerContainer<UUID, String> container;
5252
@Autowired
5353
private KafkaProperties kafkaProperties;

0 commit comments

Comments
 (0)