Skip to content

Commit 0b89a24

Browse files
committed
Merge remote-tracking branch 'elastic/main' into partial-results-ccs
2 parents d3be47f + cffbccb commit 0b89a24

File tree

32 files changed

+425
-155
lines changed

32 files changed

+425
-155
lines changed

.buildkite/scripts/dra-update-staging.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ for BRANCH in "${BRANCHES[@]}"; do
3737

3838
if [[ "$SHOULD_TRIGGER" == "true" ]]; then
3939
if [[ "$BRANCH" == "9.0" ]]; then
40-
export VERSION_QUALIFIER="beta1"
40+
export VERSION_QUALIFIER="rc1"
4141
fi
4242
echo "Triggering DRA staging workflow for $BRANCH"
4343
cat << EOF | buildkite-agent pipeline upload

.buildkite/scripts/dra-workflow.trigger.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ source .buildkite/scripts/branches.sh
88

99
for BRANCH in "${BRANCHES[@]}"; do
1010
if [[ "$BRANCH" == "9.0" ]]; then
11-
export VERSION_QUALIFIER="beta1"
11+
export VERSION_QUALIFIER="rc1"
1212
fi
1313

1414
INTAKE_PIPELINE_SLUG="elasticsearch-intake"

docs/changelog/122409.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 122409
2+
summary: Allow setting the `type` in the reroute processor
3+
area: Ingest Node
4+
type: enhancement
5+
issues:
6+
- 121553

docs/internal/DistributedArchitectureGuide.md

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -229,19 +229,45 @@ works in parallel with the storage engine.)
229229

230230
# Allocation
231231

232-
(AllocationService runs on the master node)
233-
234-
(Discuss different deciders that limit allocation. Sketch / list the different deciders that we have.)
235-
236-
### APIs for Balancing Operations
237-
238-
(Significant internal APIs for balancing a cluster)
239-
240-
### Heuristics for Allocation
241-
242-
### Cluster Reroute Command
243-
244-
(How does this command behave with the desired auto balancer.)
232+
### Core Components
233+
234+
The `DesiredBalanceShardsAllocator` is what runs shard allocation decisions. It leverages the `DesiredBalanceComputer` to produce
235+
`DesiredBalance` instances for the cluster based on the latest cluster changes (add/remove nodes, create/remove indices, load, etc.). Then
236+
the `DesiredBalanceReconciler` is invoked to choose the next steps to take to move the cluster from the current shard allocation to the
237+
latest computed `DesiredBalance` shard allocation. The `DesiredBalanceReconciler` will apply changes to a copy of the `RoutingNodes`, which
238+
is then published in a cluster state update that will reach the data nodes to start the individual shard recovery/deletion/move work.
239+
240+
The `DesiredBalanceReconciler` is throttled by cluster settings, like the max number of concurrent shard moves and recoveries per cluster
241+
and node: this is why the `DesiredBalanceReconciler` will make, and publish via cluster state updates, incremental changes to the cluster
242+
shard allocation. The `DesiredBalanceShardsAllocator` is the endpoint for reroute requests, which may trigger immediate requests to the
243+
`DesiredBalanceReconciler`, but asynchronous requests to the `DesiredBalanceComputer` via the `ContinuousComputation` component. Cluster
244+
state changes that affect shard balancing (for example index deletion) all call some reroute method interface that reaches the
245+
`DesiredBalanceShardsAllocator` to run reconciliation and queue a request for the `DesiredBalancerComputer`, leading to desired balance
246+
computation and reconciliation actions. Asynchronous completion of a new `DesiredBalance` will also invoke a reconciliation action, as will
247+
cluster state updates completing shard moves/recoveries (unthrottling the next shard move/recovery).
248+
249+
The `ContinuousComputation` saves the latest desired balance computation request, which holds the cluster information at the time of that
250+
request, and a thread that runs the `DesiredBalanceComputer`. The `ContinuousComputation` thread takes the latest request, with the
251+
associated cluster information, feeds it into the `DesiredBalanceComputer` and publishes a `DesiredBalance` back to the
252+
`DesiredBalanceShardsAllocator` to use for reconciliation actions. Sometimes the `ContinuousComputation` thread's desired balance
253+
computation will be signalled to exit early and publish the initial `DesiredBalance` improvements it has made, when newer rebalancing
254+
requests (due to cluster state changes) have arrived, or in order to begin recovery of unassigned shards as quickly as possible.
255+
256+
### Rebalancing Process
257+
258+
There are different priorities in shard allocation, reflected in which moves the `DesiredBalancerReconciler` selects to do first given that
259+
it can only move, recover, or remove a limited number of shards at once. The first priority is assigning unassigned shards, primaries being
260+
more important than replicas. The second is to move shards that violate any rule (such as node resource limits) as defined by an
261+
`AllocationDecider`. The `AllocationDeciders` holds a group of `AllocationDecider` implementations that place hard constraints on shard
262+
allocation. There is a decider, `DiskThresholdDecider`, that manages disk memory usage thresholds, such that further shards may not be
263+
allowed assignment to a node, or shards may be required to move off because they grew to exceed the disk space; or another,
264+
`FilterAllocationDecider`, that excludes a configurable list of indices from certain nodes; or `MaxRetryAllocationDecider` that will not
265+
attempt to recover a shard on a certain node after so many failed retries. The third priority is to rebalance shards to even out the
266+
relative weight of shards on each node: the intention is to avoid, or ease, future hot-spotting on data nodes due to too many shards being
267+
placed on the same data node. Node shard weight is based on a sum of factors: disk memory usage, projected shard write load, total number
268+
of shards, and an incentive to distribute shards within the same index across different nodes. See the `WeightFunction` and
269+
`NodeAllocationStatsAndWeightsCalculator` classes for more details on the weight calculations that support the `DesiredBalanceComputer`
270+
decisions.
245271

