Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding lifecycle configuration to the template is now redundant since we update the data stream directly in both ilm and dsl cases

Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,7 @@ import {
MappingDateProperty,
MappingProperty,
} from '@elastic/elasticsearch/lib/api/types';
import {
Streams,
getAdvancedParameters,
isDslLifecycle,
isIlmLifecycle,
isRoot,
namespacePrefixes,
} from '@kbn/streams-schema';
import { Streams, getAdvancedParameters, isRoot, namespacePrefixes } from '@kbn/streams-schema';
import { ASSET_VERSION } from '../../../../common/constants';
import { logsSettings } from './logs_layer';
import { getComponentTemplateName } from './name';
Expand Down Expand Up @@ -65,7 +58,6 @@ export function generateLayer(
return {
name: getComponentTemplateName(name),
template: {
lifecycle: getTemplateLifecycle(definition, isServerless),
settings: getTemplateSettings(definition, isServerless),
mappings: {
dynamic: false,
Expand All @@ -85,53 +77,7 @@ export function generateLayer(
};
}

function getTemplateLifecycle(definition: Streams.WiredStream.Definition, isServerless: boolean) {
const lifecycle = definition.ingest.lifecycle;
if (isServerless) {
// dlm cannot be disabled in serverless
return {
data_retention: isDslLifecycle(lifecycle) ? lifecycle.dsl.data_retention : undefined,
};
}

if (isIlmLifecycle(lifecycle)) {
return { enabled: false };
}

if (isDslLifecycle(lifecycle)) {
return {
enabled: true,
data_retention: lifecycle.dsl.data_retention,
};
}

return undefined;
}

function getTemplateSettings(definition: Streams.WiredStream.Definition, isServerless: boolean) {
const baseSettings = isRoot(definition.name) ? logsSettings : {};
const lifecycle = definition.ingest.lifecycle;

if (isServerless) {
return baseSettings;
}

if (isIlmLifecycle(lifecycle)) {
return {
...baseSettings,
'index.lifecycle.prefer_ilm': true,
'index.lifecycle.name': lifecycle.ilm.policy,
};
}

if (isDslLifecycle(lifecycle)) {
return {
...baseSettings,
'index.lifecycle.prefer_ilm': false,
'index.lifecycle.name': undefined,
};
}

// don't specify any lifecycle property when lifecyle is disabled or inherited
return baseSettings;
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,32 +138,28 @@ export async function updateDataStreamsLifecycle({
{ logger }
);

// if we transition from ilm to dlm or vice versa, the rolled over backing
// if we transition from ilm to dsl or vice versa, the rolled over backing
// indices need to be updated or they'll retain the lifecycle configuration
// set at the time of creation.
// this is not needed for serverless since only dlm is allowed but in stateful
// we update every indices while not always necessary. this should be optimized
// this is not needed for serverless since only dsl is allowed.
if (isServerless) {
return;
}

const dataStreams = await esClient.indices.getDataStream({ name: names });
const isIlm = isIlmLifecycle(lifecycle);

for (const dataStream of dataStreams.data_streams) {
logger.debug(`updating settings for data stream ${dataStream.name} backing indices`);
await retryTransientEsErrors(
() =>
esClient.indices.putSettings({
index: dataStream.indices.map((index) => index.index_name),
settings: {
'lifecycle.prefer_ilm': isIlm,
'lifecycle.name': isIlm ? lifecycle.ilm.policy : null,
},
}),
{ logger }
);
}
await retryTransientEsErrors(
() =>
// TODO: use client method once available
esClient.transport.request({
method: 'PUT',
path: `/_data_stream/${names.join(',')}/_settings`,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this updates backing indices settings as well

body: {
'index.lifecycle.name': isIlm ? lifecycle.ilm.policy : null,
'index.lifecycle.prefer_ilm': isIlm,
},
}),
{ logger }
);
} catch (err: any) {
logger.error(`Error updating data stream lifecycle: ${err.message}`);
throw err;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type {
IngestProcessorContainer,
} from '@elastic/elasticsearch/lib/api/types';
import type { IngestStreamLifecycle } from '@kbn/streams-schema';
import { isDslLifecycle, isInheritLifecycle, Streams } from '@kbn/streams-schema';
import { isInheritLifecycle, Streams } from '@kbn/streams-schema';
import _, { cloneDeep } from 'lodash';
import { isNotFoundError } from '@kbn/es-errors';
import { StatusError } from '../../errors/status_error';
Expand All @@ -28,7 +28,7 @@ import { StreamActiveRecord, PrintableStream } from '../stream_active_record/str

export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Definition> {
private _processingChanged: boolean = false;
private _lifeCycleChanged: boolean = false;
private _lifecycleChanged: boolean = false;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to align naming


constructor(definition: Streams.UnwiredStream.Definition, dependencies: StateDependencies) {
super(definition, dependencies);
Expand All @@ -42,7 +42,7 @@ export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Defi
return {
...super.toPrintable(),
processingChanged: this._processingChanged,
lifeCycleChanged: this._lifeCycleChanged,
lifecycleChanged: this._lifecycleChanged,
};
}

Expand Down Expand Up @@ -77,7 +77,7 @@ export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Defi
startingStateStreamDefinition.ingest.processing
);

this._lifeCycleChanged =
this._lifecycleChanged =
!startingStateStreamDefinition ||
!_.isEqual(this._definition.ingest.lifecycle, startingStateStreamDefinition.ingest.lifecycle);

Expand All @@ -101,7 +101,7 @@ export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Defi
startingState: State
): Promise<ValidationResult> {
// Check for conflicts
if (this._lifeCycleChanged || this._processingChanged) {
if (this._lifecycleChanged || this._processingChanged) {
try {
const dataStreamResult =
await this.dependencies.scopedClusterClient.asCurrentUser.indices.getDataStream({
Expand Down Expand Up @@ -134,19 +134,6 @@ export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Defi
}
}

if (this._lifeCycleChanged && isDslLifecycle(this.getLifeCycle())) {
const dataStream = await this.dependencies.streamsClient.getDataStream(this._definition.name);
if (dataStream.ilm_policy !== undefined) {
return {
isValid: false,
errors: [
new Error(
'Cannot apply DSL lifecycle to a data stream that is already managed by an ILM policy'
),
],
};
}
}
return { isValid: true, errors: [] };
}

Expand All @@ -166,12 +153,12 @@ export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Defi
if (this._definition.ingest.processing.length > 0) {
actions.push(...(await this.createUpsertPipelineActions()));
}
if (!isInheritLifecycle(this.getLifeCycle())) {
if (!isInheritLifecycle(this.getLifecycle())) {
actions.push({
type: 'update_lifecycle',
request: {
name: this._definition.name,
lifecycle: this.getLifeCycle(),
lifecycle: this.getLifecycle(),
},
});
}
Expand All @@ -182,11 +169,11 @@ export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Defi
return actions;
}

public hasChangedLifeCycle(): boolean {
return this._lifeCycleChanged;
public hasChangedLifecycle(): boolean {
return this._lifecycleChanged;
}

public getLifeCycle(): IngestStreamLifecycle {
public getLifecycle(): IngestStreamLifecycle {
return this._definition.ingest.lifecycle;
}

Expand Down Expand Up @@ -223,12 +210,12 @@ export class UnwiredStream extends StreamActiveRecord<Streams.UnwiredStream.Defi
});
}

if (this._lifeCycleChanged) {
if (this._lifecycleChanged) {
actions.push({
type: 'update_lifecycle',
request: {
name: this._definition.name,
lifecycle: this.getLifeCycle(),
lifecycle: this.getLifecycle(),
},
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import { isNotFoundError } from '@kbn/es-errors';
import { IngestStreamLifecycle, Streams } from '@kbn/streams-schema';
import { IngestStreamLifecycle, Streams, isInheritLifecycle } from '@kbn/streams-schema';
import {
getAncestors,
getAncestorsAndSelf,
Expand Down Expand Up @@ -48,7 +48,7 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
private _ownFieldsChanged: boolean = false;
private _routingChanged: boolean = false;
private _processingChanged: boolean = false;
private _lifeCycleChanged: boolean = false;
private _lifecycleChanged: boolean = false;

constructor(definition: Streams.WiredStream.Definition, dependencies: StateDependencies) {
super(definition, dependencies);
Expand All @@ -62,7 +62,7 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
return {
...super.toPrintable(),
processingChanged: this._processingChanged,
lifeCycleChanged: this._lifeCycleChanged,
lifecycleChanged: this._lifecycleChanged,
routingChanged: this._routingChanged,
ownFieldsChanged: this._ownFieldsChanged,
};
Expand Down Expand Up @@ -122,7 +122,7 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
startingStateStreamDefinition.ingest.processing
);

this._lifeCycleChanged =
this._lifecycleChanged =
!startingStateStreamDefinition ||
!_.isEqual(this._definition.ingest.lifecycle, startingStateStreamDefinition.ingest.lifecycle);

Expand Down Expand Up @@ -381,7 +381,7 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti

validateSystemFields(this._definition);

if (this.dependencies.isServerless && isIlmLifecycle(this.getLifeCycle())) {
if (this.dependencies.isServerless && isIlmLifecycle(this.getLifecycle())) {
return { isValid: false, errors: [new Error('Using ILM is not supported in Serverless')] };
}

Expand Down Expand Up @@ -472,7 +472,7 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
type: 'update_lifecycle',
request: {
name: this._definition.name,
lifecycle: this.getLifeCycle(),
lifecycle: this.getLifecycle(),
},
},
{
Expand All @@ -486,11 +486,11 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
return this._ownFieldsChanged;
}

public hasChangedLifeCycle(): boolean {
return this._lifeCycleChanged;
public hasChangedLifecycle(): boolean {
return this._lifecycleChanged;
}

public getLifeCycle(): IngestStreamLifecycle {
public getLifecycle(): IngestStreamLifecycle {
return this._definition.ingest.lifecycle;
}

Expand All @@ -500,7 +500,7 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
startingStateStream: WiredStream
): Promise<ElasticsearchAction[]> {
const actions: ElasticsearchAction[] = [];
if (this.hasChangedFields() || this.hasChangedLifeCycle()) {
if (this.hasChangedFields()) {
actions.push({
type: 'upsert_component_template',
request: generateLayer(
Expand Down Expand Up @@ -539,22 +539,22 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
});
}
const ancestorsAndSelf = getAncestorsAndSelf(this._definition.name).reverse();
let hasAncestorWithChangedLifeCycle = false;
let hasAncestorWithChangedLifecycle = false;
for (const ancestor of ancestorsAndSelf) {
const ancestorStream = desiredState.get(ancestor) as WiredStream | undefined;
// as soon as at least one ancestor has an updated lifecycle, we need to update the lifecycle of the stream
// once we find the ancestor actually defining the lifecycle
if (ancestorStream && ancestorStream.hasChangedLifeCycle()) {
hasAncestorWithChangedLifeCycle = true;
if (ancestorStream && ancestorStream.hasChangedLifecycle()) {
hasAncestorWithChangedLifecycle = true;
}
// look for the first non-inherit lifecycle, that's the one defining the effective lifecycle
if (ancestorStream && !('inherit' in ancestorStream.getLifeCycle())) {
if (hasAncestorWithChangedLifeCycle) {
if (ancestorStream && !isInheritLifecycle(ancestorStream.getLifecycle())) {
if (hasAncestorWithChangedLifecycle) {
actions.push({
type: 'update_lifecycle',
request: {
name: this._definition.name,
lifecycle: ancestorStream.getLifeCycle(),
lifecycle: ancestorStream.getLifecycle(),
},
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,15 @@ function useLifecycleState({

const lifecycleActions = useMemo(() => {
const actions: Array<{ name: string; action: LifecycleEditAction }> = [];
const isWired = Streams.WiredStream.GetResponse.is(definition);
const isUnwired = Streams.UnwiredStream.GetResponse.is(definition);
const isIlm = isIlmLifecycle(definition.effective_lifecycle);

if (isWired || (isUnwired && !isIlm)) {
actions.push({
name: i18n.translate('xpack.streams.streamDetailLifecycle.setRetentionDays', {
defaultMessage: 'Set specific retention days',
}),
action: 'dsl',
});
}
actions.push({
name: i18n.translate('xpack.streams.streamDetailLifecycle.setRetentionDays', {
defaultMessage: 'Set specific retention days',
}),
action: 'dsl',
});

if (isWired && !isServerless) {
if (!isServerless) {
actions.push({
name: i18n.translate('xpack.streams.streamDetailLifecycle.setLifecyclePolicy', {
defaultMessage: 'Use a lifecycle policy',
Expand All @@ -54,7 +49,7 @@ function useLifecycleState({
});
}

if (!isRoot(definition.stream.name) || (isUnwired && !isIlm)) {
if (!isRoot(definition.stream.name)) {
actions.push({
name: i18n.translate('xpack.streams.streamDetailLifecycle.resetToDefault', {
defaultMessage: 'Reset to default',
Expand Down
Loading