Skip to content

Commit 4f7fb4b

Browse files
committed
Merge remote-tracking branch 'origin/main' into upgrade_netty_4.1.126.Final
2 parents c905bda + c05c61d commit 4f7fb4b

File tree

8 files changed

+119
-60
lines changed

8 files changed

+119
-60
lines changed

docs/changelog/134198.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 134198
2+
summary: Improve `ShardLockObtainFailedException` message
3+
area: Store
4+
type: enhancement
5+
issues: []

muted-tests.yml

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -507,30 +507,21 @@ tests:
507507
- class: org.elasticsearch.xpack.ml.integration.ClassificationIT
508508
method: testWithCustomFeatureProcessors
509509
issue: https://github.com/elastic/elasticsearch/issues/134001
510-
- class: org.elasticsearch.cluster.ClusterInfoServiceIT
511-
method: testMaxQueueLatenciesInClusterInfo
512-
issue: https://github.com/elastic/elasticsearch/issues/134088
513510
- class: org.elasticsearch.xpack.esql.ccq.MultiClusterSpecIT
514511
method: test {csv-spec:fork.ForkBeforeStats}
515512
issue: https://github.com/elastic/elasticsearch/issues/134100
516513
- class: org.elasticsearch.xpack.esql.ccq.MultiClusterSpecIT
517514
method: test {csv-spec:spatial.ConvertFromStringParseError}
518515
issue: https://github.com/elastic/elasticsearch/issues/134104
519-
- class: org.elasticsearch.xpack.writeloadforecaster.WriteLoadForecasterIT
520-
method: testWriteLoadForecastIsOverriddenBySetting
521-
issue: https://github.com/elastic/elasticsearch/issues/133455
522-
- class: org.elasticsearch.xpack.writeloadforecaster.WriteLoadForecasterIT
523-
method: testWriteLoadForecastDoesNotGetPopulatedWithInvalidLicense
524-
issue: https://github.com/elastic/elasticsearch/issues/134124
525516
- class: org.elasticsearch.xpack.esql.ccq.MultiClusterSpecIT
526517
method: test {csv-spec:fork.ForkWithMixOfCommands}
527518
issue: https://github.com/elastic/elasticsearch/issues/134135
528519
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeForkIT
529520
method: test {csv-spec:inlinestats.MultiIndexInlinestatsOfMultiTypedField}
530521
issue: https://github.com/elastic/elasticsearch/issues/133973
531-
- class: org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT
532-
method: test {p0=search/510_range_query_out_of_bounds/Test range query for float field with out of bounds lower limit}
533-
issue: https://github.com/elastic/elasticsearch/issues/134184
522+
- class: org.elasticsearch.health.HealthPeriodicLoggerTests
523+
method: testOutputModeNoLogging
524+
issue: https://github.com/elastic/elasticsearch/issues/134200
534525

535526
# Examples:
536527
#

server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,10 @@
2727
import org.elasticsearch.cluster.service.ClusterService;
2828
import org.elasticsearch.common.Strings;
2929
import org.elasticsearch.common.settings.Settings;
30-
import org.elasticsearch.common.util.concurrent.EsExecutors;
3130
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
3231
import org.elasticsearch.core.TimeValue;
3332
import org.elasticsearch.index.IndexService;
34-
import org.elasticsearch.index.shard.GlobalCheckpointListeners;
33+
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
3534
import org.elasticsearch.index.shard.IndexShard;
3635
import org.elasticsearch.index.shard.ShardId;
3736
import org.elasticsearch.index.store.Store;
@@ -45,6 +44,7 @@
4544
import org.elasticsearch.test.InternalTestCluster;
4645
import org.elasticsearch.test.transport.MockTransportService;
4746
import org.elasticsearch.threadpool.ThreadPool;
47+
import org.elasticsearch.transport.RequestHandlerRegistry;
4848
import org.elasticsearch.transport.TransportService;
4949
import org.hamcrest.Matchers;
5050

@@ -57,7 +57,6 @@
5757
import java.util.Map;
5858
import java.util.Set;
5959
import java.util.concurrent.CountDownLatch;
60-
import java.util.concurrent.Executor;
6160
import java.util.concurrent.atomic.AtomicBoolean;
6261

