Skip to content

Commit a0c5af3

Browse files
authored
Ensure clean thread context in MasterService (#114512) (#114569)
`ThreadContext#stashContext` doesn't guarantee to give a clean thread context, but it's important we don't allow the callers' thread contexts to leak into the cluster state update. This commit captures the desired thread context at startup rather than using `stashContext` when forking the processor.
1 parent 62d0765 commit a0c5af3

File tree

24 files changed

+99
-24
lines changed

24 files changed

+99
-24
lines changed

docs/changelog/114512.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 114512
2+
summary: Ensure clean thread context in `MasterService`
3+
area: Cluster Coordination
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/cluster/service/MasterService.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ public class MasterService extends AbstractLifecycleComponent {
109109

110110
protected final ThreadPool threadPool;
111111
private final TaskManager taskManager;
112+
private final ThreadContext.StoredContext clusterStateUpdateContext;
112113

113114
private volatile ExecutorService threadPoolExecutor;
114115
private final AtomicInteger totalQueueSize = new AtomicInteger();
@@ -129,6 +130,7 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP
129130

130131
this.threadPool = threadPool;
131132
this.taskManager = taskManager;
133+
this.clusterStateUpdateContext = getClusterStateUpdateContext(threadPool.getThreadContext());
132134

133135
final var queuesByPriorityBuilder = new EnumMap<Priority, PerPriorityQueue>(Priority.class);
134136
for (final var priority : Priority.values()) {
@@ -138,6 +140,15 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP
138140
this.unbatchedExecutor = new UnbatchedExecutor();
139141
}
140142

143+
private static ThreadContext.StoredContext getClusterStateUpdateContext(ThreadContext threadContext) {
144+
try (var ignored = threadContext.newStoredContext()) {
145+
// capture the context in which to run all cluster state updates here where we know it to be very clean
146+
assert threadContext.isDefaultContext() : "must only create MasterService in a clean ThreadContext";
147+
threadContext.markAsSystemContext();
148+
return threadContext.newStoredContext();
149+
}
150+
}
151+
141152
private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
142153
this.slowTaskLoggingThreshold = slowTaskLoggingThreshold;
143154
}
@@ -1330,8 +1341,8 @@ private void forkQueueProcessor() {
13301341

13311342
assert totalQueueSize.get() > 0;
13321343
final var threadContext = threadPool.getThreadContext();
1333-
try (var ignored = threadContext.stashContext()) {
1334-
threadContext.markAsSystemContext();
1344+
try (var ignored = threadContext.newStoredContext()) {
1345+
clusterStateUpdateContext.restore();
13351346
threadPoolExecutor.execute(queuesProcessor);
13361347
}
13371348
}

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,14 @@ static NodeConstruction prepareConstruction(
268268
constructor.loadLoggingDataProviders();
269269
TelemetryProvider telemetryProvider = constructor.createTelemetryProvider(settings);
270270
ThreadPool threadPool = constructor.createThreadPool(settings, telemetryProvider.getMeterRegistry());
271-
SettingsModule settingsModule = constructor.validateSettings(initialEnvironment.settings(), settings, threadPool);
271+
272+
final SettingsModule settingsModule;
273+
try (var ignored = threadPool.getThreadContext().newStoredContext()) {
274+
// If any deprecated settings are in use then we add warnings to the thread context response headers, but we're not
275+
// computing a response here so these headers aren't relevant and eventually just get dropped after possibly leaking into
276+
// places they shouldn't. Best to explicitly drop them now to protect against such leakage.
277+
settingsModule = constructor.validateSettings(initialEnvironment.settings(), settings, threadPool);
278+
}
272279

273280
SearchModule searchModule = constructor.createSearchModule(settingsModule.getSettings(), threadPool, telemetryProvider);
274281
constructor.createClientAndRegistries(settingsModule.getSettings(), threadPool, searchModule);

server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.common.Strings;
3535
import org.elasticsearch.common.UUIDs;
3636
import org.elasticsearch.common.settings.Settings;
37+
import org.elasticsearch.common.util.concurrent.ThreadContext;
3738
import org.elasticsearch.core.TimeValue;
3839
import org.elasticsearch.index.Index;
3940
import org.elasticsearch.index.IndexVersion;
@@ -63,6 +64,7 @@
6364
import static org.hamcrest.Matchers.lessThanOrEqualTo;
6465
import static org.hamcrest.Matchers.nullValue;
6566
import static org.mockito.Mockito.mock;
67+
import static org.mockito.Mockito.when;
6668

6769
public class MetadataRolloverServiceTests extends ESTestCase {
6870

@@ -833,6 +835,7 @@ public void testRolloverClusterStateForDataStreamNoTemplate() throws Exception {
833835
final TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin();
834836

835837
ThreadPool testThreadPool = mock(ThreadPool.class);
838+
when(testThreadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
836839
MetadataRolloverService rolloverService = DataStreamTestHelper.getMetadataRolloverService(
837840
dataStream,
838841
testThreadPool,

server/src/test/java/org/elasticsearch/action/ingest/ReservedPipelineActionTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.common.settings.ClusterSettings;
2424
import org.elasticsearch.common.settings.Settings;
2525
import org.elasticsearch.common.util.concurrent.EsExecutors;
26+
import org.elasticsearch.common.util.concurrent.ThreadContext;
2627
import org.elasticsearch.index.IndexVersion;
2728
import org.elasticsearch.ingest.FakeProcessor;
2829
import org.elasticsearch.ingest.IngestInfo;
@@ -81,6 +82,7 @@ public void setup() {
8182
threadPool = mock(ThreadPool.class);
8283
when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
8384
when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
85+
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
8486

8587
Client client = mock(Client.class);
8688
ingestService = new IngestService(

server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.elasticsearch.indices.EmptySystemIndices;
4747
import org.elasticsearch.injection.guice.ModuleTestCase;
4848
import org.elasticsearch.plugins.ClusterPlugin;
49-
import org.elasticsearch.tasks.TaskManager;
5049
import org.elasticsearch.telemetry.TelemetryProvider;
5150
import org.elasticsearch.test.gateway.TestGatewayAllocator;
5251
import org.elasticsearch.threadpool.TestThreadPool;
@@ -88,8 +87,8 @@ public void setUp() throws Exception {
8887
clusterService = new ClusterService(
8988
Settings.EMPTY,
9089
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
91-
null,
92-
(TaskManager) null
90+
threadPool,
91+
null
9392
);
9493
}
9594

server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateServiceTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.common.settings.ClusterSettings;
2323
import org.elasticsearch.common.settings.IndexScopedSettings;
2424
import org.elasticsearch.common.settings.Settings;
25+
import org.elasticsearch.common.util.concurrent.ThreadContext;
2526
import org.elasticsearch.core.TimeValue;
2627
import org.elasticsearch.env.Environment;
2728
import org.elasticsearch.health.node.selection.HealthNodeTaskExecutor;
@@ -77,6 +78,7 @@
7778
import static org.hamcrest.Matchers.nullValue;
7879
import static org.hamcrest.Matchers.sameInstance;
7980
import static org.mockito.Mockito.mock;
81+
import static org.mockito.Mockito.when;
8082

8183
public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
8284

@@ -2473,6 +2475,7 @@ public void testAddIndexTemplateWithDeprecatedComponentTemplate() throws Excepti
24732475

24742476
private static List<Throwable> putTemplate(NamedXContentRegistry xContentRegistry, PutRequest request) {
24752477
ThreadPool testThreadPool = mock(ThreadPool.class);
2478+
when(testThreadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
24762479
ClusterService clusterService = ClusterServiceUtils.createClusterService(testThreadPool);
24772480
MetadataCreateIndexService createIndexService = new MetadataCreateIndexService(
24782481
Settings.EMPTY,

server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import java.util.ArrayList;
6969
import java.util.Collections;
7070
import java.util.EnumMap;
71+
import java.util.HashMap;
7172
import java.util.List;
7273
import java.util.Map;
7374
import java.util.concurrent.CountDownLatch;
@@ -262,7 +263,15 @@ public void testThreadContext() throws InterruptedException {
262263
final CountDownLatch latch = new CountDownLatch(1);
263264

264265
try (ThreadContext.StoredContext ignored = threadPool.getThreadContext().stashContext()) {
265-
final Map<String, String> expectedHeaders = Collections.singletonMap("test", "test");
266+
267+
final var expectedHeaders = new HashMap<String, String>();
268+
expectedHeaders.put(randomIdentifier(), randomIdentifier());
269+
for (final var copiedHeader : Task.HEADERS_TO_COPY) {
270+
if (randomBoolean()) {
271+
expectedHeaders.put(copiedHeader, randomIdentifier());
272+
}
273+
}
274+
266275
final Map<String, List<String>> expectedResponseHeaders = Collections.singletonMap(
267276
"testResponse",
268277
Collections.singletonList("testResponse")
@@ -1342,7 +1351,6 @@ public void testAcking() {
13421351
.build();
13431352
final var deterministicTaskQueue = new DeterministicTaskQueue();
13441353
final var threadPool = deterministicTaskQueue.getThreadPool();
1345-
threadPool.getThreadContext().markAsSystemContext();
13461354
try (
13471355
var masterService = createMasterService(
13481356
true,
@@ -1351,6 +1359,7 @@ public void testAcking() {
13511359
new StoppableExecutorServiceWrapper(threadPool.generic())
13521360
)
13531361
) {
1362+
threadPool.getThreadContext().markAsSystemContext();
13541363

13551364
final var responseHeaderName = "test-response-header";
13561365

test/framework/src/main/java/org/elasticsearch/test/AbstractBuilderTestCase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.common.settings.IndexScopedSettings;
3737
import org.elasticsearch.common.settings.Settings;
3838
import org.elasticsearch.common.settings.SettingsModule;
39+
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
3940
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
4041
import org.elasticsearch.core.IOUtils;
4142
import org.elasticsearch.env.Environment;
@@ -81,7 +82,6 @@
8182
import org.elasticsearch.script.ScriptModule;
8283
import org.elasticsearch.script.ScriptService;
8384
import org.elasticsearch.search.SearchModule;
84-
import org.elasticsearch.tasks.TaskManager;
8585
import org.elasticsearch.test.index.IndexVersionUtils;
8686
import org.elasticsearch.threadpool.TestThreadPool;
8787
import org.elasticsearch.transport.RemoteClusterAware;
@@ -432,8 +432,8 @@ private static class ServiceHolder implements Closeable {
432432
ClusterService clusterService = new ClusterService(
433433
Settings.EMPTY,
434434
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
435-
null,
436-
(TaskManager) null
435+
new DeterministicTaskQueue().getThreadPool(),
436+
null
437437
);
438438

439439
client = (Client) Proxy.newProxyInstance(

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlSingleNodeTestCase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.cluster.service.ClusterService;
1313
import org.elasticsearch.common.settings.Settings;
1414
import org.elasticsearch.common.unit.ByteSizeValue;
15+
import org.elasticsearch.common.util.concurrent.ThreadContext;
1516
import org.elasticsearch.core.TimeValue;
1617
import org.elasticsearch.datastreams.DataStreamsPlugin;
1718
import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
@@ -159,6 +160,7 @@ protected <T> T blockingCall(Consumer<ActionListener<T>> function) throws Except
159160

160161
protected static ThreadPool mockThreadPool() {
161162
ThreadPool tp = mock(ThreadPool.class);
163+
when(tp.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
162164
ExecutorService executor = mock(ExecutorService.class);
163165
doAnswer(invocationOnMock -> {
164166
((Runnable) invocationOnMock.getArguments()[0]).run();

0 commit comments

Comments
 (0)