Skip to content

Commit 1e2b200

Browse files
authored
Ensure clean thread context in MasterService (#114512)
`ThreadContext#stashContext` doesn't guarantee to give a clean thread context, but it's important we don't allow the callers' thread contexts to leak into the cluster state update. This commit captures the desired thread context at startup rather than using `stashContext` when forking the processor.
1 parent 2c1e023 commit 1e2b200

File tree

24 files changed

+99
-24
lines changed

24 files changed

+99
-24
lines changed

docs/changelog/114512.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 114512
2+
summary: Ensure clean thread context in `MasterService`
3+
area: Cluster Coordination
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/cluster/service/MasterService.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ public class MasterService extends AbstractLifecycleComponent {
108108

109109
protected final ThreadPool threadPool;
110110
private final TaskManager taskManager;
111+
private final ThreadContext.StoredContext clusterStateUpdateContext;
111112

112113
private volatile ExecutorService threadPoolExecutor;
113114
private final AtomicInteger totalQueueSize = new AtomicInteger();
@@ -128,6 +129,7 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP
128129

129130
this.threadPool = threadPool;
130131
this.taskManager = taskManager;
132+
this.clusterStateUpdateContext = getClusterStateUpdateContext(threadPool.getThreadContext());
131133

132134
final var queuesByPriorityBuilder = new EnumMap<Priority, PerPriorityQueue>(Priority.class);
133135
for (final var priority : Priority.values()) {
@@ -137,6 +139,15 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP
137139
this.unbatchedExecutor = new UnbatchedExecutor();
138140
}
139141

142+
private static ThreadContext.StoredContext getClusterStateUpdateContext(ThreadContext threadContext) {
143+
try (var ignored = threadContext.newStoredContext()) {
144+
// capture the context in which to run all cluster state updates here where we know it to be very clean
145+
assert threadContext.isDefaultContext() : "must only create MasterService in a clean ThreadContext";
146+
threadContext.markAsSystemContext();
147+
return threadContext.newStoredContext();
148+
}
149+
}
150+
140151
private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
141152
this.slowTaskLoggingThreshold = slowTaskLoggingThreshold;
142153
}
@@ -1324,8 +1335,8 @@ private void forkQueueProcessor() {
13241335

13251336
assert totalQueueSize.get() > 0;
13261337
final var threadContext = threadPool.getThreadContext();
1327-
try (var ignored = threadContext.stashContext()) {
1328-
threadContext.markAsSystemContext();
1338+
try (var ignored = threadContext.newStoredContext()) {
1339+
clusterStateUpdateContext.restore();
13291340
threadPoolExecutor.execute(queuesProcessor);
13301341
}
13311342
}

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,14 @@ static NodeConstruction prepareConstruction(
268268
constructor.loadLoggingDataProviders();
269269
TelemetryProvider telemetryProvider = constructor.createTelemetryProvider(settings);
270270
ThreadPool threadPool = constructor.createThreadPool(settings, telemetryProvider.getMeterRegistry());
271-
SettingsModule settingsModule = constructor.validateSettings(initialEnvironment.settings(), settings, threadPool);
271+
272+
final SettingsModule settingsModule;
273+
try (var ignored = threadPool.getThreadContext().newStoredContext()) {
274+
// If any deprecated settings are in use then we add warnings to the thread context response headers, but we're not
275+
// computing a response here so these headers aren't relevant and eventually just get dropped after possibly leaking into
276+
// places they shouldn't. Best to explicitly drop them now to protect against such leakage.
277+
settingsModule = constructor.validateSettings(initialEnvironment.settings(), settings, threadPool);
278+
}
272279

273280
SearchModule searchModule = constructor.createSearchModule(settingsModule.getSettings(), threadPool, telemetryProvider);
274281
constructor.createClientAndRegistries(settingsModule.getSettings(), threadPool, searchModule);

server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.common.Strings;
3535
import org.elasticsearch.common.UUIDs;
3636
import org.elasticsearch.common.settings.Settings;
37+
import org.elasticsearch.common.util.concurrent.ThreadContext;
3738
import org.elasticsearch.core.TimeValue;
3839
import org.elasticsearch.index.Index;
3940
import org.elasticsearch.index.IndexVersion;
@@ -63,6 +64,7 @@
6364
import static org.hamcrest.Matchers.lessThanOrEqualTo;
6465
import static org.hamcrest.Matchers.nullValue;
6566
import static org.mockito.Mockito.mock;
67+
import static org.mockito.Mockito.when;
6668

6769
public class MetadataRolloverServiceTests extends ESTestCase {
6870

@@ -833,6 +835,7 @@ public void testRolloverClusterStateForDataStreamNoTemplate() throws Exception {
833835
final TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin();
834836

835837
ThreadPool testThreadPool = mock(ThreadPool.class);
838+
when(testThreadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
836839
MetadataRolloverService rolloverService = DataStreamTestHelper.getMetadataRolloverService(
837840
dataStream,
838841
testThreadPool,

server/src/test/java/org/elasticsearch/action/ingest/ReservedPipelineActionTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.common.settings.ClusterSettings;
2424
import org.elasticsearch.common.settings.Settings;
2525
import org.elasticsearch.common.util.concurrent.EsExecutors;
26+
import org.elasticsearch.common.util.concurrent.ThreadContext;
2627
import org.elasticsearch.index.IndexVersion;
2728
import org.elasticsearch.ingest.FakeProcessor;
2829
import org.elasticsearch.ingest.IngestInfo;
@@ -81,6 +82,7 @@ public void setup() {
8182
threadPool = mock(ThreadPool.class);
8283
when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
8384
when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
85+
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
8486

8587
Client client = mock(Client.class);
8688
ingestService = new IngestService(

server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.elasticsearch.indices.EmptySystemIndices;
4747
import org.elasticsearch.injection.guice.ModuleTestCase;
4848
import org.elasticsearch.plugins.ClusterPlugin;
49-
import org.elasticsearch.tasks.TaskManager;
5049
import org.elasticsearch.telemetry.TelemetryProvider;
5150
import org.elasticsearch.test.gateway.TestGatewayAllocator;
5251
import org.elasticsearch.threadpool.TestThreadPool;
@@ -88,8 +87,8 @@ public void setUp() throws Exception {
8887
clusterService = new ClusterService(
8988
Settings.EMPTY,
9089
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
91-
null,
92-
(TaskManager) null
90+
threadPool,
91+
null
9392
);
9493
}
9594

server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateServiceTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.common.settings.ClusterSettings;
2323
import org.elasticsearch.common.settings.IndexScopedSettings;
2424
import org.elasticsearch.common.settings.Settings;
25+
import org.elasticsearch.common.util.concurrent.ThreadContext;
2526
import org.elasticsearch.core.TimeValue;
2627
import org.elasticsearch.env.Environment;
2728
import org.elasticsearch.health.node.selection.HealthNodeTaskExecutor;
@@ -77,6 +78,7 @@
7778
import static org.hamcrest.Matchers.nullValue;
7879
import static org.hamcrest.Matchers.sameInstance;
7980
import static org.mockito.Mockito.mock;
81+
import static org.mockito.Mockito.when;
8082

8183
public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
8284

@@ -2473,6 +2475,7 @@ public void testAddIndexTemplateWithDeprecatedComponentTemplate() throws Excepti
24732475

24742476
private static List<Throwable> putTemplate(NamedXContentRegistry xContentRegistry, PutRequest request) {
24752477
ThreadPool testThreadPool = mock(ThreadPool.class);
2478+
when(testThreadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
24762479
ClusterService clusterService = ClusterServiceUtils.createClusterService(testThreadPool);
24772480
MetadataCreateIndexService createIndexService = new MetadataCreateIndexService(
24782481
Settings.EMPTY,

server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import java.util.ArrayList;
7070
import java.util.Collections;
7171
import java.util.EnumMap;
72+
import java.util.HashMap;
7273
import java.util.List;
7374
import java.util.Map;
7475
import java.util.concurrent.CountDownLatch;
@@ -263,7 +264,15 @@ public void testThreadContext() throws InterruptedException {
263264
final CountDownLatch latch = new CountDownLatch(1);
264265

265266
try (ThreadContext.StoredContext ignored = threadPool.getThreadContext().stashContext()) {
266-
final Map<String, String> expectedHeaders = Collections.singletonMap("test", "test");
267+
268+
final var expectedHeaders = new HashMap<String, String>();
269+
expectedHeaders.put(randomIdentifier(), randomIdentifier());
270+
for (final var copiedHeader : Task.HEADERS_TO_COPY) {
271+
if (randomBoolean()) {
272+
expectedHeaders.put(copiedHeader, randomIdentifier());
273+
}
274+
}
275+
267276
final Map<String, List<String>> expectedResponseHeaders = Collections.singletonMap(
268277
"testResponse",
269278
Collections.singletonList("testResponse")
@@ -1343,7 +1352,6 @@ public void testAcking() {
13431352
.build();
13441353
final var deterministicTaskQueue = new DeterministicTaskQueue();
13451354
final var threadPool = deterministicTaskQueue.getThreadPool();
1346-
threadPool.getThreadContext().markAsSystemContext();
13471355
try (
13481356
var masterService = createMasterService(
13491357
true,
@@ -1352,6 +1360,7 @@ public void testAcking() {
13521360
new StoppableExecutorServiceWrapper(threadPool.generic())
13531361
)
13541362
) {
1363+
threadPool.getThreadContext().markAsSystemContext();
13551364

13561365
final var responseHeaderName = "test-response-header";
13571366

test/framework/src/main/java/org/elasticsearch/test/AbstractBuilderTestCase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.common.settings.IndexScopedSettings;
3737
import org.elasticsearch.common.settings.Settings;
3838
import org.elasticsearch.common.settings.SettingsModule;
39+
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
3940
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
4041
import org.elasticsearch.core.IOUtils;
4142
import org.elasticsearch.env.Environment;
@@ -81,7 +82,6 @@
8182
import org.elasticsearch.script.ScriptModule;
8283
import org.elasticsearch.script.ScriptService;
8384
import org.elasticsearch.search.SearchModule;
84-
import org.elasticsearch.tasks.TaskManager;
8585
import org.elasticsearch.test.index.IndexVersionUtils;
8686
import org.elasticsearch.threadpool.TestThreadPool;
8787
import org.elasticsearch.transport.RemoteClusterAware;
@@ -432,8 +432,8 @@ private static class ServiceHolder implements Closeable {
432432
ClusterService clusterService = new ClusterService(
433433
Settings.EMPTY,
434434
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
435-
null,
436-
(TaskManager) null
435+
new DeterministicTaskQueue().getThreadPool(),
436+
null
437437
);
438438

439439
client = (Client) Proxy.newProxyInstance(

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlSingleNodeTestCase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.cluster.service.ClusterService;
1313
import org.elasticsearch.common.settings.Settings;
1414
import org.elasticsearch.common.unit.ByteSizeValue;
15+
import org.elasticsearch.common.util.concurrent.ThreadContext;
1516
import org.elasticsearch.core.TimeValue;
1617
import org.elasticsearch.datastreams.DataStreamsPlugin;
1718
import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
@@ -159,6 +160,7 @@ protected <T> T blockingCall(Consumer<ActionListener<T>> function) throws Except
159160

160161
protected static ThreadPool mockThreadPool() {
161162
ThreadPool tp = mock(ThreadPool.class);
163+
when(tp.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
162164
ExecutorService executor = mock(ExecutorService.class);
163165
doAnswer(invocationOnMock -> {
164166
((Runnable) invocationOnMock.getArguments()[0]).run();

0 commit comments

Comments
 (0)