Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ describe('generateLayer', () => {
},
"name": "[email protected]",
"template": Object {
"lifecycle": Object {
"data_retention": "30d",
"enabled": true,
},
"mappings": Object {
"dynamic": false,
"properties": Object {
Expand All @@ -63,10 +59,7 @@ describe('generateLayer', () => {
},
},
},
"settings": Object {
"index.lifecycle.name": undefined,
"index.lifecycle.prefer_ilm": false,
},
"settings": Object {},
},
"version": 1,
}
Expand All @@ -83,9 +76,6 @@ describe('generateLayer', () => {
},
"name": "[email protected]",
"template": Object {
"lifecycle": Object {
"data_retention": "30d",
},
"mappings": Object {
"dynamic": false,
"properties": Object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,7 @@ import {
MappingDateProperty,
MappingProperty,
} from '@elastic/elasticsearch/lib/api/types';
import {
Streams,
getAdvancedParameters,
isDslLifecycle,
isIlmLifecycle,
isRoot,
namespacePrefixes,
} from '@kbn/streams-schema';
import { Streams, getAdvancedParameters, isRoot, namespacePrefixes } from '@kbn/streams-schema';
import { ASSET_VERSION } from '../../../../common/constants';
import { logsSettings } from './logs_layer';
import { getComponentTemplateName } from './name';
Expand Down Expand Up @@ -65,7 +58,6 @@ export function generateLayer(
return {
name: getComponentTemplateName(name),
template: {
lifecycle: getTemplateLifecycle(definition, isServerless),
settings: getTemplateSettings(definition, isServerless),
mappings: {
dynamic: false,
Expand All @@ -85,53 +77,7 @@ export function generateLayer(
};
}

function getTemplateLifecycle(definition: Streams.WiredStream.Definition, isServerless: boolean) {
const lifecycle = definition.ingest.lifecycle;
if (isServerless) {
// dlm cannot be disabled in serverless
return {
data_retention: isDslLifecycle(lifecycle) ? lifecycle.dsl.data_retention : undefined,
};
}

if (isIlmLifecycle(lifecycle)) {
return { enabled: false };
}

if (isDslLifecycle(lifecycle)) {
return {
enabled: true,
data_retention: lifecycle.dsl.data_retention,
};
}

return undefined;
}

function getTemplateSettings(definition: Streams.WiredStream.Definition, isServerless: boolean) {
const baseSettings = isRoot(definition.name) ? logsSettings : {};
const lifecycle = definition.ingest.lifecycle;

if (isServerless) {
return baseSettings;
}

if (isIlmLifecycle(lifecycle)) {
return {
...baseSettings,
'index.lifecycle.prefer_ilm': true,
'index.lifecycle.name': lifecycle.ilm.policy,
};
}

if (isDslLifecycle(lifecycle)) {
return {
...baseSettings,
'index.lifecycle.prefer_ilm': false,
'index.lifecycle.name': undefined,
};
}

// don't specify any lifecycle property when lifecyle is disabled or inherited
return baseSettings;
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,32 +138,28 @@ export async function updateDataStreamsLifecycle({
{ logger }
);

// if we transition from ilm to dlm or vice versa, the rolled over backing
// if we transition from ilm to dsl or vice versa, the rolled over backing
// indices need to be updated or they'll retain the lifecycle configuration
// set at the time of creation.
// this is not needed for serverless since only dlm is allowed but in stateful
// we update every indices while not always necessary. this should be optimized
// this is not needed for serverless since only dsl is allowed.
if (isServerless) {
return;
}

const dataStreams = await esClient.indices.getDataStream({ name: names });
const isIlm = isIlmLifecycle(lifecycle);

for (const dataStream of dataStreams.data_streams) {
logger.debug(`updating settings for data stream ${dataStream.name} backing indices`);
await retryTransientEsErrors(
() =>
esClient.indices.putSettings({
index: dataStream.indices.map((index) => index.index_name),
settings: {
'lifecycle.prefer_ilm': isIlm,
'lifecycle.name': isIlm ? lifecycle.ilm.policy : null,
},
}),
{ logger }
);
}
await retryTransientEsErrors(
() =>
// TODO: use client method once available
esClient.transport.request({
method: 'PUT',
path: `/_data_stream/${names.join(',')}/_settings`,
body: {
'index.lifecycle.name': isIlm ? lifecycle.ilm.policy : null,
'index.lifecycle.prefer_ilm': isIlm,
},
}),
{ logger }
);
} catch (err: any) {
logger.error(`Error updating data stream lifecycle: ${err.message}`);
throw err;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type {
IngestProcessorContainer,
} from '@elastic/elasticsearch/lib/api/types';
import type { IngestStreamLifecycle } from '@kbn/streams-schema';
import { isDslLifecycle, isInheritLifecycle, Streams } from '@kbn/streams-schema';
import { isIlmLifecycle, isInheritLifecycle, Streams } from '@kbn/streams-schema';
import _, { cloneDeep } from 'lodash';
import { isNotFoundError } from '@kbn/es-errors';
import { StatusError } from '../../errors/status_error';
Expand All @@ -28,7 +28,7 @@ import { StreamActiveRecord, PrintableStream } from '../stream_active_record/str

export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Definition> {
private _processingChanged: boolean = false;
private _lifeCycleChanged: boolean = false;
private _lifecycleChanged: boolean = false;

constructor(definition: Streams.UnwiredStream.Definition, dependencies: StateDependencies) {
super(definition, dependencies);
Expand All @@ -42,7 +42,7 @@ export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Defi
return {
...super.toPrintable(),
processingChanged: this._processingChanged,
lifeCycleChanged: this._lifeCycleChanged,
lifecycleChanged: this._lifecycleChanged,
};
}

Expand Down Expand Up @@ -77,7 +77,7 @@ export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Defi
startingStateStreamDefinition.ingest.processing
);

this._lifeCycleChanged =
this._lifecycleChanged =
!startingStateStreamDefinition ||
!_.isEqual(this._definition.ingest.lifecycle, startingStateStreamDefinition.ingest.lifecycle);

Expand All @@ -100,8 +100,24 @@ export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Defi
desiredState: State,
startingState: State
): Promise<ValidationResult> {
if (this.dependencies.isServerless && isIlmLifecycle(this.getLifecycle())) {
return { isValid: false, errors: [new Error('Using ILM is not supported in Serverless')] };
}

if (
startingState.get(this._definition.name)?.definition &&
this._lifecycleChanged &&
isInheritLifecycle(this.getLifecycle())
) {
// temporary until https://github.com/elastic/kibana/issues/222440 is resolved
return {
isValid: false,
errors: [new Error('Cannot revert to default lifecycle once updated')],
};
}

// Check for conflicts
if (this._lifeCycleChanged || this._processingChanged) {
if (this._lifecycleChanged || this._processingChanged) {
try {
const dataStreamResult =
await this.dependencies.scopedClusterClient.asCurrentUser.indices.getDataStream({
Expand Down Expand Up @@ -134,19 +150,6 @@ export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Defi
}
}

if (this._lifeCycleChanged && isDslLifecycle(this.getLifeCycle())) {
const dataStream = await this.dependencies.streamsClient.getDataStream(this._definition.name);
if (dataStream.ilm_policy !== undefined) {
return {
isValid: false,
errors: [
new Error(
'Cannot apply DSL lifecycle to a data stream that is already managed by an ILM policy'
),
],
};
}
}
return { isValid: true, errors: [] };
}

Expand All @@ -166,12 +169,12 @@ export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Defi
if (this._definition.ingest.processing.length > 0) {
actions.push(...(await this.createUpsertPipelineActions()));
}
if (!isInheritLifecycle(this.getLifeCycle())) {
if (!isInheritLifecycle(this.getLifecycle())) {
actions.push({
type: 'update_lifecycle',
request: {
name: this._definition.name,
lifecycle: this.getLifeCycle(),
lifecycle: this.getLifecycle(),
},
});
}
Expand All @@ -182,11 +185,11 @@ export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Defi
return actions;
}

public hasChangedLifeCycle(): boolean {
return this._lifeCycleChanged;
public hasChangedLifecycle(): boolean {
return this._lifecycleChanged;
}

public getLifeCycle(): IngestStreamLifecycle {
public getLifecycle(): IngestStreamLifecycle {
return this._definition.ingest.lifecycle;
}

Expand Down Expand Up @@ -223,12 +226,12 @@ export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Defi
});
}

if (this._lifeCycleChanged) {
if (this._lifecycleChanged && !isInheritLifecycle(this.getLifecycle())) {
actions.push({
type: 'update_lifecycle',
request: {
name: this._definition.name,
lifecycle: this.getLifeCycle(),
lifecycle: this.getLifecycle(),
},
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import { isNotFoundError } from '@kbn/es-errors';
import { IngestStreamLifecycle, Streams } from '@kbn/streams-schema';
import { IngestStreamLifecycle, Streams, isInheritLifecycle } from '@kbn/streams-schema';
import {
getAncestors,
getAncestorsAndSelf,
Expand Down Expand Up @@ -48,7 +48,7 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
private _ownFieldsChanged: boolean = false;
private _routingChanged: boolean = false;
private _processingChanged: boolean = false;
private _lifeCycleChanged: boolean = false;
private _lifecycleChanged: boolean = false;

constructor(definition: Streams.WiredStream.Definition, dependencies: StateDependencies) {
super(definition, dependencies);
Expand All @@ -62,7 +62,7 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
return {
...super.toPrintable(),
processingChanged: this._processingChanged,
lifeCycleChanged: this._lifeCycleChanged,
lifecycleChanged: this._lifecycleChanged,
routingChanged: this._routingChanged,
ownFieldsChanged: this._ownFieldsChanged,
};
Expand Down Expand Up @@ -122,7 +122,7 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
startingStateStreamDefinition.ingest.processing
);

this._lifeCycleChanged =
this._lifecycleChanged =
!startingStateStreamDefinition ||
!_.isEqual(this._definition.ingest.lifecycle, startingStateStreamDefinition.ingest.lifecycle);

Expand Down Expand Up @@ -381,7 +381,7 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti

validateSystemFields(this._definition);

if (this.dependencies.isServerless && isIlmLifecycle(this.getLifeCycle())) {
if (this.dependencies.isServerless && isIlmLifecycle(this.getLifecycle())) {
return { isValid: false, errors: [new Error('Using ILM is not supported in Serverless')] };
}

Expand Down Expand Up @@ -474,7 +474,7 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
type: 'update_lifecycle',
request: {
name: this._definition.name,
lifecycle: this.getLifeCycle(),
lifecycle: this.getLifecycle(),
},
},
{
Expand All @@ -488,11 +488,11 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
return this._ownFieldsChanged;
}

public hasChangedLifeCycle(): boolean {
return this._lifeCycleChanged;
public hasChangedLifecycle(): boolean {
return this._lifecycleChanged;
}

public getLifeCycle(): IngestStreamLifecycle {
public getLifecycle(): IngestStreamLifecycle {
return this._definition.ingest.lifecycle;
}

Expand All @@ -502,7 +502,7 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
startingStateStream: WiredStream
): Promise<ElasticsearchAction[]> {
const actions: ElasticsearchAction[] = [];
if (this.hasChangedFields() || this.hasChangedLifeCycle()) {
if (this.hasChangedFields()) {
actions.push({
type: 'upsert_component_template',
request: generateLayer(
Expand Down Expand Up @@ -543,22 +543,22 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
});
}
const ancestorsAndSelf = getAncestorsAndSelf(this._definition.name).reverse();
let hasAncestorWithChangedLifeCycle = false;
let hasAncestorWithChangedLifecycle = false;
for (const ancestor of ancestorsAndSelf) {
const ancestorStream = desiredState.get(ancestor) as WiredStream | undefined;
// as soon as at least one ancestor has an updated lifecycle, we need to update the lifecycle of the stream
// once we find the ancestor actually defining the lifecycle
if (ancestorStream && ancestorStream.hasChangedLifeCycle()) {
hasAncestorWithChangedLifeCycle = true;
if (ancestorStream && ancestorStream.hasChangedLifecycle()) {
hasAncestorWithChangedLifecycle = true;
}
// look for the first non-inherit lifecycle, that's the one defining the effective lifecycle
if (ancestorStream && !('inherit' in ancestorStream.getLifeCycle())) {
if (hasAncestorWithChangedLifeCycle) {
if (ancestorStream && !isInheritLifecycle(ancestorStream.getLifecycle())) {
if (hasAncestorWithChangedLifecycle) {
actions.push({
type: 'update_lifecycle',
request: {
name: this._definition.name,
lifecycle: ancestorStream.getLifeCycle(),
lifecycle: ancestorStream.getLifecycle(),
},
});
}
Expand Down
Loading