246272
# Autoscaling
247273

docs/reference/ingest/processors/reroute.asciidoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ Otherwise, the document will be rejected with a security exception which looks l
4545
|======
4646
| Name | Required | Default | Description
4747
| `destination` | no | - | A static value for the target. Can't be set when the `dataset` or `namespace` option is set.
48+
| `type` | no | `{{data_stream.type}}` a| Field references or a static value for the type part of the data stream name. In addition to the criteria for <<indices-create-api-path-params, index names>>, cannot contain `-` and must be no longer than 100 characters. Example values are `logs` and `metrics`.
49+
50+
Supports field references with a mustache-like syntax (denoted as `{{double}}` or `{{{triple}}}` curly braces). When resolving field references, the processor replaces invalid characters with `_`. Uses the `<type>` part of the index name as a fallback if all field references resolve to a `null`, missing, or non-string value.
4851
| `dataset` | no | `{{data_stream.dataset}}` a| Field references or a static value for the dataset part of the data stream name. In addition to the criteria for <<indices-create-api-path-params, index names>>, cannot contain `-` and must be no longer than 100 characters. Example values are `nginx.access` and `nginx.error`.
4952

5053
Supports field references with a mustache-like syntax (denoted as `{{double}}` or `{{{triple}}}` curly braces). When resolving field references, the processor replaces invalid characters with `_`. Uses the `<dataset>` part of the index name as a fallback if all field references resolve to a `null`, missing, or non-string value.

libs/entitlement/src/main/java/org/elasticsearch/entitlement/bootstrap/EntitlementBootstrap.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.nio.file.Path;
2929
import java.util.Map;
3030
import java.util.function.Function;
31+
import java.util.stream.Stream;
3132

3233
import static java.util.Objects.requireNonNull;
3334

