|
9 | 9 | package org.elasticsearch.cluster.routing.allocation.allocator; |
10 | 10 |
|
11 | 11 | import org.elasticsearch.action.ActionListener; |
| 12 | +import org.elasticsearch.action.support.RefCountingRunnable; |
12 | 13 | import org.elasticsearch.action.support.master.AcknowledgedResponse; |
13 | 14 | import org.elasticsearch.common.settings.Settings; |
14 | 15 | import org.elasticsearch.common.util.concurrent.ThreadContext; |
15 | | -import org.elasticsearch.core.Tuple; |
16 | 16 | import org.elasticsearch.test.ESTestCase; |
17 | 17 | import org.elasticsearch.threadpool.TestThreadPool; |
18 | 18 | import org.elasticsearch.threadpool.ThreadPool; |
| 19 | +import org.junit.Assert; |
19 | 20 |
|
| 21 | +import java.util.ArrayList; |
20 | 22 | import java.util.List; |
| 23 | +import java.util.Set; |
21 | 24 | import java.util.concurrent.CountDownLatch; |
22 | 25 | import java.util.concurrent.TimeUnit; |
23 | 26 | import java.util.concurrent.atomic.AtomicBoolean; |
24 | 27 | import java.util.concurrent.atomic.AtomicInteger; |
25 | | -import java.util.concurrent.atomic.AtomicReference; |
26 | 28 |
|
27 | 29 | import static org.hamcrest.Matchers.equalTo; |
28 | 30 |
|
@@ -121,45 +123,60 @@ private static void awaitQuietly(CountDownLatch latch) { |
121 | 123 |
|
122 | 124 | public void testShouldExecuteWithCorrectContext() { |
123 | 125 |
|
124 | | - var context = new ThreadContext(Settings.EMPTY); |
125 | | - var listener = new AllocationActionMultiListener<Integer>(context); |
126 | | - |
127 | | - context.putHeader("header", "root"); |
128 | | - var r1 = new AtomicReference<String>(); |
129 | | - var r2 = new AtomicReference<String>(); |
130 | | - var l1 = listener.delay( |
131 | | - ActionListener.wrap( |
132 | | - response -> r1.set(context.getHeader("header")), |
133 | | - exception -> { throw new AssertionError("Should not fail in test"); } |
134 | | - ) |
135 | | - ); |
136 | | - var l2 = listener.delay( |
137 | | - ActionListener.wrap( |
138 | | - response -> r2.set(context.getHeader("header")), |
139 | | - exception -> { throw new AssertionError("Should not fail in test"); } |
140 | | - ) |
141 | | - ); |
| 126 | + final var requestHeaderName = "header"; |
| 127 | + final var responseHeaderName = "responseHeader"; |
142 | 128 |
|
143 | | - executeInRandomOrder( |
144 | | - context, |
145 | | - List.of( |
146 | | - new Tuple<>("clusterStateUpdate1", () -> l1.onResponse(1)), |
147 | | - new Tuple<>("clusterStateUpdate2", () -> l2.onResponse(2)), |
148 | | - new Tuple<>("reroute", () -> listener.reroute().onResponse(null)) |
149 | | - ) |
150 | | - ); |
| 129 | + final var expectedRequestHeader = randomAlphaOfLength(10); |
| 130 | + final var expectedResponseHeader = randomAlphaOfLength(10); |
151 | 131 |
|
152 | | - assertThat(r1.get(), equalTo("root")); |
153 | | - assertThat(r2.get(), equalTo("root")); |
154 | | - } |
| 132 | + var context = new ThreadContext(Settings.EMPTY); |
| 133 | + var listener = new AllocationActionMultiListener<>(context); |
| 134 | + |
| 135 | + context.putHeader(requestHeaderName, expectedRequestHeader); |
| 136 | + context.addResponseHeader(responseHeaderName, expectedResponseHeader); |
| 137 | + |
| 138 | + var isComplete = new AtomicBoolean(); |
| 139 | + try (var refs = new RefCountingRunnable(() -> assertTrue(isComplete.compareAndSet(false, true)))) { |
| 140 | + |
| 141 | + List<Runnable> actions = new ArrayList<>(); |
| 142 | + |
| 143 | + for (int i = between(0, 5); i > 0; i--) { |
| 144 | + var expectedVal = new Object(); |
| 145 | + var delayedListener = listener.delay( |
| 146 | + ActionListener.releaseAfter(ActionListener.wrap(Assert::fail).delegateFailure((l, val) -> { |
| 147 | + assertSame(expectedVal, val); |
| 148 | + assertEquals(expectedRequestHeader, context.getHeader(requestHeaderName)); |
| 149 | + assertEquals(List.of(expectedResponseHeader), context.getResponseHeaders().get(responseHeaderName)); |
| 150 | + context.addResponseHeader(responseHeaderName, randomAlphaOfLength(10)); |
| 151 | + }), refs.acquire()) |
| 152 | + ); |
| 153 | + actions.add(() -> delayedListener.onResponse(expectedVal)); |
| 154 | + } |
155 | 155 |
|
156 | | - private static void executeInRandomOrder(ThreadContext context, List<Tuple<String, Runnable>> actions) { |
157 | | - for (var action : shuffledList(actions)) { |
158 | | - try (var ignored = context.stashContext()) { |
159 | | - context.putHeader("header", action.v1()); |
160 | | - action.v2().run(); |
| 156 | + final var additionalResponseHeader = randomAlphaOfLength(10); |
| 157 | + context.addResponseHeader(responseHeaderName, additionalResponseHeader); |
| 158 | + |
| 159 | + actions.add(() -> listener.reroute().onResponse(null)); |
| 160 | + |
| 161 | + for (var action : shuffledList(actions)) { |
| 162 | + try (var ignored = context.stashContext()) { |
| 163 | + final var localRequestHeader = randomAlphaOfLength(10); |
| 164 | + final var localResponseHeader = randomAlphaOfLength(10); |
| 165 | + context.putHeader(requestHeaderName, localRequestHeader); |
| 166 | + context.addResponseHeader(responseHeaderName, localResponseHeader); |
| 167 | + action.run(); |
| 168 | + assertEquals(localRequestHeader, context.getHeader(requestHeaderName)); |
| 169 | + assertEquals(List.of(localResponseHeader), context.getResponseHeaders().get(responseHeaderName)); |
| 170 | + } |
161 | 171 | } |
| 172 | + |
| 173 | + assertEquals( |
| 174 | + Set.of(expectedResponseHeader, additionalResponseHeader), |
| 175 | + Set.copyOf(context.getResponseHeaders().get(responseHeaderName)) |
| 176 | + ); |
162 | 177 | } |
| 178 | + |
| 179 | + assertTrue(isComplete.get()); |
163 | 180 | } |
164 | 181 |
|
165 | 182 | private static ThreadContext createEmptyThreadContext() { |
|
0 commit comments