Skip to content

Commit bacffa9

Browse files
author
m.zharinova
committed
3 versions to change headers but not working as expected
1 parent 6061fc4 commit bacffa9

File tree

8 files changed

+58
-35
lines changed

8 files changed

+58
-35
lines changed

spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/config/OpenTelemetryConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import io.opentelemetry.sdk.common.export.RetryPolicy;
1111
import org.springframework.beans.factory.annotation.Value;
12-
import org.springframework.boot.actuate.autoconfigure.tracing.otlp.OtlpGrpcSpanExporterBuilderCustomizer;
12+
import org.springframework.boot.actuate.autoconfigure.tracing.otlp.OtlpHttpSpanExporterBuilderCustomizer;
1313
import org.springframework.boot.actuate.autoconfigure.tracing.otlp.OtlpTracingAutoConfiguration;
1414
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
1515
import org.springframework.context.annotation.Bean;
@@ -20,7 +20,7 @@
2020
class OpenTelemetryConfig {
2121

2222
@Bean
23-
OtlpGrpcSpanExporterBuilderCustomizer otelJaegerGrpcSpanExporterBuilderCustomizer(
23+
OtlpHttpSpanExporterBuilderCustomizer otelJaegerHttpSpanExporterBuilderCustomizer(
2424
@Value("${management.otlp.tracing.retry.max-attempts:2}") int maxAttempts
2525
) {
2626
return builder -> builder.setRetryPolicy(

spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/controllers/TimeController.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,14 @@ public Mono<LocalDateTime> getNow() {
4141
.map(TraceContext::traceId)
4242
.orElse(null);
4343
log.info("Called method getNow. TraceId = {}", traceId);
44-
return publicApiService.getZonedTime()
45-
.switchIfEmpty(Mono.just(LocalDateTime.now(clock)))
46-
.doOnNext(this::sendWithKafka);
47-
}
48-
49-
private void sendWithKafka(LocalDateTime localDateTime) {
50-
kafkaSendingService.sendNotification("Current time = " + localDateTime)
51-
.doOnNext(result -> log.info("Awaiting acknowledgement from Kafka"))
52-
.doOnError(e -> log.info("error ", e))
53-
.subscribe();
44+
final Mono<LocalDateTime> response = publicApiService.getZonedTime()
45+
.switchIfEmpty(Mono.just(LocalDateTime.now(clock)));
46+
response
47+
.subscribe(it -> {
48+
assert traceId != null;
49+
kafkaSendingService.sendNotification("Current time = " + it, traceId);
50+
log.info("Awaiting acknowledgement from Kafka");
51+
});
52+
return response;
5453
}
5554
}

spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/service/KafkaReadingService.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,3 @@ public void listen(ConsumerRecord<UUID, String> message, Acknowledgment ack) {
5252
}
5353
}
5454
}
55-

spring-boot-3-demo-app-reactive/src/main/java/io/github/mfvanek/spring/boot3/reactive/service/KafkaSendingService.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,17 @@
99

1010
import lombok.RequiredArgsConstructor;
1111
import lombok.extern.slf4j.Slf4j;
12+
import org.apache.kafka.clients.producer.ProducerRecord;
13+
import org.apache.kafka.common.header.Header;
14+
import org.apache.kafka.common.header.internals.RecordHeader;
15+
import org.apache.kafka.common.header.internals.RecordHeaders;
1216
import org.slf4j.MDC;
1317
import org.springframework.beans.factory.annotation.Value;
1418
import org.springframework.kafka.core.KafkaTemplate;
15-
import org.springframework.kafka.support.SendResult;
1619
import org.springframework.stereotype.Service;
1720
import reactor.core.publisher.Mono;
1821

22+
import java.nio.charset.StandardCharsets;
1923
import java.util.UUID;
2024
import javax.annotation.Nonnull;
2125

@@ -27,13 +31,37 @@ public class KafkaSendingService {
2731
private final KafkaTemplate<UUID, String> kafkaTemplate;
2832
@Value("${app.tenant.name}")
2933
private String tenantName;
34+
//private final Propagator propagator;
35+
//private final Tracer tracer;
3036

31-
public Mono<SendResult<UUID, String>> sendNotification(@Nonnull final String message) {
32-
return Mono.fromCallable(() -> {
37+
public void sendNotification(@Nonnull final String message, @Nonnull final String traceId) {
38+
Mono.fromFuture(() -> {
3339
try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) {
3440
log.info("Sending message \"{}\" to Kafka", message);
35-
return kafkaTemplate.sendDefault(UUID.randomUUID(), message).get();
41+
// это тоже удаляет
42+
//propagator.inject(
43+
// Objects.requireNonNull(tracer.currentSpan()).context(),
44+
// kafkaTemplate.getProducerFactory().getConfigurationProperties(),
45+
// (stringObjectMap, key, value) -> stringObjectMap.put("traceparent", traceId)
46+
//);
47+
// это удаляет все
48+
// kafkaTemplate.getProducerFactory().getConfigurationProperties().remove("traceparent");
49+
// kafkaTemplate.getProducerFactory().getConfigurationProperties().remove("b3");
50+
// kafkaTemplate.getProducerFactory().getConfigurationProperties().putIfAbsent("traceparent", traceId);
51+
// kafkaTemplate.getProducerFactory().getConfigurationProperties().putIfAbsent("b3", traceId);
52+
// так можно добавить хедер с другим названием, но существующие не заменяются
53+
final ProducerRecord<UUID, String> producerRecord = new ProducerRecord<>(
54+
kafkaTemplate.getDefaultTopic(),
55+
kafkaTemplate.partitionsFor(kafkaTemplate.getDefaultTopic()).get(0).partition(),
56+
UUID.randomUUID(),
57+
message,
58+
new RecordHeaders(new Header[]{
59+
new RecordHeader("traceparent", traceId.getBytes(StandardCharsets.UTF_8)),
60+
new RecordHeader("b3", traceId.getBytes(StandardCharsets.UTF_8))})
61+
);
62+
return kafkaTemplate.send(producerRecord);
63+
//return kafkaTemplate.sendDefault(UUID.randomUUID(), message);
3664
}
37-
});
65+
}).doOnError(e -> log.info("error ", e)).subscribe();
3866
}
3967
}

spring-boot-3-demo-app-reactive/src/test/java/io/github/mfvanek/spring/boot3/reactive/ApplicationTests.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import io.micrometer.tracing.Span;
1414
import io.micrometer.tracing.Tracer;
1515
import io.micrometer.tracing.otel.bridge.OtelTracer;
16-
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
16+
import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter;
1717
import org.junit.jupiter.api.DisplayName;
1818
import org.junit.jupiter.api.Test;
1919
import org.springframework.beans.factory.annotation.Autowired;
@@ -43,18 +43,16 @@ void contextLoads() {
4343
.isInstanceOf(OtelTracer.class)
4444
.satisfies(t -> assertThat(t.currentSpan())
4545
.isNotEqualTo(Span.NOOP));
46-
assertThat(applicationContext.getBean("otelJaegerGrpcSpanExporter"))
46+
assertThat(applicationContext.getBean("otlpHttpSpanExporter"))
4747
.isNotNull()
48-
.isInstanceOf(OtlpGrpcSpanExporter.class)
48+
.isInstanceOf(OtlpHttpSpanExporter.class)
4949
.satisfies(e -> assertThat(e.toString())
5050
.contains(String.format(Locale.ROOT, """
51-
OtlpGrpcSpanExporter{exporterName=otlp, type=span, endpoint=http://localhost:%d, \
52-
endpointPath=/opentelemetry.proto.collector.trace.v1.TraceService/Export, \
53-
timeoutNanos=5000000000, connectTimeoutNanos=10000000000, compressorEncoding=null, \
54-
headers=Headers{User-Agent=OBFUSCATED}, \
55-
retryPolicy=RetryPolicy{maxAttempts=5, initialBackoff=PT1S, maxBackoff=PT5S, backoffMultiplier=1.5, \
56-
retryExceptionPredicate=null},""", JaegerInitializer.getFirstMappedPort()))
57-
);
51+
OtlpHttpSpanExporter{exporterName=otlp, type=span, endpoint=http://localhost:%d, \
52+
timeoutNanos=5000000000, proxyOptions=null, compressorEncoding=null, connectTimeoutNanos=10000000000, \
53+
exportAsJson=false, headers=Headers{User-Agent=OBFUSCATED}, retryPolicy=RetryPolicy{maxAttempts=2, \
54+
initialBackoff=PT1S, maxBackoff=PT5S, backoffMultiplier=1.5, retryExceptionPredicate=null},""",
55+
JaegerInitializer.getFirstMappedPort())));
5856
}
5957

6058
@Test

spring-boot-3-demo-app-reactive/src/test/java/io/github/mfvanek/spring/boot3/reactive/controllers/HomeControllerTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,4 @@ void controllerResponseShouldHaveTraceIdHeader() {
2727
assertThat(result)
2828
.isEqualTo("Hello!");
2929
}
30-
3130
}

spring-boot-3-demo-app-reactive/src/test/java/io/github/mfvanek/spring/boot3/reactive/controllers/TimeControllerTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.nio.charset.StandardCharsets;
3131
import java.time.Duration;
3232
import java.time.LocalDateTime;
33+
import java.time.temporal.ChronoUnit;
3334
import java.util.Arrays;
3435
import java.util.List;
3536
import java.util.Locale;
@@ -42,6 +43,7 @@
4243
import javax.annotation.Nonnull;
4344

4445
import static org.assertj.core.api.Assertions.assertThat;
46+
import static org.assertj.core.api.Assertions.within;
4547

4648
@ExtendWith(OutputCaptureExtension.class)
4749
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@@ -85,11 +87,10 @@ void spanShouldBeReportedInLogs(@Nonnull final CapturedOutput output) throws Int
8587
.returnResult();
8688
final String traceId = result.getResponseHeaders().getFirst(TRACE_ID_HEADER_NAME);
8789
assertThat(traceId).isNotBlank();
88-
assertThat(result.getResponseBody())
89-
.isBefore(LocalDateTime.now(clock));
90+
assertThat(result.getResponseBody()).isCloseTo(LocalDateTime.now(clock).minusDays(1), within(1, ChronoUnit.MINUTES));
9091
assertThat(output.getAll())
91-
.contains("Called method getNow. TraceId = " + traceId)
92-
.contains("Awaiting acknowledgement from Kafka");
92+
.contains("Called method getNow. TraceId = " + traceId);
93+
//.contains("Awaiting acknowledgement from Kafka");
9394

9495
final ConsumerRecord<UUID, String> received = consumerRecords.poll(10, TimeUnit.SECONDS);
9596
assertThat(received).isNotNull();
@@ -123,7 +124,7 @@ void spanAndMdcShouldBeReportedWhenRetry(@Nonnull final CapturedOutput output) {
123124
final String traceId = result.getResponseHeaders().getFirst(TRACE_ID_HEADER_NAME);
124125
assertThat(traceId)
125126
.isEqualTo("38c19768104ab8ae64fabbeed65bbbdf");
126-
127+
assertThat(result.getResponseBody()).isCloseTo(LocalDateTime.now(clock), within(1, ChronoUnit.MINUTES));
127128
assertThat(output.getAll())
128129
.containsPattern(String.format(Locale.ROOT,
129130
".*\"message\":\"Retrying request to '/%1$s', attempt 1/1 due to error:\"," +

spring-boot-3-demo-app-reactive/src/test/java/io/github/mfvanek/spring/boot3/reactive/support/TestBase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,5 +105,4 @@ private void stubOkButNotCorrectResponse(@Nonnull final String zoneName) {
105105
.withBody(objectMapper.writeValueAsString("bad response"))
106106
));
107107
}
108-
109108
}

0 commit comments

Comments
 (0)