Skip to content

Commit 69641fc

Browse files
authored
Merge branch 'main' into mis-enable-failure-store
2 parents 90860d2 + bfab88c commit 69641fc

File tree

60 files changed

+2666
-1764
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+2666
-1764
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/ValuesSourceReaderBenchmark.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ public class ValuesSourceReaderBenchmark {
9292
"double",
9393
"keyword",
9494
"stored_keyword",
95-
"3_stored_keywords" };
95+
"3_stored_keywords",
96+
"keyword_mv" };
9697

9798
private static final int BLOCK_LENGTH = 16 * 1024;
9899
private static final int INDEX_SIZE = 10 * BLOCK_LENGTH;
@@ -332,7 +333,7 @@ public FieldNamesFieldMapper.FieldNamesFieldType fieldNames() {
332333
@Param({ "in_order", "shuffled" })
333334
public String layout;
334335

335-
@Param({ "long", "keyword", "stored_keyword" })
336+
@Param({ "long", "keyword", "stored_keyword", "keyword_mv" })
336337
public String name;
337338

338339
private Directory directory;
@@ -398,6 +399,22 @@ public void benchmark() {
398399
}
399400
}
400401
}
402+
case "keyword_mv" -> {
403+
BytesRef scratch = new BytesRef();
404+
BytesRefBlock values = op.getOutput().<BytesRefBlock>getBlock(1);
405+
for (int p = 0; p < values.getPositionCount(); p++) {
406+
int count = values.getValueCount(p);
407+
if (count > 0) {
408+
int first = values.getFirstValueIndex(p);
409+
for (int i = 0; i < count; i++) {
410+
BytesRef r = values.getBytesRef(first + i, scratch);
411+
r.offset++;
412+
r.length--;
413+
sum += Integer.parseInt(r.utf8ToString());
414+
}
415+
}
416+
}
417+
}
401418
}
402419
}
403420
long expected = 0;
@@ -407,6 +424,16 @@ public void benchmark() {
407424
expected += i % 1000;
408425
}
409426
break;
427+
case "keyword_mv":
428+
for (int i = 0; i < INDEX_SIZE; i++) {
429+
int v1 = i % 1000;
430+
expected += v1;
431+
int v2 = i % 500;
432+
if (v1 != v2) {
433+
expected += v2;
434+
}
435+
}
436+
break;
410437
case "3_stored_keywords":
411438
for (int i = 0; i < INDEX_SIZE; i++) {
412439
expected += 3 * (i % 1000);
@@ -461,7 +488,9 @@ private void setupIndex() throws IOException {
461488
new StoredField("double", (double) i),
462489
new KeywordFieldMapper.KeywordField("keyword_1", new BytesRef(c + i % 1000), keywordFieldType),
463490
new KeywordFieldMapper.KeywordField("keyword_2", new BytesRef(c + i % 1000), keywordFieldType),
464-
new KeywordFieldMapper.KeywordField("keyword_3", new BytesRef(c + i % 1000), keywordFieldType)
491+
new KeywordFieldMapper.KeywordField("keyword_3", new BytesRef(c + i % 1000), keywordFieldType),
492+
new KeywordFieldMapper.KeywordField("keyword_mv", new BytesRef(c + i % 1000), keywordFieldType),
493+
new KeywordFieldMapper.KeywordField("keyword_mv", new BytesRef(c + i % 500), keywordFieldType)
465494
)
466495
);
467496
if (i % COMMIT_INTERVAL == 0) {

benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesAggregatorBenchmark.java

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,7 @@ static void selfTest() {
9595
try {
9696
for (String groups : ValuesAggregatorBenchmark.class.getField("groups").getAnnotationsByType(Param.class)[0].value()) {
9797
for (String dataType : ValuesAggregatorBenchmark.class.getField("dataType").getAnnotationsByType(Param.class)[0].value()) {
98-
run(Integer.parseInt(groups), dataType, 10, 0);
99-
run(Integer.parseInt(groups), dataType, 10, 1);
98+
run(Integer.parseInt(groups), dataType, 10);
10099
}
101100
}
102101
} catch (NoSuchFieldException e) {
@@ -114,10 +113,7 @@ static void selfTest() {
114113
@Param({ BYTES_REF, INT, LONG })
115114
public String dataType;
116115

117-
@Param({ "0", "1" })
118-
public int numOrdinalMerges;
119-
120-
private static Operator operator(DriverContext driverContext, int groups, String dataType, int numOrdinalMerges) {
116+
private static Operator operator(DriverContext driverContext, int groups, String dataType) {
121117
if (groups == 1) {
122118
return new AggregationOperator(
123119
List.of(supplier(dataType).aggregatorFactory(AggregatorMode.SINGLE, List.of(0)).apply(driverContext)),
@@ -132,20 +128,8 @@ private static Operator operator(DriverContext driverContext, int groups, String
132128
) {
133129
@Override
134130
public Page getOutput() {
135-
mergeOrdinal();
136131
return super.getOutput();
137132
}
138-
139-
// simulate OrdinalsGroupingOperator
140-
void mergeOrdinal() {
141-
var merged = supplier(dataType).groupingAggregatorFactory(AggregatorMode.SINGLE, List.of(1)).apply(driverContext);
142-
for (int i = 0; i < numOrdinalMerges; i++) {
143-
for (int p = 0; p < groups; p++) {
144-
merged.addIntermediateRow(p, aggregators.getFirst(), p);
145-
}
146-
}
147-
aggregators.set(0, merged);
148-
}
149133
};
150134
}
151135

@@ -352,12 +336,12 @@ private static Block groupingBlock(int groups) {
352336

353337
@Benchmark
354338
public void run() {
355-
run(groups, dataType, OP_COUNT, numOrdinalMerges);
339+
run(groups, dataType, OP_COUNT);
356340
}
357341

358-
private static void run(int groups, String dataType, int opCount, int numOrdinalMerges) {
342+
private static void run(int groups, String dataType, int opCount) {
359343
DriverContext driverContext = driverContext();
360-
try (Operator operator = operator(driverContext, groups, dataType, numOrdinalMerges)) {
344+
try (Operator operator = operator(driverContext, groups, dataType)) {
361345
Page page = page(groups, dataType);
362346
for (int i = 0; i < opCount; i++) {
363347
operator.addInput(page.shallowCopy());

docs/changelog/128639.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 128639
2+
summary: Substitue `date_trunc` with `round_to` when the pre-calculated rounding points
3+
are available
4+
area: ES|QL
5+
type: enhancement
6+
issues: []

docs/changelog/131061.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 131061
2+
summary: Speed up reading multivalued keywords
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderIT.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.action.search.SearchResponse;
2626
import org.elasticsearch.action.support.SubscribableListener;
2727
import org.elasticsearch.action.support.master.AcknowledgedResponse;
28+
import org.elasticsearch.cluster.metadata.ProjectId;
2829
import org.elasticsearch.common.Strings;
2930
import org.elasticsearch.common.settings.MockSecureSettings;
3031
import org.elasticsearch.common.settings.Settings;
@@ -116,7 +117,7 @@ public void testEnterpriseDownloaderTask() throws Exception {
116117
final String sourceField = "ip";
117118
final String targetField = "ip-result";
118119

119-
startEnterpriseGeoIpDownloaderTask();
120+
startEnterpriseGeoIpDownloaderTask(ProjectId.DEFAULT);
120121
configureMaxmindDatabase(MAXMIND_DATABASE_TYPE);
121122
configureIpinfoDatabase(IPINFO_DATABASE_TYPE);
122123
waitAround();
@@ -171,9 +172,10 @@ private void deleteDatabaseConfiguration(String configurationName, ActionListene
171172
);
172173
}
173174

174-
private void startEnterpriseGeoIpDownloaderTask() {
175+
private void startEnterpriseGeoIpDownloaderTask(ProjectId projectId) {
175176
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
176-
persistentTasksService.sendStartRequest(
177+
persistentTasksService.sendProjectStartRequest(
178+
projectId,
177179
ENTERPRISE_GEOIP_DOWNLOADER,
178180
ENTERPRISE_GEOIP_DOWNLOADER,
179181
new EnterpriseGeoIpTask.EnterpriseGeoIpTaskParams(),

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/ConfigDatabases.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.elasticsearch.cluster.metadata.ProjectId;
14-
import org.elasticsearch.core.FixForMultiProject;
14+
import org.elasticsearch.core.NotMultiProjectCapable;
1515
import org.elasticsearch.env.Environment;
1616
import org.elasticsearch.watcher.FileChangesListener;
1717
import org.elasticsearch.watcher.FileWatcher;
@@ -34,6 +34,9 @@
3434
* Keeps track of user provided databases in the ES_HOME/config/ingest-geoip directory.
3535
* This directory is monitored and files updates are picked up and may cause databases being loaded or removed at runtime.
3636
*/
37+
@NotMultiProjectCapable(
38+
description = "Custom databases not available in serverless, we should review this class for MP again after serverless is enabled"
39+
)
3740
final class ConfigDatabases implements Closeable {
3841

3942
private static final Logger logger = LogManager.getLogger(ConfigDatabases.class);
@@ -71,7 +74,7 @@ Map<String, DatabaseReaderLazyLoader> getConfigDatabases() {
7174
return configDatabases;
7275
}
7376

74-
@FixForMultiProject(description = "Replace DEFAULT project")
77+
@NotMultiProjectCapable(description = "Replace DEFAULT project after serverless is enabled")
7578
void updateDatabase(Path file, boolean update) {
7679
String databaseFileName = file.getFileName().toString();
7780
try {
@@ -93,7 +96,7 @@ void updateDatabase(Path file, boolean update) {
9396
}
9497
}
9598

96-
@FixForMultiProject(description = "Replace DEFAULT project")
99+
@NotMultiProjectCapable(description = "Replace DEFAULT project after serverless is enabled")
97100
Map<String, DatabaseReaderLazyLoader> initConfigDatabases() throws IOException {
98101
Map<String, DatabaseReaderLazyLoader> databases = new HashMap<>();
99102

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloader.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
import org.elasticsearch.action.support.PlainActionFuture;
2020
import org.elasticsearch.client.internal.Client;
2121
import org.elasticsearch.cluster.block.ClusterBlockLevel;
22+
import org.elasticsearch.cluster.metadata.ProjectId;
2223
import org.elasticsearch.cluster.service.ClusterService;
2324
import org.elasticsearch.common.CheckedSupplier;
2425
import org.elasticsearch.common.Strings;
2526
import org.elasticsearch.common.hash.MessageDigests;
27+
import org.elasticsearch.core.NotMultiProjectCapable;
2628
import org.elasticsearch.core.TimeValue;
2729
import org.elasticsearch.core.Tuple;
2830
import org.elasticsearch.index.query.BoolQueryBuilder;
@@ -67,6 +69,9 @@
6769
* Downloads are verified against MD5 checksum provided by the server
6870
* Current state of all stored databases is stored in cluster state in persistent task state
6971
*/
72+
@NotMultiProjectCapable(
73+
description = "Enterprise GeoIP not available in serverless, we should review this class for MP again after serverless is enabled"
74+
)
7075
public class EnterpriseGeoIpDownloader extends AllocatedPersistentTask {
7176

7277
private static final Logger logger = LogManager.getLogger(EnterpriseGeoIpDownloader.class);
@@ -142,22 +147,27 @@ void setState(EnterpriseGeoIpTaskState state) {
142147

143148
// visible for testing
144149
void updateDatabases() throws IOException {
150+
@NotMultiProjectCapable(description = "Enterprise GeoIP not available in serverless")
151+
ProjectId projectId = ProjectId.DEFAULT;
145152
var clusterState = clusterService.state();
146-
var geoipIndex = clusterState.getMetadata().getProject().getIndicesLookup().get(EnterpriseGeoIpDownloader.DATABASES_INDEX);
153+
var geoipIndex = clusterState.getMetadata().getProject(projectId).getIndicesLookup().get(EnterpriseGeoIpDownloader.DATABASES_INDEX);
147154
if (geoipIndex != null) {
148155
logger.trace("the geoip index [{}] exists", EnterpriseGeoIpDownloader.DATABASES_INDEX);
149-
if (clusterState.getRoutingTable().index(geoipIndex.getWriteIndex()).allPrimaryShardsActive() == false) {
156+
if (clusterState.routingTable(projectId).index(geoipIndex.getWriteIndex()).allPrimaryShardsActive() == false) {
150157
logger.debug("not updating databases because not all primary shards of [{}] index are active yet", DATABASES_INDEX);
151158
return;
152159
}
153-
var blockException = clusterState.blocks().indexBlockedException(ClusterBlockLevel.WRITE, geoipIndex.getWriteIndex().getName());
160+
var blockException = clusterState.blocks()
161+
.indexBlockedException(projectId, ClusterBlockLevel.WRITE, geoipIndex.getWriteIndex().getName());
154162
if (blockException != null) {
155163
throw blockException;
156164
}
157165
}
158166

159167
logger.trace("Updating databases");
160-
IngestGeoIpMetadata geoIpMeta = clusterState.metadata().getProject().custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);
168+
IngestGeoIpMetadata geoIpMeta = clusterState.metadata()
169+
.getProject(projectId)
170+
.custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);
161171

162172
// if there are entries in the cs that aren't in the persistent task state,
163173
// then download those (only)

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderTaskExecutor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.common.settings.SecureString;
2424
import org.elasticsearch.common.settings.Setting;
2525
import org.elasticsearch.common.settings.Settings;
26+
import org.elasticsearch.core.NotMultiProjectCapable;
2627
import org.elasticsearch.core.TimeValue;
2728
import org.elasticsearch.ingest.EnterpriseGeoIpTask.EnterpriseGeoIpTaskParams;
2829
import org.elasticsearch.ingest.IngestService;
@@ -47,6 +48,9 @@
4748
import static org.elasticsearch.ingest.geoip.GeoIpDownloaderTaskExecutor.ENABLED_SETTING;
4849
import static org.elasticsearch.ingest.geoip.GeoIpDownloaderTaskExecutor.POLL_INTERVAL_SETTING;
4950

51+
@NotMultiProjectCapable(
52+
description = "Enterprise GeoIP not available in serverless, we should review this class for MP again after serverless is enabled"
53+
)
5054
public class EnterpriseGeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<EnterpriseGeoIpTaskParams>
5155
implements
5256
ClusterStateListener {

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/direct/TransportGetDatabaseConfigurationAction.java

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,14 @@
1313
import org.elasticsearch.action.FailedNodeException;
1414
import org.elasticsearch.action.support.ActionFilters;
1515
import org.elasticsearch.action.support.nodes.TransportNodesAction;
16+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1617
import org.elasticsearch.cluster.node.DiscoveryNode;
18+
import org.elasticsearch.cluster.project.ProjectResolver;
1719
import org.elasticsearch.cluster.service.ClusterService;
1820
import org.elasticsearch.common.io.stream.StreamInput;
1921
import org.elasticsearch.common.regex.Regex;
2022
import org.elasticsearch.ingest.geoip.DatabaseNodeService;
23+
import org.elasticsearch.ingest.geoip.GeoIpDownloaderTaskExecutor;
2124
import org.elasticsearch.ingest.geoip.GeoIpTaskState;
2225
import org.elasticsearch.ingest.geoip.IngestGeoIpMetadata;
2326
import org.elasticsearch.injection.guice.Inject;
@@ -48,14 +51,16 @@ public class TransportGetDatabaseConfigurationAction extends TransportNodesActio
4851
List<DatabaseConfigurationMetadata>> {
4952

5053
private final DatabaseNodeService databaseNodeService;
54+
private final ProjectResolver projectResolver;
5155

5256
@Inject
5357
public TransportGetDatabaseConfigurationAction(
5458
TransportService transportService,
5559
ClusterService clusterService,
5660
ThreadPool threadPool,
5761
ActionFilters actionFilters,
58-
DatabaseNodeService databaseNodeService
62+
DatabaseNodeService databaseNodeService,
63+
ProjectResolver projectResolver
5964
) {
6065
super(
6166
GetDatabaseConfigurationAction.NAME,
@@ -66,6 +71,7 @@ public TransportGetDatabaseConfigurationAction(
6671
threadPool.executor(ThreadPool.Names.MANAGEMENT)
6772
);
6873
this.databaseNodeService = databaseNodeService;
74+
this.projectResolver = projectResolver;
6975
}
7076

7177
protected List<DatabaseConfigurationMetadata> createActionContext(Task task, GetDatabaseConfigurationAction.Request request) {
@@ -82,25 +88,29 @@ protected List<DatabaseConfigurationMetadata> createActionContext(Task task, Get
8288
"wildcard only supports a single value, please use comma-separated values or a single wildcard value"
8389
);
8490
}
85-
8691
List<DatabaseConfigurationMetadata> results = new ArrayList<>();
87-
PersistentTasksCustomMetadata tasksMetadata = PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(
88-
clusterService.state()
89-
);
92+
ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterService.state());
93+
PersistentTasksCustomMetadata tasksMetadata = PersistentTasksCustomMetadata.get(projectMetadata);
94+
String geoIpTaskId = GeoIpDownloaderTaskExecutor.getTaskId(projectMetadata.id(), projectResolver.supportsMultipleProjects());
95+
9096
for (String id : ids) {
91-
results.addAll(getWebDatabases(tasksMetadata, id));
92-
results.addAll(getMaxmindDatabases(clusterService, id));
97+
results.addAll(getWebDatabases(geoIpTaskId, tasksMetadata, id));
98+
results.addAll(getMaxmindDatabases(projectMetadata, id));
9399
}
94100
return results;
95101
}
96102

97103
/*
98104
* This returns read-only database information about the databases managed by the standard downloader
99105
*/
100-
private static Collection<DatabaseConfigurationMetadata> getWebDatabases(PersistentTasksCustomMetadata tasksMetadata, String id) {
106+
private static Collection<DatabaseConfigurationMetadata> getWebDatabases(
107+
String geoIpTaskId,
108+
PersistentTasksCustomMetadata tasksMetadata,
109+
String id
110+
) {
101111
List<DatabaseConfigurationMetadata> webDatabases = new ArrayList<>();
102112
if (tasksMetadata != null) {
103-
PersistentTasksCustomMetadata.PersistentTask<?> maybeGeoIpTask = tasksMetadata.getTask("geoip-downloader");
113+
PersistentTasksCustomMetadata.PersistentTask<?> maybeGeoIpTask = tasksMetadata.getTask(geoIpTaskId);
104114
if (maybeGeoIpTask != null) {
105115
GeoIpTaskState geoIpTaskState = (GeoIpTaskState) maybeGeoIpTask.getState();
106116
if (geoIpTaskState != null) {
@@ -137,12 +147,9 @@ private static String getDatabaseNameForFileName(String databaseFileName) {
137147
/*
138148
* This returns information about databases that are downloaded from maxmind.
139149
*/
140-
private static Collection<DatabaseConfigurationMetadata> getMaxmindDatabases(ClusterService clusterService, String id) {
150+
private static Collection<DatabaseConfigurationMetadata> getMaxmindDatabases(ProjectMetadata projectMetadata, String id) {
141151
List<DatabaseConfigurationMetadata> maxmindDatabases = new ArrayList<>();
142-
final IngestGeoIpMetadata geoIpMeta = clusterService.state()
143-
.metadata()
144-
.getProject()
145-
.custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);
152+
final IngestGeoIpMetadata geoIpMeta = projectMetadata.custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);
146153
if (Regex.isSimpleMatchPattern(id)) {
147154
for (Map.Entry<String, DatabaseConfigurationMetadata> entry : geoIpMeta.getDatabases().entrySet()) {
148155
if (Regex.simpleMatch(id, entry.getKey())) {

muted-tests.yml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -415,15 +415,9 @@ tests:
415415
- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT
416416
method: testRowStatsProjectGroupByInt
417417
issue: https://github.com/elastic/elasticsearch/issues/131024
418-
- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryStopIT
419-
method: testStopQueryLocal
420-
issue: https://github.com/elastic/elasticsearch/issues/121672
421418
- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT
422419
method: test {lookup-join.MvJoinKeyOnFromAfterStats ASYNC}
423420
issue: https://github.com/elastic/elasticsearch/issues/131148
424-
- class: org.elasticsearch.index.engine.ThreadPoolMergeExecutorServiceDiskSpaceTests
425-
method: testEnqueuedMergeTasksAreUnblockedWhenEstimatedMergeSizeChanges
426-
issue: https://github.com/elastic/elasticsearch/issues/131165
427421
- class: org.elasticsearch.xpack.esql.ccq.MultiClustersIT
428422
method: testLookupJoinAliases
429423
issue: https://github.com/elastic/elasticsearch/issues/131166

0 commit comments

Comments
 (0)