|
5 | 5 | import static java.util.stream.Collectors.groupingBy;
|
6 | 6 | import static java.util.stream.Collectors.mapping;
|
7 | 7 | import static java.util.stream.Collectors.toList;
|
| 8 | +import static org.awaitility.Awaitility.await; |
8 | 9 | import static org.hamcrest.MatcherAssert.assertThat;
|
9 | 10 | import static org.hamcrest.Matchers.hasItem;
|
10 | 11 | import static org.junit.jupiter.api.Assertions.assertEquals;
|
11 | 12 | import static org.junit.jupiter.api.Assertions.assertTrue;
|
12 | 13 |
|
| 14 | +import java.util.Arrays; |
13 | 15 | import java.util.List;
|
14 | 16 | import java.util.Map;
|
15 | 17 | import java.util.UUID;
|
16 | 18 | import java.util.concurrent.CountDownLatch;
|
17 |
| -import java.util.concurrent.atomic.AtomicReference; |
18 |
| -import java.util.function.Consumer; |
| 19 | +import java.util.concurrent.TimeUnit; |
19 | 20 | import java.util.stream.IntStream;
|
20 | 21 | import java.util.stream.Stream;
|
21 | 22 |
|
|
25 | 26 | import org.jboss.logging.MDC;
|
26 | 27 | import org.jboss.shrinkwrap.api.ShrinkWrap;
|
27 | 28 | import org.jboss.shrinkwrap.api.spec.JavaArchive;
|
28 |
| -import org.junit.jupiter.api.Test; |
| 29 | +import org.junit.jupiter.api.Assertions; |
| 30 | +import org.junit.jupiter.api.RepeatedTest; |
29 | 31 | import org.junit.jupiter.api.extension.RegisterExtension;
|
30 | 32 |
|
| 33 | +import io.quarkus.bootstrap.logging.InitialConfigurator; |
31 | 34 | import io.quarkus.test.QuarkusUnitTest;
|
32 |
| -import io.vertx.core.AsyncResult; |
33 |
| -import io.vertx.core.CompositeFuture; |
34 | 35 | import io.vertx.core.Future;
|
35 |
| -import io.vertx.core.Handler; |
36 | 36 | import io.vertx.core.Vertx;
|
37 | 37 | import io.vertx.core.buffer.Buffer;
|
38 | 38 | import io.vertx.ext.web.client.HttpRequest;
|
@@ -61,102 +61,102 @@ public class VertxMDCTest {
|
61 | 61 |
|
62 | 62 | @Inject
|
63 | 63 | InMemoryLogHandler inMemoryLogHandler;
|
64 |
| - |
| 64 | + @Inject |
| 65 | + InMemoryLogHandlerProducer producer; |
65 | 66 | @Inject
|
66 | 67 | VerticleDeployer verticleDeployer;
|
67 | 68 |
|
68 |
| - static final CountDownLatch countDownLatch = new CountDownLatch(1); |
69 |
| - static final AtomicReference<Throwable> errorDuringExecution = new AtomicReference<>(); |
70 |
| - |
71 |
| - @Test |
| 69 | + @RepeatedTest(10) |
72 | 70 | void mdc() throws Throwable {
|
73 |
| - List<String> requestIds = IntStream.range(0, 10) |
| 71 | + |
| 72 | + InMemoryLogHandler.reset(); |
| 73 | + await().until(() -> producer.isInitialized()); |
| 74 | + await().until(InitialConfigurator.DELAYED_HANDLER::isActivated); |
| 75 | + await().until(() -> { |
| 76 | + return Arrays.stream(InitialConfigurator.DELAYED_HANDLER.getHandlers()).anyMatch(h -> h == inMemoryLogHandler); |
| 77 | + }); |
| 78 | + ; |
| 79 | + |
| 80 | + List<String> requestIds = IntStream.range(0, 1) |
74 | 81 | .mapToObj(i -> UUID.randomUUID().toString())
|
75 | 82 | .collect(toList());
|
76 | 83 |
|
77 |
| - sendRequests(requestIds, onSuccess(v -> { |
78 |
| - try { |
79 |
| - Map<String, List<String>> allMessagesById = inMemoryLogHandler.logRecords() |
80 |
| - .stream() |
81 |
| - .map(line -> line.split(" ### ")) |
82 |
| - .peek(split -> assertEquals(split[0], split[2])) |
83 |
| - .collect(groupingBy(split -> split[0], |
84 |
| - mapping(split -> split[1], toList()))); |
85 |
| - |
86 |
| - assertEquals(requestIds.size(), allMessagesById.size()); |
87 |
| - assertTrue(requestIds.containsAll(allMessagesById.keySet())); |
88 |
| - |
89 |
| - List<String> expected = Stream.<String> builder() |
90 |
| - .add("Received HTTP request") |
91 |
| - .add("Timer fired") |
92 |
| - .add("Blocking task executed") |
93 |
| - .add("Received Web Client response") |
94 |
| - .build() |
95 |
| - .collect(toList()); |
96 |
| - |
97 |
| - for (List<String> messages : allMessagesById.values()) { |
98 |
| - assertEquals(expected, messages); |
99 |
| - } |
100 |
| - } catch (Throwable t) { |
101 |
| - errorDuringExecution.set(t); |
102 |
| - } finally { |
103 |
| - countDownLatch.countDown(); |
104 |
| - } |
105 |
| - })); |
| 84 | + CountDownLatch done = new CountDownLatch(1); |
| 85 | + sendRequests(requestIds, done); |
| 86 | + |
| 87 | + Assertions.assertTrue(done.await(20, TimeUnit.SECONDS)); |
| 88 | + |
| 89 | + await().untilAsserted(() -> { |
| 90 | + Map<String, List<String>> allMessagesById = inMemoryLogHandler.logRecords() |
| 91 | + .stream() |
| 92 | + .map(line -> line.split(" ### ")) |
| 93 | + .peek(split -> assertEquals(split[0], split[2])) |
| 94 | + .collect(groupingBy(split -> split[0], |
| 95 | + mapping(split -> split[1], toList()))); |
106 | 96 |
|
107 |
| - countDownLatch.await(); |
| 97 | + assertEquals(requestIds.size(), allMessagesById.size()); |
| 98 | + assertTrue(requestIds.containsAll(allMessagesById.keySet())); |
108 | 99 |
|
109 |
| - Throwable throwable = errorDuringExecution.get(); |
110 |
| - if (throwable != null) { |
111 |
| - throw throwable; |
112 |
| - } |
| 100 | + List<String> expected = Stream.<String> builder() |
| 101 | + .add("Received HTTP request") |
| 102 | + .add("Timer fired") |
| 103 | + .add("Blocking task executed") |
| 104 | + .add("Received Web Client response") |
| 105 | + .build() |
| 106 | + .collect(toList()); |
| 107 | + |
| 108 | + for (List<String> messages : allMessagesById.values()) { |
| 109 | + assertEquals(expected, messages); |
| 110 | + } |
| 111 | + }); |
113 | 112 | }
|
114 | 113 |
|
115 |
| - @Test |
| 114 | + @RepeatedTest(10) |
116 | 115 | public void mdcNonVertxThreadTest() {
|
| 116 | + InMemoryLogHandler.reset(); |
| 117 | + await().until(() -> producer.isInitialized()); |
| 118 | + await().until(InitialConfigurator.DELAYED_HANDLER::isActivated); |
| 119 | + await().until(() -> { |
| 120 | + return Arrays.stream(InitialConfigurator.DELAYED_HANDLER.getHandlers()).anyMatch(h -> h == inMemoryLogHandler); |
| 121 | + }); |
| 122 | + ; |
| 123 | + |
117 | 124 | String mdcValue = "Test MDC value";
|
118 | 125 | MDC.put("requestId", mdcValue);
|
119 |
| - LOGGER.info("Test 1"); |
120 | 126 |
|
121 |
| - assertThat(inMemoryLogHandler.logRecords(), |
122 |
| - hasItem(mdcValue + " ### Test 1")); |
| 127 | + await().untilAsserted(() -> { |
| 128 | + LOGGER.info("Test 1"); |
| 129 | + assertThat(inMemoryLogHandler.logRecords(), hasItem(mdcValue + " ### Test 1")); |
| 130 | + }); |
123 | 131 |
|
124 | 132 | MDC.remove("requestId");
|
125 |
| - LOGGER.info("Test 2"); |
126 | 133 |
|
127 |
| - assertThat(inMemoryLogHandler.logRecords(), |
128 |
| - hasItem(" ### Test 2")); |
| 134 | + await().untilAsserted(() -> { |
| 135 | + LOGGER.info("Test 2"); |
| 136 | + assertThat(inMemoryLogHandler.logRecords(), hasItem(" ### Test 2")); |
| 137 | + }); |
129 | 138 |
|
130 |
| - mdcValue = "New test MDC value"; |
131 |
| - MDC.put("requestId", mdcValue); |
132 |
| - LOGGER.info("Test 3"); |
| 139 | + String mdcValue2 = "New test MDC value"; |
| 140 | + MDC.put("requestId", mdcValue2); |
133 | 141 |
|
134 |
| - assertThat(inMemoryLogHandler.logRecords(), |
135 |
| - hasItem(mdcValue + " ### Test 3")); |
136 |
| - } |
137 |
| - |
138 |
| - protected <T> Handler<AsyncResult<T>> onSuccess(Consumer<T> consumer) { |
139 |
| - return result -> { |
140 |
| - if (result.failed()) { |
141 |
| - errorDuringExecution.set(result.cause()); |
142 |
| - countDownLatch.countDown(); |
143 |
| - } else { |
144 |
| - consumer.accept(result.result()); |
145 |
| - } |
146 |
| - }; |
| 142 | + await().untilAsserted(() -> { |
| 143 | + LOGGER.info("Test 3"); |
| 144 | + assertThat(inMemoryLogHandler.logRecords(), hasItem(mdcValue2 + " ### Test 3")); |
| 145 | + }); |
147 | 146 | }
|
148 | 147 |
|
149 |
| - @SuppressWarnings({ "rawtypes" }) |
150 |
| - private void sendRequests(List<String> ids, Handler<AsyncResult<Void>> handler) { |
| 148 | + private void sendRequests(List<String> ids, CountDownLatch done) { |
151 | 149 | WebClient webClient = WebClient.create(vertx, new WebClientOptions().setDefaultPort(VERTICLE_PORT));
|
152 | 150 |
|
153 | 151 | HttpRequest<Buffer> request = webClient.get("/")
|
154 | 152 | .expect(ResponsePredicate.SC_OK);
|
155 | 153 |
|
156 |
| - List<Future> futures = ids.stream() |
| 154 | + List<? extends Future<?>> futures = ids.stream() |
157 | 155 | .map(id -> request.putHeader(REQUEST_ID_HEADER, id).send())
|
158 | 156 | .collect(toList());
|
159 | 157 |
|
160 |
| - CompositeFuture.all(futures).<Void> mapEmpty().onComplete(handler); |
| 158 | + Future.all(futures).mapEmpty().onComplete(x -> { |
| 159 | + done.countDown(); |
| 160 | + }); |
161 | 161 | }
|
162 | 162 | }
|
0 commit comments