Skip to content

Commit 9ea1125

Browse files
Merge branch 'main' into feature/logsdb-ignore-dynamic-beyond-limit
2 parents af335b1 + 291ced7 commit 9ea1125

File tree

168 files changed

+4028
-1663
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

168 files changed

+4028
-1663
lines changed

docs/changelog/114951.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 114951
2+
summary: Expose cluster-state role mappings in APIs
3+
area: Authentication
4+
type: bug
5+
issues: []

docs/changelog/115102.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 115102
2+
summary: Watch Next Run Interval Resets On Shard Move or Node Restart
3+
area: Watcher
4+
type: bug
5+
issues:
6+
- 111433

docs/changelog/115317.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 115317
2+
summary: Revert "Add `ResolvedExpression` wrapper"
3+
area: Indices APIs
4+
type: bug
5+
issues: []

docs/changelog/115359.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 115359
2+
summary: Adding support for simulate ingest mapping adddition for indices with mappings
3+
that do not come from templates
4+
area: Ingest Node
5+
type: enhancement
6+
issues: []

docs/reference/esql/functions/kibana/definition/to_date_nanos.json

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/reference/mapping/types/binary.asciidoc

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,16 @@ Synthetic source may sort `binary` values in order of their byte representation.
6868
----
6969
PUT idx
7070
{
71+
"settings": {
72+
"index": {
73+
"mapping": {
74+
"source": {
75+
"mode": "synthetic"
76+
}
77+
}
78+
}
79+
},
7180
"mappings": {
72-
"_source": { "mode": "synthetic" },
7381
"properties": {
7482
"binary": { "type": "binary", "doc_values": true }
7583
}

docs/reference/watcher/how-watcher-works.asciidoc

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,15 +146,18 @@ add, the more distributed the watches can be executed. If you add or remove
146146
replicas, all watches need to be reloaded. If a shard is relocated, the
147147
primary and all replicas of this particular shard will reload.
148148

149-
Because the watches are executed on the node, where the watch shards are, you can create
150-
dedicated watcher nodes by using shard allocation filtering.
149+
Because the watches are executed on the node, where the watch shards are, you
150+
can create dedicated watcher nodes by using shard allocation filtering. To do this
151+
, configure nodes with a dedicated `node.attr.role: watcher` property.
151152

152-
You could configure nodes with a dedicated `node.attr.role: watcher` property and
153-
then configure the `.watches` index like this:
153+
As the `.watches` index is a system index, you can't use the normal `.watcher/_settings`
154+
endpoint to modify its routing allocation. Instead, you can use the following dedicated
155+
endpoint to adjust the allocation of the `.watches` shards to the nodes with the
156+
`watcher` role attribute:
154157

155158
[source,console]
156159
------------------------
157-
PUT .watches/_settings
160+
PUT _watcher/settings
158161
{
159162
"index.routing.allocation.include.role": "watcher"
160163
}

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,14 @@
4343
import org.elasticsearch.threadpool.ThreadPool;
4444
import org.elasticsearch.transport.RemoteTransportException;
4545

46+
import java.util.Collections;
47+
import java.util.HashSet;
4648
import java.util.List;
4749
import java.util.Map;
4850
import java.util.Objects;
4951
import java.util.Set;
5052
import java.util.concurrent.atomic.AtomicBoolean;
5153
import java.util.concurrent.atomic.AtomicReference;
52-
import java.util.stream.Collectors;
5354

5455
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX;
5556
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER;
@@ -238,14 +239,11 @@ public void clusterChanged(ClusterChangedEvent event) {
238239
}
239240

240241
static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
241-
if (pipelineConfigurationsWithGeoIpProcessor(clusterState, true).isEmpty() == false) {
242+
if (pipelinesWithGeoIpProcessor(clusterState, true).isEmpty() == false) {
242243
return true;
243244
}
244245

245-
Set<String> checkReferencedPipelines = pipelineConfigurationsWithGeoIpProcessor(clusterState, false).stream()
246-
.map(PipelineConfiguration::getId)
247-
.collect(Collectors.toSet());
248-
246+
final Set<String> checkReferencedPipelines = pipelinesWithGeoIpProcessor(clusterState, false);
249247
if (checkReferencedPipelines.isEmpty()) {
250248
return false;
251249
}
@@ -258,22 +256,24 @@ static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
258256
}
259257

260258
/**
261-
* Retrieve list of pipelines that have at least one geoip processor.
259+
* Retrieve the set of pipeline ids that have at least one geoip processor.
262260
* @param clusterState Cluster state.
263261
* @param downloadDatabaseOnPipelineCreation Filter the list to include only pipeline with the download_database_on_pipeline_creation
264262
* matching the param.
265-
* @return A list of {@link PipelineConfiguration} matching criteria.
263+
* @return A set of pipeline ids matching criteria.
266264
*/
267265
@SuppressWarnings("unchecked")
268-
private static List<PipelineConfiguration> pipelineConfigurationsWithGeoIpProcessor(
269-
ClusterState clusterState,
270-
boolean downloadDatabaseOnPipelineCreation
271-
) {
272-
List<PipelineConfiguration> pipelineDefinitions = IngestService.getPipelines(clusterState);
273-
return pipelineDefinitions.stream().filter(pipelineConfig -> {
274-
List<Map<String, Object>> processors = (List<Map<String, Object>>) pipelineConfig.getConfigAsMap().get(Pipeline.PROCESSORS_KEY);
275-
return hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation);
276-
}).toList();
266+
private static Set<String> pipelinesWithGeoIpProcessor(ClusterState clusterState, boolean downloadDatabaseOnPipelineCreation) {
267+
List<PipelineConfiguration> configurations = IngestService.getPipelines(clusterState);
268+
Set<String> ids = new HashSet<>();
269+
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
270+
for (PipelineConfiguration configuration : configurations) {
271+
List<Map<String, Object>> processors = (List<Map<String, Object>>) configuration.getConfigAsMap().get(Pipeline.PROCESSORS_KEY);
272+
if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation)) {
273+
ids.add(configuration.getId());
274+
}
275+
}
276+
return Collections.unmodifiableSet(ids);
277277
}
278278

