Skip to content

Commit 08d8357

Browse files
Resolve pipelines from template if lazy rollover write (#116031) (#116137)
If datastream rollover on write flag is set in cluster state, resolve pipelines from templates rather than from metadata. This fixes the following bug: when a pipeline reroutes every document to another index, and rollover is called with lazy=true (setting the rollover on write flag), changes to the pipeline do not go into effect, because the lack of writes means the data stream never rolls over and pipelines in metadata are not updated. The fix is to resolve pipelines from templates if the lazy rollover flag is set. To improve efficiency we only resolve pipelines once per index in the bulk request, caching the value, and reusing for other requests to the same index. Fixes: #112781 (cherry picked from commit 6db39d1)
1 parent 72fba73 commit 08d8357

File tree

5 files changed

+525
-11
lines changed

5 files changed

+525
-11
lines changed

docs/changelog/116031.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 116031
2+
summary: Resolve pipelines from template on lazy rollover write
3+
area: Data streams
4+
type: bug
5+
issues:
6+
- 112781

modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/310_reroute_processor.yml

Lines changed: 323 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ teardown:
3636
ingest.delete_pipeline:
3737
id: "pipeline-2"
3838
ignore: 404
39-
39+
- do:
40+
indices.delete_data_stream:
41+
name: "data-stream-*"
42+
expand_wildcards: all
4043
---
4144
"Test first matching router terminates pipeline":
4245
- skip:
@@ -252,3 +255,322 @@ teardown:
252255
- match: { _source.existing-field : true }
253256
- match: { _source.added-in-pipeline-before-reroute : true }
254257
- match: { _source.added-in-pipeline-after-reroute : true }
258+
259+
---
260+
"Test data stream with lazy rollover obtains pipeline from template":
261+
# This test starts with chain of reroutes, from data-stream-1, to data-stream-2, to data-stream-3
262+
# We then add higher priority templates that remove the reroute processors. Then we show that
263+
# after a lazy rollover on data-stream-2, a document written to data-stream-1 still gets rerouted
264+
# to data-steam-2, but not on to data-stream-3. Finally, after a lazy rollover on data-stream-1
265+
# causes the new template to also take effect on data-stream-1, and the last write goes directly
266+
# into data-stream-1. Multiple reroute steps are tested because pipeline resolution using a
267+
# different code path for initial index and indices after a reroute.
268+
269+
# start with pipeline that reroutes from ds-1 to ds-2
270+
- do:
271+
ingest.put_pipeline:
272+
id: "reroute-1"
273+
body: >
274+
{
275+
"processors": [
276+
{
277+
"reroute" : {"destination": "data-stream-2"}
278+
}
279+
]
280+
}
281+
- match: { acknowledged: true }
282+
283+
# and pipeline that reroutes from ds-2 to ds-3
284+
- do:
285+
ingest.put_pipeline:
286+
id: "reroute-2"
287+
body: >
288+
{
289+
"processors": [
290+
{
291+
"reroute" : {"destination": "data-stream-3"}
292+
}
293+
]
294+
}
295+
- match: { acknowledged: true }
296+
297+
# set pipelines in templates
298+
- do:
299+
indices.put_index_template:
300+
name: template-1
301+
body:
302+
index_patterns: [ "data-stream-1"]
303+
priority: 1
304+
data_stream: { }
305+
template:
306+
settings:
307+
index.default_pipeline: "reroute-1"
308+
- match: { acknowledged: true }
309+
- do:
310+
indices.put_index_template:
311+
name: template-2
312+
body:
313+
index_patterns: [ "data-stream-2"]
314+
priority: 1
315+
data_stream: { }
316+
template:
317+
settings:
318+
index.default_pipeline: "reroute-2"
319+
- match: { acknowledged: true }
320+
- do:
321+
indices.put_index_template:
322+
name: template_3
323+
body:
324+
index_patterns: [ "data-stream-3" ]
325+
priority: 1
326+
data_stream: { }
327+
- match: { acknowledged: true }
328+
329+
- do:
330+
indices.create_data_stream:
331+
name: data-stream-1
332+
- match: { acknowledged: true }
333+
- do:
334+
indices.create_data_stream:
335+
name: data-stream-2
336+
- match: { acknowledged: true }
337+
- do:
338+
indices.create_data_stream:
339+
name: data-stream-3
340+
- match: { acknowledged: true }
341+
342+
# write to ds-1
343+
- do:
344+
index:
345+
index: data-stream-1
346+
body:
347+
'@timestamp': '2020-12-12'
348+
some-field: 1
349+
- do:
350+
indices.refresh:
351+
index: data-stream-3
352+
353+
# document is rerouted to ds-3
354+
- do:
355+
search:
356+
index: data-stream-3
357+
body: { query: { match_all: { } } }
358+
- length: { hits.hits: 1 }
359+
- match: { hits.hits.0._source.some-field: 1 }
360+
361+
# add higher priority templates without reroute processors
362+
- do:
363+
indices.put_index_template:
364+
name: template_4
365+
body:
366+
index_patterns: [ "data-stream-1" ]
367+
priority: 2 # higher priority
368+
data_stream: { }
369+
- match: { acknowledged: true }
370+
- do:
371+
indices.put_index_template:
372+
name: template_5
373+
body:
374+
index_patterns: [ "data-stream-2" ]
375+
priority: 2 # higher priority
376+
data_stream: { }
377+
- match: { acknowledged: true }
378+
379+
# write to ds-1
380+
- do:
381+
index:
382+
index: data-stream-1
383+
body:
384+
'@timestamp': '2020-12-12'
385+
some-field: 2
386+
- do:
387+
indices.refresh:
388+
index: data-stream-3
389+
390+
# still rerouted because ds-1 and ds-2 rolled over
391+
- do:
392+
search:
393+
index: data-stream-3
394+
body: { query: { match_all: { } } }
395+
- length: { hits.hits: 2 }
396+
397+
# perform lazy rollover on ds-2
398+
- do:
399+
indices.rollover:
400+
alias: data-stream-2
401+
lazy: true
402+
403+
# write to ds-1
404+
- do:
405+
index:
406+
index: data-stream-1
407+
body:
408+
'@timestamp': '2020-12-12'
409+
some-field: 3
410+
- do:
411+
indices.refresh:
412+
index: data-stream-2
413+
414+
# written to ds-2, as rerouted to ds-2, but not on to ds-3
415+
- do:
416+
search:
417+
index: data-stream-2
418+
body: { query: { match_all: { } } }
419+
- length: { hits.hits: 1 }
420+
- match: { hits.hits.0._source.some-field: 3 }
421+
422+
# perform lazy rollover on 1
423+
- do:
424+
indices.rollover:
425+
alias: data-stream-1
426+
lazy: true
427+
428+
# write to ds-1
429+
- do:
430+
index:
431+
index: data-stream-1
432+
body:
433+
'@timestamp': '2020-12-12'
434+
some-field: 4
435+
- do:
436+
indices.refresh:
437+
index: data-stream-1
438+
439+
# written to ds-1, as not rerouted to ds-2
440+
- do:
441+
search:
442+
index: data-stream-1
443+
body: { query: { match_all: { } } }
444+
- length: { hits.hits: 1 }
445+
- match: { hits.hits.0._source.some-field: 4 }
446+
447+
---
448+
"Test remove then add reroute processor with and without lazy rollover":
449+
# start with pipeline that reroutes from ds-1 to ds-2
450+
- do:
451+
ingest.put_pipeline:
452+
id: "reroute-1"
453+
body: >
454+
{
455+
"processors": [
456+
{
457+
"reroute" : {"destination": "data-stream-2"}
458+
}
459+
]
460+
}
461+
- match: { acknowledged: true }
462+
463+
# set pipelines in templates
464+
- do:
465+
indices.put_index_template:
466+
name: template-1
467+
body:
468+
index_patterns: [ "data-stream-1"]
469+
priority: 1
470+
data_stream: { }
471+
template:
472+
settings:
473+
index.default_pipeline: "reroute-1"
474+
- match: { acknowledged: true }
475+
- do:
476+
indices.put_index_template:
477+
name: template_2
478+
body:
479+
index_patterns: [ "data-stream-2" ]
480+
priority: 1
481+
data_stream: { }
482+
- match: { acknowledged: true }
483+
484+
- do:
485+
indices.create_data_stream:
486+
name: data-stream-1
487+
- match: { acknowledged: true }
488+
489+
- do:
490+
indices.create_data_stream:
491+
name: data-stream-2
492+
- match: { acknowledged: true }
493+
494+
# write to ds-1
495+
- do:
496+
index:
497+
index: data-stream-1
498+
body:
499+
'@timestamp': '2020-12-12'
500+
some-field: 1
501+
- do:
502+
indices.refresh:
503+
index: data-stream-2
504+
505+
# document is rerouted to ds-2
506+
- do:
507+
search:
508+
index: data-stream-2
509+
body: { query: { match_all: { } } }
510+
- length: { hits.hits: 1 }
511+
512+
# add higher priority templates without reroute processors
513+
- do:
514+
indices.put_index_template:
515+
name: template_3
516+
body:
517+
index_patterns: [ "data-stream-1" ]
518+
priority: 2 # higher priority
519+
data_stream: { }
520+
- match: { acknowledged: true }
521+
522+
# perform lazy rollover on ds-2
523+
- do:
524+
indices.rollover:
525+
alias: data-stream-1
526+
lazy: true
527+
528+
# write to ds-1
529+
- do:
530+
index:
531+
index: data-stream-1
532+
body:
533+
'@timestamp': '2020-12-12'
534+
some-field: 2
535+
- do:
536+
indices.refresh:
537+
index: data-stream-1
538+
539+
# written to ds-1, as not rerouted to ds-2
540+
- do:
541+
search:
542+
index: data-stream-1
543+
body: { query: { match_all: { } } }
544+
- length: { hits.hits: 1 }
545+
546+
# add another higher priority templates with reroute processors
547+
- do:
548+
indices.put_index_template:
549+
name: template-3
550+
body:
551+
index_patterns: [ "data-stream-1" ]
552+
priority: 3
553+
data_stream: { }
554+
template:
555+
settings:
556+
index.default_pipeline: "reroute-1"
557+
- match: { acknowledged: true }
558+
559+
# don't do a lazy rollover
560+
# write to ds-1
561+
- do:
562+
index:
563+
index: data-stream-1
564+
body:
565+
'@timestamp': '2020-12-12'
566+
some-field: 3
567+
- do:
568+
indices.refresh:
569+
index: data-stream-1
570+
571+
# because no lazy rollover, still no reroute processor
572+
- do:
573+
search:
574+
index: data-stream-1
575+
body: { query: { match_all: { } } }
576+
- length: { hits.hits: 2 }

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import org.elasticsearch.threadpool.ThreadPool;
3939
import org.elasticsearch.transport.TransportService;
4040

41+
import java.util.HashMap;
42+
import java.util.Map;
4143
import java.util.Objects;
4244
import java.util.concurrent.Executor;
4345
import java.util.concurrent.TimeUnit;
@@ -171,10 +173,18 @@ protected void doRun() {
171173
private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> listener) {
172174
boolean hasIndexRequestsWithPipelines = false;
173175
final Metadata metadata = clusterService.state().getMetadata();
176+
Map<String, IngestService.Pipelines> resolvedPipelineCache = new HashMap<>();
174177
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
175178
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
176179
if (indexRequest != null) {
177-
IngestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata);
180+
if (indexRequest.isPipelineResolved() == false) {
181+
var pipeline = resolvedPipelineCache.computeIfAbsent(
182+
indexRequest.index(),
183+
// TODO perhaps this should use `threadPool.absoluteTimeInMillis()`, but leaving as is for now.
184+
(index) -> IngestService.resolvePipelines(actionRequest, indexRequest, metadata, System.currentTimeMillis())
185+
);
186+
IngestService.setPipelineOnRequest(indexRequest, pipeline);
187+
}
178188
hasIndexRequestsWithPipelines |= IngestService.hasPipeline(indexRequest);
179189
}
180190

0 commit comments

Comments
 (0)