Skip to content

Commit 15482ce

Browse files
authored
[apm-data] Apply lazy rollover on index template creation (#116219) (#116241)
* Apply lazy rollover on index template creation We should trigger a lazy rollover of existing data streams regardless of whether the index template is being created or updated. This ensures that the apm-data plugin will roll over data streams that were previously using the Fleet integration package. * Update docs/changelog/116219.yaml * Update docs/changelog/116219.yaml * Add YAML REST test for template reinstallation * Code review suggestion #116219 (comment) * Remove wait_for_events from setup This doesn't guarantee the templates are set up, it only increases the chances; and we disable the plugin at the start of the test anyway.
1 parent 633e7d8 commit 15482ce

File tree

8 files changed

+146
-51
lines changed

8 files changed

+146
-51
lines changed

docs/changelog/116219.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 116219
2+
summary: "[apm-data] Apply lazy rollover on index template creation"
3+
area: Data streams
4+
type: bug
5+
issues:
6+
- 116230

x-pack/plugin/apm-data/src/yamlRestTest/resources/rest-api-spec/test/10_apm.yml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,23 @@ setup:
5353
- contains: {index_templates: {name: logs-apm.app@template}}
5454
- contains: {index_templates: {name: logs-apm.error@template}}
5555

56+
---
57+
"Test template reinstallation":
58+
- skip:
59+
reason: contains is a newly added assertion
60+
features: contains
61+
- do:
62+
indices.delete_index_template:
63+
name: traces-apm@template
64+
- do:
65+
cluster.health:
66+
wait_for_events: languid
67+
- do:
68+
indices.get_index_template:
69+
name: traces-apm@template
70+
- length: {index_templates: 1}
71+
- contains: {index_templates: {name: traces-apm@template}}
72+
5673
---
5774
"Test traces-apm-* data stream indexing":
5875
- skip:
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
---
2+
setup:
3+
- do:
4+
indices.put_index_template:
5+
name: traces-low-prio
6+
body:
7+
data_stream: {}
8+
index_patterns: ["traces-*"]
9+
priority: 1
10+
11+
---
12+
"Test data stream rollover on template installation":
13+
- skip:
14+
awaits_fix: "https://github.com/elastic/elasticsearch/issues/102360"
15+
16+
# Disable the apm-data plugin and delete the traces-apm@template index
17+
# template so traces-low-prio takes effect.
18+
- do:
19+
cluster.put_settings:
20+
body:
21+
transient:
22+
xpack.apm_data.registry.enabled: false
23+
- do:
24+
indices.delete_index_template:
25+
name: traces-apm@template
26+
- do:
27+
indices.create_data_stream:
28+
name: traces-apm-testing
29+
- do:
30+
indices.get_data_stream:
31+
name: traces-apm-testing
32+
- match: {data_streams.0.template: traces-low-prio}
33+
34+
# Re-enable the apm-data plugin, after which the traces-apm@template
35+
# index template should be recreated and trigger a lazy rollover on
36+
# the traces-apm-testing data stream.
37+
- do:
38+
cluster.put_settings:
39+
body:
40+
transient:
41+
xpack.apm_data.registry.enabled: true
42+
- do:
43+
cluster.health:
44+
wait_for_events: languid
45+
- do:
46+
indices.get_data_stream:
47+
name: traces-apm-testing
48+
- length: {data_streams: 1}
49+
- match: {data_streams.0.template: traces-apm@template}
50+
- match: {data_streams.0.rollover_on_write: true}
51+

x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/template/RolloverEnabledTestTemplateRegistry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ protected Map<String, ComposableIndexTemplate> getComposableTemplateConfigs() {
5353
}
5454

5555
@Override
56-
protected boolean applyRolloverAfterTemplateV2Upgrade() {
56+
protected boolean applyRolloverAfterTemplateV2Update() {
5757
return true;
5858
}
5959

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistry.java

Lines changed: 65 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ private void addComposableTemplatesIfMissing(ClusterState state) {
400400
}
401401
} else if (Objects.isNull(currentTemplate)) {
402402
logger.debug("adding composable template [{}] for [{}], because it doesn't exist", templateName, getOrigin());
403-
putComposableTemplate(state, templateName, newTemplate.getValue(), creationCheck, false);
403+
putComposableTemplate(state, templateName, newTemplate.getValue(), creationCheck);
404404
} else if (Objects.isNull(currentTemplate.version()) || newTemplate.getValue().version() > currentTemplate.version()) {
405405
// IndexTemplateConfig now enforces templates contain a `version` property, so if the template doesn't have one we can
406406
// safely assume it's an old version of the template.
@@ -411,7 +411,7 @@ private void addComposableTemplatesIfMissing(ClusterState state) {
411411
currentTemplate.version(),
412412
newTemplate.getValue().version()
413413
);
414-
putComposableTemplate(state, templateName, newTemplate.getValue(), creationCheck, true);
414+
putComposableTemplate(state, templateName, newTemplate.getValue(), creationCheck);
415415
} else {
416416
creationCheck.set(false);
417417
logger.trace(
@@ -433,11 +433,11 @@ private void addComposableTemplatesIfMissing(ClusterState state) {
433433

434434
/**
435435
* Returns true if the cluster state contains all of the component templates needed by the composable template. If this registry
436-
* requires automatic rollover after index template upgrades (see {@link #applyRolloverAfterTemplateV2Upgrade()}), this method also
436+
* requires automatic rollover after index template upgrades (see {@link #applyRolloverAfterTemplateV2Update()}), this method also
437437
* verifies that the installed components templates are of the right version.
438438
*/
439439
private boolean componentTemplatesInstalled(ClusterState state, ComposableIndexTemplate indexTemplate) {
440-
if (applyRolloverAfterTemplateV2Upgrade() == false) {
440+
if (applyRolloverAfterTemplateV2Update() == false) {
441441
// component templates and index templates can be updated independently, we only need to know that the required component
442442
// templates are available
443443
return state.metadata().componentTemplates().keySet().containsAll(indexTemplate.getRequiredComponentTemplates());
@@ -533,8 +533,7 @@ private void putComposableTemplate(
533533
ClusterState state,
534534
final String templateName,
535535
final ComposableIndexTemplate indexTemplate,
536-
final AtomicBoolean creationCheck,
537-
final boolean isUpgrade
536+
final AtomicBoolean creationCheck
538537
) {
539538
final Executor executor = threadPool.generic();
540539
executor.execute(() -> {
@@ -549,8 +548,8 @@ private void putComposableTemplate(
549548
@Override
550549
public void onResponse(AcknowledgedResponse response) {
551550
if (response.isAcknowledged()) {
552-
if (isUpgrade && applyRolloverAfterTemplateV2Upgrade()) {
553-
invokeRollover(state, templateName, indexTemplate, creationCheck);
551+
if (applyRolloverAfterTemplateV2Update()) {
552+
invokeRollover(state, templateName, indexTemplate, () -> creationCheck.set((false)));
554553
} else {
555554
creationCheck.set(false);
556555
}
@@ -763,12 +762,13 @@ public void onFailure(Exception e) {
763762

764763
/**
765764
* Allows registries to opt-in for automatic rollover of "relevant" data streams immediately after a composable index template gets
766-
* upgraded. If set to {@code true}, then every time a composable index template is being upgraded, all data streams of which name
767-
* matches this template's index patterns AND of all matching templates the upgraded one has the highest priority, will be rolled over.
765+
* updated, including its initial installation. If set to {@code true}, then every time a composable index template is being updated,
766+
* all data streams of which name matches this template's index patterns AND of all matching templates the upgraded one has the highest
767+
* priority, will be rolled over.
768768
*
769769
* @return {@code true} if this registry wants to apply automatic rollovers after template V2 upgrades
770770
*/
771-
protected boolean applyRolloverAfterTemplateV2Upgrade() {
771+
protected boolean applyRolloverAfterTemplateV2Update() {
772772
return false;
773773
}
774774

@@ -782,50 +782,56 @@ protected void onPutPipelineFailure(String pipelineId, Exception e) {
782782
logger.error(() -> format("error adding ingest pipeline template [%s] for [%s]", pipelineId, getOrigin()), e);
783783
}
784784

785+
/**
786+
* invokeRollover rolls over any data streams matching the index template,
787+
* and then invokes runAfter.
788+
*/
785789
private void invokeRollover(
786790
final ClusterState state,
787791
final String templateName,
788792
final ComposableIndexTemplate indexTemplate,
789-
final AtomicBoolean creationCheck
793+
final Runnable runAfter
790794
) {
791795
final Executor executor = threadPool.generic();
792796
executor.execute(() -> {
793797
List<String> rolloverTargets = findRolloverTargetDataStreams(state, templateName, indexTemplate);
794-
if (rolloverTargets.isEmpty() == false) {
795-
GroupedActionListener<RolloverResponse> groupedActionListener = new GroupedActionListener<>(
796-
rolloverTargets.size(),
797-
new ActionListener<>() {
798-
@Override
799-
public void onResponse(Collection<RolloverResponse> rolloverResponses) {
800-
creationCheck.set(false);
801-
onRolloversBulkResponse(rolloverResponses);
802-
}
798+
if (rolloverTargets.isEmpty()) {
799+
runAfter.run();
800+
return;
801+
}
802+
GroupedActionListener<RolloverResponse> groupedActionListener = new GroupedActionListener<>(
803+
rolloverTargets.size(),
804+
new ActionListener<>() {
805+
@Override
806+
public void onResponse(Collection<RolloverResponse> rolloverResponses) {
807+
runAfter.run();
808+
onRolloversBulkResponse(rolloverResponses);
809+
}
803810

804-
@Override
805-
public void onFailure(Exception e) {
806-
creationCheck.set(false);
807-
onRolloverFailure(e);
808-
}
811+
@Override
812+
public void onFailure(Exception e) {
813+
runAfter.run();
814+
onRolloverFailure(e);
809815
}
810-
);
811-
for (String rolloverTarget : rolloverTargets) {
812-
logger.info(
813-
"rolling over data stream [{}] lazily as a followup to the upgrade of the [{}] index template [{}]",
814-
rolloverTarget,
815-
getOrigin(),
816-
templateName
817-
);
818-
RolloverRequest request = new RolloverRequest(rolloverTarget, null);
819-
request.lazy(true);
820-
request.masterNodeTimeout(TimeValue.MAX_VALUE);
821-
executeAsyncWithOrigin(
822-
client.threadPool().getThreadContext(),
823-
getOrigin(),
824-
request,
825-
groupedActionListener,
826-
(req, listener) -> client.execute(RolloverAction.INSTANCE, req, listener)
827-
);
828816
}
817+
);
818+
for (String rolloverTarget : rolloverTargets) {
819+
logger.info(
820+
"rolling over data stream [{}] lazily as a followup to the upgrade of the [{}] index template [{}]",
821+
rolloverTarget,
822+
getOrigin(),
823+
templateName
824+
);
825+
RolloverRequest request = new RolloverRequest(rolloverTarget, null);
826+
request.lazy(true);
827+
request.masterNodeTimeout(TimeValue.MAX_VALUE);
828+
executeAsyncWithOrigin(
829+
client.threadPool().getThreadContext(),
830+
getOrigin(),
831+
request,
832+
groupedActionListener,
833+
(req, listener) -> client.execute(RolloverAction.INSTANCE, req, listener)
834+
);
829835
}
830836
});
831837
}
@@ -865,7 +871,21 @@ static List<String> findRolloverTargetDataStreams(ClusterState state, String tem
865871
.stream()
866872
// Limit to checking data streams that match any of the index template's index patterns
867873
.filter(ds -> indexTemplate.indexPatterns().stream().anyMatch(pattern -> Regex.simpleMatch(pattern, ds.getName())))
868-
.filter(ds -> templateName.equals(MetadataIndexTemplateService.findV2Template(metadata, ds.getName(), ds.isHidden())))
874+
.filter(ds -> {
875+
final String dsTemplateName = MetadataIndexTemplateService.findV2Template(metadata, ds.getName(), ds.isHidden());
876+
if (templateName.equals(dsTemplateName)) {
877+
return true;
878+
}
879+
// findV2Template did not match templateName, which implies one of two things:
880+
// - indexTemplate has a lower priority than the index template matching for ds, OR
881+
// - indexTemplate does not yet exist in cluster state (i.e. because it's in the process of being
882+
// installed or updated)
883+
//
884+
// Because of the second case, we must check if indexTemplate's priority is greater than the matching
885+
// index template, in case it would take precedence after installation/update.
886+
final ComposableIndexTemplate dsTemplate = metadata.templatesV2().get(dsTemplateName);
887+
return dsTemplate == null || indexTemplate.priorityOrZero() > dsTemplate.priorityOrZero();
888+
})
869889
.map(DataStream::getName)
870890
.collect(Collectors.toList());
871891
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/YamlTemplateRegistry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ private IngestPipelineConfig loadIngestPipeline(String name, int version, @Nulla
227227
}
228228

229229
@Override
230-
protected boolean applyRolloverAfterTemplateV2Upgrade() {
230+
protected boolean applyRolloverAfterTemplateV2Update() {
231231
return true;
232232
}
233233
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistryTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ public void testAutomaticRollover() throws Exception {
435435
assertThat(suppressed[0].getMessage(), startsWith("Failed to rollover logs-my_app-"));
436436
}
437437

438-
public void testNoRolloverForFreshInstalledIndexTemplate() throws Exception {
438+
public void testRolloverForFreshInstalledIndexTemplate() throws Exception {
439439
DiscoveryNode node = DiscoveryNodeUtils.create("node");
440440
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
441441

@@ -473,9 +473,10 @@ public void testNoRolloverForFreshInstalledIndexTemplate() throws Exception {
473473
registry.setApplyRollover(true);
474474
registry.clusterChanged(event);
475475
assertBusy(() -> assertThat(putIndexTemplateCounter.get(), equalTo(1)));
476-
// the index component is first installed, not upgraded, therefore rollover should not be triggered
476+
// rollover should be triggered even for the first installation, since the template
477+
// may now take precedence over a data stream's existing index template
477478
Thread.sleep(100L);
478-
assertThat(rolloverCounter.get(), equalTo(0));
479+
assertThat(rolloverCounter.get(), equalTo(2));
479480
}
480481

481482
public void testThatTemplatesAreNotUpgradedWhenNotNeeded() throws Exception {

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/template/TestRegistryWithCustomPlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public void setPolicyUpgradeRequired(boolean policyUpgradeRequired) {
118118
}
119119

120120
@Override
121-
protected boolean applyRolloverAfterTemplateV2Upgrade() {
121+
protected boolean applyRolloverAfterTemplateV2Update() {
122122
return applyRollover.get();
123123
}
124124

0 commit comments

Comments
 (0)