Skip to content

Commit fa252fa

Browse files
authored
Evaluate random samples for all rerouted indices (#137347)
1 parent 97e4ff3 commit fa252fa

File tree

4 files changed

+201
-24
lines changed

4 files changed

+201
-24
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
---
2+
"Test get sample with multiple reroutes":
3+
- requires:
4+
cluster_features: [ "random_sampling" ]
5+
reason: requires feature 'random_sampling' to get random samples
6+
7+
- do:
8+
ingest.put_pipeline:
9+
id: pipeline1
10+
body: >
11+
{
12+
"processors" : [
13+
{
14+
"set" : {
15+
"field": "message",
16+
"value": "set by pipeline1"
17+
}
18+
},
19+
{
20+
"reroute" : {
21+
"destination": "foo.bar"
22+
}
23+
}
24+
]
25+
}
26+
- match: { acknowledged: true }
27+
28+
- do:
29+
ingest.put_pipeline:
30+
id: pipeline2
31+
body: >
32+
{
33+
"processors" : [
34+
{
35+
"set" : {
36+
"field": "message",
37+
"value": "set by pipeline2"
38+
}
39+
},
40+
{
41+
"reroute" : {
42+
"destination": "foo.bar.baz"
43+
}
44+
}
45+
]
46+
}
47+
- match: { acknowledged: true }
48+
49+
- do:
50+
indices.put_index_template:
51+
name: my-template1
52+
body:
53+
index_patterns: [foo]
54+
template:
55+
settings:
56+
default_pipeline: pipeline1
57+
index.number_of_shards: 1
58+
index.number_of_replicas: 0
59+
mappings:
60+
dynamic: strict
61+
properties:
62+
message:
63+
type: text
64+
data_stream: {}
65+
- match: { acknowledged: true }
66+
67+
- do:
68+
indices.put_index_template:
69+
name: my-template2
70+
body:
71+
index_patterns: [foo.bar]
72+
template:
73+
settings:
74+
default_pipeline: pipeline2
75+
index.number_of_shards: 1
76+
index.number_of_replicas: 0
77+
mappings:
78+
dynamic: strict
79+
properties:
80+
message:
81+
type: text
82+
data_stream: {}
83+
- match: { acknowledged: true }
84+
85+
- do:
86+
indices.put_index_template:
87+
name: my-template3
88+
body:
89+
index_patterns: [foo.bar.baz]
90+
template:
91+
settings:
92+
index.number_of_shards: 1
93+
index.number_of_replicas: 0
94+
mappings:
95+
dynamic: strict
96+
properties:
97+
message:
98+
type: text
99+
data_stream: {}
100+
- match: { acknowledged: true }
101+
102+
- do:
103+
indices.create_data_stream:
104+
name: foo
105+
- is_true: acknowledged
106+
107+
- do:
108+
indices.create_data_stream:
109+
name: foo.bar
110+
- is_true: acknowledged
111+
112+
- do:
113+
indices.create_data_stream:
114+
name: foo.bar.baz
115+
- is_true: acknowledged
116+
117+
- do:
118+
indices.rollover:
119+
alias: foo.bar.baz
120+
wait_for_active_shards: 1
121+
- match: { rolled_over: true }
122+
123+
- do:
124+
indices.put_sample_configuration:
125+
index: foo
126+
body:
127+
rate: 1.0
128+
max_samples: 100
129+
130+
- do:
131+
indices.put_sample_configuration:
132+
index: foo.bar
133+
body:
134+
rate: 1.0
135+
max_samples: 100
136+
137+
- do:
138+
indices.put_sample_configuration:
139+
index: foo.bar.baz
140+
body:
141+
rate: 1.0
142+
max_samples: 100
143+
144+
- do:
145+
bulk:
146+
refresh: true
147+
body:
148+
- '{ "create":{"_index": "foo" } }'
149+
- '{"@timestamp": 123456, "message": "This is the original message"}'
150+
- '{ "create":{"_index": "foo" } }'
151+
- '{"@timestamp": 123456, "message": "This is the original message"}'
152+
- match: { errors: false }
153+
154+
- do:
155+
indices.get_sample:
156+
index: foo
157+
- length: { sample: 2 }
158+
- match: { sample.0.index: "foo" }
159+
- match: { sample.0.source.message: "This is the original message" }
160+
- match: { sample.1.source.message: "This is the original message" }
161+
162+
- do:
163+
indices.get_sample:
164+
index: foo.bar
165+
- length: { sample: 2 }
166+
- match: { sample.0.index: "foo.bar" }
167+
- match: { sample.0.source.message: "This is the original message" }
168+
- match: { sample.1.source.message: "This is the original message" }
169+
170+
- do:
171+
indices.get_sample:
172+
index: foo.bar.baz
173+
- length: { sample: 2 }
174+
- match: { sample.0.index: "foo.bar.baz" }
175+
- match: { sample.0.source.message: "This is the original message" }
176+
- match: { sample.1.source.message: "This is the original message" }
177+
178+
---
179+
teardown:
180+
- requires:
181+
cluster_features: [ "random_sampling" ]
182+
reason: requires feature 'random_sampling' to get random samples
183+
184+
- do:
185+
indices.delete_data_stream:
186+
name: foo*

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,14 +1040,7 @@ public void onFailure(Exception e) {
10401040
}
10411041
);
10421042

1043-
executePipelines(
1044-
pipelines,
1045-
indexRequest,
1046-
ingestDocument,
1047-
adaptedResolveFailureStore,
1048-
documentListener,
1049-
originalDocumentMetadata
1050-
);
1043+
executePipelines(pipelines, indexRequest, ingestDocument, adaptedResolveFailureStore, documentListener);
10511044
assert actionRequest.index() != null;
10521045

