Skip to content

Commit 41ea32f

Browse files
committed
Merge remote-tracking branch 'elastic/main' into timeseries-tests
2 parents 05fb7c4 + debb95a commit 41ea32f

File tree

8 files changed

+140
-28
lines changed

8 files changed

+140
-28
lines changed

benchmarks/src/test/java/org/elasticsearch/benchmark/vector/JDKVectorInt7uBenchmarkTests.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111

1212
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
1313

14+
import org.apache.lucene.util.Constants;
1415
import org.elasticsearch.test.ESTestCase;
16+
import org.junit.BeforeClass;
1517
import org.openjdk.jmh.annotations.Param;
1618

1719
import java.util.Arrays;
@@ -25,6 +27,15 @@ public JDKVectorInt7uBenchmarkTests(int size) {
2527
this.size = size;
2628
}
2729

30+
@BeforeClass
31+
public static void skipWindows() {
32+
assumeFalse("doesn't work on windows yet", Constants.WINDOWS);
33+
}
34+
35+
static boolean supportsHeapSegments() {
36+
return Runtime.version().feature() >= 22;
37+
}
38+
2839
public void testDotProduct() {
2940
for (int i = 0; i < 100; i++) {
3041
var bench = new JDKVectorInt7uBenchmark();
@@ -33,8 +44,10 @@ public void testDotProduct() {
3344
try {
3445
float expected = dotProductScalar(bench.byteArrayA, bench.byteArrayB);
3546
assertEquals(expected, bench.dotProductLucene(), delta);
36-
assertEquals(expected, bench.dotProductNativeWithHeapSeg(), delta);
3747
assertEquals(expected, bench.dotProductNativeWithNativeSeg(), delta);
48+
if (supportsHeapSegments()) {
49+
assertEquals(expected, bench.dotProductNativeWithHeapSeg(), delta);
50+
}
3851
} finally {
3952
bench.teardown();
4053
}

modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportUpdateDataStreamMappingsAction.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2222
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2323
import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
24+
import org.elasticsearch.cluster.metadata.ProjectId;
2425
import org.elasticsearch.cluster.project.ProjectResolver;
2526
import org.elasticsearch.cluster.service.ClusterService;
2627
import org.elasticsearch.common.Strings;
@@ -81,8 +82,9 @@ protected void masterOperation(
8182
ClusterState state,
8283
ActionListener<UpdateDataStreamMappingsAction.Response> listener
8384
) throws Exception {
85+
ProjectId projectId = projectResolver.getProjectId();
8486
List<String> dataStreamNames = indexNameExpressionResolver.dataStreamNames(
85-
state.projectState(projectResolver.getProjectId()).metadata(),
87+
state.metadata().getProject(projectId),
8688
IndicesOptions.DEFAULT,
8789
request.indices()
8890
);
@@ -98,6 +100,7 @@ protected void masterOperation(
98100
countDownListener.onResponse(null);
99101
for (String dataStreamName : dataStreamNames) {
100102
updateSingleDataStream(
103+
projectId,
101104
dataStreamName,
102105
request.getMappings(),
103106
request.masterNodeTimeout(),
@@ -123,6 +126,7 @@ protected void masterOperation(
123126
}
124127

125128
private void updateSingleDataStream(
129+
ProjectId projectId,
126130
String dataStreamName,
127131
CompressedXContent mappingsOverrides,
128132
TimeValue masterNodeTimeout,
@@ -144,7 +148,7 @@ private void updateSingleDataStream(
144148
return;
145149
}
146150
metadataDataStreamsService.updateMappings(
147-
projectResolver.getProjectId(),
151+
projectId,
148152
masterNodeTimeout,
149153
ackTimeout,
150154
dataStreamName,
@@ -159,9 +163,7 @@ private void updateSingleDataStream(
159163
true,
160164
null,
161165
mappingsOverrides,
162-
dataStream.getEffectiveMappings(
163-
clusterService.state().projectState(projectResolver.getProjectId()).metadata()
164-
)
166+
dataStream.getEffectiveMappings(clusterService.state().metadata().getProject(projectId))
165167
)
166168
);
167169
} catch (IOException e) {

modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/direct/TransportPutDatabaseConfigurationActionTests.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,16 @@
1818
import java.util.HashMap;
1919
import java.util.Map;
2020

21+
import static org.elasticsearch.cluster.ClusterName.DEFAULT;
22+
2123
public class TransportPutDatabaseConfigurationActionTests extends ESTestCase {
2224

2325
public void testValidatePrerequisites() {
2426
ProjectId projectId = randomProjectIdOrDefault();
2527
// Test that we reject two configurations with the same database name but different ids:
2628
String name = randomAlphaOfLengthBetween(1, 50);
2729
IngestGeoIpMetadata ingestGeoIpMetadata = randomIngestGeoIpMetadata(name);
28-
ClusterState state = ClusterState.builder(ClusterState.EMPTY_STATE)
30+
ClusterState state = ClusterState.builder(DEFAULT)
2931
.putProjectMetadata(ProjectMetadata.builder(projectId).putCustom(IngestGeoIpMetadata.TYPE, ingestGeoIpMetadata).build())
3032
.build();
3133
DatabaseConfiguration databaseConfiguration = randomDatabaseConfiguration(randomIdentifier(), name);
@@ -40,7 +42,11 @@ public void testValidatePrerequisites() {
4042
TransportPutDatabaseConfigurationAction.validatePrerequisites(projectId, databaseConfigurationForDifferentName, state);
4143

4244
// Test that we do not reject a configuration if none already exists:
43-
TransportPutDatabaseConfigurationAction.validatePrerequisites(projectId, databaseConfiguration, ClusterState.EMPTY_STATE);
45+
TransportPutDatabaseConfigurationAction.validatePrerequisites(
46+
projectId,
47+
databaseConfiguration,
48+
ClusterState.builder(DEFAULT).putProjectMetadata(ProjectMetadata.builder(projectId)).build()
49+
);
4450

4551
// Test that we do not reject a configuration if one with the same database name AND id already exists:
4652
DatabaseConfiguration databaseConfigurationSameNameSameId = ingestGeoIpMetadata.getDatabases()

muted-tests.yml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -566,12 +566,15 @@ tests:
566566
issue: https://github.com/elastic/elasticsearch/issues/130145
567567
- class: org.elasticsearch.xpack.logsdb.patternedtext.PatternedTextFieldMapperTests
568568
issue: https://github.com/elastic/elasticsearch/issues/130162
569-
- class: org.elasticsearch.ingest.geoip.direct.TransportPutDatabaseConfigurationActionTests
570-
method: testValidatePrerequisites
571-
issue: https://github.com/elastic/elasticsearch/issues/130178
572569
- class: org.elasticsearch.index.engine.ThreadPoolMergeExecutorServiceDiskSpaceTests
573570
method: testUnavailableBudgetBlocksNewMergeTasksFromStartingExecution
574571
issue: https://github.com/elastic/elasticsearch/issues/130205
572+
- class: org.elasticsearch.index.codec.vectors.cluster.KMeansLocalTests
573+
method: testKMeansNeighbors
574+
issue: https://github.com/elastic/elasticsearch/issues/130258
575+
- class: org.elasticsearch.compute.operator.topn.TopNOperatorTests
576+
method: testSimpleWithCranky
577+
issue: https://github.com/elastic/elasticsearch/issues/130215
575578

576579
# Examples:
577580
#

x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1555,7 +1555,6 @@ setup:
15551555
body:
15561556
settings:
15571557
number_of_shards: 1
1558-
number_of_replicas: 0
15591558
index:
15601559
mode: time_series
15611560
routing_path: [ k8s.pod.name ]
@@ -1639,7 +1638,6 @@ setup:
16391638
body:
16401639
settings:
16411640
number_of_shards: 1
1642-
number_of_replicas: 0
16431641
index:
16441642
mode: time_series
16451643
routing_path: [ k8s.pod.name, k8s.pod.empty ]

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
9898
new DownsampleShardPersistentTaskExecutor(
9999
client,
100100
DownsampleShardTask.TASK_NAME,
101+
clusterService.getSettings(),
101102
threadPool.executor(DOWNSAMPLE_TASK_THREAD_POOL_NAME)
102103
)
103104
);

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
2727
import org.elasticsearch.cluster.routing.ShardRouting;
2828
import org.elasticsearch.common.io.stream.StreamOutput;
29+
import org.elasticsearch.common.settings.Settings;
2930
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
3031
import org.elasticsearch.common.util.concurrent.EsExecutors;
3132
import org.elasticsearch.index.IndexNotFoundException;
@@ -57,10 +58,12 @@
5758
public class DownsampleShardPersistentTaskExecutor extends PersistentTasksExecutor<DownsampleShardTaskParams> {
5859
private static final Logger LOGGER = LogManager.getLogger(DownsampleShardPersistentTaskExecutor.class);
5960
private final Client client;
61+
private final boolean isStateless;
6062

61-
public DownsampleShardPersistentTaskExecutor(final Client client, final String taskName, final Executor executor) {
63+
public DownsampleShardPersistentTaskExecutor(final Client client, final String taskName, Settings settings, final Executor executor) {
6264
super(taskName, executor);
6365
this.client = Objects.requireNonNull(client);
66+
this.isStateless = DiscoveryNode.isStateless(settings);
6467
}
6568

6669
@Override
@@ -142,23 +145,38 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(
142145
return new PersistentTasksCustomMetadata.Assignment(node.getId(), "a node to fail and stop this persistent task");
143146
}
144147

145-
final ShardRouting shardRouting = indexShardRouting.primaryShard();
146-
if (shardRouting.started() == false) {
147-
return NO_NODE_FOUND;
148-
}
149-
150-
return candidateNodes.stream()
151-
.filter(candidateNode -> candidateNode.getId().equals(shardRouting.currentNodeId()))
148+
// We find the nodes that hold the eligible shards.
149+
// If the current node of such a shard is a candidate node, then we assign the task there.
150+
// This code is inefficient, but we are relying on the laziness of the intermediate operations
151+
// and the assumption that the first shard we examine has high chances of being assigned to a candidate node.
152+
return indexShardRouting.activeShards()
153+
.stream()
154+
.filter(this::isEligible)
155+
.map(ShardRouting::currentNodeId)
156+
.filter(nodeId -> isCandidateNode(candidateNodes, nodeId))
152157
.findAny()
153-
.map(
154-
node -> new PersistentTasksCustomMetadata.Assignment(
155-
node.getId(),
156-
"downsampling using node holding shard [" + shardId + "]"
157-
)
158-
)
158+
.map(nodeId -> new PersistentTasksCustomMetadata.Assignment(nodeId, "downsampling using node holding shard [" + shardId + "]"))
159159
.orElse(NO_NODE_FOUND);
160160
}
161161

162+
/**
163+
* Only shards that can be searched can be used as the source of a downsampling task.
164+
* In stateless deployment, this means that shards that CANNOT be promoted to primary can be used.
165+
* For simplicity, in non-stateless deployments we use the primary shard.
166+
*/
167+
private boolean isEligible(ShardRouting shardRouting) {
168+
return shardRouting.started() && (isStateless ? shardRouting.isPromotableToPrimary() == false : shardRouting.primary());
169+
}
170+
171+
private boolean isCandidateNode(Collection<DiscoveryNode> candidateNodes, String nodeId) {
172+
for (DiscoveryNode candidateNode : candidateNodes) {
173+
if (candidateNode.getId().equals(nodeId)) {
174+
return true;
175+
}
176+
}
177+
return false;
178+
}
179+
162180
@Override
163181
public Executor getExecutor() {
164182
// The delegate action forks to the a downsample thread:

x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
import org.elasticsearch.cluster.node.DiscoveryNodes;
1919
import org.elasticsearch.cluster.routing.IndexRoutingTable;
2020
import org.elasticsearch.cluster.routing.RoutingTable;
21+
import org.elasticsearch.cluster.routing.ShardRouting;
2122
import org.elasticsearch.common.Strings;
2223
import org.elasticsearch.common.UUIDs;
24+
import org.elasticsearch.common.settings.Settings;
2325
import org.elasticsearch.core.Tuple;
2426
import org.elasticsearch.index.Index;
2527
import org.elasticsearch.index.shard.ShardId;
@@ -35,9 +37,11 @@
3537
import java.util.Set;
3638
import java.util.concurrent.Executor;
3739

40+
import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME;
3841
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
3942
import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder;
4043
import static org.hamcrest.Matchers.equalTo;
44+
import static org.hamcrest.Matchers.nullValue;
4145
import static org.mockito.Mockito.mock;
4246

4347
public class DownsampleShardPersistentTaskExecutorTests extends ESTestCase {
@@ -57,7 +61,12 @@ public void setup() {
5761
"metrics-app1",
5862
List.of(new Tuple<>(start, end))
5963
);
60-
executor = new DownsampleShardPersistentTaskExecutor(mock(Client.class), DownsampleShardTask.TASK_NAME, mock(Executor.class));
64+
executor = new DownsampleShardPersistentTaskExecutor(
65+
mock(Client.class),
66+
DownsampleShardTask.TASK_NAME,
67+
Settings.EMPTY,
68+
mock(Executor.class)
69+
);
6170
}
6271

6372
public void testGetAssignment() {
@@ -124,7 +133,69 @@ public void testGetAssignmentMissingIndex() {
124133
assertThat(result.getExplanation(), equalTo("a node to fail and stop this persistent task"));
125134
}
126135

136+
public void testGetStatelessAssignment() {
137+
executor = new DownsampleShardPersistentTaskExecutor(
138+
mock(Client.class),
139+
DownsampleShardTask.TASK_NAME,
140+
Settings.builder().put(STATELESS_ENABLED_SETTING_NAME, "true").build(),
141+
mock(Executor.class)
142+
);
143+
var backingIndex = initialClusterState.metadata().getProject(projectId).dataStreams().get("metrics-app1").getWriteIndex();
144+
var searchNode = newNode(Set.of(DiscoveryNodeRole.SEARCH_ROLE));
145+
var indexNode = newNode(Set.of(DiscoveryNodeRole.INDEX_ROLE));
146+
var shardId = new ShardId(backingIndex, 0);
147+
var clusterState = ClusterState.builder(initialClusterState)
148+
.nodes(new DiscoveryNodes.Builder().add(indexNode).add(searchNode).build())
149+
.putRoutingTable(
150+
projectId,
151+
RoutingTable.builder()
152+
.add(
153+
IndexRoutingTable.builder(backingIndex)
154+
.addShard(shardRoutingBuilder(shardId, indexNode.getId(), true, STARTED).withRecoverySource(null).build())
155+
)
156+
.build()
157+
)
158+
.build();
159+
160+
var params = new DownsampleShardTaskParams(
161+
new DownsampleConfig(new DateHistogramInterval("1h")),
162+
shardId.getIndexName(),
163+
1,
164+
1,
165+
shardId,
166+
Strings.EMPTY_ARRAY,
167+
Strings.EMPTY_ARRAY,
168+
Strings.EMPTY_ARRAY
169+
);
170+
var result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState);
171+
assertThat(result.getExecutorNode(), nullValue());
172+
173+
// Assign a copy of the shard to a search node
174+
clusterState = ClusterState.builder(clusterState)
175+
.putRoutingTable(
176+
projectId,
177+
RoutingTable.builder()
178+
.add(
179+
IndexRoutingTable.builder(backingIndex)
180+
.addShard(shardRoutingBuilder(shardId, indexNode.getId(), true, STARTED).withRecoverySource(null).build())
181+
.addShard(
182+
shardRoutingBuilder(shardId, searchNode.getId(), false, STARTED).withRecoverySource(null)
183+
.withRole(ShardRouting.Role.SEARCH_ONLY)
184+
.build()
185+
)
186+
)
187+
.build()
188+
)
189+
.build();
190+
result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState);
191+
assertThat(result.getExecutorNode(), equalTo(searchNode.getId()));
192+
}
193+
127194
private static DiscoveryNode newNode() {
195+
return newNode(DiscoveryNodeRole.roles());
196+
}
197+
198+
private static DiscoveryNode newNode(Set<DiscoveryNodeRole> nodes) {
128199
return DiscoveryNodeUtils.create(
129200
"node_" + UUIDs.randomBase64UUID(random()),
130201
buildNewFakeTransportAddress(),

0 commit comments

Comments
 (0)