6362
import static java.util.Collections.emptySet;
@@ -431,7 +430,27 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception {
431430
final int numShards = randomIntBetween(1, 5);
432431
createIndex(indexName, Settings.builder().put(SETTING_NUMBER_OF_SHARDS, numShards).put(SETTING_NUMBER_OF_REPLICAS, 0).build());
433432
ensureGreen(indexName);
434-
final var indexService = internalCluster().getInstance(IndicesService.class, dataNodeName).iterator().next();
433+
434+
// Global checkpoint sync actions are asynchronous. We cannot really tell exactly when they are completely off the
435+
// thread pool. To avoid busy waiting, we redirect them to the generic thread pool so that we have precise control
436+
// over the write thread pool for assertions.
437+
final MockTransportService mockTransportService = MockTransportService.getInstance(dataNodeName);
438+
final var originalRegistry = mockTransportService.transport()
439+
.getRequestHandlers()
440+
.getHandler(GlobalCheckpointSyncAction.ACTION_NAME + "[p]");
441+
mockTransportService.transport()
442+
.getRequestHandlers()
443+
.forceRegister(
444+
new RequestHandlerRegistry<>(
445+
GlobalCheckpointSyncAction.ACTION_NAME + "[p]",
446+
in -> null, // no need to deserialize the request since it's local
447+
mockTransportService.getTaskManager(),
448+
originalRegistry.getHandler(),
449+
mockTransportService.getThreadPool().executor(ThreadPool.Names.GENERIC),
450+
true,
451+
true
452+
)
453+
);
435454

436455
// Block indexing on the data node by submitting write thread pool tasks equal to the number of write threads.
437456
var barrier = blockDataNodeIndexing(dataNodeName);
@@ -496,28 +515,6 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception {
496515
}
497516
Arrays.stream(threadsToJoin).forEach(thread -> assertFalse(thread.isAlive()));
498517

499-
// Wait for async post replication actions to complete
500-
final var checkpointsSyncLatch = new CountDownLatch(numShards);
501-
for (int i = 0; i < numShards; ++i) {
502-
final var indexShard = indexService.getShard(i);
503-
final long expectedGlobalCheckpoint = indexShard.seqNoStats().getGlobalCheckpoint();
504-
logger.info("--> shard [{}] waiting for global checkpoint {}", i, expectedGlobalCheckpoint);
505-
indexShard.addGlobalCheckpointListener(expectedGlobalCheckpoint, new GlobalCheckpointListeners.GlobalCheckpointListener() {
506-
@Override
507-
public Executor executor() {
508-
return EsExecutors.DIRECT_EXECUTOR_SERVICE;
509-
}
510-
511-
@Override
512-
public void accept(long globalCheckpoint, Exception e) {
513-
assertNull(e); // should have no error
514-
logger.info("--> shard [{}] global checkpoint updated to {}", indexShard.shardId().id(), globalCheckpoint);
515-
checkpointsSyncLatch.countDown();
516-
}
517-
}, TimeValue.THIRTY_SECONDS);
518-
}
519-
safeAwait(checkpointsSyncLatch);
520-
521518
assertThat(
522519
"Unexpectedly found a task queued for the write thread pool. Write thread pool dump: " + trackingWriteExecutor,
523520
trackingWriteExecutor.peekMaxQueueLatencyInQueueMillis(),

server/src/main/java/org/elasticsearch/env/NodeEnvironment.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,12 +1009,16 @@ void acquire(long timeoutInMillis, final String details) throws ShardLockObtainF
10091009
setDetails(details);
10101010
} else {
10111011
final Tuple<Long, String> lockDetails = this.lockDetails; // single volatile read
1012+
final var stateAge = TimeValue.timeValueNanos(System.nanoTime() - lockDetails.v1());
10121013
final var message = format(
1013-
"obtaining shard lock for [%s] timed out after [%dms], lock already held for [%s] with age [%dms]",
1014+
"""
1015+
obtaining shard lock for [%s] timed out after [%dms]; \
1016+
this shard lock is still held by a different instance of the shard and has been in state [%s] for [%s/%dms]""",
10141017
details,
10151018
timeoutInMillis,
10161019
lockDetails.v2(),
1017-
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lockDetails.v1())
1020+
stateAge,
1021+
stateAge.millis()
10181022
);
10191023
maybeLogThreadDump(shardId, message);
10201024
throw new ShardLockObtainFailedException(shardId, message);

server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import static org.hamcrest.Matchers.arrayWithSize;
7171
import static org.hamcrest.Matchers.containsString;
7272
import static org.hamcrest.Matchers.empty;
73+
import static org.hamcrest.Matchers.matchesPattern;
7374
import static org.hamcrest.Matchers.startsWith;
7475

7576
@LuceneTestCase.SuppressFileSystems("ExtrasFS") // TODO: fix test to allow extras
@@ -134,25 +135,28 @@ public void testShardLock() throws Exception {
134135

135136
try (var mockLog = MockLog.capture(NodeEnvironment.class); var lock = env.shardLock(new ShardId(index, 0), "1")) {
136137
mockLog.addExpectation(
137-
new MockLog.SeenEventExpectation(
138-
"hot threads logging",
139-
NODE_ENVIRONMENT_LOGGER_NAME,
140-
Level.DEBUG,
141-
"hot threads while failing to obtain shard lock for [foo][0]: obtaining shard lock for [2] timed out after *"
142-
)
138+
new MockLog.SeenEventExpectation("hot threads logging", NODE_ENVIRONMENT_LOGGER_NAME, Level.DEBUG, """
139+
hot threads while failing to obtain shard lock for [foo][0]: obtaining shard lock for [2] timed out after [*ms]; \
140+
this shard lock is still held by a different instance of the shard and has been in state [1] for [*/*ms]*""")
143141
);
144142
mockLog.addExpectation(
145143
new MockLog.UnseenEventExpectation(
146144
"second attempt should be suppressed due to throttling",
147145
NODE_ENVIRONMENT_LOGGER_NAME,
148146
Level.DEBUG,
149-
"hot threads while failing to obtain shard lock for [foo][0]: obtaining shard lock for [3] timed out after *"
147+
"*obtaining shard lock for [3] timed out*"
150148
)
151149
);
152150

153151
assertEquals(new ShardId(index, 0), lock.getShardId());
154152

155-
expectThrows(ShardLockObtainFailedException.class, () -> env.shardLock(new ShardId(index, 0), "2"));
153+
assertThat(
154+
expectThrows(ShardLockObtainFailedException.class, () -> env.shardLock(new ShardId(index, 0), "2")).getMessage(),
155+
matchesPattern("""
156+
\\[foo]\\[0]: obtaining shard lock for \\[2] timed out after \\[0ms]; \
157+
this shard lock is still held by a different instance of the shard \
158+
and has been in state \\[1] for \\[.*/[0-9]+ms]""")
159+
);
156160

157161
for (Path path : env.indexPaths(index)) {
158162
Files.createDirectories(path.resolve("0"));

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEnrichBasedCrossClusterTestCase.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ protected Settings nodeSettings() {
8787
}
8888

8989
static final EnrichPolicy hostPolicy = new EnrichPolicy("match", null, List.of("hosts"), "ip", List.of("ip", "os"));
90+
static final EnrichPolicy hostPolicyLocal = new EnrichPolicy("match", null, List.of("hosts_local"), "ip", List.of("ip", "os"));
9091
static final EnrichPolicy vendorPolicy = new EnrichPolicy("match", null, List.of("vendors"), "os", List.of("os", "vendor"));
9192

9293
@Before
@@ -115,18 +116,23 @@ public void setupHostsEnrich() {
115116
"Windows"
116117
);
117118
for (String cluster : allClusters()) {
118-
Client client = client(cluster);
119-
client.admin().indices().prepareCreate("hosts").setMapping("ip", "type=ip", "os", "type=keyword").get();
120-
for (Map.Entry<String, String> h : allHosts.entrySet()) {
121-
client.prepareIndex("hosts").setSource("ip", h.getKey(), "os", h.getValue()).get();
122-
}
123-
client.admin().indices().prepareRefresh("hosts").get();
124-
client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts", hostPolicy))
125-
.actionGet();
126-
client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts"))
127-
.actionGet();
128-
assertAcked(client.admin().indices().prepareDelete("hosts"));
119+
initHostsPolicy(client(cluster), "hosts", hostPolicy, allHosts);
120+
}
121+
// create policy on coordinator only
122+
initHostsPolicy(client(), "hosts_local", hostPolicyLocal, allHosts);
123+
}
124+
125+
private static void initHostsPolicy(Client client, String indexName, EnrichPolicy policy, Map<String, String> allHosts) {
126+
client.admin().indices().prepareCreate(indexName).setMapping("ip", "type=ip", "os", "type=keyword").get();
127+
for (Map.Entry<String, String> h : allHosts.entrySet()) {
128+
client.prepareIndex(indexName).setSource("ip", h.getKey(), "os", h.getValue()).get();
129129
}
130+
client.admin().indices().prepareRefresh(indexName).get();
131+
client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, indexName, policy))
132+
.actionGet();
133+
client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, indexName))
134+
.actionGet();
135+
assertAcked(client.admin().indices().prepareDelete(indexName));
130136
}
131137

