Skip to content

Commit 438291e

Browse files
committed
Fix reactive chain
1 parent 6bca264 commit 438291e

File tree

5 files changed

+17
-47
lines changed

5 files changed

+17
-47
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,5 @@ build/
88
*.iml
99
*.ipr
1010
out/
11+
12+
.kotlin

.kotlin/errors/errors-1753873052588.log

Lines changed: 0 additions & 4 deletions
This file was deleted.

spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/controllers/TimeController.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,18 @@ public class TimeController {
3636
@GetMapping(path = "/current-time")
3737
public Mono<LocalDateTime> getNow() {
3838
log.trace("tracer {}", tracer);
39+
3940
final String traceId = Optional.ofNullable(tracer.currentSpan())
4041
.map(Span::context)
4142
.map(TraceContext::traceId)
4243
.orElse(null);
4344
log.info("Called method getNow. TraceId = {}", traceId);
44-
final Mono<LocalDateTime> response = publicApiService.getZonedTime()
45-
.switchIfEmpty(Mono.just(LocalDateTime.now(clock)));
46-
response
47-
.subscribe(it -> {
48-
assert traceId != null;
49-
kafkaSendingService.sendNotification("Current time = " + it, traceId);
50-
log.info("Awaiting acknowledgement from Kafka");
51-
});
52-
return response;
45+
46+
return publicApiService.getZonedTime()
47+
.defaultIfEmpty(LocalDateTime.now(clock))
48+
.flatMap(now -> kafkaSendingService.sendNotification("Current time = " + now)
49+
.doOnSuccess(v -> log.info("Awaiting acknowledgement from Kafka"))
50+
.thenReturn(now)
51+
);
5352
}
5453
}

spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/service/KafkaSendingService.java

Lines changed: 5 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,13 @@
99

1010
import lombok.RequiredArgsConstructor;
1111
import lombok.extern.slf4j.Slf4j;
12-
import org.apache.kafka.clients.producer.ProducerRecord;
13-
import org.apache.kafka.common.header.Header;
14-
import org.apache.kafka.common.header.internals.RecordHeader;
15-
import org.apache.kafka.common.header.internals.RecordHeaders;
1612
import org.slf4j.MDC;
1713
import org.springframework.beans.factory.annotation.Value;
1814
import org.springframework.kafka.core.KafkaTemplate;
15+
import org.springframework.kafka.support.SendResult;
1916
import org.springframework.stereotype.Service;
2017
import reactor.core.publisher.Mono;
2118

22-
import java.nio.charset.StandardCharsets;
2319
import java.util.UUID;
2420
import javax.annotation.Nonnull;
2521

@@ -31,37 +27,13 @@ public class KafkaSendingService {
3127
private final KafkaTemplate<UUID, String> kafkaTemplate;
3228
@Value("${app.tenant.name}")
3329
private String tenantName;
34-
//private final Propagator propagator;
35-
//private final Tracer tracer;
3630

37-
public void sendNotification(@Nonnull final String message, @Nonnull final String traceId) {
38-
Mono.fromFuture(() -> {
31+
public Mono<SendResult<UUID, String>> sendNotification(@Nonnull final String message) {
32+
return Mono.deferContextual(contextView -> {
3933
try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) {
4034
log.info("Sending message \"{}\" to Kafka", message);
41-
// это тоже удаляет
42-
//propagator.inject(
43-
// Objects.requireNonNull(tracer.currentSpan()).context(),
44-
// kafkaTemplate.getProducerFactory().getConfigurationProperties(),
45-
// (stringObjectMap, key, value) -> stringObjectMap.put("traceparent", traceId)
46-
//);
47-
// это удаляет все
48-
// kafkaTemplate.getProducerFactory().getConfigurationProperties().remove("traceparent");
49-
// kafkaTemplate.getProducerFactory().getConfigurationProperties().remove("b3");
50-
// kafkaTemplate.getProducerFactory().getConfigurationProperties().putIfAbsent("traceparent", traceId);
51-
// kafkaTemplate.getProducerFactory().getConfigurationProperties().putIfAbsent("b3", traceId);
52-
// так можно добавить хедер с другим названием, но существующие не заменяются
53-
final ProducerRecord<UUID, String> producerRecord = new ProducerRecord<>(
54-
kafkaTemplate.getDefaultTopic(),
55-
kafkaTemplate.partitionsFor(kafkaTemplate.getDefaultTopic()).get(0).partition(),
56-
UUID.randomUUID(),
57-
message,
58-
new RecordHeaders(new Header[]{
59-
new RecordHeader("traceparent", traceId.getBytes(StandardCharsets.UTF_8)),
60-
new RecordHeader("b3", traceId.getBytes(StandardCharsets.UTF_8))})
61-
);
62-
return kafkaTemplate.send(producerRecord);
63-
//return kafkaTemplate.sendDefault(UUID.randomUUID(), message);
35+
return Mono.fromFuture(() -> kafkaTemplate.sendDefault(UUID.randomUUID(), message));
6436
}
65-
}).doOnError(e -> log.info("error ", e)).subscribe();
37+
});
6638
}
6739
}

spring-boot-3-demo-app-reactive/src/test/java/io/github/mfvanek/spring/boot3/reactive/ApplicationTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.springframework.beans.factory.annotation.Autowired;
2020
import org.springframework.context.ApplicationContext;
2121
import org.springframework.dao.DataAccessResourceFailureException;
22+
import org.springframework.dao.QueryTimeoutException;
2223

2324
import java.util.Locale;
2425

@@ -65,7 +66,7 @@ void jdbcQueryTimeoutFromProperties() {
6566
@DisplayName("Throws exception when query exceeds timeout")
6667
void exceptionWithLongQuery() {
6768
assertThatThrownBy(() -> jdbcTemplate.execute("select pg_sleep(1.1);"))
68-
.isInstanceOf(DataAccessResourceFailureException.class)
69+
.isInstanceOf(QueryTimeoutException.class)
6970
.hasMessageContaining("ERROR: canceling statement due to user request");
7071
}
7172

0 commit comments

Comments
 (0)