Skip to content

Commit fa573c8

Browse files
klacabanekibanamachine
authored andcommitted
[streams][lifecycle] ilm for classic streams (elastic#221364)
## Summary Enable ILM for classic streams. We can now overwrite template settings at the data stream level with `PUT _data_stream/{name}/_settings` so we can enable full lifecycle capabilities on classic stream. --------- Co-authored-by: kibanamachine <[email protected]>
1 parent 7b5315e commit fa573c8

File tree

7 files changed

+162
-183
lines changed

7 files changed

+162
-183
lines changed

x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.test.ts

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,6 @@ describe('generateLayer', () => {
3939
},
4040
"name": "[email protected]",
4141
"template": Object {
42-
"lifecycle": Object {
43-
"data_retention": "30d",
44-
"enabled": true,
45-
},
4642
"mappings": Object {
4743
"dynamic": false,
4844
"properties": Object {
@@ -63,10 +59,7 @@ describe('generateLayer', () => {
6359
},
6460
},
6561
},
66-
"settings": Object {
67-
"index.lifecycle.name": undefined,
68-
"index.lifecycle.prefer_ilm": false,
69-
},
62+
"settings": Object {},
7063
},
7164
"version": 1,
7265
}
@@ -83,9 +76,6 @@ describe('generateLayer', () => {
8376
},
8477
"name": "[email protected]",
8578
"template": Object {
86-
"lifecycle": Object {
87-
"data_retention": "30d",
88-
},
8979
"mappings": Object {
9080
"dynamic": false,
9181
"properties": Object {

x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.ts

Lines changed: 1 addition & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,7 @@ import {
1010
MappingDateProperty,
1111
MappingProperty,
1212
} from '@elastic/elasticsearch/lib/api/types';
13-
import {
14-
Streams,
15-
getAdvancedParameters,
16-
isDslLifecycle,
17-
isIlmLifecycle,
18-
isRoot,
19-
namespacePrefixes,
20-
} from '@kbn/streams-schema';
13+
import { Streams, getAdvancedParameters, isRoot, namespacePrefixes } from '@kbn/streams-schema';
2114
import { ASSET_VERSION } from '../../../../common/constants';
2215
import { logsSettings } from './logs_layer';
2316
import { getComponentTemplateName } from './name';
@@ -65,7 +58,6 @@ export function generateLayer(
6558
return {
6659
name: getComponentTemplateName(name),
6760
template: {
68-
lifecycle: getTemplateLifecycle(definition, isServerless),
6961
settings: getTemplateSettings(definition, isServerless),
7062
mappings: {
7163
dynamic: false,
@@ -85,53 +77,7 @@ export function generateLayer(
8577
};
8678
}
8779

88-
function getTemplateLifecycle(definition: Streams.WiredStream.Definition, isServerless: boolean) {
89-
const lifecycle = definition.ingest.lifecycle;
90-
if (isServerless) {
91-
// dlm cannot be disabled in serverless
92-
return {
93-
data_retention: isDslLifecycle(lifecycle) ? lifecycle.dsl.data_retention : undefined,
94-
};
95-
}
96-
97-
if (isIlmLifecycle(lifecycle)) {
98-
return { enabled: false };
99-
}
100-
101-
if (isDslLifecycle(lifecycle)) {
102-
return {
103-
enabled: true,
104-
data_retention: lifecycle.dsl.data_retention,
105-
};
106-
}
107-
108-
return undefined;
109-
}
110-
11180
function getTemplateSettings(definition: Streams.WiredStream.Definition, isServerless: boolean) {
11281
const baseSettings = isRoot(definition.name) ? logsSettings : {};
113-
const lifecycle = definition.ingest.lifecycle;
114-
115-
if (isServerless) {
116-
return baseSettings;
117-
}
118-
119-
if (isIlmLifecycle(lifecycle)) {
120-
return {
121-
...baseSettings,
122-
'index.lifecycle.prefer_ilm': true,
123-
'index.lifecycle.name': lifecycle.ilm.policy,
124-
};
125-
}
126-
127-
if (isDslLifecycle(lifecycle)) {
128-
return {
129-
...baseSettings,
130-
'index.lifecycle.prefer_ilm': false,
131-
'index.lifecycle.name': undefined,
132-
};
133-
}
134-
135-
// don't specify any lifecycle property when lifecyle is disabled or inherited
13682
return baseSettings;
13783
}

x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -138,32 +138,28 @@ export async function updateDataStreamsLifecycle({
138138
{ logger }
139139
);
140140

141-
// if we transition from ilm to dlm or vice versa, the rolled over backing
141+
// if we transition from ilm to dsl or vice versa, the rolled over backing
142142
// indices need to be updated or they'll retain the lifecycle configuration
143143
// set at the time of creation.
144-
// this is not needed for serverless since only dlm is allowed but in stateful
145-
// we update every indices while not always necessary. this should be optimized
144+
// this is not needed for serverless since only dsl is allowed.
146145
if (isServerless) {
147146
return;
148147
}
149148

150-
const dataStreams = await esClient.indices.getDataStream({ name: names });
151149
const isIlm = isIlmLifecycle(lifecycle);
152-
153-
for (const dataStream of dataStreams.data_streams) {
154-
logger.debug(`updating settings for data stream ${dataStream.name} backing indices`);
155-
await retryTransientEsErrors(
156-
() =>
157-
esClient.indices.putSettings({
158-
index: dataStream.indices.map((index) => index.index_name),
159-
settings: {
160-
'lifecycle.prefer_ilm': isIlm,
161-
'lifecycle.name': isIlm ? lifecycle.ilm.policy : null,
162-
},
163-
}),
164-
{ logger }
165-
);
166-
}
150+
await retryTransientEsErrors(
151+
() =>
152+
// TODO: use client method once available
153+
esClient.transport.request({
154+
method: 'PUT',
155+
path: `/_data_stream/${names.join(',')}/_settings`,
156+
body: {
157+
'index.lifecycle.name': isIlm ? lifecycle.ilm.policy : null,
158+
'index.lifecycle.prefer_ilm': isIlm,
159+
},
160+
}),
161+
{ logger }
162+
);
167163
} catch (err: any) {
168164
logger.error(`Error updating data stream lifecycle: ${err.message}`);
169165
throw err;

x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/unwired_stream.ts

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import type {
1010
IngestProcessorContainer,
1111
} from '@elastic/elasticsearch/lib/api/types';
1212
import type { IngestStreamLifecycle } from '@kbn/streams-schema';
13-
import { isDslLifecycle, isInheritLifecycle, Streams } from '@kbn/streams-schema';
13+
import { isIlmLifecycle, isInheritLifecycle, Streams } from '@kbn/streams-schema';
1414
import _, { cloneDeep } from 'lodash';
1515
import { isNotFoundError } from '@kbn/es-errors';
1616
import { StatusError } from '../../errors/status_error';
@@ -28,7 +28,7 @@ import { StreamActiveRecord, PrintableStream } from '../stream_active_record/str
2828

2929
export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Definition> {
3030
private _processingChanged: boolean = false;
31-
private _lifeCycleChanged: boolean = false;
31+
private _lifecycleChanged: boolean = false;
3232

3333
constructor(definition: Streams.UnwiredStream.Definition, dependencies: StateDependencies) {
3434
super(definition, dependencies);
@@ -42,7 +42,7 @@ export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Defi
4242
return {
4343
...super.toPrintable(),
4444
processingChanged: this._processingChanged,
45-
lifeCycleChanged: this._lifeCycleChanged,
45+
lifecycleChanged: this._lifecycleChanged,
4646
};
4747
}
4848

@@ -77,7 +77,7 @@ export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Defi
7777
startingStateStreamDefinition.ingest.processing
7878
);
7979

80-
this._lifeCycleChanged =
80+
this._lifecycleChanged =
8181
!startingStateStreamDefinition ||
8282
!_.isEqual(this._definition.ingest.lifecycle, startingStateStreamDefinition.ingest.lifecycle);
8383

@@ -100,8 +100,24 @@ export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Defi
100100
desiredState: State,
101101
startingState: State
102102
): Promise<ValidationResult> {
103+
if (this.dependencies.isServerless && isIlmLifecycle(this.getLifecycle())) {
104+
return { isValid: false, errors: [new Error('Using ILM is not supported in Serverless')] };
105+
}
106+
107+
if (
108+
startingState.get(this._definition.name)?.definition &&
109+
this._lifecycleChanged &&
110+
isInheritLifecycle(this.getLifecycle())
111+
) {
112+
// temporary until https://github.com/elastic/kibana/issues/222440 is resolved
113+
return {
114+
isValid: false,
115+
errors: [new Error('Cannot revert to default lifecycle once updated')],
116+
};
117+
}
118+
103119
// Check for conflicts
104-
if (this._lifeCycleChanged || this._processingChanged) {
120+
if (this._lifecycleChanged || this._processingChanged) {
105121
try {
106122
const dataStreamResult =
107123
await this.dependencies.scopedClusterClient.asCurrentUser.indices.getDataStream({
@@ -134,19 +150,6 @@ export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Defi
134150
}
135151
}
136152

137-
if (this._lifeCycleChanged && isDslLifecycle(this.getLifeCycle())) {
138-
const dataStream = await this.dependencies.streamsClient.getDataStream(this._definition.name);
139-
if (dataStream.ilm_policy !== undefined) {
140-
return {
141-
isValid: false,
142-
errors: [
143-
new Error(
144-
'Cannot apply DSL lifecycle to a data stream that is already managed by an ILM policy'
145-
),
146-
],
147-
};
148-
}
149-
}
150153
return { isValid: true, errors: [] };
151154
}
152155

@@ -166,12 +169,12 @@ export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Defi
166169
if (this._definition.ingest.processing.length > 0) {
167170
actions.push(...(await this.createUpsertPipelineActions()));
168171
}
169-
if (!isInheritLifecycle(this.getLifeCycle())) {
172+
if (!isInheritLifecycle(this.getLifecycle())) {
170173
actions.push({
171174
type: 'update_lifecycle',
172175
request: {
173176
name: this._definition.name,
174-
lifecycle: this.getLifeCycle(),
177+
lifecycle: this.getLifecycle(),
175178
},
176179
});
177180
}
@@ -182,11 +185,11 @@ export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Defi
182185
return actions;
183186
}
184187

185-
public hasChangedLifeCycle(): boolean {
186-
return this._lifeCycleChanged;
188+
public hasChangedLifecycle(): boolean {
189+
return this._lifecycleChanged;
187190
}
188191

189-
public getLifeCycle(): IngestStreamLifecycle {
192+
public getLifecycle(): IngestStreamLifecycle {
190193
return this._definition.ingest.lifecycle;
191194
}
192195

@@ -223,12 +226,12 @@ export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Defi
223226
});
224227
}
225228

226-
if (this._lifeCycleChanged) {
229+
if (this._lifecycleChanged && !isInheritLifecycle(this.getLifecycle())) {
227230
actions.push({
228231
type: 'update_lifecycle',
229232
request: {
230233
name: this._definition.name,
231-
lifecycle: this.getLifeCycle(),
234+
lifecycle: this.getLifecycle(),
232235
},
233236
});
234237
}

x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
*/
77

88
import { isNotFoundError } from '@kbn/es-errors';
9-
import { IngestStreamLifecycle, Streams } from '@kbn/streams-schema';
9+
import { IngestStreamLifecycle, Streams, isInheritLifecycle } from '@kbn/streams-schema';
1010
import {
1111
getAncestors,
1212
getAncestorsAndSelf,
@@ -48,7 +48,7 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
4848
private _ownFieldsChanged: boolean = false;
4949
private _routingChanged: boolean = false;
5050
private _processingChanged: boolean = false;
51-
private _lifeCycleChanged: boolean = false;
51+
private _lifecycleChanged: boolean = false;
5252

5353
constructor(definition: Streams.WiredStream.Definition, dependencies: StateDependencies) {
5454
super(definition, dependencies);
@@ -62,7 +62,7 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
6262
return {
6363
...super.toPrintable(),
6464
processingChanged: this._processingChanged,
65-
lifeCycleChanged: this._lifeCycleChanged,
65+
lifecycleChanged: this._lifecycleChanged,
6666
routingChanged: this._routingChanged,
6767
ownFieldsChanged: this._ownFieldsChanged,
6868
};
@@ -122,7 +122,7 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
122122
startingStateStreamDefinition.ingest.processing
123123
);
124124

125-
this._lifeCycleChanged =
125+
this._lifecycleChanged =
126126
!startingStateStreamDefinition ||
127127
!_.isEqual(this._definition.ingest.lifecycle, startingStateStreamDefinition.ingest.lifecycle);
128128

@@ -381,7 +381,7 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
381381

382382
validateSystemFields(this._definition);
383383

384-
if (this.dependencies.isServerless && isIlmLifecycle(this.getLifeCycle())) {
384+
if (this.dependencies.isServerless && isIlmLifecycle(this.getLifecycle())) {
385385
return { isValid: false, errors: [new Error('Using ILM is not supported in Serverless')] };
386386
}
387387

@@ -472,7 +472,7 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
472472
type: 'update_lifecycle',
473473
request: {
474474
name: this._definition.name,
475-
lifecycle: this.getLifeCycle(),
475+
lifecycle: this.getLifecycle(),
476476
},
477477
},
478478
{
@@ -486,11 +486,11 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
486486
return this._ownFieldsChanged;
487487
}
488488

489-
public hasChangedLifeCycle(): boolean {
490-
return this._lifeCycleChanged;
489+
public hasChangedLifecycle(): boolean {
490+
return this._lifecycleChanged;
491491
}
492492

493-
public getLifeCycle(): IngestStreamLifecycle {
493+
public getLifecycle(): IngestStreamLifecycle {
494494
return this._definition.ingest.lifecycle;
495495
}
496496

@@ -500,7 +500,7 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
500500
startingStateStream: WiredStream
501501
): Promise<ElasticsearchAction[]> {
502502
const actions: ElasticsearchAction[] = [];
503-
if (this.hasChangedFields() || this.hasChangedLifeCycle()) {
503+
if (this.hasChangedFields()) {
504504
actions.push({
505505
type: 'upsert_component_template',
506506
request: generateLayer(
@@ -539,22 +539,22 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
539539
});
540540
}
541541
const ancestorsAndSelf = getAncestorsAndSelf(this._definition.name).reverse();
542-
let hasAncestorWithChangedLifeCycle = false;
542+
let hasAncestorWithChangedLifecycle = false;
543543
for (const ancestor of ancestorsAndSelf) {
544544
const ancestorStream = desiredState.get(ancestor) as WiredStream | undefined;
545545
// as soon as at least one ancestor has an updated lifecycle, we need to update the lifecycle of the stream
546546
// once we find the ancestor actually defining the lifecycle
547-
if (ancestorStream && ancestorStream.hasChangedLifeCycle()) {
548-
hasAncestorWithChangedLifeCycle = true;
547+
if (ancestorStream && ancestorStream.hasChangedLifecycle()) {
548+
hasAncestorWithChangedLifecycle = true;
549549
}
550550
// look for the first non-inherit lifecycle, that's the one defining the effective lifecycle
551-
if (ancestorStream && !('inherit' in ancestorStream.getLifeCycle())) {
552-
if (hasAncestorWithChangedLifeCycle) {
551+
if (ancestorStream && !isInheritLifecycle(ancestorStream.getLifecycle())) {
552+
if (hasAncestorWithChangedLifecycle) {
553553
actions.push({
554554
type: 'update_lifecycle',
555555
request: {
556556
name: this._definition.name,
557-
lifecycle: ancestorStream.getLifeCycle(),
557+
lifecycle: ancestorStream.getLifecycle(),
558558
},
559559
});
560560
}

0 commit comments

Comments
 (0)