Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
Expand All @@ -27,22 +29,26 @@
@RequiredArgsConstructor
public class KafkaReadingService {

@Value("${app.tenant.name}")
private String tenantName;
private final Tracer tracer;
private final Clock clock;
private final NamedParameterJdbcTemplate jdbcTemplate;

@KafkaListener(topics = "${spring.kafka.template.default-topic}")
public void listen(ConsumerRecord<UUID, String> message, Acknowledgment ack) {
final Span currentSpan = tracer.currentSpan();
final String traceId = currentSpan != null ? currentSpan.context().traceId() : "";
log.info("Received record: {} with traceId {}", message.value(), traceId);
jdbcTemplate.update("insert into otel_demo.storage(message, trace_id, created_at) values(:msg, :traceId, :createdAt);",
Map.ofEntries(
Map.entry("msg", message.value()),
Map.entry("traceId", traceId),
Map.entry("createdAt", LocalDateTime.now(clock))
)
);
try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) {
final Span currentSpan = tracer.currentSpan();
final String traceId = currentSpan != null ? currentSpan.context().traceId() : "";
log.info("Received record: {} with traceId {}", message.value(), traceId);
jdbcTemplate.update("insert into otel_demo.storage(message, trace_id, created_at) values(:msg, :traceId, :createdAt);",
Map.ofEntries(
Map.entry("msg", message.value()),
Map.entry("traceId", traceId),
Map.entry("createdAt", LocalDateTime.now(clock))
)
);
}
ack.acknowledge();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
Expand All @@ -22,10 +24,14 @@
@RequiredArgsConstructor
public class KafkaSendingService {

@Value("${app.tenant.name}")
private String tenantName;
private final KafkaTemplate<UUID, String> kafkaTemplate;

public CompletableFuture<SendResult<UUID, String>> sendNotification(@Nonnull final String message) {
log.info("Sending message \"{}\" to Kafka", message);
return kafkaTemplate.sendDefault(UUID.randomUUID(), message).completable();
try (MDC.MDCCloseable ignored = MDC.putCloseable("tenant.name", tenantName)) {
log.info("Sending message \"{}\" to Kafka", message);
return kafkaTemplate.sendDefault(UUID.randomUUID(), message).completable();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.github.mfvanek.spring.boot2.test.service.dto.ParsedDateTime;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.retry.ExhaustedRetryException;
Expand Down Expand Up @@ -58,8 +59,12 @@ private CurrentTime getZonedTimeFromWorldTimeApi() throws JsonProcessingExceptio
.retrieve()
.bodyToMono(String.class)
.retryWhen(Retry.fixedDelay(retries, Duration.ofSeconds(2))
.doBeforeRetry(retrySignal -> log.info("Retrying request to {}, attempt {}/{} due to error:",
webClient.options().uri(String.join("", zoneNames)), retries, retrySignal.totalRetries() + 1, retrySignal.failure()))
.doBeforeRetry(retrySignal -> {
try (MDC.MDCCloseable ignored = MDC.putCloseable("instance_timezone", zoneNames)) {
log.info("Retrying request to {}, attempt {}/{} due to error:",
webClient.options().uri(String.join("", zoneNames)), retrySignal.totalRetries() + 1, retries, retrySignal.failure());
}
})
.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {
log.error("Request to {} failed after {} attempts.", webClient.options().uri(String.join("", zoneNames)), retrySignal.totalRetries() + 1);
return new ExhaustedRetryException("Retries exhausted", retrySignal.failure());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@

package io.github.mfvanek.spring.boot2.test.controllers;

import io.github.mfvanek.spring.boot2.test.service.dto.CurrentTime;
import io.github.mfvanek.spring.boot2.test.service.dto.ParsedDateTime;
import io.github.mfvanek.spring.boot2.test.support.KafkaConsumerUtils;
import io.github.mfvanek.spring.boot2.test.support.TestBase;
import lombok.SneakyThrows;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -31,22 +30,16 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching;
import static io.github.mfvanek.spring.boot2.test.filters.TraceIdInResponseServletFilter.TRACE_ID_HEADER_NAME;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -78,17 +71,10 @@ void cleanUpDatabase() {
jdbcTemplate.execute("truncate table otel_demo.storage");
}

@SneakyThrows
@Test
void spanShouldBeReportedInLogs(@Nonnull final CapturedOutput output) {
final String zoneNames = TimeZone.getDefault().getID();
final ParsedDateTime parsedDateTime = ParsedDateTime.from(LocalDateTime.now(ZoneId.systemDefault()).minusDays(1));
final CurrentTime currentTime = new CurrentTime(parsedDateTime);
stubFor(get(urlPathMatching("/" + zoneNames))
.willReturn(aResponse()
.withStatus(200)
.withBody(objectMapper.writeValueAsString(currentTime))
));
void spanShouldBeReportedInLogs(@Nonnull final CapturedOutput output) throws Exception {
stubOkResponse(ParsedDateTime.from(LocalDateTime.now(clock).minusDays(1)));

final EntityExchangeResult<LocalDateTime> result = webTestClient.get()
.uri(uriBuilder -> uriBuilder.path("current-time")
.build())
Expand All @@ -107,6 +93,79 @@ void spanShouldBeReportedInLogs(@Nonnull final CapturedOutput output) {

final ConsumerRecord<UUID, String> received = consumerRecords.poll(10, TimeUnit.SECONDS);
assertThat(received).isNotNull();
assertThatTraceIdPresentInKafkaHeaders(received, traceId);

awaitStoringIntoDatabase();

assertThat(output.getAll())
.contains("Received record: " + received.value() + " with traceId " + traceId);
final String messageFromDb = namedParameterJdbcTemplate.queryForObject("select message from otel_demo.storage where trace_id = :traceId",
Map.of("traceId", traceId), String.class);
assertThat(messageFromDb)
.isEqualTo(received.value());
}

private long countRecordsInTable() {
final Long queryResult = jdbcTemplate.queryForObject("select count(*) from otel_demo.storage", Long.class);
return Objects.requireNonNullElse(queryResult, 0L);
}

@Disabled
@Test
void mdcValuesShouldBeReportedInLogs(@Nonnull final CapturedOutput output) throws Exception {
stubOkResponse(ParsedDateTime.from(LocalDateTime.now(clock).minusDays(1)));

webTestClient.get()
.uri(uriBuilder -> uriBuilder.path("current-time")
.build())
.exchange()
.expectStatus().isOk()
.expectHeader().exists(TRACE_ID_HEADER_NAME)
.expectBody(LocalDateTime.class)
.returnResult();

final ConsumerRecord<UUID, String> received = consumerRecords.poll(10, TimeUnit.SECONDS);
assertThat(received).isNotNull();

assertThat(output.getAll())
.contains("\"tenant.name\":\"ru-a1-private\"");
}

@Test
void spanAndMdcShouldBeReportedWhenRetry(@Nonnull final CapturedOutput output) throws Exception {
stubErrorResponse();
//final String zoneNames = stubErrorResponse();

final EntityExchangeResult<LocalDateTime> result = webTestClient.get()
.uri(uriBuilder -> uriBuilder.path("current-time")
.build())
.exchange()
.expectStatus().isOk()
.expectHeader().exists(TRACE_ID_HEADER_NAME)
.expectBody(LocalDateTime.class)
.returnResult();
final String traceId = result.getResponseHeaders().getFirst(TRACE_ID_HEADER_NAME);
assertThat(traceId).isNotBlank();
assertThat(output.getAll())
.contains("Called method getNow. TraceId = " + traceId)
.contains("Awaiting acknowledgement from Kafka");

final ConsumerRecord<UUID, String> received = consumerRecords.poll(10, TimeUnit.SECONDS);
assertThat(received).isNotNull();
assertThatTraceIdPresentInKafkaHeaders(received, traceId);

awaitStoringIntoDatabase();

assertThat(output.getAll())
.contains(
"Received record: " + received.value() + " with traceId " + traceId,
"Retrying request to ",
"Retries exhausted"//, "\"instance_timezone\":\"" + zoneNames + "\""
);
}

private void assertThatTraceIdPresentInKafkaHeaders(@Nonnull final ConsumerRecord<UUID, String> received,
@Nonnull final String expectedTraceId) {
assertThat(received.value()).startsWith("Current time = ");
final Header[] headers = received.headers().toArray();
final List<String> headerNames = Arrays.stream(headers)
Expand All @@ -121,23 +180,14 @@ void spanShouldBeReportedInLogs(@Nonnull final CapturedOutput output) {
.toList();
assertThat(headerValues)
.hasSameSizeAs(headerNames)
.allSatisfy(h -> assertThat(h).contains(traceId));
.allSatisfy(h -> assertThat(h).contains(expectedTraceId));
}

private void awaitStoringIntoDatabase() {
Awaitility
.await()
.atMost(10, TimeUnit.SECONDS)
.pollInterval(Duration.ofMillis(500L))
.until(() -> countRecordsInTable() >= 1L);
assertThat(output.getAll())
.contains("Received record: " + received.value() + " with traceId " + traceId);
final String messageFromDb = namedParameterJdbcTemplate.queryForObject("select message from otel_demo.storage where trace_id = :traceId",
Map.of("traceId", traceId), String.class);
assertThat(messageFromDb)
.isEqualTo(received.value());
}

private long countRecordsInTable() {
final Long queryResult = jdbcTemplate.queryForObject("select count(*) from otel_demo.storage", Long.class);
return Objects.requireNonNullElse(queryResult, 0L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

package io.github.mfvanek.spring.boot2.test.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.github.mfvanek.spring.boot2.test.service.dto.CurrentTime;
import io.github.mfvanek.spring.boot2.test.service.dto.ParsedDateTime;
import io.github.mfvanek.spring.boot2.test.support.TestBase;
import org.junit.jupiter.api.Test;
Expand All @@ -18,15 +16,10 @@
import org.springframework.boot.test.system.OutputCaptureExtension;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.TimeZone;
import javax.annotation.Nonnull;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching;
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -38,44 +31,36 @@ class PublicApiServiceTest extends TestBase {
private PublicApiService publicApiService;

@Test
void getZonedTimeSuccessfully(@Nonnull final CapturedOutput output) throws JsonProcessingException {
final String zoneNames = TimeZone.getDefault().getID();
final LocalDateTime localDateTimeNow = LocalDateTime.now(ZoneId.systemDefault());
final ParsedDateTime parsedDateTime = ParsedDateTime.from(localDateTimeNow);
final CurrentTime currentTime = new CurrentTime(parsedDateTime);
stubFor(get(urlPathMatching("/" + zoneNames))
.willReturn(aResponse()
.withStatus(200)
.withBody(objectMapper.writeValueAsString(currentTime))
));
void getZonedTimeSuccessfully(@Nonnull final CapturedOutput output) {
final LocalDateTime localDateTimeNow = LocalDateTime.now(clock);
final String zoneNames = stubOkResponse(ParsedDateTime.from(localDateTimeNow));

final LocalDateTime result = publicApiService.getZonedTime();
verify(getRequestedFor(urlPathMatching("/" + zoneNames)));

assertThat(result).isNotNull();
assertThat(result.truncatedTo(ChronoUnit.MINUTES))
.isEqualTo(localDateTimeNow.truncatedTo(ChronoUnit.MINUTES));
assertThat(output).doesNotContain(
"Retrying request to ",
"Retries exhausted",
"Failed to convert response ");
assertThat(output.getAll())
.contains("Request received:")
.doesNotContain(
"Retrying request to ",
"Retries exhausted",
"Failed to convert response ",
"timezone");
}

@Test
void retriesThreeTimesToGetZonedTime(@Nonnull final CapturedOutput output) throws JsonProcessingException {
final String zoneNames = TimeZone.getDefault().getID();
final RuntimeException exception = new RuntimeException("Retries exhausted");
stubFor(get(urlPathMatching("/" + zoneNames))
.willReturn(aResponse()
.withStatus(500)
.withBody(objectMapper.writeValueAsString(exception))
));
void retriesOnceToGetZonedTime(@Nonnull final CapturedOutput output) {
final String zoneNames = stubErrorResponse();

final LocalDateTime result = publicApiService.getZonedTime();
verify(2, getRequestedFor(urlPathMatching("/" + zoneNames)));

assertThat(result).isNull();
assertThat(output).contains("Retrying request to ", "Retries exhausted");
assertThat(output).doesNotContain("Failed to convert response ");
assertThat(output.getAll())
.contains("Retrying request to ", "Retries exhausted")
//.contains("Retrying request to ", "Retries exhausted", "\"instance_timezone\":\"" + zoneNames + "\"")
.doesNotContain("Failed to convert response ");
}
}
Loading