@@ -36,19 +37,24 @@ public class EntitlementBootstrap {
3637
public record BootstrapArgs(
3738
Map<String, Policy> pluginPolicies,
3839
Function<Class<?>, String> pluginResolver,
40+
Function<String, String> settingResolver,
41+
Function<String, Stream<String>> settingGlobResolver,
3942
Path[] dataDirs,
4043
Path configDir,
41-
Path tempDir,
42-
Path logsDir
44+
Path logsDir,
45+
Path tempDir
4346
) {
4447
public BootstrapArgs {
4548
requireNonNull(pluginPolicies);
4649
requireNonNull(pluginResolver);
50+
requireNonNull(settingResolver);
51+
requireNonNull(settingGlobResolver);
4752
requireNonNull(dataDirs);
4853
if (dataDirs.length == 0) {
4954
throw new IllegalArgumentException("must provide at least one data directory");
5055
}
5156
requireNonNull(configDir);
57+
requireNonNull(logsDir);
5258
requireNonNull(tempDir);
5359
}
5460
}
@@ -73,16 +79,27 @@ public static BootstrapArgs bootstrapArgs() {
7379
public static void bootstrap(
7480
Map<String, Policy> pluginPolicies,
7581
Function<Class<?>, String> pluginResolver,
82+
Function<String, String> settingResolver,
83+
Function<String, Stream<String>> settingGlobResolver,
7684
Path[] dataDirs,
7785
Path configDir,
78-
Path tempDir,
79-
Path logsDir
86+
Path logsDir,
87+
Path tempDir
8088
) {
8189
logger.debug("Loading entitlement agent");
8290
if (EntitlementBootstrap.bootstrapArgs != null) {
8391
throw new IllegalStateException("plugin data is already set");
8492
}
85-
EntitlementBootstrap.bootstrapArgs = new BootstrapArgs(pluginPolicies, pluginResolver, dataDirs, configDir, tempDir, logsDir);
93+
EntitlementBootstrap.bootstrapArgs = new BootstrapArgs(
94+
pluginPolicies,
95+
pluginResolver,
96+
settingResolver,
97+
settingGlobResolver,
98+
dataDirs,
99+
configDir,
100+
logsDir,
101+
tempDir
102+
);
86103
exportInitializationToAgent();
87104
loadAgent(findAgentJar());
88105
selfTest();

libs/entitlement/src/main/java/org/elasticsearch/entitlement/initialization/EntitlementInitialization.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,14 @@ private static Class<?>[] findClassesToRetransform(Class<?>[] loadedClasses, Set
135135
private static PolicyManager createPolicyManager() {
136136
EntitlementBootstrap.BootstrapArgs bootstrapArgs = EntitlementBootstrap.bootstrapArgs();
137137
Map<String, Policy> pluginPolicies = bootstrapArgs.pluginPolicies();
138-
var pathLookup = new PathLookup(getUserHome(), bootstrapArgs.configDir(), bootstrapArgs.dataDirs(), bootstrapArgs.tempDir());
139-
Path logsDir = EntitlementBootstrap.bootstrapArgs().logsDir();
138+
var pathLookup = new PathLookup(
139+
getUserHome(),
140+
bootstrapArgs.configDir(),
141+
bootstrapArgs.dataDirs(),
142+
bootstrapArgs.tempDir(),
143+
bootstrapArgs.settingResolver(),
144+
bootstrapArgs.settingGlobResolver()
145+
);
140146

141147
List<Scope> serverScopes = new ArrayList<>();
142148
Collections.addAll(

libs/entitlement/src/main/java/org/elasticsearch/entitlement/runtime/policy/FileAccessTree.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ private FileAccessTree(FilesEntitlement filesEntitlement, PathLookup pathLookup)
4242
}
4343

4444
// everything has access to the temp dir
45-
readPaths.add(pathLookup.tempDir().toString());
46-
writePaths.add(pathLookup.tempDir().toString());
45+
String tempDir = normalizePath(pathLookup.tempDir());
46+
readPaths.add(tempDir);
47+
writePaths.add(tempDir);
4748

4849
readPaths.sort(String::compareTo);
4950
writePaths.sort(String::compareTo);

libs/entitlement/src/main/java/org/elasticsearch/entitlement/runtime/policy/PathLookup.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,14 @@
1010
package org.elasticsearch.entitlement.runtime.policy;
1111

1212
import java.nio.file.Path;
13+
import java.util.function.Function;
14+
import java.util.stream.Stream;
1315

14-
public record PathLookup(Path homeDir, Path configDir, Path[] dataDirs, Path tempDir) {}
16+
public record PathLookup(
17+
Path homeDir,
18+
Path configDir,
19+
Path[] dataDirs,
20+
Path tempDir,
21+
Function<String, String> settingResolver,
22+
Function<String, Stream<String>> settingGlobResolver
23+
) {}

libs/entitlement/src/main/java/org/elasticsearch/entitlement/runtime/policy/entitlements/FilesEntitlement.java

Lines changed: 87 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
import java.nio.file.Path;
1717
import java.util.ArrayList;
18-
import java.util.Arrays;
1918
import java.util.HashMap;
2019
import java.util.List;
2120
import java.util.Map;
@@ -53,33 +52,81 @@ static FileData ofPath(Path path, Mode mode) {
5352
static FileData ofRelativePath(Path relativePath, BaseDir baseDir, Mode mode) {
5453
return new RelativePathFileData(relativePath, baseDir, mode);
5554
}
56-
}
5755

58-
private record AbsolutePathFileData(Path path, Mode mode) implements FileData {
59-
@Override
60-
public Stream<Path> resolvePaths(PathLookup pathLookup) {
61-
return Stream.of(path);
56+
static FileData ofPathSetting(String setting, Mode mode) {
57+
return new PathSettingFileData(setting, mode);
58+
}
59+
60+
static FileData ofRelativePathSetting(String setting, BaseDir baseDir, Mode mode) {
61+
return new RelativePathSettingFileData(setting, baseDir, mode);
6262
}
6363
}
6464

65-
private record RelativePathFileData(Path relativePath, BaseDir baseDir, Mode mode) implements FileData {
65+
private sealed interface RelativeFileData extends FileData {
66+
BaseDir baseDir();
67+
68+
Stream<Path> resolveRelativePaths(PathLookup pathLookup);
6669

6770
@Override
68-
public Stream<Path> resolvePaths(PathLookup pathLookup) {
71+
default Stream<Path> resolvePaths(PathLookup pathLookup) {
6972
Objects.requireNonNull(pathLookup);
70-
switch (baseDir) {
73+
var relativePaths = resolveRelativePaths(pathLookup);
74+
switch (baseDir()) {
7175
case CONFIG:
72-
return Stream.of(pathLookup.configDir().resolve(relativePath));
76+
return relativePaths.map(relativePath -> pathLookup.configDir().resolve(relativePath));
7377
case DATA:
74-
return Arrays.stream(pathLookup.dataDirs()).map(d -> d.resolve(relativePath));
78+
// multiple data dirs are a pain...we need the combination of relative paths and data dirs
79+
List<Path> paths = new ArrayList<>();
80+
for (var relativePath : relativePaths.toList()) {
81+
for (var dataDir : pathLookup.dataDirs()) {
82+
paths.add(dataDir.resolve(relativePath));
83+
}
84+
}
85+
return paths.stream();
7586
case HOME:
76-
return Stream.of(pathLookup.homeDir().resolve(relativePath));
87+
return relativePaths.map(relativePath -> pathLookup.homeDir().resolve(relativePath));
7788
default:
7889
throw new IllegalArgumentException();
7990
}
8091
}
8192
}
8293

94+
private record AbsolutePathFileData(Path path, Mode mode) implements FileData {
95+
@Override
96+
public Stream<Path> resolvePaths(PathLookup pathLookup) {
97+
return Stream.of(path);
98+
}
99+
}
100+
101+
private record RelativePathFileData(Path relativePath, BaseDir baseDir, Mode mode) implements FileData, RelativeFileData {
102+
@Override
103+
public Stream<Path> resolveRelativePaths(PathLookup pathLookup) {
104+
return Stream.of(relativePath);
105+
}
106+
}
107+
108+
private record PathSettingFileData(String setting, Mode mode) implements FileData {
109+
@Override
110+
public Stream<Path> resolvePaths(PathLookup pathLookup) {
111+
return resolvePathSettings(pathLookup, setting);
112+
}
113+
}
114+
115+
private record RelativePathSettingFileData(String setting, BaseDir baseDir, Mode mode) implements FileData, RelativeFileData {
116+
@Override
117+
public Stream<Path> resolveRelativePaths(PathLookup pathLookup) {
118+
return resolvePathSettings(pathLookup, setting);
119+
}
120+
}
121+
122+
private static Stream<Path> resolvePathSettings(PathLookup pathLookup, String setting) {
123+
if (setting.contains("*")) {
124+
return pathLookup.settingGlobResolver().apply(setting).map(Path::of);
125+
}
126+
String path = pathLookup.settingResolver().apply(setting);
127+
return path == null ? Stream.of() : Stream.of(Path.of(path));
128+
}
129+
83130
private static Mode parseMode(String mode) {
84131
if (mode.equals("read")) {
85132
return Mode.READ;
@@ -113,37 +160,56 @@ public static FilesEntitlement build(List<Object> paths) {
113160
String pathAsString = file.remove("path");
114161
String relativePathAsString = file.remove("relative_path");
115162
String relativeTo = file.remove("relative_to");
116-
String mode = file.remove("mode");
163+
String pathSetting = file.remove("path_setting");
164+
String relativePathSetting = file.remove("relative_path_setting");
165+
String modeAsString = file.remove("mode");
117166

118167
if (file.isEmpty() == false) {
119168
throw new PolicyValidationException("unknown key(s) [" + file + "] in a listed file for files entitlement");
120169
}
121-
if (mode == null) {
170+
int foundKeys = (pathAsString != null ? 1 : 0) + (relativePathAsString != null ? 1 : 0) + (pathSetting != null ? 1 : 0)
171+
+ (relativePathSetting != null ? 1 : 0);
172+
if (foundKeys != 1) {
173+
throw new PolicyValidationException(
174+
"a files entitlement entry must contain one of " + "[path, relative_path, path_setting, relative_path_setting]"
175+
);
176+
}
177+
178+
if (modeAsString == null) {
122179
throw new PolicyValidationException("files entitlement must contain 'mode' for every listed file");
123180
}
124-
if (pathAsString != null && relativePathAsString != null) {
125-
throw new PolicyValidationException("a files entitlement entry cannot contain both 'path' and 'relative_path'");
181+
Mode mode = parseMode(modeAsString);
182+
183+
BaseDir baseDir = null;
184+
if (relativeTo != null) {
185+
baseDir = parseBaseDir(relativeTo);
126186
}
127187

128188
if (relativePathAsString != null) {
129-
if (relativeTo == null) {
189+
if (baseDir == null) {
130190
throw new PolicyValidationException("files entitlement with a 'relative_path' must specify 'relative_to'");
131191
}
132-
final BaseDir baseDir = parseBaseDir(relativeTo);
133192

134193
Path relativePath = Path.of(relativePathAsString);
135194
if (relativePath.isAbsolute()) {
136195
throw new PolicyValidationException("'relative_path' [" + relativePathAsString + "] must be relative");
137196
}
138-
filesData.add(FileData.ofRelativePath(relativePath, baseDir, parseMode(mode)));
197+
filesData.add(FileData.ofRelativePath(relativePath, baseDir, mode));
139198
} else if (pathAsString != null) {
140199
Path path = Path.of(pathAsString);
141200
if (path.isAbsolute() == false) {
142201
throw new PolicyValidationException("'path' [" + pathAsString + "] must be absolute");
143202
}
144-
filesData.add(FileData.ofPath(path, parseMode(mode)));
203+
filesData.add(FileData.ofPath(path, mode));
204+
} else if (pathSetting != null) {
205+
filesData.add(FileData.ofPathSetting(pathSetting, mode));
206+
} else if (relativePathSetting != null) {
207+
if (baseDir == null) {
208+
throw new PolicyValidationException("files entitlement with a 'relative_path_setting' must specify 'relative_to'");
209+
}
210+
filesData.add(FileData.ofRelativePathSetting(relativePathSetting, baseDir, mode));
145211
} else {
146-
throw new PolicyValidationException("files entitlement must contain either 'path' or 'relative_path' for every entry");
212+
throw new AssertionError("File entry validation error");
147213
}
148214
}
149215
return new FilesEntitlement(filesData);

0 commit comments

Comments
 (0)