diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.test.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.test.ts index 552196892503c..6ea527c666a32 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.test.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.test.ts @@ -39,10 +39,6 @@ describe('generateLayer', () => { }, "name": "logs.abc@stream.layer", "template": Object { - "lifecycle": Object { - "data_retention": "30d", - "enabled": true, - }, "mappings": Object { "dynamic": false, "properties": Object { @@ -63,10 +59,7 @@ describe('generateLayer', () => { }, }, }, - "settings": Object { - "index.lifecycle.name": undefined, - "index.lifecycle.prefer_ilm": false, - }, + "settings": Object {}, }, "version": 1, } @@ -83,9 +76,6 @@ describe('generateLayer', () => { }, "name": "logs@stream.layer", "template": Object { - "lifecycle": Object { - "data_retention": "30d", - }, "mappings": Object { "dynamic": false, "properties": Object { diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.ts index fe9002d366369..078cec9d85362 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.ts @@ -10,14 +10,7 @@ import { MappingDateProperty, MappingProperty, } from '@elastic/elasticsearch/lib/api/types'; -import { - Streams, - getAdvancedParameters, - isDslLifecycle, - isIlmLifecycle, - isRoot, - namespacePrefixes, -} from '@kbn/streams-schema'; +import { Streams, getAdvancedParameters, isRoot, namespacePrefixes } from '@kbn/streams-schema'; import { ASSET_VERSION } from '../../../../common/constants'; import { logsSettings } from './logs_layer'; import { getComponentTemplateName } from './name'; @@ -65,7 +58,6 @@ export function generateLayer( return { name: getComponentTemplateName(name), template: { - lifecycle: getTemplateLifecycle(definition, isServerless), settings: getTemplateSettings(definition, isServerless), mappings: { dynamic: false, @@ -85,53 +77,7 @@ export function generateLayer( }; } -function getTemplateLifecycle(definition: Streams.WiredStream.Definition, isServerless: boolean) { - const lifecycle = definition.ingest.lifecycle; - if (isServerless) { - // dlm cannot be disabled in serverless - return { - data_retention: isDslLifecycle(lifecycle) ? lifecycle.dsl.data_retention : undefined, - }; - } - - if (isIlmLifecycle(lifecycle)) { - return { enabled: false }; - } - - if (isDslLifecycle(lifecycle)) { - return { - enabled: true, - data_retention: lifecycle.dsl.data_retention, - }; - } - - return undefined; -} - function getTemplateSettings(definition: Streams.WiredStream.Definition, isServerless: boolean) { const baseSettings = isRoot(definition.name) ? logsSettings : {}; - const lifecycle = definition.ingest.lifecycle; - - if (isServerless) { - return baseSettings; - } - - if (isIlmLifecycle(lifecycle)) { - return { - ...baseSettings, - 'index.lifecycle.prefer_ilm': true, - 'index.lifecycle.name': lifecycle.ilm.policy, - }; - } - - if (isDslLifecycle(lifecycle)) { - return { - ...baseSettings, - 'index.lifecycle.prefer_ilm': false, - 'index.lifecycle.name': undefined, - }; - } - - // don't specify any lifecycle property when lifecyle is disabled or inherited return baseSettings; } diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts index 7212adf9781ef..2732470de9689 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts @@ -138,32 +138,28 @@ export async function updateDataStreamsLifecycle({ { logger } ); - // if we transition from ilm to dlm or vice versa, the rolled over backing + // if we transition from ilm to dsl or vice versa, the rolled over backing // indices need to be updated or they'll retain the lifecycle configuration // set at the time of creation. - // this is not needed for serverless since only dlm is allowed but in stateful - // we update every indices while not always necessary. this should be optimized + // this is not needed for serverless since only dsl is allowed. if (isServerless) { return; } - const dataStreams = await esClient.indices.getDataStream({ name: names }); const isIlm = isIlmLifecycle(lifecycle); - - for (const dataStream of dataStreams.data_streams) { - logger.debug(`updating settings for data stream ${dataStream.name} backing indices`); - await retryTransientEsErrors( - () => - esClient.indices.putSettings({ - index: dataStream.indices.map((index) => index.index_name), - settings: { - 'lifecycle.prefer_ilm': isIlm, - 'lifecycle.name': isIlm ? lifecycle.ilm.policy : null, - }, - }), - { logger } - ); - } + await retryTransientEsErrors( + () => + // TODO: use client method once available + esClient.transport.request({ + method: 'PUT', + path: `/_data_stream/${names.join(',')}/_settings`, + body: { + 'index.lifecycle.name': isIlm ? lifecycle.ilm.policy : null, + 'index.lifecycle.prefer_ilm': isIlm, + }, + }), + { logger } + ); } catch (err: any) { logger.error(`Error updating data stream lifecycle: ${err.message}`); throw err; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/unwired_stream.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/unwired_stream.ts index 60f44cf6ed598..c49d0f5c6ff93 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/unwired_stream.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/unwired_stream.ts @@ -10,7 +10,7 @@ import type { IngestProcessorContainer, } from '@elastic/elasticsearch/lib/api/types'; import type { IngestStreamLifecycle } from '@kbn/streams-schema'; -import { isDslLifecycle, isInheritLifecycle, Streams } from '@kbn/streams-schema'; +import { isIlmLifecycle, isInheritLifecycle, Streams } from '@kbn/streams-schema'; import _, { cloneDeep } from 'lodash'; import { isNotFoundError } from '@kbn/es-errors'; import { StatusError } from '../../errors/status_error'; @@ -28,7 +28,7 @@ import { StreamActiveRecord, PrintableStream } from '../stream_active_record/str export class UnwiredStream extends StreamActiveRecord { private _processingChanged: boolean = false; - private _lifeCycleChanged: boolean = false; + private _lifecycleChanged: boolean = false; constructor(definition: Streams.UnwiredStream.Definition, dependencies: StateDependencies) { super(definition, dependencies); @@ -42,7 +42,7 @@ export class UnwiredStream extends StreamActiveRecord { + if (this.dependencies.isServerless && isIlmLifecycle(this.getLifecycle())) { + return { isValid: false, errors: [new Error('Using ILM is not supported in Serverless')] }; + } + + if ( + startingState.get(this._definition.name)?.definition && + this._lifecycleChanged && + isInheritLifecycle(this.getLifecycle()) + ) { + // temporary until https://github.com/elastic/kibana/issues/222440 is resolved + return { + isValid: false, + errors: [new Error('Cannot revert to default lifecycle once updated')], + }; + } + // Check for conflicts - if (this._lifeCycleChanged || this._processingChanged) { + if (this._lifecycleChanged || this._processingChanged) { try { const dataStreamResult = await this.dependencies.scopedClusterClient.asCurrentUser.indices.getDataStream({ @@ -134,19 +150,6 @@ export class UnwiredStream extends StreamActiveRecord 0) { actions.push(...(await this.createUpsertPipelineActions())); } - if (!isInheritLifecycle(this.getLifeCycle())) { + if (!isInheritLifecycle(this.getLifecycle())) { actions.push({ type: 'update_lifecycle', request: { name: this._definition.name, - lifecycle: this.getLifeCycle(), + lifecycle: this.getLifecycle(), }, }); } @@ -182,11 +185,11 @@ export class UnwiredStream extends StreamActiveRecord { const actions: ElasticsearchAction[] = []; - if (this.hasChangedFields() || this.hasChangedLifeCycle()) { + if (this.hasChangedFields()) { actions.push({ type: 'upsert_component_template', request: generateLayer( @@ -543,22 +543,22 @@ export class WiredStream extends StreamActiveRecord { const actions: Array<{ name: string; action: LifecycleEditAction }> = []; const isWired = Streams.WiredStream.GetResponse.is(definition); - const isUnwired = Streams.UnwiredStream.GetResponse.is(definition); - const isIlm = isIlmLifecycle(definition.effective_lifecycle); - if (isWired || (isUnwired && !isIlm)) { - actions.push({ - name: i18n.translate('xpack.streams.streamDetailLifecycle.setRetentionDays', { - defaultMessage: 'Set specific retention days', - }), - action: 'dsl', - }); - } + actions.push({ + name: i18n.translate('xpack.streams.streamDetailLifecycle.setRetentionDays', { + defaultMessage: 'Set specific retention days', + }), + action: 'dsl', + }); - if (isWired && !isServerless) { + if (!isServerless) { actions.push({ name: i18n.translate('xpack.streams.streamDetailLifecycle.setLifecyclePolicy', { defaultMessage: 'Use a lifecycle policy', @@ -55,7 +51,7 @@ function useLifecycleState({ }); } - if (!isRoot(definition.stream.name) || (isUnwired && !isIlm)) { + if (isWired && !isRoot(definition.stream.name)) { actions.push({ name: i18n.translate('xpack.streams.streamDetailLifecycle.resetToDefault', { defaultMessage: 'Reset to default', diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/lifecycle.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/lifecycle.ts index b530ec5617e9c..38d9ba888bc95 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/lifecycle.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/lifecycle.ts @@ -153,7 +153,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { ); }); - it('inherits dlm', async () => { + it('inherits dsl', async () => { // create two branches, one that inherits from root and // another one that overrides the root lifecycle await putStream(apiClient, 'logs.inherits.lifecycle', wiredPutBody); @@ -320,7 +320,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }); }); - it('updates when transitioning from ilm to dlm', async () => { + it('updates when transitioning from ilm to dsl', async () => { const name = 'logs.ilm-with-backing-indices'; await putStream(apiClient, name, { dashboards: [], @@ -356,8 +356,8 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { await expectLifecycle([name], { dsl: { data_retention: '7d' }, from: name }); }); - it('updates when transitioning from dlm to ilm', async () => { - const name = 'logs.dlm-with-backing-indices'; + it('updates when transitioning from dsl to ilm', async () => { + const name = 'logs.dsl-with-backing-indices'; await putStream(apiClient, name, { dashboards: [], queries: [], @@ -412,6 +412,9 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { queries: [], }; + let clean: () => Promise; + afterEach(() => clean?.()); + const createDataStream = async (name: string, lifecycle: IngestStreamLifecycle) => { await esClient.indices.putIndexTemplate({ name, @@ -437,29 +440,85 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }); await esClient.indices.createDataStream({ name }); - return async () => { + clean = async () => { await esClient.indices.deleteDataStream({ name }); await esClient.indices.deleteIndexTemplate({ name }); }; }; - it('noop when inherit lifecycle', async () => { + it('cannot update to inherit lifecycle', async () => { const indexName = 'unwired-stream-inherit'; - const clean = await createDataStream(indexName, { dsl: { data_retention: '77d' } }); + await createDataStream(indexName, { dsl: { data_retention: '77d' } }); - try { - await putStream(apiClient, indexName, unwiredPutBody); - await expectLifecycle([indexName], { dsl: { data_retention: '77d' } }); - } finally { - await clean(); - } + // initially set to inherit which is a noop + await putStream(apiClient, indexName, unwiredPutBody); + await expectLifecycle([indexName], { dsl: { data_retention: '77d' } }); + + // update to dsl + await putStream(apiClient, indexName, { + dashboards: [], + queries: [], + stream: { + description: '', + ingest: { + ...unwiredPutBody.stream.ingest, + lifecycle: { dsl: { data_retention: '2d' } }, + }, + }, + }); + await expectLifecycle([indexName], { dsl: { data_retention: '2d' } }); + + // fail to set inherit + await putStream(apiClient, indexName, unwiredPutBody, 400); + await expectLifecycle([indexName], { dsl: { data_retention: '2d' } }); }); it('overrides dsl retention', async () => { const indexName = 'unwired-stream-override-dsl'; - const clean = await createDataStream(indexName, { dsl: { data_retention: '77d' } }); + await createDataStream(indexName, { dsl: { data_retention: '77d' } }); + + await putStream(apiClient, indexName, { + dashboards: [], + queries: [], + stream: { + description: '', + ingest: { + ...unwiredPutBody.stream.ingest, + lifecycle: { dsl: { data_retention: '11d' } }, + }, + }, + }); + + await expectLifecycle([indexName], { dsl: { data_retention: '11d' } }); + }); + + if (isServerless) { + it('does not support ilm', async () => { + const indexName = 'unwired-stream-no-ilm'; + await createDataStream(indexName, { dsl: { data_retention: '2d' } }); + + await putStream( + apiClient, + indexName, + { + dashboards: [], + queries: [], + stream: { + description: '', + ingest: { + ...wiredPutBody.stream.ingest, + lifecycle: { ilm: { policy: 'my-policy' } }, + }, + }, + }, + 400 + ); + }); + } else { + it('updates from ilm to dsl', async () => { + const indexName = 'unwired-stream-ilm-to-dsl'; + await createDataStream(indexName, { ilm: { policy: 'my-policy' } }); - try { await putStream(apiClient, indexName, { dashboards: [], queries: [], @@ -467,42 +526,31 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { description: '', ingest: { ...unwiredPutBody.stream.ingest, - lifecycle: { dsl: { data_retention: '11d' } }, + lifecycle: { dsl: { data_retention: '1d' } }, }, }, }); - await expectLifecycle([indexName], { dsl: { data_retention: '11d' } }); - } finally { - await clean(); - } - }); + await expectLifecycle([indexName], { dsl: { data_retention: '1d' } }); + }); - if (!isServerless) { - it('does not allow dsl lifecycle if the data stream is managed by ilm', async () => { - const indexName = 'unwired-stream-ilm-to-dsl'; - const clean = await createDataStream(indexName, { ilm: { policy: 'my-policy' } }); - - try { - await putStream( - apiClient, - indexName, - { - dashboards: [], - queries: [], - stream: { - description: '', - ingest: { - ...unwiredPutBody.stream.ingest, - lifecycle: { dsl: { data_retention: '1d' } }, - }, - }, + it('updates from dsl to ilm', async () => { + const indexName = 'unwired-stream-dsl-to-ilm'; + await createDataStream(indexName, { dsl: { data_retention: '10d' } }); + + await putStream(apiClient, indexName, { + dashboards: [], + queries: [], + stream: { + description: '', + ingest: { + ...unwiredPutBody.stream.ingest, + lifecycle: { ilm: { policy: 'my-policy' } }, }, - 400 - ); - } finally { - await clean(); - } + }, + }); + + await expectLifecycle([indexName], { ilm: { policy: 'my-policy' } }); }); } });