diff --git a/spring-boot-2-demo-app/src/main/java/io/github/mfvanek/spring/boot2/test/service/KafkaReadingService.java b/spring-boot-2-demo-app/src/main/java/io/github/mfvanek/spring/boot2/test/service/KafkaReadingService.java index 9d0088d9..71fee011 100644 --- a/spring-boot-2-demo-app/src/main/java/io/github/mfvanek/spring/boot2/test/service/KafkaReadingService.java +++ b/spring-boot-2-demo-app/src/main/java/io/github/mfvanek/spring/boot2/test/service/KafkaReadingService.java @@ -10,6 +10,8 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.MDC; +import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.sleuth.Span; import org.springframework.cloud.sleuth.Tracer; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; @@ -27,22 +29,26 @@ @RequiredArgsConstructor public class KafkaReadingService { + @Value("${app.tenant.name}") + private String tenantName; private final Tracer tracer; private final Clock clock; private final NamedParameterJdbcTemplate jdbcTemplate; @KafkaListener(topics = "${spring.kafka.template.default-topic}") public void listen(ConsumerRecord message, Acknowledgment ack) { - final Span currentSpan = tracer.currentSpan(); - final String traceId = currentSpan != null ? currentSpan.context().traceId() : ""; - log.info("Received record: {} with traceId {}", message.value(), traceId); - jdbcTemplate.update("insert into otel_demo.storage(message, trace_id, created_at) values(:msg, :traceId, :createdAt);", - Map.ofEntries( - Map.entry("msg", message.value()), - Map.entry("traceId", traceId), - Map.entry("createdAt", LocalDateTime.now(clock)) - ) - ); + try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) { + final Span currentSpan = tracer.currentSpan(); + final String traceId = currentSpan != null ? currentSpan.context().traceId() : ""; + log.info("Received record: {} with traceId {}", message.value(), traceId); + jdbcTemplate.update("insert into otel_demo.storage(message, trace_id, created_at) values(:msg, :traceId, :createdAt);", + Map.ofEntries( + Map.entry("msg", message.value()), + Map.entry("traceId", traceId), + Map.entry("createdAt", LocalDateTime.now(clock)) + ) + ); + } ack.acknowledge(); } } diff --git a/spring-boot-2-demo-app/src/main/java/io/github/mfvanek/spring/boot2/test/service/KafkaSendingService.java b/spring-boot-2-demo-app/src/main/java/io/github/mfvanek/spring/boot2/test/service/KafkaSendingService.java index 5cc93186..4e246fec 100644 --- a/spring-boot-2-demo-app/src/main/java/io/github/mfvanek/spring/boot2/test/service/KafkaSendingService.java +++ b/spring-boot-2-demo-app/src/main/java/io/github/mfvanek/spring/boot2/test/service/KafkaSendingService.java @@ -9,6 +9,8 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.slf4j.MDC; +import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; @@ -22,10 +24,14 @@ @RequiredArgsConstructor public class KafkaSendingService { + @Value("${app.tenant.name}") + private String tenantName; private final KafkaTemplate kafkaTemplate; public CompletableFuture> sendNotification(@Nonnull final String message) { - log.info("Sending message \"{}\" to Kafka", message); - return kafkaTemplate.sendDefault(UUID.randomUUID(), message).completable(); + try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) { + log.info("Sending message \"{}\" to Kafka", message); + return kafkaTemplate.sendDefault(UUID.randomUUID(), message).completable(); + } } } diff --git a/spring-boot-2-demo-app/src/main/java/io/github/mfvanek/spring/boot2/test/service/PublicApiService.java b/spring-boot-2-demo-app/src/main/java/io/github/mfvanek/spring/boot2/test/service/PublicApiService.java index dc5d672c..1ab18f67 100644 --- a/spring-boot-2-demo-app/src/main/java/io/github/mfvanek/spring/boot2/test/service/PublicApiService.java +++ b/spring-boot-2-demo-app/src/main/java/io/github/mfvanek/spring/boot2/test/service/PublicApiService.java @@ -13,6 +13,7 @@ import io.github.mfvanek.spring.boot2.test.service.dto.ParsedDateTime; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.MediaType; import org.springframework.retry.ExhaustedRetryException; @@ -58,8 +59,12 @@ private CurrentTime getZonedTimeFromWorldTimeApi() throws JsonProcessingExceptio .retrieve() .bodyToMono(String.class) .retryWhen(Retry.fixedDelay(retries, Duration.ofSeconds(2)) - .doBeforeRetry(retrySignal -> log.info("Retrying request to {}, attempt {}/{} due to error:", - webClient.options().uri(String.join("", zoneNames)), retries, retrySignal.totalRetries() + 1, retrySignal.failure())) + .doBeforeRetry(retrySignal -> { + try (MDC.MDCCloseable ignored = MDC.putCloseable("instance_timezone", zoneNames)) { + log.info("Retrying request to {}, attempt {}/{} due to error:", + webClient.options().uri(String.join("", zoneNames)), retrySignal.totalRetries() + 1, retries, retrySignal.failure()); + } + }) .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> { log.error("Request to {} failed after {} attempts.", webClient.options().uri(String.join("", zoneNames)), retrySignal.totalRetries() + 1); return new ExhaustedRetryException("Retries exhausted", retrySignal.failure()); diff --git a/spring-boot-2-demo-app/src/test/java/io/github/mfvanek/spring/boot2/test/controllers/TimeControllerTest.java b/spring-boot-2-demo-app/src/test/java/io/github/mfvanek/spring/boot2/test/controllers/TimeControllerTest.java index 4d9f9838..8b8dca18 100644 --- a/spring-boot-2-demo-app/src/test/java/io/github/mfvanek/spring/boot2/test/controllers/TimeControllerTest.java +++ b/spring-boot-2-demo-app/src/test/java/io/github/mfvanek/spring/boot2/test/controllers/TimeControllerTest.java @@ -7,17 +7,16 @@ package io.github.mfvanek.spring.boot2.test.controllers; -import io.github.mfvanek.spring.boot2.test.service.dto.CurrentTime; import io.github.mfvanek.spring.boot2.test.service.dto.ParsedDateTime; import io.github.mfvanek.spring.boot2.test.support.KafkaConsumerUtils; import io.github.mfvanek.spring.boot2.test.support.TestBase; -import lombok.SneakyThrows; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.extension.ExtendWith; @@ -31,22 +30,16 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.LocalDateTime; -import java.time.ZoneId; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.TimeZone; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; -import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; -import static com.github.tomakehurst.wiremock.client.WireMock.get; -import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; -import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching; import static io.github.mfvanek.spring.boot2.test.filters.TraceIdInResponseServletFilter.TRACE_ID_HEADER_NAME; import static org.assertj.core.api.Assertions.assertThat; @@ -78,17 +71,10 @@ void cleanUpDatabase() { jdbcTemplate.execute("truncate table otel_demo.storage"); } - @SneakyThrows @Test - void spanShouldBeReportedInLogs(@Nonnull final CapturedOutput output) { - final String zoneNames = TimeZone.getDefault().getID(); - final ParsedDateTime parsedDateTime = ParsedDateTime.from(LocalDateTime.now(ZoneId.systemDefault()).minusDays(1)); - final CurrentTime currentTime = new CurrentTime(parsedDateTime); - stubFor(get(urlPathMatching("/" + zoneNames)) - .willReturn(aResponse() - .withStatus(200) - .withBody(objectMapper.writeValueAsString(currentTime)) - )); + void spanShouldBeReportedInLogs(@Nonnull final CapturedOutput output) throws Exception { + stubOkResponse(ParsedDateTime.from(LocalDateTime.now(clock).minusDays(1))); + final EntityExchangeResult result = webTestClient.get() .uri(uriBuilder -> uriBuilder.path("current-time") .build()) @@ -107,6 +93,79 @@ void spanShouldBeReportedInLogs(@Nonnull final CapturedOutput output) { final ConsumerRecord received = consumerRecords.poll(10, TimeUnit.SECONDS); assertThat(received).isNotNull(); + assertThatTraceIdPresentInKafkaHeaders(received, traceId); + + awaitStoringIntoDatabase(); + + assertThat(output.getAll()) + .contains("Received record: " + received.value() + " with traceId " + traceId); + final String messageFromDb = namedParameterJdbcTemplate.queryForObject("select message from otel_demo.storage where trace_id = :traceId", + Map.of("traceId", traceId), String.class); + assertThat(messageFromDb) + .isEqualTo(received.value()); + } + + private long countRecordsInTable() { + final Long queryResult = jdbcTemplate.queryForObject("select count(*) from otel_demo.storage", Long.class); + return Objects.requireNonNullElse(queryResult, 0L); + } + + @Disabled + @Test + void mdcValuesShouldBeReportedInLogs(@Nonnull final CapturedOutput output) throws Exception { + stubOkResponse(ParsedDateTime.from(LocalDateTime.now(clock).minusDays(1))); + + webTestClient.get() + .uri(uriBuilder -> uriBuilder.path("current-time") + .build()) + .exchange() + .expectStatus().isOk() + .expectHeader().exists(TRACE_ID_HEADER_NAME) + .expectBody(LocalDateTime.class) + .returnResult(); + + final ConsumerRecord received = consumerRecords.poll(10, TimeUnit.SECONDS); + assertThat(received).isNotNull(); + + assertThat(output.getAll()) + .contains("\"tenant.name\":\"ru-a1-private\""); + } + + @Test + void spanAndMdcShouldBeReportedWhenRetry(@Nonnull final CapturedOutput output) throws Exception { + stubErrorResponse(); + //final String zoneNames = stubErrorResponse(); + + final EntityExchangeResult result = webTestClient.get() + .uri(uriBuilder -> uriBuilder.path("current-time") + .build()) + .exchange() + .expectStatus().isOk() + .expectHeader().exists(TRACE_ID_HEADER_NAME) + .expectBody(LocalDateTime.class) + .returnResult(); + final String traceId = result.getResponseHeaders().getFirst(TRACE_ID_HEADER_NAME); + assertThat(traceId).isNotBlank(); + assertThat(output.getAll()) + .contains("Called method getNow. TraceId = " + traceId) + .contains("Awaiting acknowledgement from Kafka"); + + final ConsumerRecord received = consumerRecords.poll(10, TimeUnit.SECONDS); + assertThat(received).isNotNull(); + assertThatTraceIdPresentInKafkaHeaders(received, traceId); + + awaitStoringIntoDatabase(); + + assertThat(output.getAll()) + .contains( + "Received record: " + received.value() + " with traceId " + traceId, + "Retrying request to ", + "Retries exhausted"//, "\"instance_timezone\":\"" + zoneNames + "\"" + ); + } + + private void assertThatTraceIdPresentInKafkaHeaders(@Nonnull final ConsumerRecord received, + @Nonnull final String expectedTraceId) { assertThat(received.value()).startsWith("Current time = "); final Header[] headers = received.headers().toArray(); final List headerNames = Arrays.stream(headers) @@ -121,23 +180,14 @@ void spanShouldBeReportedInLogs(@Nonnull final CapturedOutput output) { .toList(); assertThat(headerValues) .hasSameSizeAs(headerNames) - .allSatisfy(h -> assertThat(h).contains(traceId)); + .allSatisfy(h -> assertThat(h).contains(expectedTraceId)); + } + private void awaitStoringIntoDatabase() { Awaitility .await() .atMost(10, TimeUnit.SECONDS) .pollInterval(Duration.ofMillis(500L)) .until(() -> countRecordsInTable() >= 1L); - assertThat(output.getAll()) - .contains("Received record: " + received.value() + " with traceId " + traceId); - final String messageFromDb = namedParameterJdbcTemplate.queryForObject("select message from otel_demo.storage where trace_id = :traceId", - Map.of("traceId", traceId), String.class); - assertThat(messageFromDb) - .isEqualTo(received.value()); - } - - private long countRecordsInTable() { - final Long queryResult = jdbcTemplate.queryForObject("select count(*) from otel_demo.storage", Long.class); - return Objects.requireNonNullElse(queryResult, 0L); } } diff --git a/spring-boot-2-demo-app/src/test/java/io/github/mfvanek/spring/boot2/test/service/PublicApiServiceTest.java b/spring-boot-2-demo-app/src/test/java/io/github/mfvanek/spring/boot2/test/service/PublicApiServiceTest.java index 36eb95a1..0f5be8c6 100644 --- a/spring-boot-2-demo-app/src/test/java/io/github/mfvanek/spring/boot2/test/service/PublicApiServiceTest.java +++ b/spring-boot-2-demo-app/src/test/java/io/github/mfvanek/spring/boot2/test/service/PublicApiServiceTest.java @@ -7,8 +7,6 @@ package io.github.mfvanek.spring.boot2.test.service; -import com.fasterxml.jackson.core.JsonProcessingException; -import io.github.mfvanek.spring.boot2.test.service.dto.CurrentTime; import io.github.mfvanek.spring.boot2.test.service.dto.ParsedDateTime; import io.github.mfvanek.spring.boot2.test.support.TestBase; import org.junit.jupiter.api.Test; @@ -18,15 +16,10 @@ import org.springframework.boot.test.system.OutputCaptureExtension; import java.time.LocalDateTime; -import java.time.ZoneId; import java.time.temporal.ChronoUnit; -import java.util.TimeZone; import javax.annotation.Nonnull; -import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; -import static com.github.tomakehurst.wiremock.client.WireMock.get; import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor; -import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching; import static com.github.tomakehurst.wiremock.client.WireMock.verify; import static org.assertj.core.api.Assertions.assertThat; @@ -38,16 +31,9 @@ class PublicApiServiceTest extends TestBase { private PublicApiService publicApiService; @Test - void getZonedTimeSuccessfully(@Nonnull final CapturedOutput output) throws JsonProcessingException { - final String zoneNames = TimeZone.getDefault().getID(); - final LocalDateTime localDateTimeNow = LocalDateTime.now(ZoneId.systemDefault()); - final ParsedDateTime parsedDateTime = ParsedDateTime.from(localDateTimeNow); - final CurrentTime currentTime = new CurrentTime(parsedDateTime); - stubFor(get(urlPathMatching("/" + zoneNames)) - .willReturn(aResponse() - .withStatus(200) - .withBody(objectMapper.writeValueAsString(currentTime)) - )); + void getZonedTimeSuccessfully(@Nonnull final CapturedOutput output) { + final LocalDateTime localDateTimeNow = LocalDateTime.now(clock); + final String zoneNames = stubOkResponse(ParsedDateTime.from(localDateTimeNow)); final LocalDateTime result = publicApiService.getZonedTime(); verify(getRequestedFor(urlPathMatching("/" + zoneNames))); @@ -55,27 +41,26 @@ void getZonedTimeSuccessfully(@Nonnull final CapturedOutput output) throws JsonP assertThat(result).isNotNull(); assertThat(result.truncatedTo(ChronoUnit.MINUTES)) .isEqualTo(localDateTimeNow.truncatedTo(ChronoUnit.MINUTES)); - assertThat(output).doesNotContain( - "Retrying request to ", - "Retries exhausted", - "Failed to convert response "); + assertThat(output.getAll()) + .contains("Request received:") + .doesNotContain( + "Retrying request to ", + "Retries exhausted", + "Failed to convert response ", + "timezone"); } @Test - void retriesThreeTimesToGetZonedTime(@Nonnull final CapturedOutput output) throws JsonProcessingException { - final String zoneNames = TimeZone.getDefault().getID(); - final RuntimeException exception = new RuntimeException("Retries exhausted"); - stubFor(get(urlPathMatching("/" + zoneNames)) - .willReturn(aResponse() - .withStatus(500) - .withBody(objectMapper.writeValueAsString(exception)) - )); + void retriesOnceToGetZonedTime(@Nonnull final CapturedOutput output) { + final String zoneNames = stubErrorResponse(); final LocalDateTime result = publicApiService.getZonedTime(); verify(2, getRequestedFor(urlPathMatching("/" + zoneNames))); assertThat(result).isNull(); - assertThat(output).contains("Retrying request to ", "Retries exhausted"); - assertThat(output).doesNotContain("Failed to convert response "); + assertThat(output.getAll()) + .contains("Retrying request to ", "Retries exhausted") + //.contains("Retrying request to ", "Retries exhausted", "\"instance_timezone\":\"" + zoneNames + "\"") + .doesNotContain("Failed to convert response "); } } diff --git a/spring-boot-2-demo-app/src/test/java/io/github/mfvanek/spring/boot2/test/support/TestBase.java b/spring-boot-2-demo-app/src/test/java/io/github/mfvanek/spring/boot2/test/support/TestBase.java index 2036fa19..3c1b9200 100644 --- a/spring-boot-2-demo-app/src/test/java/io/github/mfvanek/spring/boot2/test/support/TestBase.java +++ b/spring-boot-2-demo-app/src/test/java/io/github/mfvanek/spring/boot2/test/support/TestBase.java @@ -8,6 +8,11 @@ package io.github.mfvanek.spring.boot2.test.support; import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.tomakehurst.wiremock.client.WireMock; +import io.github.mfvanek.spring.boot2.test.service.dto.CurrentTime; +import io.github.mfvanek.spring.boot2.test.service.dto.ParsedDateTime; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock; @@ -18,6 +23,13 @@ import org.springframework.test.web.reactive.server.WebTestClient; import java.time.Clock; +import java.util.TimeZone; +import javax.annotation.Nonnull; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching; @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @ContextConfiguration(initializers = {KafkaInitializer.class, JaegerInitializer.class, PostgresInitializer.class}) @@ -35,4 +47,43 @@ public abstract class TestBase { protected ObjectMapper objectMapper; @Autowired protected Clock clock; + + @BeforeEach + void resetExternalMocks() { + WireMock.resetAllRequests(); + } + + @Nonnull + protected String stubOkResponse(@Nonnull final ParsedDateTime parsedDateTime) { + final String zoneNames = TimeZone.getDefault().getID(); + stubOkResponse(zoneNames, parsedDateTime); + return zoneNames; + } + + @SneakyThrows + private void stubOkResponse(@Nonnull final String zoneNames, @Nonnull final ParsedDateTime parsedDateTime) { + final CurrentTime currentTime = new CurrentTime(parsedDateTime); + stubFor(get(urlPathMatching("/" + zoneNames)) + .willReturn(aResponse() + .withStatus(200) + .withBody(objectMapper.writeValueAsString(currentTime)) + )); + } + + @Nonnull + protected String stubErrorResponse() { + final String zoneNames = TimeZone.getDefault().getID(); + final RuntimeException exception = new RuntimeException("Retries exhausted"); + stubErrorResponse(zoneNames, exception); + return zoneNames; + } + + @SneakyThrows + private void stubErrorResponse(@Nonnull final String zoneNames, @Nonnull final RuntimeException errorForResponse) { + stubFor(get(urlPathMatching("/" + zoneNames)) + .willReturn(aResponse() + .withStatus(500) + .withBody(objectMapper.writeValueAsString(errorForResponse)) + )); + } } diff --git a/spring-boot-2-demo-app/src/test/resources/application-test.yml b/spring-boot-2-demo-app/src/test/resources/application-test.yml index e2dffedc..91d85ab9 100644 --- a/spring-boot-2-demo-app/src/test/resources/application-test.yml +++ b/spring-boot-2-demo-app/src/test/resources/application-test.yml @@ -1,3 +1,13 @@ app: external-base-url: "http://localhost:${wiremock.server.port}/" retries: 1 + +#to change default appender in tests use: +#logging: +# appender.name: CONSOLE + +logging: + level: + org.testcontainers: INFO # In order to troubleshoot issues with Testcontainers, increase the logging level to DEBUG + com.github.dockerjava: WARN + com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.wire: OFF diff --git a/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/config/WebClientConfig.java b/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/config/WebClientConfig.java index a4c568d9..5842175c 100644 --- a/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/config/WebClientConfig.java +++ b/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/config/WebClientConfig.java @@ -12,14 +12,16 @@ import org.springframework.context.annotation.Configuration; import org.springframework.web.reactive.function.client.WebClient; -@Configuration +@Configuration(proxyBeanMethods = false) public class WebClientConfig { @Value("${app.external-base-url}") private String external; @Bean - public WebClient webClient() { - return WebClient.builder().baseUrl(external).build(); + public WebClient webClient(WebClient.Builder builder) { + return builder + .baseUrl(external) + .build(); } } diff --git a/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/service/KafkaReadingService.java b/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/service/KafkaReadingService.java index 730366ff..8dadc668 100644 --- a/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/service/KafkaReadingService.java +++ b/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/service/KafkaReadingService.java @@ -12,6 +12,8 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.MDC; +import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; @@ -27,22 +29,26 @@ @RequiredArgsConstructor public class KafkaReadingService { + @Value("${app.tenant.name}") + private String tenantName; private final Tracer tracer; private final Clock clock; private final NamedParameterJdbcTemplate jdbcTemplate; @KafkaListener(topics = "${spring.kafka.template.default-topic}") public void listen(ConsumerRecord message, Acknowledgment ack) { - final Span currentSpan = tracer.currentSpan(); - final String traceId = currentSpan != null ? currentSpan.context().traceId() : ""; - log.info("Received record: {} with traceId {}", message.value(), traceId); - jdbcTemplate.update("insert into otel_demo.storage(message, trace_id, created_at) values(:msg, :traceId, :createdAt);", - Map.ofEntries( - Map.entry("msg", message.value()), - Map.entry("traceId", traceId), - Map.entry("createdAt", LocalDateTime.now(clock)) - ) - ); - ack.acknowledge(); + try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) { + final Span currentSpan = tracer.currentSpan(); + final String traceId = currentSpan != null ? currentSpan.context().traceId() : ""; + log.info("Received record: {} with traceId {}", message.value(), traceId); + jdbcTemplate.update("insert into otel_demo.storage(message, trace_id, created_at) values(:msg, :traceId, :createdAt);", + Map.ofEntries( + Map.entry("msg", message.value()), + Map.entry("traceId", traceId), + Map.entry("createdAt", LocalDateTime.now(clock)) + ) + ); + ack.acknowledge(); + } } } diff --git a/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/service/KafkaSendingService.java b/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/service/KafkaSendingService.java index 50b7ffb9..5076980b 100644 --- a/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/service/KafkaSendingService.java +++ b/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/service/KafkaSendingService.java @@ -9,6 +9,8 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.slf4j.MDC; +import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; @@ -22,10 +24,14 @@ @RequiredArgsConstructor public class KafkaSendingService { + @Value("${app.tenant.name}") + private String tenantName; private final KafkaTemplate kafkaTemplate; public CompletableFuture> sendNotification(@Nonnull final String message) { - log.info("Sending message \"{}\" to Kafka", message); - return kafkaTemplate.sendDefault(UUID.randomUUID(), message); + try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) { + log.info("Sending message \"{}\" to Kafka", message); + return kafkaTemplate.sendDefault(UUID.randomUUID(), message); + } } } diff --git a/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/service/PublicApiService.java b/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/service/PublicApiService.java index 306d3815..680e3657 100644 --- a/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/service/PublicApiService.java +++ b/spring-boot-3-demo-app/src/main/java/io/github/mfvanek/spring/boot3/test/service/PublicApiService.java @@ -13,6 +13,7 @@ import io.github.mfvanek.spring.boot3.test.service.dto.ParsedDateTime; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.MediaType; import org.springframework.retry.ExhaustedRetryException; @@ -58,8 +59,12 @@ private CurrentTime getZonedTimeFromWorldTimeApi() throws JsonProcessingExceptio .retrieve() .bodyToMono(String.class) .retryWhen(Retry.fixedDelay(retries, Duration.ofSeconds(2)) - .doBeforeRetry(retrySignal -> log.info("Retrying request to {}, attempt {}/{} due to error:", - webClient.options().uri(String.join("", zoneNames)), retries, retrySignal.totalRetries() + 1, retrySignal.failure())) + .doBeforeRetry(retrySignal -> { + try (MDC.MDCCloseable ignored = MDC.putCloseable("instance_timezone", zoneNames)) { + log.info("Retrying request to {}, attempt {}/{} due to error:", + webClient.options().uri(String.join("", zoneNames)), retrySignal.totalRetries() + 1, retries, retrySignal.failure()); + } + }) .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> { log.error("Request to {} failed after {} attempts.", webClient.options().uri(String.join("", zoneNames)), retrySignal.totalRetries() + 1); return new ExhaustedRetryException("Retries exhausted", retrySignal.failure()); diff --git a/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/controllers/TimeControllerTest.java b/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/controllers/TimeControllerTest.java index ada2be21..c43534bd 100644 --- a/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/controllers/TimeControllerTest.java +++ b/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/controllers/TimeControllerTest.java @@ -7,17 +7,16 @@ package io.github.mfvanek.spring.boot3.test.controllers; -import io.github.mfvanek.spring.boot3.test.service.dto.CurrentTime; import io.github.mfvanek.spring.boot3.test.service.dto.ParsedDateTime; import io.github.mfvanek.spring.boot3.test.support.KafkaConsumerUtils; import io.github.mfvanek.spring.boot3.test.support.TestBase; -import lombok.SneakyThrows; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.extension.ExtendWith; @@ -31,22 +30,16 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.LocalDateTime; -import java.time.ZoneId; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.TimeZone; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; -import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; -import static com.github.tomakehurst.wiremock.client.WireMock.get; -import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; -import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching; import static io.github.mfvanek.spring.boot3.test.filters.TraceIdInResponseServletFilter.TRACE_ID_HEADER_NAME; import static org.assertj.core.api.Assertions.assertThat; @@ -78,17 +71,10 @@ void cleanUpDatabase() { jdbcTemplate.execute("truncate table otel_demo.storage"); } - @SneakyThrows @Test - void spanShouldBeReportedInLogs(@Nonnull final CapturedOutput output) { - final String zoneNames = TimeZone.getDefault().getID(); - final ParsedDateTime parsedDateTime = ParsedDateTime.from(LocalDateTime.now(ZoneId.systemDefault()).minusDays(1)); - final CurrentTime currentTime = new CurrentTime(parsedDateTime); - stubFor(get(urlPathMatching("/" + zoneNames)) - .willReturn(aResponse() - .withStatus(200) - .withBody(objectMapper.writeValueAsString(currentTime)) - )); + void spanShouldBeReportedInLogs(@Nonnull final CapturedOutput output) throws Exception { + stubOkResponse(ParsedDateTime.from(LocalDateTime.now(clock).minusDays(1))); + final EntityExchangeResult result = webTestClient.get() .uri(uriBuilder -> uriBuilder.path("current-time") .build()) @@ -107,6 +93,78 @@ void spanShouldBeReportedInLogs(@Nonnull final CapturedOutput output) { final ConsumerRecord received = consumerRecords.poll(10, TimeUnit.SECONDS); assertThat(received).isNotNull(); + assertThatTraceIdPresentInKafkaHeaders(received, traceId); + + awaitStoringIntoDatabase(); + + assertThat(output.getAll()) + .contains("Received record: " + received.value() + " with traceId " + traceId); + final String messageFromDb = namedParameterJdbcTemplate.queryForObject("select message from otel_demo.storage where trace_id = :traceId", + Map.of("traceId", traceId), String.class); + assertThat(messageFromDb) + .isEqualTo(received.value()); + } + + private long countRecordsInTable() { + final Long queryResult = jdbcTemplate.queryForObject("select count(*) from otel_demo.storage", Long.class); + return Objects.requireNonNullElse(queryResult, 0L); + } + + @Disabled + @Test + void mdcValuesShouldBeReportedInLogs(@Nonnull final CapturedOutput output) throws Exception { + stubOkResponse(ParsedDateTime.from(LocalDateTime.now(clock).minusDays(1))); + + webTestClient.get() + .uri(uriBuilder -> uriBuilder.path("current-time") + .build()) + .exchange() + .expectStatus().isOk() + .expectHeader().exists(TRACE_ID_HEADER_NAME) + .expectBody(LocalDateTime.class) + .returnResult(); + final ConsumerRecord received = consumerRecords.poll(10, TimeUnit.SECONDS); + assertThat(received).isNotNull(); + + assertThat(output.getAll()) + .contains("\"tenant.name\":\"ru-a1-private\""); + } + + @Test + void mdcValuesShouldBeReportedWhenRetry(@Nonnull final CapturedOutput output) throws Exception { + stubErrorResponse(); + //final String zoneNames = stubErrorResponse(); + + final EntityExchangeResult result = webTestClient.get() + .uri(uriBuilder -> uriBuilder.path("current-time") + .build()) + .exchange() + .expectStatus().isOk() + .expectHeader().exists(TRACE_ID_HEADER_NAME) + .expectBody(LocalDateTime.class) + .returnResult(); + final String traceId = result.getResponseHeaders().getFirst(TRACE_ID_HEADER_NAME); + assertThat(traceId).isNotBlank(); + assertThat(output.getAll()) + .contains("Called method getNow. TraceId = " + traceId) + .contains("Awaiting acknowledgement from Kafka"); + + final ConsumerRecord received = consumerRecords.poll(10, TimeUnit.SECONDS); + assertThat(received).isNotNull(); + assertThatTraceIdPresentInKafkaHeaders(received, traceId); + + awaitStoringIntoDatabase(); + + assertThat(output.getAll()) + .contains( + "Received record: " + received.value() + " with traceId " + traceId, + "Retrying request to ", + "Retries exhausted"//, "\"instance_timezone\":\"" + zoneNames + "\"" + ); + } + + private void assertThatTraceIdPresentInKafkaHeaders(@Nonnull final ConsumerRecord received, + @Nonnull final String expectedTraceId) { assertThat(received.value()).startsWith("Current time = "); final Header[] headers = received.headers().toArray(); final List headerNames = Arrays.stream(headers) @@ -121,23 +179,14 @@ void spanShouldBeReportedInLogs(@Nonnull final CapturedOutput output) { .toList(); assertThat(headerValues) .hasSameSizeAs(headerNames) - .allSatisfy(h -> assertThat(h).contains(traceId)); + .allSatisfy(h -> assertThat(h).contains(expectedTraceId)); + } + private void awaitStoringIntoDatabase() { Awaitility .await() .atMost(10, TimeUnit.SECONDS) .pollInterval(Duration.ofMillis(500L)) .until(() -> countRecordsInTable() >= 1L); - assertThat(output.getAll()) - .contains("Received record: " + received.value() + " with traceId " + traceId); - final String messageFromDb = namedParameterJdbcTemplate.queryForObject("select message from otel_demo.storage where trace_id = :traceId", - Map.of("traceId", traceId), String.class); - assertThat(messageFromDb) - .isEqualTo(received.value()); - } - - private long countRecordsInTable() { - final Long queryResult = jdbcTemplate.queryForObject("select count(*) from otel_demo.storage", Long.class); - return Objects.requireNonNullElse(queryResult, 0L); } } diff --git a/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/service/PublicApiServiceTest.java b/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/service/PublicApiServiceTest.java index fb8f6981..0176b7a3 100644 --- a/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/service/PublicApiServiceTest.java +++ b/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/service/PublicApiServiceTest.java @@ -7,8 +7,6 @@ package io.github.mfvanek.spring.boot3.test.service; -import com.fasterxml.jackson.core.JsonProcessingException; -import io.github.mfvanek.spring.boot3.test.service.dto.CurrentTime; import io.github.mfvanek.spring.boot3.test.service.dto.ParsedDateTime; import io.github.mfvanek.spring.boot3.test.support.TestBase; import org.junit.jupiter.api.Test; @@ -18,15 +16,10 @@ import org.springframework.boot.test.system.OutputCaptureExtension; import java.time.LocalDateTime; -import java.time.ZoneId; import java.time.temporal.ChronoUnit; -import java.util.TimeZone; import javax.annotation.Nonnull; -import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; -import static com.github.tomakehurst.wiremock.client.WireMock.get; import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor; -import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching; import static com.github.tomakehurst.wiremock.client.WireMock.verify; import static org.assertj.core.api.Assertions.assertThat; @@ -38,16 +31,9 @@ class PublicApiServiceTest extends TestBase { private PublicApiService publicApiService; @Test - void printTimeZoneSuccessfully(@Nonnull final CapturedOutput output) throws JsonProcessingException { - final String zoneNames = TimeZone.getDefault().getID(); - final LocalDateTime localDateTimeNow = LocalDateTime.now(ZoneId.systemDefault()); - final ParsedDateTime parsedDateTime = ParsedDateTime.from(localDateTimeNow); - final CurrentTime currentTime = new CurrentTime(parsedDateTime); - stubFor(get(urlPathMatching("/" + zoneNames)) - .willReturn(aResponse() - .withStatus(200) - .withBody(objectMapper.writeValueAsString(currentTime)) - )); + void printTimeZoneSuccessfully(@Nonnull final CapturedOutput output) { + final LocalDateTime localDateTimeNow = LocalDateTime.now(clock); + final String zoneNames = stubOkResponse(ParsedDateTime.from(localDateTimeNow)); final LocalDateTime result = publicApiService.getZonedTime(); verify(getRequestedFor(urlPathMatching("/" + zoneNames))); @@ -55,27 +41,26 @@ void printTimeZoneSuccessfully(@Nonnull final CapturedOutput output) throws Json assertThat(result).isNotNull(); assertThat(result.truncatedTo(ChronoUnit.MINUTES)) .isEqualTo(localDateTimeNow.truncatedTo(ChronoUnit.MINUTES)); - assertThat(output).doesNotContain( - "Retrying request to ", - "Retries exhausted", - "Failed to convert response "); + assertThat(output.getAll()) + .contains("Request received:") + .doesNotContain( + "Retrying request to ", + "Retries exhausted", + "Failed to convert response ", + "timezone"); } @Test - void retriesThreeTimesToGetZonedTime(@Nonnull final CapturedOutput output) throws JsonProcessingException { - final String zoneNames = TimeZone.getDefault().getID(); - final RuntimeException exception = new RuntimeException("Retries exhausted"); - stubFor(get(urlPathMatching("/" + zoneNames)) - .willReturn(aResponse() - .withStatus(500) - .withBody(objectMapper.writeValueAsString(exception)) - )); + void retriesOnceToGetZonedTime(@Nonnull final CapturedOutput output) { + final String zoneNames = stubErrorResponse(); final LocalDateTime result = publicApiService.getZonedTime(); verify(2, getRequestedFor(urlPathMatching("/" + zoneNames))); assertThat(result).isNull(); - assertThat(output).contains("Retrying request to ", "Retries exhausted"); - assertThat(output).doesNotContain("Failed to convert response "); + assertThat(output.getAll()) + .contains("Retrying request to ", "Retries exhausted") + //.contains("Retrying request to ", "Retries exhausted", "\"instance_timezone\":\"" + zoneNames + "\"") + .doesNotContain("Failed to convert response "); } } diff --git a/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/support/TestBase.java b/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/support/TestBase.java index 3920fd6f..d3e1027e 100644 --- a/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/support/TestBase.java +++ b/spring-boot-3-demo-app/src/test/java/io/github/mfvanek/spring/boot3/test/support/TestBase.java @@ -8,6 +8,11 @@ package io.github.mfvanek.spring.boot3.test.support; import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.tomakehurst.wiremock.client.WireMock; +import io.github.mfvanek.spring.boot3.test.service.dto.CurrentTime; +import io.github.mfvanek.spring.boot3.test.service.dto.ParsedDateTime; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.actuate.observability.AutoConfigureObservability; import org.springframework.boot.test.context.SpringBootTest; @@ -19,6 +24,13 @@ import org.springframework.test.web.reactive.server.WebTestClient; import java.time.Clock; +import java.util.TimeZone; +import javax.annotation.Nonnull; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching; @AutoConfigureObservability @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @@ -37,4 +49,43 @@ public abstract class TestBase { protected ObjectMapper objectMapper; @Autowired protected Clock clock; + + @BeforeEach + void resetExternalMocks() { + WireMock.resetAllRequests(); + } + + @Nonnull + protected String stubOkResponse(@Nonnull final ParsedDateTime parsedDateTime) { + final String zoneNames = TimeZone.getDefault().getID(); + stubOkResponse(zoneNames, parsedDateTime); + return zoneNames; + } + + @SneakyThrows + private void stubOkResponse(@Nonnull final String zoneNames, @Nonnull final ParsedDateTime parsedDateTime) { + final CurrentTime currentTime = new CurrentTime(parsedDateTime); + stubFor(get(urlPathMatching("/" + zoneNames)) + .willReturn(aResponse() + .withStatus(200) + .withBody(objectMapper.writeValueAsString(currentTime)) + )); + } + + @Nonnull + protected String stubErrorResponse() { + final String zoneNames = TimeZone.getDefault().getID(); + final RuntimeException exception = new RuntimeException("Retries exhausted"); + stubErrorResponse(zoneNames, exception); + return zoneNames; + } + + @SneakyThrows + private void stubErrorResponse(@Nonnull final String zoneNames, @Nonnull final RuntimeException errorForResponse) { + stubFor(get(urlPathMatching("/" + zoneNames)) + .willReturn(aResponse() + .withStatus(500) + .withBody(objectMapper.writeValueAsString(errorForResponse)) + )); + } } diff --git a/spring-boot-3-demo-app/src/test/resources/application-test.yml b/spring-boot-3-demo-app/src/test/resources/application-test.yml index e2dffedc..91d85ab9 100644 --- a/spring-boot-3-demo-app/src/test/resources/application-test.yml +++ b/spring-boot-3-demo-app/src/test/resources/application-test.yml @@ -1,3 +1,13 @@ app: external-base-url: "http://localhost:${wiremock.server.port}/" retries: 1 + +#to change default appender in tests use: +#logging: +# appender.name: CONSOLE + +logging: + level: + org.testcontainers: INFO # In order to troubleshoot issues with Testcontainers, increase the logging level to DEBUG + com.github.dockerjava: WARN + com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.wire: OFF