Skip to content

Commit 6ebee66

Browse files
[8.x] Resolve pipelines from template if lazy rollover write (elastic#116031) (elastic#116132)
* Resolve pipelines from template if lazy rollover write (elastic#116031) If datastream rollover on write flag is set in cluster state, resolve pipelines from templates rather than from metadata. This fixes the following bug: when a pipeline reroutes every document to another index, and rollover is called with lazy=true (setting the rollover on write flag), changes to the pipeline do not go into effect, because the lack of writes means the data stream never rolls over and pipelines in metadata are not updated. The fix is to resolve pipelines from templates if the lazy rollover flag is set. To improve efficiency we only resolve pipelines once per index in the bulk request, caching the value, and reusing for other requests to the same index. Fixes: elastic#112781 * Remute tests block merge * Remute tests block merge
1 parent 06d003b commit 6ebee66

File tree

6 files changed

+535
-15
lines changed

6 files changed

+535
-15
lines changed

docs/changelog/116031.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 116031
2+
summary: Resolve pipelines from template on lazy rollover write
3+
area: Data streams
4+
type: bug
5+
issues:
6+
- 112781

modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/310_reroute_processor.yml

Lines changed: 323 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ teardown:
3636
ingest.delete_pipeline:
3737
id: "pipeline-2"
3838
ignore: 404
39-
39+
- do:
40+
indices.delete_data_stream:
41+
name: "data-stream-*"
42+
expand_wildcards: all
4043
---
4144
"Test first matching router terminates pipeline":
4245
- skip:
@@ -252,3 +255,322 @@ teardown:
252255
- match: { _source.existing-field : true }
253256
- match: { _source.added-in-pipeline-before-reroute : true }
254257
- match: { _source.added-in-pipeline-after-reroute : true }
258+
259+
---
260+
"Test data stream with lazy rollover obtains pipeline from template":
261+
# This test starts with chain of reroutes, from data-stream-1, to data-stream-2, to data-stream-3
262+
# We then add higher priority templates that remove the reroute processors. Then we show that
263+
# after a lazy rollover on data-stream-2, a document written to data-stream-1 still gets rerouted
264+
# to data-steam-2, but not on to data-stream-3. Finally, after a lazy rollover on data-stream-1
265+
# causes the new template to also take effect on data-stream-1, and the last write goes directly
266+
# into data-stream-1. Multiple reroute steps are tested because pipeline resolution using a
267+
# different code path for initial index and indices after a reroute.
268+
269+
# start with pipeline that reroutes from ds-1 to ds-2
270+
- do:
271+
ingest.put_pipeline:
272+
id: "reroute-1"
273+
body: >
274+
{
275+
"processors": [
276+
{
277+
"reroute" : {"destination": "data-stream-2"}
278+
}
279+
]
280+
}
281+
- match: { acknowledged: true }
282+
283+
# and pipeline that reroutes from ds-2 to ds-3
284+
- do:
285+
ingest.put_pipeline:
286+
id: "reroute-2"
287+
body: >
288+
{
289+
"processors": [
290+
{
291+
"reroute" : {"destination": "data-stream-3"}
292+
}
293+
]
294+
}
295+
- match: { acknowledged: true }
296+
297+
# set pipelines in templates
298+
- do:
299+
indices.put_index_template:
300+
name: template-1
301+
body:
302+
index_patterns: [ "data-stream-1"]
303+
priority: 1
304+
data_stream: { }
305+
template:
306+
settings:
307+
index.default_pipeline: "reroute-1"
308+
- match: { acknowledged: true }
309+
- do:
310+
indices.put_index_template:
311+
name: template-2
312+
body:
313+
index_patterns: [ "data-stream-2"]
314+
priority: 1
315+
data_stream: { }
316+
template:
317+
settings:
318+
index.default_pipeline: "reroute-2"
319+
- match: { acknowledged: true }
320+
- do:
321+
indices.put_index_template:
322+
name: template_3
323+
body:
324+
index_patterns: [ "data-stream-3" ]
325+
priority: 1
326+
data_stream: { }
327+
- match: { acknowledged: true }
328+
329+
- do:
330+
indices.create_data_stream:
331+
name: data-stream-1
332+
- match: { acknowledged: true }
333+
- do:
334+
indices.create_data_stream:
335+
name: data-stream-2
336+
- match: { acknowledged: true }
337+
- do:
338+
indices.create_data_stream:
339+
name: data-stream-3
340+
- match: { acknowledged: true }
341+
342+
# write to ds-1
343+
- do:
344+
index:
345+
index: data-stream-1
346+
body:
347+
'@timestamp': '2020-12-12'
348+
some-field: 1
349+
- do:
350+
indices.refresh:
351+
index: data-stream-3
352+
353+
# document is rerouted to ds-3
354+
- do:
355+
search:
356+
index: data-stream-3
357+
body: { query: { match_all: { } } }
358+
- length: { hits.hits: 1 }
359+
- match: { hits.hits.0._source.some-field: 1 }
360+
361+
# add higher priority templates without reroute processors
362+
- do:
363+
indices.put_index_template:
364+
name: template_4
365+
body:
366+
index_patterns: [ "data-stream-1" ]
367+
priority: 2 # higher priority
368+
data_stream: { }
369+
- match: { acknowledged: true }
370+
- do:
371+
indices.put_index_template:
372+
name: template_5
373+
body:
374+
index_patterns: [ "data-stream-2" ]
375+
priority: 2 # higher priority
376+
data_stream: { }
377+
- match: { acknowledged: true }
378+
379+
# write to ds-1
380+
- do:
381+
index:
382+
index: data-stream-1
383+
body:
384+
'@timestamp': '2020-12-12'
385+
some-field: 2
386+
- do:
387+
indices.refresh:
388+
index: data-stream-3
389+
390+
# still rerouted because ds-1 and ds-2 rolled over
391+
- do:
392+
search:
393+
index: data-stream-3
394+
body: { query: { match_all: { } } }
395+
- length: { hits.hits: 2 }
396+
397+
# perform lazy rollover on ds-2
398+
- do:
399+
indices.rollover:
400+
alias: data-stream-2
401+
lazy: true
402+
403+
# write to ds-1
404+
- do:
405+
index:
406+
index: data-stream-1
407+
body:
408+
'@timestamp': '2020-12-12'
409+
some-field: 3
410+
- do:
411+
indices.refresh:
412+
index: data-stream-2
413+
414+
# written to ds-2, as rerouted to ds-2, but not on to ds-3
415+
- do:
416+
search:
417+
index: data-stream-2
418+
body: { query: { match_all: { } } }
419+
- length: { hits.hits: 1 }
420+
- match: { hits.hits.0._source.some-field: 3 }
421+
422+
# perform lazy rollover on 1
423+
- do:
424+
indices.rollover:
425+
alias: data-stream-1
426+
lazy: true
427+
428+
# write to ds-1
429+
- do:
430+
index:
431+
index: data-stream-1
432+
body:
433+
'@timestamp': '2020-12-12'
434+
some-field: 4
435+
- do:
436+
indices.refresh:
437+
index: data-stream-1
438+
439+
# written to ds-1, as not rerouted to ds-2
440+
- do:
441+
search:
442+
index: data-stream-1
443+
body: { query: { match_all: { } } }
444+
- length: { hits.hits: 1 }
445+
- match: { hits.hits.0._source.some-field: 4 }
446+
447+
---
448+
"Test remove then add reroute processor with and without lazy rollover":
449+
# start with pipeline that reroutes from ds-1 to ds-2
450+
- do:
451+
ingest.put_pipeline:
452+
id: "reroute-1"
453+
body: >
454+
{
455+
"processors": [
456+
{
457+
"reroute" : {"destination": "data-stream-2"}
458+
}
459+
]
460+
}
461+
- match: { acknowledged: true }
462+
463+
# set pipelines in templates
464+
- do:
465+
indices.put_index_template:
466+
name: template-1
467+
body:
468+
index_patterns: [ "data-stream-1"]
469+
priority: 1
470+
data_stream: { }
471+
template:
472+
settings:
473+
index.default_pipeline: "reroute-1"
474+
- match: { acknowledged: true }
475+
- do:
476+
indices.put_index_template:
477+
name: template_2
478+
body:
479+
index_patterns: [ "data-stream-2" ]
480+
priority: 1
481+
data_stream: { }
482+
- match: { acknowledged: true }
483+
484+
- do:
485+
indices.create_data_stream:
486+
name: data-stream-1
487+
- match: { acknowledged: true }
488+
489+
- do:
490+
indices.create_data_stream:
491+
name: data-stream-2
492+
- match: { acknowledged: true }
493+
494+
# write to ds-1
495+
- do:
496+
index:
497+
index: data-stream-1
498+
body:
499+
'@timestamp': '2020-12-12'
500+
some-field: 1
501+
- do:
502+
indices.refresh:
503+
index: data-stream-2
504+
505+
# document is rerouted to ds-2
506+
- do:
507+
search:
508+
index: data-stream-2
509+
body: { query: { match_all: { } } }
510+
- length: { hits.hits: 1 }
511+
512+
# add higher priority templates without reroute processors
513+
- do:
514+
indices.put_index_template:
515+
name: template_3
516+
body:
517+
index_patterns: [ "data-stream-1" ]
518+
priority: 2 # higher priority
519+
data_stream: { }
520+
- match: { acknowledged: true }
521+
522+
# perform lazy rollover on ds-2
523+
- do:
524+
indices.rollover:
525+
alias: data-stream-1
526+
lazy: true
527+
528+
# write to ds-1
529+
- do:
530+
index:
531+
index: data-stream-1
532+
body:
533+
'@timestamp': '2020-12-12'
534+
some-field: 2
535+
- do:
536+
indices.refresh:
537+
index: data-stream-1
538+
539+
# written to ds-1, as not rerouted to ds-2
540+
- do:
541+
search:
542+
index: data-stream-1
543+
body: { query: { match_all: { } } }
544+
- length: { hits.hits: 1 }
545+
546+
# add another higher priority templates with reroute processors
547+
- do:
548+
indices.put_index_template:
549+
name: template-3
550+
body:
551+
index_patterns: [ "data-stream-1" ]
552+
priority: 3
553+
data_stream: { }
554+
template:
555+
settings:
556+
index.default_pipeline: "reroute-1"
557+
- match: { acknowledged: true }
558+
559+
# don't do a lazy rollover
560+
# write to ds-1
561+
- do:
562+
index:
563+
index: data-stream-1
564+
body:
565+
'@timestamp': '2020-12-12'
566+
some-field: 3
567+
- do:
568+
indices.refresh:
569+
index: data-stream-1
570+
571+
# because no lazy rollover, still no reroute processor
572+
- do:
573+
search:
574+
index: data-stream-1
575+
body: { query: { match_all: { } } }
576+
- length: { hits.hits: 2 }

muted-tests.yml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,18 @@ tests:
293293
- class: org.elasticsearch.smoketest.SmokeTestMultiNodeClientYamlTestSuiteIT
294294
method: test {yaml=logsdb/10_settings/logsdb with default ignore dynamic beyond limit and non-default sorting}
295295
issue: https://github.com/elastic/elasticsearch/issues/116095
296-
296+
- class: org.elasticsearch.xpack.ml.integration.DatafeedJobsRestIT
297+
issue: https://github.com/elastic/elasticsearch/issues/111319
298+
- class: org.elasticsearch.upgrades.FullClusterRestartIT
299+
method: testSnapshotRestore {cluster=OLD}
300+
issue: https://github.com/elastic/elasticsearch/issues/111777
301+
- class: org.elasticsearch.xpack.restart.CoreFullClusterRestartIT
302+
method: testSnapshotRestore {cluster=OLD}
303+
issue: https://github.com/elastic/elasticsearch/issues/111775
304+
- class: org.elasticsearch.xpack.restart.CoreFullClusterRestartIT
305+
method: testSnapshotRestore {cluster=UPGRADED}
306+
issue: https://github.com/elastic/elasticsearch/issues/111799
307+
297308
# Examples:
298309
#
299310
# Mute a single test case in a YAML test suite:

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,18 @@ private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor exec
228228
metadata = clusterService.state().getMetadata();
229229
}
230230

231+
Map<String, IngestService.Pipelines> resolvedPipelineCache = new HashMap<>();
231232
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
232233
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
233234
if (indexRequest != null) {
234-
IngestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata);
235+
if (indexRequest.isPipelineResolved() == false) {
236+
var pipeline = resolvedPipelineCache.computeIfAbsent(
237+
indexRequest.index(),
238+
// TODO perhaps this should use `threadPool.absoluteTimeInMillis()`, but leaving as is for now.
239+
(index) -> IngestService.resolvePipelines(actionRequest, indexRequest, metadata, System.currentTimeMillis())
240+
);
241+
IngestService.setPipelineOnRequest(indexRequest, pipeline);
242+
}
235243
hasIndexRequestsWithPipelines |= IngestService.hasPipeline(indexRequest);
236244
}
237245

0 commit comments

Comments
 (0)