77
88package io .github .mfvanek .spring .boot2 .test .controllers ;
99
10- import io .github .mfvanek .spring .boot2 .test .service .dto .CurrentTime ;
1110import io .github .mfvanek .spring .boot2 .test .service .dto .ParsedDateTime ;
1211import io .github .mfvanek .spring .boot2 .test .support .KafkaConsumerUtils ;
1312import io .github .mfvanek .spring .boot2 .test .support .TestBase ;
14- import lombok .SneakyThrows ;
1513import org .apache .kafka .clients .consumer .ConsumerRecord ;
1614import org .apache .kafka .common .header .Header ;
1715import org .awaitility .Awaitility ;
3129import java .nio .charset .StandardCharsets ;
3230import java .time .Duration ;
3331import java .time .LocalDateTime ;
34- import java .time .ZoneId ;
3532import java .util .Arrays ;
3633import java .util .List ;
3734import java .util .Map ;
3835import java .util .Objects ;
39- import java .util .TimeZone ;
4036import java .util .UUID ;
4137import java .util .concurrent .BlockingQueue ;
4238import java .util .concurrent .LinkedBlockingQueue ;
4339import java .util .concurrent .TimeUnit ;
4440import javax .annotation .Nonnull ;
4541
46- import static com .github .tomakehurst .wiremock .client .WireMock .aResponse ;
47- import static com .github .tomakehurst .wiremock .client .WireMock .get ;
48- import static com .github .tomakehurst .wiremock .client .WireMock .stubFor ;
49- import static com .github .tomakehurst .wiremock .client .WireMock .urlPathMatching ;
5042import static io .github .mfvanek .spring .boot2 .test .filters .TraceIdInResponseServletFilter .TRACE_ID_HEADER_NAME ;
5143import static org .assertj .core .api .Assertions .assertThat ;
5244
@@ -78,17 +70,10 @@ void cleanUpDatabase() {
7870 jdbcTemplate .execute ("truncate table otel_demo.storage" );
7971 }
8072
81- @ SneakyThrows
8273 @ Test
83- void spanShouldBeReportedInLogs (@ Nonnull final CapturedOutput output ) {
84- final String zoneNames = TimeZone .getDefault ().getID ();
85- final ParsedDateTime parsedDateTime = ParsedDateTime .from (LocalDateTime .now (ZoneId .systemDefault ()).minusDays (1 ));
86- final CurrentTime currentTime = new CurrentTime (parsedDateTime );
87- stubFor (get (urlPathMatching ("/" + zoneNames ))
88- .willReturn (aResponse ()
89- .withStatus (200 )
90- .withBody (objectMapper .writeValueAsString (currentTime ))
91- ));
74+ void spanShouldBeReportedInLogs (@ Nonnull final CapturedOutput output ) throws Exception {
75+ stubOkResponse (ParsedDateTime .from (LocalDateTime .now (clock ).minusDays (1 )));
76+
9277 final EntityExchangeResult <LocalDateTime > result = webTestClient .get ()
9378 .uri (uriBuilder -> uriBuilder .path ("current-time" )
9479 .build ())
@@ -107,27 +92,10 @@ void spanShouldBeReportedInLogs(@Nonnull final CapturedOutput output) {
10792
10893 final ConsumerRecord <UUID , String > received = consumerRecords .poll (10 , TimeUnit .SECONDS );
10994 assertThat (received ).isNotNull ();
110- assertThat (received .value ()).startsWith ("Current time = " );
111- final Header [] headers = received .headers ().toArray ();
112- final List <String > headerNames = Arrays .stream (headers )
113- .map (Header ::key )
114- .toList ();
115- assertThat (headerNames )
116- .hasSize (2 )
117- .containsExactlyInAnyOrder ("traceparent" , "b3" );
118- final List <String > headerValues = Arrays .stream (headers )
119- .map (Header ::value )
120- .map (v -> new String (v , StandardCharsets .UTF_8 ))
121- .toList ();
122- assertThat (headerValues )
123- .hasSameSizeAs (headerNames )
124- .allSatisfy (h -> assertThat (h ).contains (traceId ));
95+ assertThatTraceIdPresentInKafkaHeaders (received , traceId );
96+
97+ awaitStoringIntoDatabase ();
12598
126- Awaitility
127- .await ()
128- .atMost (10 , TimeUnit .SECONDS )
129- .pollInterval (Duration .ofMillis (500L ))
130- .until (() -> countRecordsInTable () >= 1L );
13199 assertThat (output .getAll ())
132100 .contains ("Received record: " + received .value () + " with traceId " + traceId );
133101 final String messageFromDb = namedParameterJdbcTemplate .queryForObject ("select message from otel_demo.storage where trace_id = :traceId" ,
@@ -141,17 +109,10 @@ private long countRecordsInTable() {
141109 return Objects .requireNonNullElse (queryResult , 0L );
142110 }
143111
144- @ SneakyThrows
145112 @ Test
146- void mdcValuesShouldBeReportedInLogs (@ Nonnull final CapturedOutput output ) {
147- final String zoneNames = TimeZone .getDefault ().getID ();
148- final ParsedDateTime parsedDateTime = ParsedDateTime .from (LocalDateTime .now (ZoneId .systemDefault ()).minusDays (1 ));
149- final CurrentTime currentTime = new CurrentTime (parsedDateTime );
150- stubFor (get (urlPathMatching ("/" + zoneNames ))
151- .willReturn (aResponse ()
152- .withStatus (200 )
153- .withBody (objectMapper .writeValueAsString (currentTime ))
154- ));
113+ void mdcValuesShouldBeReportedInLogs (@ Nonnull final CapturedOutput output ) throws Exception {
114+ stubOkResponse (ParsedDateTime .from (LocalDateTime .now (clock ).minusDays (1 )));
115+
155116 webTestClient .get ()
156117 .uri (uriBuilder -> uriBuilder .path ("current-time" )
157118 .build ())
@@ -168,16 +129,9 @@ void mdcValuesShouldBeReportedInLogs(@Nonnull final CapturedOutput output) {
168129 .contains ("\" tenant.name\" :\" ru-a1-private\" " );
169130 }
170131
171- @ SneakyThrows
172132 @ Test
173- void spanAndMdcShouldBeReportedWhenRetry (@ Nonnull final CapturedOutput output ) {
174- final String zoneNames = TimeZone .getDefault ().getID ();
175- final RuntimeException exception = new RuntimeException ("Retries exhausted" );
176- stubFor (get (urlPathMatching ("/" + zoneNames ))
177- .willReturn (aResponse ()
178- .withStatus (500 )
179- .withBody (objectMapper .writeValueAsString (exception ))
180- ));
133+ void spanAndMdcShouldBeReportedWhenRetry (@ Nonnull final CapturedOutput output ) throws Exception {
134+ final String zoneNames = stubErrorResponse ();
181135
182136 final EntityExchangeResult <LocalDateTime > result = webTestClient .get ()
183137 .uri (uriBuilder -> uriBuilder .path ("current-time" )
@@ -195,6 +149,21 @@ void spanAndMdcShouldBeReportedWhenRetry(@Nonnull final CapturedOutput output) {
195149
196150 final ConsumerRecord <UUID , String > received = consumerRecords .poll (10 , TimeUnit .SECONDS );
197151 assertThat (received ).isNotNull ();
152+ assertThatTraceIdPresentInKafkaHeaders (received , traceId );
153+
154+ awaitStoringIntoDatabase ();
155+
156+ assertThat (output .getAll ())
157+ .contains (
158+ "Received record: " + received .value () + " with traceId " + traceId ,
159+ "Retrying request to " ,
160+ "Retries exhausted" ,
161+ "\" instance_timezone\" :\" " + zoneNames + "\" "
162+ );
163+ }
164+
165+ private void assertThatTraceIdPresentInKafkaHeaders (@ Nonnull final ConsumerRecord <UUID , String > received ,
166+ @ Nonnull final String expectedTraceId ) {
198167 assertThat (received .value ()).startsWith ("Current time = " );
199168 final Header [] headers = received .headers ().toArray ();
200169 final List <String > headerNames = Arrays .stream (headers )
@@ -209,19 +178,14 @@ void spanAndMdcShouldBeReportedWhenRetry(@Nonnull final CapturedOutput output) {
209178 .toList ();
210179 assertThat (headerValues )
211180 .hasSameSizeAs (headerNames )
212- .allSatisfy (h -> assertThat (h ).contains (traceId ));
181+ .allSatisfy (h -> assertThat (h ).contains (expectedTraceId ));
182+ }
213183
184+ private void awaitStoringIntoDatabase () {
214185 Awaitility
215186 .await ()
216187 .atMost (10 , TimeUnit .SECONDS )
217188 .pollInterval (Duration .ofMillis (500L ))
218189 .until (() -> countRecordsInTable () >= 1L );
219- assertThat (output .getAll ())
220- .contains (
221- "Received record: " + received .value () + " with traceId " + traceId ,
222- "Retrying request to " ,
223- "Retries exhausted" ,
224- "\" instance_timezone\" :\" " + zoneNames + "\" "
225- );
226190 }
227191}
0 commit comments