132138
@Before
@@ -201,7 +207,7 @@ record Event(long timestamp, String user, String host) {
201207
public void wipeEnrichPolicies() {
202208
for (String cluster : allClusters()) {
203209
cluster(cluster).wipe(Set.of());
204-
for (String policy : List.of("hosts", "vendors")) {
210+
for (String policy : List.of("hosts", "hosts_local", "vendors")) {
205211
if (tolerateErrorsWhenWipingEnrichPolicies()) {
206212
try {
207213
client(cluster).execute(
@@ -226,6 +232,10 @@ static String enrichHosts(Enrich.Mode mode) {
226232
return EsqlTestUtils.randomEnrichCommand("hosts", mode, hostPolicy.getMatchField(), hostPolicy.getEnrichFields());
227233
}
228234

235+
static String enrichHostsLocal(Enrich.Mode mode) {
236+
return EsqlTestUtils.randomEnrichCommand("hosts_local", mode, hostPolicyLocal.getMatchField(), hostPolicyLocal.getEnrichFields());
237+
}
238+
229239
static String enrichVendors(Enrich.Mode mode) {
230240
return EsqlTestUtils.randomEnrichCommand("vendors", mode, vendorPolicy.getMatchField(), vendorPolicy.getEnrichFields());
231241
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,52 @@ public void testWithHostsPolicy() {
110110
}
111111
}
112112

113+
public void testFromRemotesWithCoordPolicy() {
114+
115+
String query = "FROM *:events | eval ip= TO_STR(host) | "
116+
+ enrichHostsLocal(Enrich.Mode.COORDINATOR)
117+
+ " | stats c = COUNT(*) by os | SORT os";
118+
try (EsqlQueryResponse resp = runQuery(query, null)) {
119+
List<List<Object>> rows = getValuesList(resp);
120+
assertThat(
121+
rows,
122+
equalTo(
123+
List.of(
124+
List.of(1L, "Android"),
125+
List.of(2L, "Linux"),
126+
List.of(4L, "MacOS"),
127+
List.of(3L, "Windows"),
128+
List.of(1L, "iOS"),
129+
Arrays.asList(2L, (String) null)
130+
)
131+
)
132+
);
133+
assertTrue(resp.getExecutionInfo().isCrossClusterSearch());
134+
}
135+
136+
query = "FROM *:events | eval ip= TO_STR(host) | stats by ip | "
137+
+ enrichHostsLocal(Enrich.Mode.COORDINATOR)
138+
+ " | stats c = COUNT(*) by os | SORT os";
139+
try (EsqlQueryResponse resp = runQuery(query, null)) {
140+
List<List<Object>> rows = getValuesList(resp);
141+
assertThat(
142+
rows,
143+
equalTo(
144+
List.of(
145+
List.of(1L, "Android"),
146+
List.of(2L, "Linux"),
147+
List.of(2L, "MacOS"),
148+
List.of(2L, "Windows"),
149+
List.of(1L, "iOS"),
150+
Arrays.asList(2L, (String) null)
151+
)
152+
)
153+
);
154+
assertTrue(resp.getExecutionInfo().isCrossClusterSearch());
155+
}
156+
157+
}
158+
113159
public void testEnrichHostsAggThenEnrichVendorCoordinator() {
114160
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
115161
Boolean requestIncludeMeta = includeCCSMetadata.v1();

x-pack/plugin/write-load-forecaster/src/internalClusterTest/java/org/elasticsearch/xpack/writeloadforecaster/WriteLoadForecasterIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ private void setUpDataStreamWriteDocsAndRollover(String dataStreamName) throws E
145145
private void setUpDataStreamWriteDocsAndRollover(String dataStreamName, Settings extraIndexTemplateSettings) throws Exception {
146146
final int numberOfShards = randomIntBetween(1, 5);
147147
final int numberOfReplicas = randomIntBetween(0, 1);
148+
internalCluster().ensureAtLeastNumDataNodes(numberOfReplicas + 1);
149+
148150
final Settings indexSettings = Settings.builder()
149151
.put(extraIndexTemplateSettings)
150152
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)

0 commit comments

Comments
 (0)