279279
/**
@@ -283,7 +283,15 @@ private static List<PipelineConfiguration> pipelineConfigurationsWithGeoIpProces
283283
* @return true if a geoip processor is found in the processor list.
284284
*/
285285
private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> processors, boolean downloadDatabaseOnPipelineCreation) {
286-
return processors != null && processors.stream().anyMatch(p -> hasAtLeastOneGeoipProcessor(p, downloadDatabaseOnPipelineCreation));
286+
if (processors != null) {
287+
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
288+
for (Map<String, Object> processor : processors) {
289+
if (hasAtLeastOneGeoipProcessor(processor, downloadDatabaseOnPipelineCreation)) {
290+
return true;
291+
}
292+
}
293+
}
294+
return false;
287295
}
288296

289297
/**
@@ -317,7 +325,7 @@ private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor
317325
}
318326

319327
/**
320-
* Check if a processor config is has an on_failure clause containing at least a geoip processor.
328+
* Check if a processor config has an on_failure clause containing at least a geoip processor.
321329
* @param processor Processor config.
322330
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
323331
* @return true if a geoip processor is found in the processor list.
@@ -327,16 +335,17 @@ private static boolean isProcessorWithOnFailureGeoIpProcessor(
327335
Map<String, Object> processor,
328336
boolean downloadDatabaseOnPipelineCreation
329337
) {
330-
return processor != null
331-
&& processor.values()
332-
.stream()
333-
.anyMatch(
334-
value -> value instanceof Map
335-
&& hasAtLeastOneGeoipProcessor(
336-
((Map<String, List<Map<String, Object>>>) value).get("on_failure"),
337-
downloadDatabaseOnPipelineCreation
338-
)
339-
);
338+
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
339+
for (Object value : processor.values()) {
340+
if (value instanceof Map
341+
&& hasAtLeastOneGeoipProcessor(
342+
((Map<String, List<Map<String, Object>>>) value).get("on_failure"),
343+
downloadDatabaseOnPipelineCreation
344+
)) {
345+
return true;
346+
}
347+
}
348+
return false;
340349
}
341350

342351
/**

modules/mapper-extras/src/main/java/org/elasticsearch/index/mapper/extras/MatchOnlyTextFieldMapper.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,8 +364,7 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) {
364364
SourceValueFetcher fetcher = SourceValueFetcher.toString(blContext.sourcePaths(name()));
365365
// MatchOnlyText never has norms, so we have to use the field names field
366366
BlockSourceReader.LeafIteratorLookup lookup = BlockSourceReader.lookupFromFieldNames(blContext.fieldNames(), name());
367-
var sourceMode = blContext.indexSettings().getIndexMappingSourceMode();
368-
return new BlockSourceReader.BytesRefsBlockLoader(fetcher, lookup, sourceMode);
367+
return new BlockSourceReader.BytesRefsBlockLoader(fetcher, lookup);
369368
}
370369

371370
@Override

modules/mapper-extras/src/main/java/org/elasticsearch/index/mapper/extras/ScaledFloatFieldMapper.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -319,8 +319,7 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) {
319319
BlockSourceReader.LeafIteratorLookup lookup = isStored() || isIndexed()
320320
? BlockSourceReader.lookupFromFieldNames(blContext.fieldNames(), name())
321321
: BlockSourceReader.lookupMatchingAll();
322-
var sourceMode = blContext.indexSettings().getIndexMappingSourceMode();
323-
return new BlockSourceReader.DoublesBlockLoader(valueFetcher, lookup, sourceMode);
322+
return new BlockSourceReader.DoublesBlockLoader(valueFetcher, lookup);
324323
}
325324

326325
@Override

0 commit comments

Comments
 (0)