10531046
i++;
@@ -1166,8 +1159,7 @@ private void executePipelines(
11661159
final IndexRequest indexRequest,
11671160
final IngestDocument ingestDocument,
11681161
final Function<String, Boolean> resolveFailureStore,
1169-
final ActionListener<IngestPipelinesExecutionResult> listener,
1170-
final Metadata originalDocumentMetadata
1162+
final ActionListener<IngestPipelinesExecutionResult> listener
11711163
) {
11721164
assert pipelines.hasNext();
11731165
PipelineSlot slot = pipelines.next();
@@ -1353,14 +1345,14 @@ private void executePipelines(
13531345
}
13541346

13551347
if (newPipelines.hasNext()) {
1356-
executePipelines(newPipelines, indexRequest, ingestDocument, resolveFailureStore, listener, originalDocumentMetadata);
1348+
executePipelines(newPipelines, indexRequest, ingestDocument, resolveFailureStore, listener);
13571349
} else {
13581350
/*
13591351
* At this point, all pipelines have been executed, and we are about to overwrite ingestDocument with the results.
13601352
* This is our chance to sample with both the original document and all changes.
13611353
*/
13621354
haveAttemptedSampling.set(true);
1363-
attemptToSampleData(project, indexRequest, ingestDocument, originalDocumentMetadata);
1355+
attemptToSampleData(project, indexRequest, ingestDocument);
13641356
updateIndexRequestSource(indexRequest, ingestDocument);
13651357
cacheRawTimestamp(indexRequest, ingestDocument);
13661358
listener.onResponse(IngestPipelinesExecutionResult.SUCCESSFUL_RESULT); // document succeeded!
@@ -1369,7 +1361,7 @@ private void executePipelines(
13691361
} catch (Exception e) {
13701362
if (haveAttemptedSampling.get() == false) {
13711363
// It is possible that an exception happened after we sampled. We do not want to sample the same document twice.
1372-
attemptToSampleData(project, indexRequest, ingestDocument, originalDocumentMetadata);
1364+
attemptToSampleData(project, indexRequest, ingestDocument);
13731365
}
13741366
logger.debug(
13751367
() -> format("failed to execute pipeline [%s] for document [%s/%s]", pipelineId, indexRequest.index(), indexRequest.id()),
@@ -1379,18 +1371,13 @@ private void executePipelines(
13791371
}
13801372
}
13811373

1382-
private void attemptToSampleData(
1383-
ProjectMetadata projectMetadata,
1384-
IndexRequest indexRequest,
1385-
IngestDocument ingestDocument,
1386-
Metadata originalDocumentMetadata
1387-
) {
1374+
private void attemptToSampleData(ProjectMetadata projectMetadata, IndexRequest indexRequest, IngestDocument ingestDocument) {
13881375
if (samplingService != null && samplingService.atLeastOneSampleConfigured(projectMetadata)) {
13891376
/*
13901377
* We need both the original document and the fully updated document for sampling, so we make a copy of the original
13911378
* before overwriting it here. We can discard it after sampling.
13921379
*/
1393-
samplingService.maybeSample(projectMetadata, originalDocumentMetadata.getIndex(), indexRequest, ingestDocument);
1380+
samplingService.maybeSample(projectMetadata, indexRequest, ingestDocument);
13941381

13951382
}
13961383
}

server/src/main/java/org/elasticsearch/ingest/SamplingService.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -195,13 +195,17 @@ public void maybeSample(ProjectMetadata projectMetadata, IndexRequest indexReque
195195
}
196196

197197
/**
198-
* Potentially samples the given indexRequest, depending on the existing sampling configuration.
198+
* Potentially samples the given indexRequest, depending on the existing sampling configuration. The request will be sampled against
199+
* the sampling configurations of all indices it has been rerouted to (if it has been rerouted).
199200
* @param projectMetadata Used to get the sampling configuration
200201
* @param indexRequest The raw request to potentially sample
201202
* @param ingestDocument The IngestDocument used for evaluating any conditionals that are part of the sample configuration
202203
*/
203-
public void maybeSample(ProjectMetadata projectMetadata, String indexName, IndexRequest indexRequest, IngestDocument ingestDocument) {
204-
maybeSample(projectMetadata, indexName, indexRequest, () -> ingestDocument);
204+
public void maybeSample(ProjectMetadata projectMetadata, IndexRequest indexRequest, IngestDocument ingestDocument) {
205+
// The index history gives us the initially-requested index, as well as any indices it has been rerouted through
206+
for (String index : ingestDocument.getIndexHistory()) {
207+
maybeSample(projectMetadata, index, indexRequest, () -> ingestDocument);
208+
}
205209
}
206210

207211
private void maybeSample(

server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3403,7 +3403,7 @@ public void testSampling() {
34033403
);
34043404
verify(listener, times(1)).onResponse(null);
34053405
// In the case where there is a pipeline, or there is a pipeline failure, there will be an IngestDocument so this verion is called:
3406-
verify(samplingService, times(2)).maybeSample(any(), any(), any(), any());
3406+
verify(samplingService, times(2)).maybeSample(any(), any(), any());
34073407
// When there is no pipeline, we have no IngestDocument, and the maybeSample that does not require an IngestDocument is called:
34083408
verify(samplingService, times(1)).maybeSample(any(), any());
34093409
}

0 commit comments

Comments
 (0)