From 78b97306e11c0c3f86562f1b1a78f99e62d7d852 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Thu, 17 Jul 2025 12:24:47 +0200 Subject: [PATCH 01/23] make field mappings for classic streams work --- .../src/models/ingest/unwired.test.ts | 17 +++ .../src/models/ingest/unwired.ts | 9 +- .../data_streams/manage_data_streams.ts | 103 +++++++++++++++++- .../lib/streams/helpers/validate_fields.ts | 12 ++ .../execution_plan/execution_plan.ts | 20 ++++ .../execution_plan/required_permissions.ts | 12 ++ .../state_management/execution_plan/types.ts | 15 ++- .../streams/unwired_stream.ts | 55 ++++++++++ .../state_management/streams/wired_stream.ts | 14 +++ 9 files changed, 253 insertions(+), 4 deletions(-) diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/unwired.test.ts b/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/unwired.test.ts index 173200b00d249..d155b039ea934 100644 --- a/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/unwired.test.ts +++ b/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/unwired.test.ts @@ -21,6 +21,23 @@ describe('UnwiredStream', () => { unwired: {}, }, }, + { + name: 'unwired-stream', + description: '', + ingest: { + lifecycle: { + inherit: {}, + }, + processing: [], + unwired: { + field_overrides: { + xxx: { + type: 'keyword', + }, + }, + }, + }, + }, ])('is valid', (val) => { expect(UnwiredStream.Definition.is(val)).toBe(true); expect(UnwiredStream.Definition.right.parse(val)).toEqual(val); diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/unwired.ts b/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/unwired.ts index 280a59d3a5497..b6e410714a0ff 100644 --- a/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/unwired.ts +++ b/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/unwired.ts @@ -14,15 +14,20 @@ import { ElasticsearchAssets, elasticsearchAssetsSchema } from './common'; import { Validation, validation } from '../validation/validation'; import { ModelValidation, modelValidation } from '../validation/model_validation'; import { BaseStream } from '../base'; +import { FieldDefinition, fieldDefinitionSchema } from '../../fields'; /* eslint-disable @typescript-eslint/no-namespace */ export interface IngestUnwired { - unwired: {}; + unwired: { + field_overrides?: FieldDefinition; + }; } export const IngestUnwired: z.Schema = z.object({ - unwired: z.object({}), + unwired: z.object({ + field_overrides: z.optional(fieldDefinitionSchema), + }), }); export type UnwiredIngest = IngestBase & IngestUnwired; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts index c2c6e009eb682..209491177cb7e 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts @@ -15,7 +15,10 @@ import { isIlmLifecycle, isInheritLifecycle, } from '@kbn/streams-schema'; -import { IndicesSimulateTemplateTemplate } from '@elastic/elasticsearch/lib/api/types'; +import { + IndicesSimulateTemplateTemplate, + MappingProperty, +} from '@elastic/elasticsearch/lib/api/types'; import { omit } from 'lodash'; import { retryTransientEsErrors } from '../helpers/retry'; @@ -35,6 +38,15 @@ interface UpdateOrRolloverDataStreamOptions { esClient: ElasticsearchClient; name: string; logger: Logger; + forceRollover?: boolean; +} + +interface UpdateDataStreamsMappingsOptions { + esClient: ElasticsearchClient; + logger: Logger; + name: string; + mappings: Record; + forceRollover?: boolean; } export async function upsertDataStream({ esClient, name, logger }: DataStreamManagementOptions) { @@ -67,7 +79,12 @@ export async function updateOrRolloverDataStream({ esClient, name, logger, + forceRollover, }: UpdateOrRolloverDataStreamOptions) { + if (forceRollover) { + await retryTransientEsErrors(() => esClient.indices.rollover({ alias: name }), { logger }); + return; + } const dataStreams = await esClient.indices.getDataStream({ name }); for (const dataStream of dataStreams.data_streams) { // simulate index and try to patch the write index @@ -86,6 +103,9 @@ export async function updateOrRolloverDataStream({ 'index.mapping.ignore_malformed', 'index.mode', 'index.logsdb.sort_on_host_name', + 'index.sort.order', + 'index.sort.field', + 'index.mapping.source.mode', ]); await retryTransientEsErrors( @@ -124,6 +144,87 @@ export async function updateOrRolloverDataStream({ } } +// TODO: Remove once client lib has been updated +interface DataStreamMappingsUpdateResponse { + data_streams: Array<{ + name: string; + applied_to_data_stream: boolean; + error?: string; + mappings: Record; + effective_mappings: Record; + }>; +} + +// TODO: We can simplify this once https://github.com/elastic/elasticsearch/issues/131425 lands. +// With that we can only update the data stream mappings and then issue a upsert_write_index_or_rollover action +export async function updateDataStreamsMappings({ + esClient, + logger, + name, + mappings, + forceRollover, +}: UpdateDataStreamsMappingsOptions) { + // update the mappings on the data stream level + const response = (await esClient.transport.request({ + method: 'PUT', + path: `/_data_stream/${name}/_mappings`, + body: { + properties: mappings, + _meta: { + managed_by: 'streams', + }, + }, + })) as DataStreamMappingsUpdateResponse; + if (response.data_streams.length === 0) { + throw new Error(`Data stream ${name} not found`); + } + if (response.data_streams[0].error) { + throw new Error( + `Error updating data stream mappings for ${name}: ${response.data_streams[0].error}` + ); + } + if (forceRollover) { + await retryTransientEsErrors(() => esClient.indices.rollover({ alias: name }), { logger }); + return; + } + // see whether we can patch the write index. if not, we will have to roll it over + const dataStreams = await esClient.indices.getDataStream({ name }); + for (const dataStream of dataStreams.data_streams) { + const writeIndex = dataStream.indices.at(-1); + if (!writeIndex) { + continue; + } + try { + await retryTransientEsErrors( + () => + esClient.indices.putMapping({ + index: writeIndex.index_name, + properties: mappings, + }), + { + logger, + } + ); + } catch (error: any) { + if ( + typeof error.message !== 'string' || + !error.message.includes('illegal_argument_exception') + ) { + throw error; + } + try { + await retryTransientEsErrors(() => esClient.indices.rollover({ alias: dataStream.name }), { + logger, + }); + logger.debug(() => `Rolled over data stream: ${dataStream.name}`); + } catch (rolloverError: any) { + logger.error(`Error rolling over data stream: ${error.message}`); + throw error; + } + } + } +} + export async function updateDataStreamsLifecycle({ esClient, logger, diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/helpers/validate_fields.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/helpers/validate_fields.ts index 4c99a6572b0ba..fd3d095f723ae 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/helpers/validate_fields.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/helpers/validate_fields.ts @@ -79,6 +79,18 @@ export function validateSystemFields(definition: Streams.WiredStream.Definition) } } +export function validateUnwiredFields(definition: Streams.UnwiredStream.Definition) { + if ( + Object.values(definition.ingest.unwired.field_overrides || {}).some( + (field) => field.type === 'system' + ) + ) { + throw new MalformedFieldsError( + `Stream ${definition.name} is not allowed to have system fields` + ); + } +} + export function validateDescendantFields({ descendants, fields, diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/execution_plan.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/execution_plan.ts index 432298b9a43a5..3881e98a6d38d 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/execution_plan.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/execution_plan.ts @@ -15,6 +15,7 @@ import { import { deleteDataStream, updateDataStreamsLifecycle, + updateDataStreamsMappings, updateOrRolloverDataStream, upsertDataStream, } from '../../data_streams/manage_data_streams'; @@ -37,6 +38,7 @@ import type { DeleteIndexTemplateAction, DeleteIngestPipelineAction, ElasticsearchAction, + UpdateDataStreamMappingsAction, UpdateLifecycleAction, UpsertComponentTemplateAction, UpsertDatastreamAction, @@ -74,6 +76,7 @@ export class ExecutionPlan { delete_datastream: [], upsert_dot_streams_document: [], delete_dot_streams_document: [], + update_data_stream_mappings: [], }; } @@ -158,6 +161,7 @@ export class ExecutionPlan { delete_datastream, upsert_dot_streams_document, delete_dot_streams_document, + update_data_stream_mappings, ...rest } = this.actionsByType; assertEmptyObject(rest); @@ -181,6 +185,7 @@ export class ExecutionPlan { await Promise.all([ this.upsertWriteIndexOrRollover(upsert_write_index_or_rollover), this.updateLifecycle(update_lifecycle), + this.updateDataStreamMappingsAndRollover(update_data_stream_mappings), ]); await this.upsertIngestPipelines(upsert_ingest_pipeline); @@ -235,6 +240,7 @@ export class ExecutionPlan { updateOrRolloverDataStream({ esClient: this.dependencies.scopedClusterClient.asCurrentUser, logger: this.dependencies.logger, + forceRollover: action.request.forceRollover, name: action.request.name, }) ) @@ -255,6 +261,20 @@ export class ExecutionPlan { ); } + private async updateDataStreamMappingsAndRollover(actions: UpdateDataStreamMappingsAction[]) { + return Promise.all( + actions.map((action) => + updateDataStreamsMappings({ + esClient: this.dependencies.scopedClusterClient.asCurrentUser, + logger: this.dependencies.logger, + name: action.request.name, + mappings: action.request.mappings, + forceRollover: action.request.forceRollover, + }) + ) + ); + } + private async upsertDatastreams(actions: UpsertDatastreamAction[]) { return Promise.all( actions.map((action) => diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/required_permissions.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/required_permissions.ts index aa3d1ac60b960..5562a8852bd28 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/required_permissions.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/required_permissions.ts @@ -65,6 +65,7 @@ export function getRequiredPermissionsForActions({ update_lifecycle, upsert_write_index_or_rollover, delete_datastream, + update_data_stream_mappings, // we don't need to validate permissions for these actions // since they are done by the kibana system user upsert_dot_streams_document, @@ -140,6 +141,17 @@ export function getRequiredPermissionsForActions({ }); } + if (update_data_stream_mappings.length > 0) { + const indexPermissions: Record = {}; + update_data_stream_mappings.forEach((action) => { + indexPermissions[action.request.name] = ['manage']; + }); + permissions.push({ + cluster: [], + index: indexPermissions, + }); + } + if (update_lifecycle.length > 0) { const indexPermissions: Record = {}; update_lifecycle.forEach((action) => { diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/types.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/types.ts index f6abf41997ccd..9cdfb8fcc27db 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/types.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/types.ts @@ -10,6 +10,7 @@ import type { IndicesPutIndexTemplateRequest, IngestProcessorContainer, IngestPutPipelineRequest, + MappingProperty, } from '@elastic/elasticsearch/lib/api/types'; import type { IngestStreamLifecycle, Streams } from '@kbn/streams-schema'; @@ -77,6 +78,7 @@ export interface UpsertDatastreamAction { export interface UpsertWriteIndexOrRolloverAction { type: 'upsert_write_index_or_rollover'; request: { + forceRollover?: boolean; name: string; }; } @@ -89,6 +91,15 @@ export interface UpdateLifecycleAction { }; } +export interface UpdateDataStreamMappingsAction { + type: 'update_data_stream_mappings'; + request: { + name: string; + forceRollover?: boolean; + mappings: Record; + }; +} + export interface DeleteDatastreamAction { type: 'delete_datastream'; request: { @@ -122,7 +133,8 @@ export type ElasticsearchAction = | UpdateLifecycleAction | DeleteDatastreamAction | UpsertDotStreamsDocumentAction - | DeleteDotStreamsDocumentAction; + | DeleteDotStreamsDocumentAction + | UpdateDataStreamMappingsAction; export interface ActionsByType { upsert_component_template: UpsertComponentTemplateAction[]; @@ -139,4 +151,5 @@ export interface ActionsByType { delete_datastream: DeleteDatastreamAction[]; upsert_dot_streams_document: UpsertDotStreamsDocumentAction[]; delete_dot_streams_document: DeleteDotStreamsDocumentAction[]; + update_data_stream_mappings: UpdateDataStreamMappingsAction[]; } diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/unwired_stream.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/unwired_stream.ts index ef3e0b3e6c258..60f0639a85632 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/unwired_stream.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/unwired_stream.ts @@ -13,6 +13,7 @@ import type { IngestStreamLifecycle } from '@kbn/streams-schema'; import { isIlmLifecycle, isInheritLifecycle, Streams } from '@kbn/streams-schema'; import _, { cloneDeep } from 'lodash'; import { isNotFoundError } from '@kbn/es-errors'; +import { MappingProperty } from '@elastic/elasticsearch/lib/api/types'; import { StatusError } from '../../errors/status_error'; import { generateClassicIngestPipelineBody } from '../../ingest_pipelines/generate_ingest_pipeline'; import { getProcessingPipelineName } from '../../ingest_pipelines/name'; @@ -26,15 +27,18 @@ import type { ValidationResult, } from '../stream_active_record/stream_active_record'; import { StreamActiveRecord } from '../stream_active_record/stream_active_record'; +import { validateUnwiredFields } from '../../helpers/validate_fields'; interface UnwiredStreamChanges extends StreamChanges { processing: boolean; + field_overrides: boolean; lifecycle: boolean; } export class UnwiredStream extends StreamActiveRecord { protected _changes: UnwiredStreamChanges = { processing: false, + field_overrides: false, lifecycle: false, }; @@ -81,6 +85,13 @@ export class UnwiredStream extends StreamActiveRecord 0) { + actions.push({ + type: 'update_data_stream_mappings', + request: { + name: this._definition.name, + mappings: this._definition.ingest.unwired.field_overrides! as Record< + string, + MappingProperty + >, + }, + }); + } actions.push({ type: 'upsert_dot_streams_document', request: this._definition, @@ -224,6 +249,19 @@ export class UnwiredStream extends StreamActiveRecord) || + {}, + forceRollover: this.fieldOverridesRemoved(startingState), + }, + }); + } + actions.push({ type: 'upsert_dot_streams_document', request: this._definition, @@ -232,6 +270,23 @@ export class UnwiredStream extends StreamActiveRecord 0; + } + private async createUpsertPipelineActions(): Promise { const actions: ElasticsearchAction[] = []; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts index ebb68fc4a3918..5479677d0e262 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts @@ -543,6 +543,7 @@ export class WiredStream extends StreamActiveRecord 0; + } + protected async doDetermineDeleteActions(): Promise { return [ { From 09ebb09549ca4fe264db0386293c714f4f5c010f Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Thu, 17 Jul 2025 12:34:10 +0200 Subject: [PATCH 02/23] validate whether mapping update will even work --- .../data_streams/manage_data_streams.ts | 2 +- .../streams/unwired_stream.ts | 37 +++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts index 209491177cb7e..1bea2c51c2cd7 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts @@ -145,7 +145,7 @@ export async function updateOrRolloverDataStream({ } // TODO: Remove once client lib has been updated -interface DataStreamMappingsUpdateResponse { +export interface DataStreamMappingsUpdateResponse { data_streams: Array<{ name: string; applied_to_data_stream: boolean; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/unwired_stream.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/unwired_stream.ts index 60f0639a85632..50df616bdfdb1 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/unwired_stream.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/unwired_stream.ts @@ -28,6 +28,7 @@ import type { } from '../stream_active_record/stream_active_record'; import { StreamActiveRecord } from '../stream_active_record/stream_active_record'; import { validateUnwiredFields } from '../../helpers/validate_fields'; +import { DataStreamMappingsUpdateResponse } from '../../data_streams/manage_data_streams'; interface UnwiredStreamChanges extends StreamChanges { processing: boolean; @@ -149,6 +150,42 @@ export class UnwiredStream extends StreamActiveRecord Date: Thu, 17 Jul 2025 14:55:57 +0200 Subject: [PATCH 03/23] make things work --- .../plugins/shared/data_views/common/types.ts | 5 + .../routes/internal/streams/schema/route.ts | 70 +++++--- .../schema_editor/hooks/use_schema_fields.ts | 152 +++++++++++++----- .../schema_editor/schema_editor_table.tsx | 2 +- .../stream_detail_management/classic.tsx | 10 ++ .../stream_detail_schema_editor/index.tsx | 13 +- 6 files changed, 179 insertions(+), 73 deletions(-) diff --git a/src/platform/plugins/shared/data_views/common/types.ts b/src/platform/plugins/shared/data_views/common/types.ts index 3c09e2517aa50..fe9589c9cf4de 100644 --- a/src/platform/plugins/shared/data_views/common/types.ts +++ b/src/platform/plugins/shared/data_views/common/types.ts @@ -485,6 +485,11 @@ export type FieldSpec = DataViewFieldBase & { parentName?: string; defaultFormatter?: string; + + /** + * Indicates whether the field is a metadata field. + */ + metadata_field?: boolean; }; export type DataViewFieldMap = Record; diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.ts index 6a3a71a05c62e..b5d035ee7a34c 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.ts @@ -7,6 +7,9 @@ import { getFlattenedObject } from '@kbn/std'; import { SampleDocument, fieldDefinitionConfigSchema, Streams } from '@kbn/streams-schema'; import { z } from '@kbn/zod'; +import { IScopedClusterClient } from '@kbn/core/server'; +import { SearchHit } from '@kbn/es-types'; +import { MappingProperty } from '@elastic/elasticsearch/lib/api/types'; import { STREAMS_API_PRIVILEGES } from '../../../../../common/constants'; import { SecurityError } from '../../../../lib/streams/errors/security_error'; import { checkAccess } from '../../../../lib/streams/stream_crud'; @@ -61,6 +64,12 @@ export const unmappedFieldsRoute = createServerRoute({ // Mapped fields from the stream's definition and inherited from ancestors const mappedFields = new Set(); + if (Streams.UnwiredStream.Definition.is(streamDefinition)) { + Object.keys(streamDefinition.ingest.unwired.field_overrides || {}).forEach((name) => + mappedFields.add(name) + ); + } + if (Streams.WiredStream.Definition.is(streamDefinition)) { Object.keys(streamDefinition.ingest.wired.fields).forEach((name) => mappedFields.add(name)); } @@ -179,32 +188,11 @@ export const schemaFieldsSimulationRoute = createServerRoute({ ), })); - const simulationBody = { - docs: sampleResultsAsSimulationDocs, - component_template_substitutions: { - [`${params.path.name}@stream.layer`]: { - template: { - mappings: { - dynamic: 'strict', - properties: propertiesForSimulation, - }, - }, - }, - }, - // prevent double-processing - pipeline_substitutions: { - [`${params.path.name}@stream.processing`]: { - processors: [], - }, - }, - }; - - // TODO: We should be using scopedClusterClient.asCurrentUser.simulate.ingest() but the ES JS lib currently has a bug. The types also aren't available yet, so we use any. - const simulation = (await scopedClusterClient.asCurrentUser.transport.request({ - method: 'POST', - path: `_ingest/_simulate`, - body: simulationBody, - })) as any; + const simulation = await simulateIngest( + sampleResultsAsSimulationDocs, + propertiesForSimulation, + scopedClusterClient + ); const hasErrors = simulation.docs.some((doc: any) => doc.doc.error !== undefined); @@ -276,3 +264,33 @@ export const internalSchemaRoutes = { ...unmappedFieldsRoute, ...schemaFieldsSimulationRoute, }; + +const DUMMY_PIPELINE_NAME = '__dummy_pipeline__'; + +async function simulateIngest( + sampleResultsAsSimulationDocs: Array>, + propertiesForSimulation: Record, + scopedClusterClient: IScopedClusterClient +) { + const simulationBody = { + docs: sampleResultsAsSimulationDocs, + mapping_addition: { + properties: propertiesForSimulation, + }, + // prevent double-processing + pipeline_substitutions: { + [DUMMY_PIPELINE_NAME]: { + processors: [], + }, + }, + }; + + // TODO: We should be using scopedClusterClient.asCurrentUser.simulate.ingest() but the ES JS lib currently has a bug. The types also aren't available yet, so we use any. + const simulation = (await scopedClusterClient.asCurrentUser.transport.request({ + method: 'POST', + path: `_ingest/_simulate?pipeline=${DUMMY_PIPELINE_NAME}`, + body: simulationBody, + })) as any; + + return simulation; +} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/hooks/use_schema_fields.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/hooks/use_schema_fields.ts index 11fc1a092aa58..7a04109cf8807 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/hooks/use_schema_fields.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/hooks/use_schema_fields.ts @@ -9,7 +9,7 @@ import { i18n } from '@kbn/i18n'; import { NamedFieldDefinitionConfig, Streams, getAdvancedParameters } from '@kbn/streams-schema'; import { isEqual, omit } from 'lodash'; import { useMemo, useCallback } from 'react'; -import { useAbortController } from '@kbn/react-hooks'; +import { useAbortController, useAbortableAsync } from '@kbn/react-hooks'; import { useStreamsAppFetch } from '../../../../hooks/use_streams_app_fetch'; import { useKibana } from '../../../../hooks/use_kibana'; import { SchemaField, isSchemaFieldTyped } from '../types'; @@ -20,13 +20,14 @@ export const useSchemaFields = ({ definition, refreshDefinition, }: { - definition: Streams.WiredStream.GetResponse; + definition: Streams.ingest.all.GetResponse; refreshDefinition: () => void; }) => { const { dependencies: { start: { streams: { streamsRepositoryClient }, + data: { dataViews }, }, }, core: { @@ -54,9 +55,26 @@ export const useSchemaFields = ({ [definition.stream.name, streamsRepositoryClient] ); + const { + value: dataViewFields, + loading: isLoadingDataViewFields, + refresh: refreshDataViewFields, + } = useAbortableAsync( + async ({ signal }) => { + return dataViews.getFieldsForWildcard({ + pattern: definition.stream.name, + abortSignal: signal, + forceRefresh: true, + }); + }, + [dataViews, definition.stream.name] + ); + const fields = useMemo(() => { - const inheritedFields: SchemaField[] = Object.entries(definition.inherited_fields).map( - ([name, field]) => ({ + let inheritedFields: SchemaField[] = []; + + if (Streams.WiredStream.GetResponse.is(definition)) { + inheritedFields = Object.entries(definition.inherited_fields).map(([name, field]) => ({ name, type: field.type, format: 'format' in field ? field.format : undefined, @@ -64,34 +82,62 @@ export const useSchemaFields = ({ parent: field.from, alias_for: field.alias_for, status: 'inherited', - }) - ); + })); + } - const mappedFields: SchemaField[] = Object.entries(definition.stream.ingest.wired.fields).map( - ([name, field]) => ({ - name, - type: field.type, - format: 'format' in field ? field.format : undefined, - additionalParameters: getAdvancedParameters(name, field), - parent: definition.stream.name, - status: 'mapped', - }) - ); + const mappedFields: SchemaField[] = Object.entries( + Streams.WiredStream.GetResponse.is(definition) + ? definition.stream.ingest.wired.fields + : definition.stream.ingest.unwired.field_overrides || {} + ).map(([name, field]) => ({ + name, + type: field.type, + format: 'format' in field ? field.format : undefined, + additionalParameters: getAdvancedParameters(name, field), + parent: definition.stream.name, + status: 'mapped', + })); + + const allManagedFieldsSet = new Set([ + ...inheritedFields.map((field) => field.name), + ...mappedFields.map((field) => field.name), + ]); + + const unmanagedFields: SchemaField[] = dataViewFields + ? dataViewFields + .filter((field) => !field.runtimeField && !field.metadata_field) + .filter((field) => !allManagedFieldsSet.has(field.name)) + .map((field) => ({ + name: field.name, + status: 'unmapped', + esType: field.esTypes?.[0], + parent: definition.stream.name, + })) + : []; + + const allFoundFieldsSet = new Set([ + ...inheritedFields.map((field) => field.name), + ...mappedFields.map((field) => field.name), + ...unmanagedFields.map((field) => field.name), + ]); const unmappedFields: SchemaField[] = - unmappedFieldsValue?.unmappedFields.map((field) => ({ - name: field, - parent: definition.stream.name, - status: 'unmapped', - })) ?? []; + unmappedFieldsValue?.unmappedFields + .filter((field) => !allFoundFieldsSet.has(field)) + .map((field) => ({ + name: field, + parent: definition.stream.name, + status: 'unmapped', + })) ?? []; - return [...inheritedFields, ...mappedFields, ...unmappedFields]; - }, [definition, unmappedFieldsValue]); + return [...inheritedFields, ...mappedFields, ...unmappedFields, ...unmanagedFields]; + }, [dataViewFields, definition, unmappedFieldsValue?.unmappedFields]); const refreshFields = useCallback(() => { refreshDefinition(); refreshUnmappedFields(); - }, [refreshDefinition, refreshUnmappedFields]); + refreshDataViewFields(); + }, [refreshDefinition, refreshUnmappedFields, refreshDataViewFields]); const updateField = useCallback( async (field: SchemaField) => { @@ -101,7 +147,9 @@ export const useSchemaFields = ({ } const nextFieldDefinitionConfig = convertToFieldDefinitionConfig(field); - const persistedFieldDefinitionConfig = definition.stream.ingest.wired.fields[field.name]; + const persistedFieldDefinitionConfig = Streams.WiredStream.GetResponse.is(definition) + ? definition.stream.ingest.wired.fields[field.name] + : definition.stream.ingest.unwired.field_overrides?.[field.name]; if (!hasChanges(persistedFieldDefinitionConfig, nextFieldDefinitionConfig)) { throw new Error('The field is not different, hence updating is not necessary.'); @@ -116,13 +164,25 @@ export const useSchemaFields = ({ body: { ingest: { ...definition.stream.ingest, - wired: { - ...definition.stream.ingest.wired, - fields: { - ...definition.stream.ingest.wired.fields, - [field.name]: nextFieldDefinitionConfig, - }, - }, + ...(Streams.WiredStream.GetResponse.is(definition) + ? { + wired: { + ...definition.stream.ingest.wired, + fields: { + ...definition.stream.ingest.wired.fields, + [field.name]: nextFieldDefinitionConfig, + }, + }, + } + : { + unwired: { + ...definition.stream.ingest.unwired, + field_overrides: { + ...definition.stream.ingest.unwired.field_overrides, + [field.name]: nextFieldDefinitionConfig, + }, + }, + }), }, }, }, @@ -153,7 +213,9 @@ export const useSchemaFields = ({ const unmapField = useCallback( async (fieldName: SchemaField['name']) => { try { - const persistedFieldDefinitionConfig = definition.stream.ingest.wired.fields[fieldName]; + const persistedFieldDefinitionConfig = Streams.WiredStream.GetResponse.is(definition) + ? definition.stream.ingest.wired.fields[fieldName] + : definition.stream.ingest.unwired.field_overrides?.[fieldName]; if (!persistedFieldDefinitionConfig) { throw new Error('The field is not mapped, hence it cannot be unmapped.'); @@ -168,10 +230,22 @@ export const useSchemaFields = ({ body: { ingest: { ...definition.stream.ingest, - wired: { - ...definition.stream.ingest.wired, - fields: omit(definition.stream.ingest.wired.fields, fieldName), - }, + ...(Streams.WiredStream.GetResponse.is(definition) + ? { + wired: { + ...definition.stream.ingest.wired, + fields: omit(definition.stream.ingest.wired.fields, fieldName), + }, + } + : { + unwired: { + ...definition.stream.ingest.unwired, + field_overrides: omit( + definition.stream.ingest.unwired.field_overrides, + fieldName + ), + }, + }), }, }, }, @@ -201,7 +275,7 @@ export const useSchemaFields = ({ return { fields, - isLoadingUnmappedFields, + isLoadingFields: isLoadingUnmappedFields || isLoadingDataViewFields, refreshFields, unmapField, updateField, @@ -209,7 +283,7 @@ export const useSchemaFields = ({ }; const hasChanges = ( - field: Partial, + field: Partial | undefined, fieldUpdate: Partial ) => { return !isEqual(field, fieldUpdate); diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/schema_editor_table.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/schema_editor_table.tsx index 10247f144051a..422cf14848432 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/schema_editor_table.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/schema_editor_table.tsx @@ -121,7 +121,7 @@ const createCellRenderer = 'xpack.streams.streamDetailSchemaEditorFieldsTableTypeEsTypeTooltip', { defaultMessage: - 'This field is not managed by Streams, but is defined in Elasticsearch. It can be controlled via the underlying index template and component templates available in the "Advanced" tab.', + 'This field is not managed by Streams, but is defined in Elasticsearch. It can also be controlled via the underlying index template and component templates available in the "Advanced" tab.', } )} position="right" diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/classic.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/classic.tsx index 0e64c4fa42368..4b7b85abde29a 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/classic.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/classic.tsx @@ -16,12 +16,14 @@ import { UnmanagedElasticsearchAssets } from './unmanaged_elasticsearch_assets'; import { StreamsAppPageTemplate } from '../../streams_app_page_template'; import { ClassicStreamBadge, LifecycleBadge } from '../../stream_badges'; import { useStreamsDetailManagementTabs } from './use_streams_detail_management_tabs'; +import { StreamDetailSchemaEditor } from '../stream_detail_schema_editor'; const classicStreamManagementSubTabs = [ 'enrich', 'advanced', 'lifecycle', 'significantEvents', + 'schemaEditor', ] as const; type ClassicStreamManagementSubTab = (typeof classicStreamManagementSubTabs)[number]; @@ -135,6 +137,14 @@ export function ClassicStreamDetailManagement({ ), }; + tabs.schemaEditor = { + content: ( + + ), + label: i18n.translate('xpack.streams.streamDetailView.schemaEditorTab', { + defaultMessage: 'Schema editor', + }), + }; } if (otherTabs.significantEvents) { diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_schema_editor/index.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_schema_editor/index.tsx index 0b13b363a0be9..a655a5f8ff471 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_schema_editor/index.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_schema_editor/index.tsx @@ -11,23 +11,22 @@ import { SchemaEditor } from '../schema_editor'; import { useSchemaFields } from '../schema_editor/hooks/use_schema_fields'; interface SchemaEditorProps { - definition: Streams.WiredStream.GetResponse; + definition: Streams.ingest.all.GetResponse; refreshDefinition: () => void; } export const StreamDetailSchemaEditor = ({ definition, refreshDefinition }: SchemaEditorProps) => { const { loading } = useStreamDetail(); - const { fields, isLoadingUnmappedFields, refreshFields, unmapField, updateField } = - useSchemaFields({ - definition, - refreshDefinition, - }); + const { fields, isLoadingFields, refreshFields, unmapField, updateField } = useSchemaFields({ + definition, + refreshDefinition, + }); return ( Date: Thu, 17 Jul 2025 17:49:26 +0200 Subject: [PATCH 04/23] more fixes --- .../errors/invalid_state_error.ts | 6 +++++- .../data_management/schema_editor/constants.ts | 2 +- .../detected_fields_editor.tsx | 6 +++--- .../stream_enrichment_state_machine.ts | 15 ++++++++------- .../upsert_stream_actor.ts | 6 ++++++ .../stream_enrichment_state_machine/utils.ts | 12 +++++++----- .../stream_detail_schema_editor/index.tsx | 7 +++++++ 7 files changed, 37 insertions(+), 17 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/errors/invalid_state_error.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/errors/invalid_state_error.ts index 649cb74672c13..e61a5957ec018 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/errors/invalid_state_error.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/errors/invalid_state_error.ts @@ -9,7 +9,11 @@ import { AggregateStatusError } from '../../errors/aggregate_status_error'; export class InvalidStateError extends AggregateStatusError { constructor(errors: Error[], message: string) { - super(errors, message, 400); + let overallMessage = message; + if (errors.length > 0) { + overallMessage += `: ${errors.map((error) => error.message).join(', ')}`; + } + super(errors, overallMessage, 400); this.name = 'InvalidStateError'; } } diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/constants.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/constants.ts index f2e9d591c54c8..b8144ee4713e2 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/constants.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/constants.ts @@ -78,7 +78,7 @@ export const FIELD_STATUS_MAP = { unmapped: { color: 'default', label: i18n.translate('xpack.streams.streamDetailSchemaEditorUnmappedStatusLabel', { - defaultMessage: 'Unmapped', + defaultMessage: 'Unmanaged', }), }, }; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/detected_fields_editor.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/detected_fields_editor.tsx index c595dd90044a5..2ec77f43a8e80 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/detected_fields_editor.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/detected_fields_editor.tsx @@ -73,13 +73,13 @@ export const DetectedFieldsEditor = ({ detectedFields }: DetectedFieldsEditorPro )} ); diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts index aa3c867aa7f78..bb1ec4e4f5b96 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/stream_enrichment_state_machine.ts @@ -47,7 +47,7 @@ import { getDataSourcesSamples, getDataSourcesUrlState, getStagedProcessors, - getUpsertWiredFields, + getUpsertFields, spawnDataSource, } from './utils'; import { createUrlInitializerActor, createUrlSyncAction } from './url_state_actor'; @@ -303,10 +303,7 @@ export const streamEnrichmentMachine = setup({ }, 'stream.update': { guard: 'canUpdateStream', - actions: [ - { type: 'sendResetEventToSimulator' }, - raise({ type: 'simulation.viewDataPreview' }), - ], + actions: [raise({ type: 'simulation.viewDataPreview' })], target: 'updating', }, }, @@ -318,11 +315,15 @@ export const streamEnrichmentMachine = setup({ input: ({ context }) => ({ definition: context.definition, processors: getConfiguredProcessors(context), - fields: getUpsertWiredFields(context), + fields: getUpsertFields(context), }), onDone: { target: 'idle', - actions: [{ type: 'notifyUpsertStreamSuccess' }, { type: 'refreshDefinition' }], + actions: [ + { type: 'sendResetEventToSimulator' }, + { type: 'notifyUpsertStreamSuccess' }, + { type: 'refreshDefinition' }, + ], }, onError: { target: 'idle', diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/upsert_stream_actor.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/upsert_stream_actor.ts index 0398f904c8ee7..d95b7488b049a 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/upsert_stream_actor.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/upsert_stream_actor.ts @@ -47,6 +47,12 @@ export function createUpsertStreamActor({ ingest: { ...input.definition.stream.ingest, processing: input.processors.map(processorConverter.toAPIDefinition), + ...(input.fields && { + unwired: { + ...input.definition.stream.ingest.unwired, + field_overrides: input.fields, + }, + }), }, }, }, diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/utils.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/utils.ts index 39abeaada9f6e..9226137559179 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/utils.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/stream_enrichment_state_machine/utils.ts @@ -98,14 +98,16 @@ export function getConfiguredProcessors(context: StreamEnrichmentContextType) { .map((proc) => proc.context.processor); } -export function getUpsertWiredFields( - context: StreamEnrichmentContextType -): FieldDefinition | undefined { - if (!Streams.WiredStream.GetResponse.is(context.definition) || !context.simulatorRef) { +export function getUpsertFields(context: StreamEnrichmentContextType): FieldDefinition | undefined { + if (!context.simulatorRef) { return undefined; } - const originalFieldDefinition = { ...context.definition.stream.ingest.wired.fields }; + const originalFieldDefinition = { + ...(Streams.WiredStream.GetResponse.is(context.definition) + ? context.definition.stream.ingest.wired.fields + : context.definition.stream.ingest.unwired.field_overrides), + }; const { detectedSchemaFields } = context.simulatorRef.getSnapshot().context; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_schema_editor/index.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_schema_editor/index.tsx index a655a5f8ff471..ba18d27adea6f 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_schema_editor/index.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_schema_editor/index.tsx @@ -9,12 +9,16 @@ import { Streams, isRootStreamDefinition } from '@kbn/streams-schema'; import { useStreamDetail } from '../../../hooks/use_stream_detail'; import { SchemaEditor } from '../schema_editor'; import { useSchemaFields } from '../schema_editor/hooks/use_schema_fields'; +import { SUPPORTED_TABLE_COLUMN_NAMES } from '../schema_editor/constants'; interface SchemaEditorProps { definition: Streams.ingest.all.GetResponse; refreshDefinition: () => void; } +const wiredDefaultColumns = SUPPORTED_TABLE_COLUMN_NAMES; +const classicDefaultColumns = SUPPORTED_TABLE_COLUMN_NAMES.filter((column) => column !== 'parent'); + export const StreamDetailSchemaEditor = ({ definition, refreshDefinition }: SchemaEditorProps) => { const { loading } = useStreamDetail(); @@ -27,6 +31,9 @@ export const StreamDetailSchemaEditor = ({ definition, refreshDefinition }: Sche Date: Thu, 17 Jul 2025 16:13:55 +0000 Subject: [PATCH 05/23] [CI] Auto-commit changed files from 'node scripts/capture_oas_snapshot --include-path /api/status --include-path /api/alerting/rule/ --include-path /api/alerting/rules --include-path /api/actions --include-path /api/security/role --include-path /api/spaces --include-path /api/streams --include-path /api/fleet --include-path /api/dashboards --include-path /api/saved_objects/_import --include-path /api/saved_objects/_export --include-path /api/maintenance_window --update' --- oas_docs/bundle.json | 218 +++++++++++++++++++++++++++++++- oas_docs/bundle.serverless.json | 218 +++++++++++++++++++++++++++++++- 2 files changed, 432 insertions(+), 4 deletions(-) diff --git a/oas_docs/bundle.json b/oas_docs/bundle.json index 1841d1eba01ef..3e6dee0d8f7c1 100644 --- a/oas_docs/bundle.json +++ b/oas_docs/bundle.json @@ -51984,7 +51984,114 @@ "properties": { "unwired": { "additionalProperties": false, - "properties": {}, + "properties": { + "field_overrides": { + "additionalProperties": { + "allOf": [ + { + "additionalProperties": { + "anyOf": [ + { + "anyOf": [ + { + "type": "string" + }, + { + "type": "number" + }, + { + "type": "boolean" + }, + { + "enum": [ + "null" + ], + "nullable": true + }, + { + "not": {} + } + ] + }, + { + "items": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "number" + }, + { + "type": "boolean" + }, + { + "enum": [ + "null" + ], + "nullable": true + }, + { + "not": {} + } + ] + }, + "type": "array" + }, + {} + ] + }, + "type": "object" + }, + { + "anyOf": [ + { + "additionalProperties": false, + "properties": { + "format": { + "minLength": 1, + "type": "string" + }, + "type": { + "enum": [ + "keyword", + "match_only_text", + "long", + "double", + "date", + "boolean", + "ip" + ], + "type": "string" + } + }, + "required": [ + "type" + ], + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "type": { + "enum": [ + "system" + ], + "type": "string" + } + }, + "required": [ + "type" + ], + "type": "object" + } + ] + } + ] + }, + "type": "object" + } + }, "type": "object" } }, @@ -58010,7 +58117,114 @@ "properties": { "unwired": { "additionalProperties": false, - "properties": {}, + "properties": { + "field_overrides": { + "additionalProperties": { + "allOf": [ + { + "additionalProperties": { + "anyOf": [ + { + "anyOf": [ + { + "type": "string" + }, + { + "type": "number" + }, + { + "type": "boolean" + }, + { + "enum": [ + "null" + ], + "nullable": true + }, + { + "not": {} + } + ] + }, + { + "items": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "number" + }, + { + "type": "boolean" + }, + { + "enum": [ + "null" + ], + "nullable": true + }, + { + "not": {} + } + ] + }, + "type": "array" + }, + {} + ] + }, + "type": "object" + }, + { + "anyOf": [ + { + "additionalProperties": false, + "properties": { + "format": { + "minLength": 1, + "type": "string" + }, + "type": { + "enum": [ + "keyword", + "match_only_text", + "long", + "double", + "date", + "boolean", + "ip" + ], + "type": "string" + } + }, + "required": [ + "type" + ], + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "type": { + "enum": [ + "system" + ], + "type": "string" + } + }, + "required": [ + "type" + ], + "type": "object" + } + ] + } + ] + }, + "type": "object" + } + }, "type": "object" } }, diff --git a/oas_docs/bundle.serverless.json b/oas_docs/bundle.serverless.json index 5b96b84abd308..618f1049062e2 100644 --- a/oas_docs/bundle.serverless.json +++ b/oas_docs/bundle.serverless.json @@ -51575,7 +51575,114 @@ "properties": { "unwired": { "additionalProperties": false, - "properties": {}, + "properties": { + "field_overrides": { + "additionalProperties": { + "allOf": [ + { + "additionalProperties": { + "anyOf": [ + { + "anyOf": [ + { + "type": "string" + }, + { + "type": "number" + }, + { + "type": "boolean" + }, + { + "enum": [ + "null" + ], + "nullable": true + }, + { + "not": {} + } + ] + }, + { + "items": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "number" + }, + { + "type": "boolean" + }, + { + "enum": [ + "null" + ], + "nullable": true + }, + { + "not": {} + } + ] + }, + "type": "array" + }, + {} + ] + }, + "type": "object" + }, + { + "anyOf": [ + { + "additionalProperties": false, + "properties": { + "format": { + "minLength": 1, + "type": "string" + }, + "type": { + "enum": [ + "keyword", + "match_only_text", + "long", + "double", + "date", + "boolean", + "ip" + ], + "type": "string" + } + }, + "required": [ + "type" + ], + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "type": { + "enum": [ + "system" + ], + "type": "string" + } + }, + "required": [ + "type" + ], + "type": "object" + } + ] + } + ] + }, + "type": "object" + } + }, "type": "object" } }, @@ -57601,7 +57708,114 @@ "properties": { "unwired": { "additionalProperties": false, - "properties": {}, + "properties": { + "field_overrides": { + "additionalProperties": { + "allOf": [ + { + "additionalProperties": { + "anyOf": [ + { + "anyOf": [ + { + "type": "string" + }, + { + "type": "number" + }, + { + "type": "boolean" + }, + { + "enum": [ + "null" + ], + "nullable": true + }, + { + "not": {} + } + ] + }, + { + "items": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "number" + }, + { + "type": "boolean" + }, + { + "enum": [ + "null" + ], + "nullable": true + }, + { + "not": {} + } + ] + }, + "type": "array" + }, + {} + ] + }, + "type": "object" + }, + { + "anyOf": [ + { + "additionalProperties": false, + "properties": { + "format": { + "minLength": 1, + "type": "string" + }, + "type": { + "enum": [ + "keyword", + "match_only_text", + "long", + "double", + "date", + "boolean", + "ip" + ], + "type": "string" + } + }, + "required": [ + "type" + ], + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "type": { + "enum": [ + "system" + ], + "type": "string" + } + }, + "required": [ + "type" + ], + "type": "object" + } + ] + } + ] + }, + "type": "object" + } + }, "type": "object" } }, From de999b2ba37cde12cbd87e6420d2080427622aff Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Fri, 18 Jul 2025 12:14:49 +0200 Subject: [PATCH 06/23] add tests and fix types --- ...te_unwired_stream_pipeline_actions.test.ts | 1 + .../apis/streams/classic.ts | 485 ++++++++++++------ 2 files changed, 330 insertions(+), 156 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/translate_unwired_stream_pipeline_actions.test.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/translate_unwired_stream_pipeline_actions.test.ts index f85eed1523ba8..4fbe8309f3acd 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/translate_unwired_stream_pipeline_actions.test.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/translate_unwired_stream_pipeline_actions.test.ts @@ -1054,5 +1054,6 @@ function emptyActionsByType(): ActionsByType { delete_datastream: [], upsert_dot_streams_document: [], delete_dot_streams_document: [], + update_data_stream_mappings: [], }; } diff --git a/x-pack/solutions/observability/test/api_integration_deployment_agnostic/apis/streams/classic.ts b/x-pack/solutions/observability/test/api_integration_deployment_agnostic/apis/streams/classic.ts index 1b916fbf2260f..7af1f8536f507 100644 --- a/x-pack/solutions/observability/test/api_integration_deployment_agnostic/apis/streams/classic.ts +++ b/x-pack/solutions/observability/test/api_integration_deployment_agnostic/apis/streams/classic.ts @@ -25,202 +25,375 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { let apiClient: StreamsSupertestRepositoryClient; - describe('Classic streams', () => { + describe.only('Classic streams', () => { before(async () => { apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest); }); - it('non-wired data streams', async () => { - const doc = { - message: '2023-01-01T00:00:10.000Z error test', - }; - const response = await indexDocument(esClient, TEST_STREAM_NAME, doc); - expect(response.result).to.eql('created'); - - const { - body: { streams }, - status, - } = await apiClient.fetch('GET /api/streams 2023-10-31'); - - expect(status).to.eql(200); - - const classicStream = streams.find((stream) => stream.name === TEST_STREAM_NAME); - - expect(classicStream).to.eql({ - name: TEST_STREAM_NAME, - description: '', - ingest: { - lifecycle: { inherit: {} }, - processing: [], - unwired: {}, - }, - }); - }); + describe('Classic streams processing', () => { + it('non-wired data streams', async () => { + const doc = { + message: '2023-01-01T00:00:10.000Z error test', + }; + const response = await indexDocument(esClient, TEST_STREAM_NAME, doc); + expect(response.result).to.eql('created'); - it('Allows setting processing on classic streams', async () => { - const putResponse = await apiClient.fetch('PUT /api/streams/{name} 2023-10-31', { - params: { - path: { - name: TEST_STREAM_NAME, + const { + body: { streams }, + status, + } = await apiClient.fetch('GET /api/streams 2023-10-31'); + + expect(status).to.eql(200); + + const classicStream = streams.find((stream) => stream.name === TEST_STREAM_NAME); + + expect(classicStream).to.eql({ + name: TEST_STREAM_NAME, + description: '', + ingest: { + lifecycle: { inherit: {} }, + processing: [], + unwired: {}, }, - body: { - dashboards: [], - queries: [], - stream: { - description: '', - ingest: { - lifecycle: { inherit: {} }, - processing: [ - { - grok: { - if: { always: {} }, - field: 'message', - patterns: [ - '%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}', - ], + }); + }); + + it('Allows setting processing on classic streams', async () => { + const putResponse = await apiClient.fetch('PUT /api/streams/{name} 2023-10-31', { + params: { + path: { + name: TEST_STREAM_NAME, + }, + body: { + dashboards: [], + queries: [], + stream: { + description: '', + ingest: { + lifecycle: { inherit: {} }, + processing: [ + { + grok: { + if: { always: {} }, + field: 'message', + patterns: [ + '%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}', + ], + }, }, - }, - ], - unwired: {}, + ], + unwired: {}, + }, }, }, }, - }, - }); + }); + + expect(putResponse.status).to.eql(200); + + expect(putResponse.body).to.have.property('acknowledged', true); - expect(putResponse.status).to.eql(200); + const getResponse = await apiClient.fetch('GET /api/streams/{name} 2023-10-31', { + params: { path: { name: TEST_STREAM_NAME } }, + }); - expect(putResponse.body).to.have.property('acknowledged', true); + expect(getResponse.status).to.eql(200); - const getResponse = await apiClient.fetch('GET /api/streams/{name} 2023-10-31', { - params: { path: { name: TEST_STREAM_NAME } }, + const body = getResponse.body; + Streams.UnwiredStream.GetResponse.asserts(body); + + const { + dashboards, + queries, + stream, + effective_lifecycle: effectiveLifecycle, + elasticsearch_assets: elasticsearchAssets, + } = body; + + expect(dashboards).to.eql([]); + expect(queries).to.eql([]); + + expect(stream).to.eql({ + name: TEST_STREAM_NAME, + description: '', + ingest: { + lifecycle: { inherit: {} }, + processing: [ + { + grok: { + field: 'message', + patterns: [ + '%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}', + ], + if: { always: {} }, + }, + }, + ], + unwired: {}, + }, + }); + + expect(effectiveLifecycle).to.eql(isServerless ? { dsl: {} } : { ilm: { policy: 'logs' } }); + + expect(elasticsearchAssets).to.eql({ + ingestPipeline: 'logs@default-pipeline', + componentTemplates: ['logs@mappings', 'logs@settings', 'logs@custom', 'ecs@mappings'], + indexTemplate: 'logs', + dataStream: 'logs-test-default', + }); + }); + + it('Executes processing on classic streams', async () => { + const doc = { + '@timestamp': '2024-01-01T00:00:10.000Z', + message: '2023-01-01T00:00:10.000Z error test', + }; + const response = await indexDocument(esClient, TEST_STREAM_NAME, doc); + expect(response.result).to.eql('created'); + + const result = await fetchDocument(esClient, TEST_STREAM_NAME, response._id); + expect(result._source).to.eql({ + '@timestamp': '2024-01-01T00:00:10.000Z', + message: '2023-01-01T00:00:10.000Z error test', + inner_timestamp: '2023-01-01T00:00:10.000Z', + message2: 'test', + log: { + level: 'error', + }, + }); }); - expect(getResponse.status).to.eql(200); - - const body = getResponse.body; - Streams.UnwiredStream.GetResponse.asserts(body); - - const { - dashboards, - queries, - stream, - effective_lifecycle: effectiveLifecycle, - elasticsearch_assets: elasticsearchAssets, - } = body; - - expect(dashboards).to.eql([]); - expect(queries).to.eql([]); - - expect(stream).to.eql({ - name: TEST_STREAM_NAME, - description: '', - ingest: { - lifecycle: { inherit: {} }, - processing: [ - { - grok: { - field: 'message', - patterns: [ - '%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}', - ], - if: { always: {} }, + it('Allows removing processing on classic streams', async () => { + const response = await apiClient.fetch('PUT /api/streams/{name} 2023-10-31', { + params: { + path: { name: TEST_STREAM_NAME }, + body: { + queries: [], + dashboards: [], + stream: { + description: '', + ingest: { + lifecycle: { inherit: {} }, + processing: [], + unwired: {}, + }, }, }, - ], - unwired: {}, - }, + }, + }); + + expect(response.status).to.eql(200); + + expect(response.body).to.have.property('acknowledged', true); }); - expect(effectiveLifecycle).to.eql(isServerless ? { dsl: {} } : { ilm: { policy: 'logs' } }); + it('Executes processing on classic streams after removing processing', async () => { + const doc = { + // default logs pipeline fills in timestamp with current date if not set + message: '2023-01-01T00:00:10.000Z info mylogger this is the message', + }; + const response = await indexDocument(esClient, TEST_STREAM_NAME, doc); + expect(response.result).to.eql('created'); - expect(elasticsearchAssets).to.eql({ - ingestPipeline: 'logs@default-pipeline', - componentTemplates: ['logs@mappings', 'logs@settings', 'logs@custom', 'ecs@mappings'], - indexTemplate: 'logs', - dataStream: 'logs-test-default', + const result = await fetchDocument(esClient, TEST_STREAM_NAME, response._id); + expect(result._source).to.eql({ + // accept any date + '@timestamp': (result._source as { [key: string]: unknown })['@timestamp'], + message: '2023-01-01T00:00:10.000Z info mylogger this is the message', + }); }); }); - it('Executes processing on classic streams', async () => { - const doc = { - '@timestamp': '2024-01-01T00:00:10.000Z', - message: '2023-01-01T00:00:10.000Z error test', - }; - const response = await indexDocument(esClient, TEST_STREAM_NAME, doc); - expect(response.result).to.eql('created'); - - const result = await fetchDocument(esClient, TEST_STREAM_NAME, response._id); - expect(result._source).to.eql({ - '@timestamp': '2024-01-01T00:00:10.000Z', - message: '2023-01-01T00:00:10.000Z error test', - inner_timestamp: '2023-01-01T00:00:10.000Z', - message2: 'test', - log: { - level: 'error', - }, + describe('Classic streams with field overrides', () => { + it('Allows setting field overrides on classic streams', async () => { + const putResponse = await apiClient.fetch('PUT /api/streams/{name} 2023-10-31', { + params: { + path: { name: TEST_STREAM_NAME }, + body: { + queries: [], + dashboards: [], + stream: { + description: '', + ingest: { + lifecycle: { inherit: {} }, + processing: [], + unwired: { + field_overrides: { + 'foo.bar': { + type: 'keyword', + }, + }, + }, + }, + }, + }, + }, + }); + + expect(putResponse.status).to.eql(200); + + // Verify the field override is set via field caps + const fieldCapsResponse = await esClient.fieldCaps({ + index: TEST_STREAM_NAME, + fields: 'foo.bar', + }); + expect(fieldCapsResponse.fields).to.have.property('foo.bar'); + expect(fieldCapsResponse.fields['foo.bar']).to.have.property('keyword'); }); - }); - it('Allows removing processing on classic streams', async () => { - const response = await apiClient.fetch('PUT /api/streams/{name} 2023-10-31', { - params: { - path: { name: TEST_STREAM_NAME }, - body: { - queries: [], - dashboards: [], - stream: { - description: '', - ingest: { - lifecycle: { inherit: {} }, - processing: [], - unwired: {}, + it('Does not roll over on compatible changes', async () => { + const putResponse = await apiClient.fetch('PUT /api/streams/{name} 2023-10-31', { + params: { + path: { name: TEST_STREAM_NAME }, + body: { + queries: [], + dashboards: [], + stream: { + description: '', + ingest: { + lifecycle: { inherit: {} }, + processing: [], + unwired: { + field_overrides: { + 'foo.bar': { + type: 'keyword', + }, + 'foo.baz': { + type: 'keyword', + }, + }, + }, + }, }, }, }, - }, + }); + + expect(putResponse.status).to.eql(200); + + // Verify the field override is set via field caps + const fieldCapsResponse = await esClient.fieldCaps({ + index: TEST_STREAM_NAME, + fields: 'foo.baz', + }); + expect(fieldCapsResponse.fields).to.have.property('foo.baz'); + expect(fieldCapsResponse.fields['foo.baz']).to.have.property('keyword'); + + // Verify the stream did not roll over + const getResponse = await esClient.indices.getDataStream({ + name: TEST_STREAM_NAME, + }); + expect(getResponse.data_streams[0].indices).to.have.length(1); }); - expect(response.status).to.eql(200); + it('Does roll over on incompatible changes', async () => { + const putResponse = await apiClient.fetch('PUT /api/streams/{name} 2023-10-31', { + params: { + path: { name: TEST_STREAM_NAME }, + body: { + queries: [], + dashboards: [], + stream: { + description: '', + ingest: { + lifecycle: { inherit: {} }, + processing: [], + unwired: { + field_overrides: { + 'foo.bar': { + type: 'double', + }, + 'foo.baz': { + type: 'keyword', + }, + }, + }, + }, + }, + }, + }, + }); - expect(response.body).to.have.property('acknowledged', true); - }); + expect(putResponse.status).to.eql(200); + + // Verify the field override is set via field caps + const fieldCapsResponse = await esClient.fieldCaps({ + index: TEST_STREAM_NAME, + fields: 'foo.bar', + }); + expect(fieldCapsResponse.fields).to.have.property('foo.bar'); + expect(fieldCapsResponse.fields['foo.bar']).to.have.property('double'); - it('Executes processing on classic streams after removing processing', async () => { - const doc = { - // default logs pipeline fills in timestamp with current date if not set - message: '2023-01-01T00:00:10.000Z info mylogger this is the message', - }; - const response = await indexDocument(esClient, TEST_STREAM_NAME, doc); - expect(response.result).to.eql('created'); - - const result = await fetchDocument(esClient, TEST_STREAM_NAME, response._id); - expect(result._source).to.eql({ - // accept any date - '@timestamp': (result._source as { [key: string]: unknown })['@timestamp'], - message: '2023-01-01T00:00:10.000Z info mylogger this is the message', + // Verify the stream did not roll over + const getResponse = await esClient.indices.getDataStream({ + name: TEST_STREAM_NAME, + }); + expect(getResponse.data_streams[0].indices).to.have.length(2); }); - }); - it('Allows deleting classic streams', async () => { - const deleteStreamResponse = await apiClient.fetch('DELETE /api/streams/{name} 2023-10-31', { - params: { - path: { - name: TEST_STREAM_NAME, + it('Does roll over on removing field overrides', async () => { + const putResponse = await apiClient.fetch('PUT /api/streams/{name} 2023-10-31', { + params: { + path: { name: TEST_STREAM_NAME }, + body: { + queries: [], + dashboards: [], + stream: { + description: '', + ingest: { + lifecycle: { inherit: {} }, + processing: [], + unwired: { + field_overrides: {}, + }, + }, + }, + }, }, - }, + }); + expect(putResponse.status).to.eql(200); + // Verify the stream rolled over + const getResponse = await esClient.indices.getDataStream({ + name: TEST_STREAM_NAME, + }); + expect(getResponse.data_streams[0].indices).to.have.length(3); + + // get the current write index + const writeIndex = getResponse.data_streams[0].indices[2]; + // Verify the field override is removed via field caps + const fieldCapsResponse = await esClient.fieldCaps({ + index: writeIndex.index_name, + fields: ['foo.bar', 'foo.baz'], + }); + expect(fieldCapsResponse.fields).to.not.have.property('foo.bar'); + expect(fieldCapsResponse.fields).to.not.have.property('foo.baz'); }); + }); - expect(deleteStreamResponse.status).to.eql(200); + describe('Deleting classic streams', () => { + it('Allows deleting classic streams', async () => { + const deleteStreamResponse = await apiClient.fetch( + 'DELETE /api/streams/{name} 2023-10-31', + { + params: { + path: { + name: TEST_STREAM_NAME, + }, + }, + } + ); + + expect(deleteStreamResponse.status).to.eql(200); - const getStreamsResponse = await apiClient.fetch('GET /api/streams 2023-10-31'); + const getStreamsResponse = await apiClient.fetch('GET /api/streams 2023-10-31'); - expect(getStreamsResponse.status).to.eql(200); + expect(getStreamsResponse.status).to.eql(200); - const classicStream = getStreamsResponse.body.streams.find( - (stream) => stream.name === TEST_STREAM_NAME - ); - expect(classicStream).to.eql(undefined); + const classicStream = getStreamsResponse.body.streams.find( + (stream) => stream.name === TEST_STREAM_NAME + ); + expect(classicStream).to.eql(undefined); + }); }); describe('Classic streams sharing template/pipeline', () => { From 9967d49e10afe9df33999a9f4656224b91eb72d5 Mon Sep 17 00:00:00 2001 From: kibanamachine <42973632+kibanamachine@users.noreply.github.com> Date: Fri, 18 Jul 2025 10:32:43 +0000 Subject: [PATCH 07/23] [CI] Auto-commit changed files from 'make api-docs' --- oas_docs/output/kibana.serverless.yaml | 112 ++++++++++++++++++++++++- oas_docs/output/kibana.yaml | 112 ++++++++++++++++++++++++- 2 files changed, 220 insertions(+), 4 deletions(-) diff --git a/oas_docs/output/kibana.serverless.yaml b/oas_docs/output/kibana.serverless.yaml index 382534765c792..72b2e770ea003 100644 --- a/oas_docs/output/kibana.serverless.yaml +++ b/oas_docs/output/kibana.serverless.yaml @@ -48304,7 +48304,61 @@ paths: unwired: additionalProperties: false type: object - properties: {} + properties: + field_overrides: + additionalProperties: + allOf: + - additionalProperties: + anyOf: + - anyOf: + - type: string + - type: number + - type: boolean + - enum: + - 'null' + nullable: true + - not: {} + - items: + anyOf: + - type: string + - type: number + - type: boolean + - enum: + - 'null' + nullable: true + - not: {} + type: array + - {} + type: object + - anyOf: + - additionalProperties: false + type: object + properties: + format: + minLength: 1 + type: string + type: + enum: + - keyword + - match_only_text + - long + - double + - date + - boolean + - ip + type: string + required: + - type + - additionalProperties: false + type: object + properties: + type: + enum: + - system + type: string + required: + - type + type: object required: - unwired required: @@ -52130,7 +52184,61 @@ paths: unwired: additionalProperties: false type: object - properties: {} + properties: + field_overrides: + additionalProperties: + allOf: + - additionalProperties: + anyOf: + - anyOf: + - type: string + - type: number + - type: boolean + - enum: + - 'null' + nullable: true + - not: {} + - items: + anyOf: + - type: string + - type: number + - type: boolean + - enum: + - 'null' + nullable: true + - not: {} + type: array + - {} + type: object + - anyOf: + - additionalProperties: false + type: object + properties: + format: + minLength: 1 + type: string + type: + enum: + - keyword + - match_only_text + - long + - double + - date + - boolean + - ip + type: string + required: + - type + - additionalProperties: false + type: object + properties: + type: + enum: + - system + type: string + required: + - type + type: object required: - unwired required: diff --git a/oas_docs/output/kibana.yaml b/oas_docs/output/kibana.yaml index 5d1f339611c6e..bcf7803320b4b 100644 --- a/oas_docs/output/kibana.yaml +++ b/oas_docs/output/kibana.yaml @@ -51825,7 +51825,61 @@ paths: unwired: additionalProperties: false type: object - properties: {} + properties: + field_overrides: + additionalProperties: + allOf: + - additionalProperties: + anyOf: + - anyOf: + - type: string + - type: number + - type: boolean + - enum: + - 'null' + nullable: true + - not: {} + - items: + anyOf: + - type: string + - type: number + - type: boolean + - enum: + - 'null' + nullable: true + - not: {} + type: array + - {} + type: object + - anyOf: + - additionalProperties: false + type: object + properties: + format: + minLength: 1 + type: string + type: + enum: + - keyword + - match_only_text + - long + - double + - date + - boolean + - ip + type: string + required: + - type + - additionalProperties: false + type: object + properties: + type: + enum: + - system + type: string + required: + - type + type: object required: - unwired required: @@ -55651,7 +55705,61 @@ paths: unwired: additionalProperties: false type: object - properties: {} + properties: + field_overrides: + additionalProperties: + allOf: + - additionalProperties: + anyOf: + - anyOf: + - type: string + - type: number + - type: boolean + - enum: + - 'null' + nullable: true + - not: {} + - items: + anyOf: + - type: string + - type: number + - type: boolean + - enum: + - 'null' + nullable: true + - not: {} + type: array + - {} + type: object + - anyOf: + - additionalProperties: false + type: object + properties: + format: + minLength: 1 + type: string + type: + enum: + - keyword + - match_only_text + - long + - double + - date + - boolean + - ip + type: string + required: + - type + - additionalProperties: false + type: object + properties: + type: + enum: + - system + type: string + required: + - type + type: object required: - unwired required: From 0e1d70b99532831f1008c0eb24431066104970da Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Fri, 18 Jul 2025 15:24:28 +0200 Subject: [PATCH 08/23] remove exclusive test --- .../api_integration_deployment_agnostic/apis/streams/classic.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/solutions/observability/test/api_integration_deployment_agnostic/apis/streams/classic.ts b/x-pack/solutions/observability/test/api_integration_deployment_agnostic/apis/streams/classic.ts index 7af1f8536f507..e4b8129be5134 100644 --- a/x-pack/solutions/observability/test/api_integration_deployment_agnostic/apis/streams/classic.ts +++ b/x-pack/solutions/observability/test/api_integration_deployment_agnostic/apis/streams/classic.ts @@ -25,7 +25,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { let apiClient: StreamsSupertestRepositoryClient; - describe.only('Classic streams', () => { + describe('Classic streams', () => { before(async () => { apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest); }); From 8026417ce43be123129fa8545c6dee25c04cb1e4 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Mon, 21 Jul 2025 12:20:26 +0200 Subject: [PATCH 09/23] make field simulation work for all cases --- .../streams/component_templates/logs_layer.ts | 3 -- .../streams/unwired_stream.ts | 1 - .../routes/internal/streams/schema/route.ts | 47 ++++++++++++++++++- 3 files changed, 45 insertions(+), 6 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.ts index b74826dfef17b..4ee71ce121e83 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.ts @@ -91,9 +91,6 @@ export const baseMappings: Record = { structured: { type: 'flattened', }, - text: { - type: 'match_only_text', - }, }, }, attributes: { diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/unwired_stream.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/unwired_stream.ts index 50df616bdfdb1..e61fcb339ffab 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/unwired_stream.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/unwired_stream.ts @@ -150,7 +150,6 @@ export class UnwiredStream extends StreamActiveRecord>, + dataStreamName: string, propertiesForSimulation: Record, scopedClusterClient: IScopedClusterClient ) { + // fetch the index template to get the base mappings + const dataStream = await scopedClusterClient.asCurrentUser.indices.getDataStream({ + name: dataStreamName, + }); + const indexTemplate = ( + await scopedClusterClient.asCurrentUser.indices.getIndexTemplate({ + name: dataStream.data_streams[0].template, + }) + ).index_templates[0].index_template; + + // We need to build a batched index template isntead of using mapping_addition + // because of https://github.com/elastic/elasticsearch/issues/131608 + const patchedIndexTemplate = { + ...indexTemplate, + priority: + indexTemplate.priority && indexTemplate.priority > MAX_PRIORITY + ? // max priority passed as a string so we don't lose precision + (`${MAX_PRIORITY}` as unknown as number) + : indexTemplate.priority, + composed_of: [...(indexTemplate.composed_of || []), '__DUMMY_COMPONENT_TEMPLATE__'], + template: { + ...indexTemplate.template, + mappings: { + ...indexTemplate.template?.mappings, + properties: { + ...indexTemplate.template?.mappings?.properties, + ...propertiesForSimulation, + }, + }, + }, + }; const simulationBody = { docs: sampleResultsAsSimulationDocs, - mapping_addition: { - properties: propertiesForSimulation, + index_template_substitutions: { + [dataStream.data_streams[0].template]: patchedIndexTemplate, + }, + component_template_substitutions: { + __DUMMY_COMPONENT_TEMPLATE__: { + template: { + mappings: { + properties: propertiesForSimulation, + }, + }, + }, }, // prevent double-processing pipeline_substitutions: { From b542ba46e4a9a2485a7cc61a5ad781000dd87fa3 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Mon, 21 Jul 2025 18:16:56 +0200 Subject: [PATCH 10/23] review comments and make the other field simulation whole --- .../kbn-streams-schema/src/fields/index.ts | 26 +++++++++- .../component_templates/generate_layer.ts | 13 +++-- .../streams/component_templates/logs_layer.ts | 6 +-- .../data_streams/manage_data_streams.ts | 8 ++- .../state_management/execution_plan/types.ts | 4 +- .../streams/has_removed_fields.ts | 16 ++++++ .../streams/unwired_stream.ts | 30 +++++------ .../state_management/streams/wired_stream.ts | 8 +-- .../streams/processing/simulation_handler.ts | 52 ++++++++++++++++++- .../routes/internal/streams/schema/route.ts | 4 +- 10 files changed, 129 insertions(+), 38 deletions(-) create mode 100644 x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/has_removed_fields.ts diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/src/fields/index.ts b/x-pack/platform/packages/shared/kbn-streams-schema/src/fields/index.ts index 1cf80b0ca1f68..501a6f92d6b91 100644 --- a/x-pack/platform/packages/shared/kbn-streams-schema/src/fields/index.ts +++ b/x-pack/platform/packages/shared/kbn-streams-schema/src/fields/index.ts @@ -5,7 +5,16 @@ * 2.0. */ -import { MappingProperty } from '@elastic/elasticsearch/lib/api/types'; +import { + MappingBooleanProperty, + MappingDateProperty, + MappingDoubleNumberProperty, + MappingIpProperty, + MappingKeywordProperty, + MappingLongNumberProperty, + MappingMatchOnlyTextProperty, + MappingProperty, +} from '@elastic/elasticsearch/lib/api/types'; import { z } from '@kbn/zod'; import { NonEmptyString } from '@kbn/zod-helpers'; import { recursiveRecord } from '../shared/record_types'; @@ -55,6 +64,21 @@ export interface FieldDefinition { [x: string]: FieldDefinitionConfig; } +export type AllowedMappingProperty = + | MappingKeywordProperty + | MappingMatchOnlyTextProperty + | MappingLongNumberProperty + | MappingDoubleNumberProperty + | MappingDateProperty + | MappingBooleanProperty + | MappingIpProperty; + +export type StreamsMappingProperties = Record; + +export function isMappingProperties(value: FieldDefinition): value is StreamsMappingProperties { + return Object.values(value).every((prop) => prop.type !== 'system'); +} + export const fieldDefinitionSchema: z.Schema = z.record( z.string(), fieldDefinitionConfigSchema diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.ts index e5a6d2bdb3604..a330f67b82d80 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.ts @@ -8,9 +8,10 @@ import { ClusterPutComponentTemplateRequest, MappingDateProperty, - MappingProperty, + MappingTypeMapping, } from '@elastic/elasticsearch/lib/api/types'; import { Streams, getAdvancedParameters, isRoot, namespacePrefixes } from '@kbn/streams-schema'; +import { AllowedMappingProperty, StreamsMappingProperties } from '@kbn/streams-schema/src/fields'; import { ASSET_VERSION } from '../../../../common/constants'; import { logsSettings, otelEquivalentLookupMap } from './logs_layer'; import { getComponentTemplateName } from './name'; @@ -21,12 +22,13 @@ export function generateLayer( definition: Streams.WiredStream.Definition, isServerless: boolean ): ClusterPutComponentTemplateRequest { - const properties: Record = {}; + const properties: StreamsMappingProperties = {}; + const aliases: MappingTypeMapping['properties'] = {}; Object.entries(definition.ingest.wired.fields).forEach(([field, props]) => { if (props.type === 'system') { return; } - const property: MappingProperty = { + const property: AllowedMappingProperty = { type: props.type, }; @@ -48,7 +50,7 @@ export function generateLayer( const matchingPrefix = namespacePrefixes.find((prefix) => field.startsWith(prefix)); if (matchingPrefix) { const aliasName = field.substring(matchingPrefix.length); - properties[aliasName] = { + aliases[aliasName] = { type: 'alias', path: field, }; @@ -63,7 +65,7 @@ export function generateLayer( const aliasName = field.substring(matchingPrefix.length); const otelEquivalent = otelEquivalentLookupMap[aliasName]; if (otelEquivalent) { - properties[otelEquivalent] = { + aliases[otelEquivalent] = { type: 'alias', path: field, }; @@ -81,6 +83,7 @@ export function generateLayer( ? { ...baseMappings, ...properties, + ...aliases, } : properties, }, diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.ts index e9821768f522f..89fdf5ea3b34a 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.ts @@ -6,7 +6,7 @@ */ import { EcsFlat } from '@elastic/ecs'; -import { IndicesIndexSettings, MappingProperty } from '@elastic/elasticsearch/lib/api/types'; +import { IndicesIndexSettings, MappingTypeMapping } from '@elastic/elasticsearch/lib/api/types'; import { FieldDefinition, InheritedFieldDefinition, @@ -106,7 +106,7 @@ export const baseFields: FieldDefinition = { }, }; -export const baseMappings: Record = { +export const baseMappings: MappingTypeMapping['properties'] = { body: { type: 'object', properties: { @@ -248,7 +248,7 @@ export function addAliasesForNamespacedFields( ); // Add aliases defined in the base mappings - Object.entries(baseMappings).forEach(([key, fieldDef]) => { + Object.entries(baseMappings || {}).forEach(([key, fieldDef]) => { if (fieldDef.type === 'alias') { inheritedFields[key] = { type: baseFields[fieldDef.path!].type, diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts index 1bea2c51c2cd7..9a5e3ba621ecc 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts @@ -15,11 +15,9 @@ import { isIlmLifecycle, isInheritLifecycle, } from '@kbn/streams-schema'; -import { - IndicesSimulateTemplateTemplate, - MappingProperty, -} from '@elastic/elasticsearch/lib/api/types'; +import { IndicesSimulateTemplateTemplate } from '@elastic/elasticsearch/lib/api/types'; import { omit } from 'lodash'; +import { StreamsMappingProperties } from '@kbn/streams-schema/src/fields'; import { retryTransientEsErrors } from '../helpers/retry'; interface DataStreamManagementOptions { @@ -45,7 +43,7 @@ interface UpdateDataStreamsMappingsOptions { esClient: ElasticsearchClient; logger: Logger; name: string; - mappings: Record; + mappings: StreamsMappingProperties; forceRollover?: boolean; } diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/types.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/types.ts index 9cdfb8fcc27db..fcb4af7fcda26 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/types.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/types.ts @@ -10,9 +10,9 @@ import type { IndicesPutIndexTemplateRequest, IngestProcessorContainer, IngestPutPipelineRequest, - MappingProperty, } from '@elastic/elasticsearch/lib/api/types'; import type { IngestStreamLifecycle, Streams } from '@kbn/streams-schema'; +import { StreamsMappingProperties } from '@kbn/streams-schema/src/fields'; export interface UpsertComponentTemplateAction { type: 'upsert_component_template'; @@ -96,7 +96,7 @@ export interface UpdateDataStreamMappingsAction { request: { name: string; forceRollover?: boolean; - mappings: Record; + mappings: StreamsMappingProperties; }; } diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/has_removed_fields.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/has_removed_fields.ts new file mode 100644 index 0000000000000..ff2c4ba55af1f --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/has_removed_fields.ts @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { FieldDefinition } from '@kbn/streams-schema'; + +import _ from 'lodash'; + +export function hasRemovedFields(previous?: FieldDefinition, current?: FieldDefinition): boolean { + const previousKeys = Object.keys(previous || {}); + const currentKeys = Object.keys(current || {}); + return _.difference(previousKeys, currentKeys).length > 0; +} diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/unwired_stream.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/unwired_stream.ts index e61fcb339ffab..1fa9fe3b6b6b2 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/unwired_stream.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/unwired_stream.ts @@ -13,7 +13,7 @@ import type { IngestStreamLifecycle } from '@kbn/streams-schema'; import { isIlmLifecycle, isInheritLifecycle, Streams } from '@kbn/streams-schema'; import _, { cloneDeep } from 'lodash'; import { isNotFoundError } from '@kbn/es-errors'; -import { MappingProperty } from '@elastic/elasticsearch/lib/api/types'; +import { isMappingProperties } from '@kbn/streams-schema/src/fields'; import { StatusError } from '../../errors/status_error'; import { generateClassicIngestPipelineBody } from '../../ingest_pipelines/generate_ingest_pipeline'; import { getProcessingPipelineName } from '../../ingest_pipelines/name'; @@ -29,6 +29,7 @@ import type { import { StreamActiveRecord } from '../stream_active_record/stream_active_record'; import { validateUnwiredFields } from '../../helpers/validate_fields'; import { DataStreamMappingsUpdateResponse } from '../../data_streams/manage_data_streams'; +import { hasRemovedFields } from './has_removed_fields'; interface UnwiredStreamChanges extends StreamChanges { processing: boolean; @@ -215,15 +216,15 @@ export class UnwiredStream extends StreamActiveRecord 0) { + if ( + this._definition.ingest.unwired.field_overrides && + isMappingProperties(this._definition.ingest.unwired.field_overrides) + ) { actions.push({ type: 'update_data_stream_mappings', request: { name: this._definition.name, - mappings: this._definition.ingest.unwired.field_overrides! as Record< - string, - MappingProperty - >, + mappings: this._definition.ingest.unwired.field_overrides, }, }); } @@ -286,13 +287,15 @@ export class UnwiredStream extends StreamActiveRecord) || - {}, + mappings, forceRollover: this.fieldOverridesRemoved(startingState), }, }); @@ -314,13 +317,10 @@ export class UnwiredStream extends StreamActiveRecord 0; } private async createUpsertPipelineActions(): Promise { diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts index 5479677d0e262..ce1728f56918f 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts @@ -50,6 +50,7 @@ import type { } from '../stream_active_record/stream_active_record'; import { StreamActiveRecord } from '../stream_active_record/stream_active_record'; import { hasSupportedStreamsRoot } from '../../root_stream_definition'; +import { hasRemovedFields } from './has_removed_fields'; interface WiredStreamChanges extends StreamChanges { ownFields: boolean; @@ -609,9 +610,10 @@ export class WiredStream extends StreamActiveRecord 0; + return hasRemovedFields( + startingStateStreamDefinition.ingest.wired.fields, + this._definition.ingest.wired.fields + ); } protected async doDetermineDeleteActions(): Promise { diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.ts index cd91d50ee7616..fac7f725c8cce 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.ts @@ -19,6 +19,7 @@ import { SimulateIngestResponse, SimulateIngestSimulateIngestDocumentResult, FieldCapsResponse, + IndicesGetIndexTemplateIndexTemplateItem, } from '@elastic/elasticsearch/lib/api/types'; import { IScopedClusterClient } from '@kbn/core/server'; import { flattenObjectNestedLast, calculateObjectDiff } from '@kbn/object-utils'; @@ -38,6 +39,7 @@ import { import { mapValues, uniq, omit, isEmpty, uniqBy } from 'lodash'; import { StreamsClient } from '../../../../lib/streams/client'; import { formatToIngestProcessors } from '../../../../lib/streams/helpers/processing'; +import { MAX_PRIORITY } from '../../../../lib/streams/index_templates/generate_index_template'; export interface ProcessingSimulationParams { path: { @@ -147,7 +149,7 @@ export const simulateProcessing = async ({ streamsClient, }: SimulateProcessingDeps) => { /* 0. Retrieve required data to prepare the simulation */ - const [stream, { indexState: streamIndexState, fieldCaps: streamIndexFieldCaps }] = + const [stream, { indexState: streamIndexState, fieldCaps: streamIndexFieldCaps, indexTemplate }] = await Promise.all([ streamsClient.getStream(params.path.name), getStreamIndex(scopedClusterClient, streamsClient, params.path.name), @@ -159,6 +161,7 @@ export const simulateProcessing = async ({ const ingestSimulationBody = prepareIngestSimulationBody( simulationData, streamIndexState, + indexTemplate, params ); /** @@ -303,6 +306,7 @@ const preparePipelineSimulationBody = ( const prepareIngestSimulationBody = ( simulationData: ReturnType, streamIndex: IndicesIndexState, + indexTemplate: IndicesGetIndexTemplateIndexTemplateItem, params: ProcessingSimulationParams ): SimulateIngestRequest => { const { body } = params; @@ -313,6 +317,31 @@ const prepareIngestSimulationBody = ( const defaultPipelineName = streamIndex.settings?.index?.default_pipeline; const mappings = streamIndex.mappings; + // We need to build a patched index template isntead of using mapping_addition + // because of https://github.com/elastic/elasticsearch/issues/131608 + const patchedIndexTemplate = { + ...indexTemplate.index_template, + priority: + indexTemplate.index_template.priority && indexTemplate.index_template.priority > MAX_PRIORITY + ? // max priority passed as a string so we don't lose precision + (`${MAX_PRIORITY}` as unknown as number) + : indexTemplate.index_template.priority, + composed_of: [ + ...(indexTemplate.index_template.composed_of || []), + '__DUMMY_COMPONENT_TEMPLATE__', + ], + template: { + ...indexTemplate.index_template.template, + mappings: { + ...indexTemplate.index_template.template?.mappings, + properties: { + ...indexTemplate.index_template.template?.mappings?.properties, + ...(detected_fields && computeMappingProperties(detected_fields)), + }, + }, + }, + }; + const simulationBody: SimulateIngestRequest = { docs, ...(defaultPipelineName && { @@ -322,6 +351,20 @@ const prepareIngestSimulationBody = ( }, }, }), + index_template_substitutions: { + [indexTemplate.name]: patchedIndexTemplate, + }, + component_template_substitutions: { + __DUMMY_COMPONENT_TEMPLATE__: { + template: { + mappings: { + properties: { + ...(detected_fields && computeMappingProperties(detected_fields)), + }, + }, + }, + }, + }, // Ideally we should not need to retrieve and merge the mappings from the stream index. // But the ingest simulation API does not validate correctly the mappings unless they are specified in the simulation body. // So we need to merge the mappings from the stream index with the detected fields. @@ -728,6 +771,7 @@ const getStreamIndex = async ( ): Promise<{ indexState: IndicesIndexState; fieldCaps: FieldCapsResponse['fields']; + indexTemplate: IndicesGetIndexTemplateIndexTemplateItem; }> => { const dataStream = await streamsClient.getDataStream(streamName); const lastIndexRef = dataStream.indices.at(-1); @@ -735,7 +779,7 @@ const getStreamIndex = async ( throw new Error(`No writing index found for stream ${streamName}`); } - const [lastIndex, lastIndexFieldCaps] = await Promise.all([ + const [lastIndex, lastIndexFieldCaps, indexTemplate] = await Promise.all([ scopedClusterClient.asCurrentUser.indices.get({ index: lastIndexRef.index_name, }), @@ -743,11 +787,15 @@ const getStreamIndex = async ( index: lastIndexRef.index_name, fields: '*', }), + await scopedClusterClient.asCurrentUser.indices.getIndexTemplate({ + name: dataStream.template, + }), ]); return { indexState: lastIndex[lastIndexRef.index_name], fieldCaps: lastIndexFieldCaps.fields, + indexTemplate: indexTemplate.index_templates[0], }; }; diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.ts index f2ab32be527a9..79ccdca0b72d7 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.ts @@ -9,7 +9,7 @@ import { SampleDocument, fieldDefinitionConfigSchema, Streams } from '@kbn/strea import { z } from '@kbn/zod'; import { IScopedClusterClient } from '@kbn/core/server'; import { SearchHit } from '@kbn/es-types'; -import { MappingProperty } from '@elastic/elasticsearch/lib/api/types'; +import { StreamsMappingProperties } from '@kbn/streams-schema/src/fields'; import { MAX_PRIORITY } from '../../../../lib/streams/index_templates/generate_index_template'; import { STREAMS_API_PRIVILEGES } from '../../../../../common/constants'; import { SecurityError } from '../../../../lib/streams/errors/security_error'; @@ -272,7 +272,7 @@ const DUMMY_PIPELINE_NAME = '__dummy_pipeline__'; async function simulateIngest( sampleResultsAsSimulationDocs: Array>, dataStreamName: string, - propertiesForSimulation: Record, + propertiesForSimulation: StreamsMappingProperties, scopedClusterClient: IScopedClusterClient ) { // fetch the index template to get the base mappings From 558c65af5fafbb13a7a23761fc0a77b760df921c Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Mon, 21 Jul 2025 18:19:52 +0200 Subject: [PATCH 11/23] commit --- .../streams/server/routes/internal/streams/schema/route.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.ts index 79ccdca0b72d7..08d3da256eb3f 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.ts @@ -285,7 +285,7 @@ async function simulateIngest( }) ).index_templates[0].index_template; - // We need to build a batched index template isntead of using mapping_addition + // We need to build a patched index template instead of using mapping_addition // because of https://github.com/elastic/elasticsearch/issues/131608 const patchedIndexTemplate = { ...indexTemplate, From bfc6693e3666d67524df0b8222485216315bb48e Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Mon, 21 Jul 2025 18:51:08 +0200 Subject: [PATCH 12/23] revert change for simulation --- .../streams/processing/simulation_handler.ts | 52 +------------------ 1 file changed, 2 insertions(+), 50 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.ts index fac7f725c8cce..cd91d50ee7616 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.ts @@ -19,7 +19,6 @@ import { SimulateIngestResponse, SimulateIngestSimulateIngestDocumentResult, FieldCapsResponse, - IndicesGetIndexTemplateIndexTemplateItem, } from '@elastic/elasticsearch/lib/api/types'; import { IScopedClusterClient } from '@kbn/core/server'; import { flattenObjectNestedLast, calculateObjectDiff } from '@kbn/object-utils'; @@ -39,7 +38,6 @@ import { import { mapValues, uniq, omit, isEmpty, uniqBy } from 'lodash'; import { StreamsClient } from '../../../../lib/streams/client'; import { formatToIngestProcessors } from '../../../../lib/streams/helpers/processing'; -import { MAX_PRIORITY } from '../../../../lib/streams/index_templates/generate_index_template'; export interface ProcessingSimulationParams { path: { @@ -149,7 +147,7 @@ export const simulateProcessing = async ({ streamsClient, }: SimulateProcessingDeps) => { /* 0. Retrieve required data to prepare the simulation */ - const [stream, { indexState: streamIndexState, fieldCaps: streamIndexFieldCaps, indexTemplate }] = + const [stream, { indexState: streamIndexState, fieldCaps: streamIndexFieldCaps }] = await Promise.all([ streamsClient.getStream(params.path.name), getStreamIndex(scopedClusterClient, streamsClient, params.path.name), @@ -161,7 +159,6 @@ export const simulateProcessing = async ({ const ingestSimulationBody = prepareIngestSimulationBody( simulationData, streamIndexState, - indexTemplate, params ); /** @@ -306,7 +303,6 @@ const preparePipelineSimulationBody = ( const prepareIngestSimulationBody = ( simulationData: ReturnType, streamIndex: IndicesIndexState, - indexTemplate: IndicesGetIndexTemplateIndexTemplateItem, params: ProcessingSimulationParams ): SimulateIngestRequest => { const { body } = params; @@ -317,31 +313,6 @@ const prepareIngestSimulationBody = ( const defaultPipelineName = streamIndex.settings?.index?.default_pipeline; const mappings = streamIndex.mappings; - // We need to build a patched index template isntead of using mapping_addition - // because of https://github.com/elastic/elasticsearch/issues/131608 - const patchedIndexTemplate = { - ...indexTemplate.index_template, - priority: - indexTemplate.index_template.priority && indexTemplate.index_template.priority > MAX_PRIORITY - ? // max priority passed as a string so we don't lose precision - (`${MAX_PRIORITY}` as unknown as number) - : indexTemplate.index_template.priority, - composed_of: [ - ...(indexTemplate.index_template.composed_of || []), - '__DUMMY_COMPONENT_TEMPLATE__', - ], - template: { - ...indexTemplate.index_template.template, - mappings: { - ...indexTemplate.index_template.template?.mappings, - properties: { - ...indexTemplate.index_template.template?.mappings?.properties, - ...(detected_fields && computeMappingProperties(detected_fields)), - }, - }, - }, - }; - const simulationBody: SimulateIngestRequest = { docs, ...(defaultPipelineName && { @@ -351,20 +322,6 @@ const prepareIngestSimulationBody = ( }, }, }), - index_template_substitutions: { - [indexTemplate.name]: patchedIndexTemplate, - }, - component_template_substitutions: { - __DUMMY_COMPONENT_TEMPLATE__: { - template: { - mappings: { - properties: { - ...(detected_fields && computeMappingProperties(detected_fields)), - }, - }, - }, - }, - }, // Ideally we should not need to retrieve and merge the mappings from the stream index. // But the ingest simulation API does not validate correctly the mappings unless they are specified in the simulation body. // So we need to merge the mappings from the stream index with the detected fields. @@ -771,7 +728,6 @@ const getStreamIndex = async ( ): Promise<{ indexState: IndicesIndexState; fieldCaps: FieldCapsResponse['fields']; - indexTemplate: IndicesGetIndexTemplateIndexTemplateItem; }> => { const dataStream = await streamsClient.getDataStream(streamName); const lastIndexRef = dataStream.indices.at(-1); @@ -779,7 +735,7 @@ const getStreamIndex = async ( throw new Error(`No writing index found for stream ${streamName}`); } - const [lastIndex, lastIndexFieldCaps, indexTemplate] = await Promise.all([ + const [lastIndex, lastIndexFieldCaps] = await Promise.all([ scopedClusterClient.asCurrentUser.indices.get({ index: lastIndexRef.index_name, }), @@ -787,15 +743,11 @@ const getStreamIndex = async ( index: lastIndexRef.index_name, fields: '*', }), - await scopedClusterClient.asCurrentUser.indices.getIndexTemplate({ - name: dataStream.template, - }), ]); return { indexState: lastIndex[lastIndexRef.index_name], fieldCaps: lastIndexFieldCaps.fields, - indexTemplate: indexTemplate.index_templates[0], }; }; From 6b187a8985209b7768b4ca64458a073f13dfecd2 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Tue, 22 Jul 2025 10:35:28 +0200 Subject: [PATCH 13/23] fix types --- .../server/lib/streams/component_templates/logs_layer.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.ts index 89fdf5ea3b34a..0f740eaf88ccf 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.ts @@ -106,7 +106,7 @@ export const baseFields: FieldDefinition = { }, }; -export const baseMappings: MappingTypeMapping['properties'] = { +export const baseMappings: Exclude = { body: { type: 'object', properties: { From e92f2a911a1860029c43f213de1773e7ab9a99c3 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Mon, 28 Jul 2025 11:08:08 +0200 Subject: [PATCH 14/23] fix component template building --- .../server/lib/streams/component_templates/generate_layer.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.ts index a330f67b82d80..96e5b2f834ed0 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.ts @@ -85,7 +85,7 @@ export function generateLayer( ...properties, ...aliases, } - : properties, + : { ...properties, ...aliases }, }, }, version: ASSET_VERSION, From a8102523cd77747024e592033fb0a66a791fdbb2 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Fri, 8 Aug 2025 16:36:19 +0200 Subject: [PATCH 15/23] fix --- .../data_streams/manage_data_streams.ts | 4 +- .../apis/streams/classic.ts | 111 +++--------------- 2 files changed, 22 insertions(+), 93 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts index e0fd6cfac1adb..5d3b3a9646ad7 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts @@ -142,7 +142,9 @@ export async function updateDataStreamsMappings({ `Error updating data stream mappings for ${name}: ${response.data_streams[0].error}` ); } - await retryTransientEsErrors(() => esClient.indices.rollover({ alias: name }), { logger }); + await retryTransientEsErrors(() => esClient.indices.rollover({ alias: name, lazy: true }), { + logger, + }); } export async function updateDataStreamsLifecycle({ diff --git a/x-pack/platform/test/api_integration_deployment_agnostic/apis/streams/classic.ts b/x-pack/platform/test/api_integration_deployment_agnostic/apis/streams/classic.ts index ccc8ce27020d5..cf7d9e5081aab 100644 --- a/x-pack/platform/test/api_integration_deployment_agnostic/apis/streams/classic.ts +++ b/x-pack/platform/test/api_integration_deployment_agnostic/apis/streams/classic.ts @@ -230,17 +230,9 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }); expect(putResponse.status).to.eql(200); - - // Verify the field override is set via field caps - const fieldCapsResponse = await esClient.fieldCaps({ - index: TEST_STREAM_NAME, - fields: 'foo.bar', - }); - expect(fieldCapsResponse.fields).to.have.property('foo.bar'); - expect(fieldCapsResponse.fields['foo.bar']).to.have.property('keyword'); }); - it('Does not roll over on compatible changes', async () => { + it('Does a lazy rollover on field change', async () => { const putResponse = await apiClient.fetch('PUT /api/streams/{name} 2023-10-31', { params: { path: { name: TEST_STREAM_NAME }, @@ -270,102 +262,37 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { expect(putResponse.status).to.eql(200); - // Verify the field override is set via field caps - const fieldCapsResponse = await esClient.fieldCaps({ - index: TEST_STREAM_NAME, - fields: 'foo.baz', - }); - expect(fieldCapsResponse.fields).to.have.property('foo.baz'); - expect(fieldCapsResponse.fields['foo.baz']).to.have.property('keyword'); - // Verify the stream did not roll over const getResponse = await esClient.indices.getDataStream({ name: TEST_STREAM_NAME, }); expect(getResponse.data_streams[0].indices).to.have.length(1); - }); - it('Does roll over on incompatible changes', async () => { - const putResponse = await apiClient.fetch('PUT /api/streams/{name} 2023-10-31', { - params: { - path: { name: TEST_STREAM_NAME }, - body: { - queries: [], - dashboards: [], - stream: { - description: '', - ingest: { - lifecycle: { inherit: {} }, - processing: [], - classic: { - field_overrides: { - 'foo.bar': { - type: 'double', - }, - 'foo.baz': { - type: 'keyword', - }, - }, - }, - }, - }, - }, - }, - }); + // Send a document to trigger the rollover + const doc = { + '@timestamp': '2024-01-01T00:00:10.000Z', + message: '2023-01-01T00:00:10.000Z error test', + 'foo.bar': 'bar_value', + 'foo.baz': 'baz_value', + }; + const response = await indexDocument(esClient, TEST_STREAM_NAME, doc); + expect(response.result).to.eql('created'); - expect(putResponse.status).to.eql(200); + // Verify the rollover happened + const getResponseAfterIndexing = await esClient.indices.getDataStream({ + name: TEST_STREAM_NAME, + }); + expect(getResponseAfterIndexing.data_streams[0].indices).to.have.length(2); // Verify the field override is set via field caps const fieldCapsResponse = await esClient.fieldCaps({ index: TEST_STREAM_NAME, - fields: 'foo.bar', + fields: ['foo.baz', 'foo.bar'], }); expect(fieldCapsResponse.fields).to.have.property('foo.bar'); - expect(fieldCapsResponse.fields['foo.bar']).to.have.property('double'); - - // Verify the stream did not roll over - const getResponse = await esClient.indices.getDataStream({ - name: TEST_STREAM_NAME, - }); - expect(getResponse.data_streams[0].indices).to.have.length(2); - }); - - it('Does roll over on removing field overrides', async () => { - const putResponse = await apiClient.fetch('PUT /api/streams/{name} 2023-10-31', { - params: { - path: { name: TEST_STREAM_NAME }, - body: { - queries: [], - dashboards: [], - stream: { - description: '', - ingest: { - lifecycle: { inherit: {} }, - processing: [], - classic: { - field_overrides: {}, - }, - }, - }, - }, - }, - }); - expect(putResponse.status).to.eql(200); - // Verify the stream rolled over - const getResponse = await esClient.indices.getDataStream({ - name: TEST_STREAM_NAME, - }); - expect(getResponse.data_streams[0].indices).to.have.length(3); - - // get the current write index - const writeIndex = getResponse.data_streams[0].indices[2]; - // Verify the field override is removed via field caps - const fieldCapsResponse = await esClient.fieldCaps({ - index: writeIndex.index_name, - fields: ['foo.bar', 'foo.baz'], - }); - expect(fieldCapsResponse.fields).to.not.have.property('foo.bar'); - expect(fieldCapsResponse.fields).to.not.have.property('foo.baz'); + expect(fieldCapsResponse.fields['foo.bar']).to.have.property('keyword'); + expect(fieldCapsResponse.fields).to.have.property('foo.baz'); + expect(fieldCapsResponse.fields['foo.baz']).to.have.property('keyword'); }); }); From 1e4808756f79f425197a5b28f4f2420831790ac0 Mon Sep 17 00:00:00 2001 From: Kerry Gallagher <471693+Kerry350@users.noreply.github.com> Date: Thu, 14 Aug 2025 13:53:02 +0100 Subject: [PATCH 16/23] Amend Scout tests --- .../scout/ui/tests/data_management/data_mapping.spec.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_mapping.spec.ts b/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_mapping.spec.ts index 37853a59e9e2b..a72626c5464c2 100644 --- a/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_mapping.spec.ts +++ b/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_mapping.spec.ts @@ -141,7 +141,7 @@ test.describe('Stream data mapping - schema editor', { tag: ['@ess', '@svlOblt'] await pageObjects.streams.expectCellValueContains({ columnName: 'status', rowIndex: 0, - value: 'Unmapped', + value: 'Unmanaged', }); // Open the field actions menu @@ -180,7 +180,7 @@ test.describe('Stream data mapping - schema editor', { tag: ['@ess', '@svlOblt'] await pageObjects.streams.expectCellValueContains({ columnName: 'status', rowIndex: 0, - value: 'Unmapped', + value: 'Unmanaged', }); // Open the field actions menu @@ -216,7 +216,7 @@ test.describe('Stream data mapping - schema editor', { tag: ['@ess', '@svlOblt'] await pageObjects.streams.expectCellValueContains({ columnName: 'status', rowIndex: 0, - value: 'Unmapped', + value: 'Unmanaged', }); }); }); From 7282598800b84a8c81628cc0ba31625085f930a3 Mon Sep 17 00:00:00 2001 From: Kerry Gallagher <471693+Kerry350@users.noreply.github.com> Date: Thu, 21 Aug 2025 10:16:33 +0100 Subject: [PATCH 17/23] Amend imports (merge error) --- .../data_management/schema_editor/hooks/use_schema_fields.ts | 3 ++- .../data_management/stream_detail_schema_editor/index.tsx | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/hooks/use_schema_fields.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/hooks/use_schema_fields.ts index 4c1dbd83c01a1..339588cd7cc98 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/hooks/use_schema_fields.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/hooks/use_schema_fields.ts @@ -6,7 +6,8 @@ */ import { i18n } from '@kbn/i18n'; -import type { NamedFieldDefinitionConfig, Streams } from '@kbn/streams-schema'; +import type { NamedFieldDefinitionConfig } from '@kbn/streams-schema'; +import { Streams } from '@kbn/streams-schema'; import { getAdvancedParameters } from '@kbn/streams-schema'; import { isEqual, omit } from 'lodash'; import { useMemo, useCallback } from 'react'; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_schema_editor/index.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_schema_editor/index.tsx index 8c4ed993daa02..3574ef62473eb 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_schema_editor/index.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_schema_editor/index.tsx @@ -5,7 +5,7 @@ * 2.0. */ import React from 'react'; -import type { Streams } from '@kbn/streams-schema'; +import { Streams } from '@kbn/streams-schema'; import { isRootStreamDefinition } from '@kbn/streams-schema'; import { useStreamDetail } from '../../../hooks/use_stream_detail'; import { SchemaEditor } from '../schema_editor'; From 7903657cfc5d3a9506fc3c69fea0954c7b186a9f Mon Sep 17 00:00:00 2001 From: Kerry Gallagher <471693+Kerry350@users.noreply.github.com> Date: Thu, 21 Aug 2025 10:32:08 +0100 Subject: [PATCH 18/23] Fix bug with simulation --- .../routes/internal/streams/schema/route.ts | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.ts index 9eb588e6b9be0..52e7d705599a9 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.ts @@ -11,6 +11,7 @@ import { z } from '@kbn/zod'; import type { IScopedClusterClient } from '@kbn/core/server'; import type { SearchHit } from '@kbn/es-types'; import type { StreamsMappingProperties } from '@kbn/streams-schema/src/fields'; +import { LOGS_ROOT_STREAM_NAME } from '../../../../lib/streams/root_stream_definition'; import { MAX_PRIORITY } from '../../../../lib/streams/index_templates/generate_index_template'; import { STREAMS_API_PRIVILEGES } from '../../../../../common/constants'; import { SecurityError } from '../../../../lib/streams/errors/security_error'; @@ -182,7 +183,7 @@ export const schemaFieldsSimulationRoute = createServerRoute({ const sampleResultsAsSimulationDocs = sampleResults.hits.hits.map((hit) => ({ // Direct writes to child streams are not allowed. This must be set to logs. - _index: 'logs', + _index: LOGS_ROOT_STREAM_NAME, _id: hit._id, _source: Object.fromEntries( Object.entries(getFlattenedObject(hit._source as SampleDocument)).filter( @@ -322,9 +323,21 @@ async function simulateIngest( }, }, }, - // prevent double-processing pipeline_substitutions: { [DUMMY_PIPELINE_NAME]: { + // The sampleResults are already gathered directly from the child stream index. But, we can't + // simulate an _index other than logs, this reroutes the documents back to the child stream. + // After the reroute the override below ensures no double processing happens. + processors: [ + { + reroute: { + destination: dataStreamName, + }, + }, + ], + }, + // prevent double-processing + [`${dataStreamName}@stream.processing`]: { processors: [], }, }, From 92ffccf6fc81b32f0d3904df68043f120ecebe85 Mon Sep 17 00:00:00 2001 From: Kerry Gallagher <471693+Kerry350@users.noreply.github.com> Date: Thu, 21 Aug 2025 10:50:35 +0100 Subject: [PATCH 19/23] Refine simulation fix --- .../routes/internal/streams/schema/route.ts | 32 ++++++++++++------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.ts index 52e7d705599a9..3f56920dd3649 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.ts @@ -182,8 +182,10 @@ export const schemaFieldsSimulationRoute = createServerRoute({ const fieldDefinitionKeys = Object.keys(propertiesForSimulation); const sampleResultsAsSimulationDocs = sampleResults.hits.hits.map((hit) => ({ - // Direct writes to child streams are not allowed. This must be set to logs. - _index: LOGS_ROOT_STREAM_NAME, + // For wired streams direct writes to child streams are not allowed, we must use the "logs" index. + _index: params.path.name.startsWith(`${LOGS_ROOT_STREAM_NAME}.`) + ? LOGS_ROOT_STREAM_NAME + : params.path.name, _id: hit._id, _source: Object.fromEntries( Object.entries(getFlattenedObject(hit._source as SampleDocument)).filter( @@ -326,20 +328,28 @@ async function simulateIngest( pipeline_substitutions: { [DUMMY_PIPELINE_NAME]: { // The sampleResults are already gathered directly from the child stream index. But, we can't - // simulate an _index other than logs, this reroutes the documents back to the child stream. + // simulate an _index other than logs for wired streams, this reroutes the documents back to the child stream. // After the reroute the override below ensures no double processing happens. processors: [ - { - reroute: { - destination: dataStreamName, - }, - }, + ...(dataStreamName.startsWith(`${LOGS_ROOT_STREAM_NAME}.`) + ? [ + { + reroute: { + destination: dataStreamName, + }, + }, + ] + : []), ], }, // prevent double-processing - [`${dataStreamName}@stream.processing`]: { - processors: [], - }, + ...(dataStreamName.startsWith(`${LOGS_ROOT_STREAM_NAME}.`) + ? { + [`${dataStreamName}@stream.processing`]: { + processors: [], + }, + } + : {}), }, }; From da48023ab9fe04cf05ea908aae3bc5e35c8f1d74 Mon Sep 17 00:00:00 2001 From: kibanamachine <42973632+kibanamachine@users.noreply.github.com> Date: Thu, 21 Aug 2025 10:18:48 +0000 Subject: [PATCH 20/23] [CI] Auto-commit changed files from 'node scripts/eslint_all_files --no-cache --fix' --- .../shared/kbn-streams-schema/src/models/ingest/classic.ts | 3 ++- .../lib/streams/state_management/execution_plan/types.ts | 2 +- .../lib/streams/state_management/streams/classic_stream.ts | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/classic.ts b/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/classic.ts index c59348a230d92..639d94b9612d6 100644 --- a/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/classic.ts +++ b/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/classic.ts @@ -15,7 +15,8 @@ import { validation } from '../validation/validation'; import type { ModelValidation } from '../validation/model_validation'; import { modelValidation } from '../validation/model_validation'; import { BaseStream } from '../base'; -import { FieldDefinition, fieldDefinitionSchema } from '../../fields'; +import type { FieldDefinition } from '../../fields'; +import { fieldDefinitionSchema } from '../../fields'; /* eslint-disable @typescript-eslint/no-namespace */ diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/types.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/types.ts index 4d15b06377cd7..2f01032592d15 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/types.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/types.ts @@ -12,7 +12,7 @@ import type { IngestPutPipelineRequest, } from '@elastic/elasticsearch/lib/api/types'; import type { IngestStreamLifecycle, Streams } from '@kbn/streams-schema'; -import { StreamsMappingProperties } from '@kbn/streams-schema/src/fields'; +import type { StreamsMappingProperties } from '@kbn/streams-schema/src/fields'; export interface UpsertComponentTemplateAction { type: 'upsert_component_template'; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.ts index 5f1820140521a..34bf497b9c99e 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.ts @@ -28,7 +28,7 @@ import type { } from '../stream_active_record/stream_active_record'; import { StreamActiveRecord } from '../stream_active_record/stream_active_record'; import { validateClassicFields } from '../../helpers/validate_fields'; -import { DataStreamMappingsUpdateResponse } from '../../data_streams/manage_data_streams'; +import type { DataStreamMappingsUpdateResponse } from '../../data_streams/manage_data_streams'; interface ClassicStreamChanges extends StreamChanges { processing: boolean; From a707e84df230802b912f54f7cd3534c3907897b1 Mon Sep 17 00:00:00 2001 From: Kerry Gallagher <471693+Kerry350@users.noreply.github.com> Date: Thu, 21 Aug 2025 14:00:07 +0100 Subject: [PATCH 21/23] Amend tests --- .../src/models/ingest/classic.test.ts | 4 +++- .../apis/streams/classic.ts | 23 ++++++++++--------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/classic.test.ts b/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/classic.test.ts index 5addddf3daf20..a0aa531964f18 100644 --- a/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/classic.test.ts +++ b/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/classic.test.ts @@ -30,7 +30,9 @@ describe('ClassicStream', () => { lifecycle: { inherit: {}, }, - processing: [], + processing: { + steps: [], + }, classic: { field_overrides: { xxx: { diff --git a/x-pack/platform/test/api_integration_deployment_agnostic/apis/streams/classic.ts b/x-pack/platform/test/api_integration_deployment_agnostic/apis/streams/classic.ts index d16918d545b85..31fdcc8a70eee 100644 --- a/x-pack/platform/test/api_integration_deployment_agnostic/apis/streams/classic.ts +++ b/x-pack/platform/test/api_integration_deployment_agnostic/apis/streams/classic.ts @@ -49,7 +49,9 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { description: '', ingest: { lifecycle: { inherit: {} }, - processing: [], + processing: { + steps: [], + }, classic: {}, }, }); @@ -116,17 +118,16 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { description: '', ingest: { lifecycle: { inherit: {} }, - processing: [ - { - grok: { - field: 'message', - patterns: [ - '%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}', - ], - if: { always: {} }, - }, + processing: { + steps: { + action: 'grok', + from: 'message', + patterns: [ + '%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}', + ], + where: { always: {} }, }, - ], + }, classic: {}, }, }); From c55380ccaa045c2048e16618f3c64ee43bc6fd1d Mon Sep 17 00:00:00 2001 From: Kerry Gallagher <471693+Kerry350@users.noreply.github.com> Date: Wed, 27 Aug 2025 13:00:25 +0100 Subject: [PATCH 22/23] Amend test typo --- .../apis/streams/classic.ts | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/x-pack/platform/test/api_integration_deployment_agnostic/apis/streams/classic.ts b/x-pack/platform/test/api_integration_deployment_agnostic/apis/streams/classic.ts index 31fdcc8a70eee..a803dc6081cee 100644 --- a/x-pack/platform/test/api_integration_deployment_agnostic/apis/streams/classic.ts +++ b/x-pack/platform/test/api_integration_deployment_agnostic/apis/streams/classic.ts @@ -119,14 +119,16 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { ingest: { lifecycle: { inherit: {} }, processing: { - steps: { - action: 'grok', - from: 'message', - patterns: [ - '%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}', - ], - where: { always: {} }, - }, + steps: [ + { + action: 'grok', + from: 'message', + patterns: [ + '%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}', + ], + where: { always: {} }, + }, + ], }, classic: {}, }, From 4e71160d0b072c27ffa0b76585d1f312e328adcc Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Mon, 1 Sep 2025 16:49:57 +0200 Subject: [PATCH 23/23] review comments --- .../server/lib/streams/component_templates/logs_layer.ts | 2 +- .../data_management/schema_editor/constants.ts | 9 +++++++++ .../data_management/schema_editor/field_status.tsx | 6 ++++-- .../schema_editor/hooks/use_schema_fields.ts | 6 ++++-- 4 files changed, 18 insertions(+), 5 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.ts index 146361ae48eac..7659be92e24f7 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.ts @@ -248,7 +248,7 @@ export function addAliasesForNamespacedFields( ); // Add aliases defined in the base mappings - Object.entries(baseMappings || {}).forEach(([key, fieldDef]) => { + Object.entries(baseMappings).forEach(([key, fieldDef]) => { if (fieldDef.type === 'alias') { inheritedFields[key] = { type: baseFields[fieldDef.path!].type, diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/constants.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/constants.ts index b8144ee4713e2..06e8c20138c76 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/constants.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/constants.ts @@ -68,18 +68,27 @@ export const FIELD_STATUS_MAP = { label: i18n.translate('xpack.streams.streamDetailSchemaEditorInheritedStatusLabel', { defaultMessage: 'Inherited', }), + tooltip: i18n.translate('xpack.streams.streamDetailSchemaEditorInheritedStatusTooltip', { + defaultMessage: 'The mapping for this field is inherited from the parent stream.', + }), }, mapped: { color: 'success', label: i18n.translate('xpack.streams.streamDetailSchemaEditorMappedStatusLabel', { defaultMessage: 'Mapped', }), + tooltip: i18n.translate('xpack.streams.streamDetailSchemaEditorMappedStatusTooltip', { + defaultMessage: 'This field is mapped as part of this stream.', + }), }, unmapped: { color: 'default', label: i18n.translate('xpack.streams.streamDetailSchemaEditorUnmappedStatusLabel', { defaultMessage: 'Unmanaged', }), + tooltip: i18n.translate('xpack.streams.streamDetailSchemaEditorUnmappedStatusTooltip', { + defaultMessage: 'The mapping for this field is not managed by this stream or a parent.', + }), }, }; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/field_status.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/field_status.tsx index 2fbc3b7713564..24fed7cf911b6 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/field_status.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/field_status.tsx @@ -6,12 +6,14 @@ */ import React from 'react'; -import { EuiBadge } from '@elastic/eui'; +import { EuiBadge, EuiToolTip } from '@elastic/eui'; import type { FieldStatus } from './constants'; import { FIELD_STATUS_MAP } from './constants'; export const FieldStatusBadge = ({ status }: { status: FieldStatus }) => { return ( - {FIELD_STATUS_MAP[status].label} + + {FIELD_STATUS_MAP[status].label} + ); }; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/hooks/use_schema_fields.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/hooks/use_schema_fields.ts index 339588cd7cc98..0c889a8e4e7df 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/hooks/use_schema_fields.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/hooks/use_schema_fields.ts @@ -108,8 +108,10 @@ export const useSchemaFields = ({ const unmanagedFields: SchemaField[] = dataViewFields ? dataViewFields - .filter((field) => !field.runtimeField && !field.metadata_field) - .filter((field) => !allManagedFieldsSet.has(field.name)) + .filter( + (field) => + !field.runtimeField && !field.metadata_field && !allManagedFieldsSet.has(field.name) + ) .map((field) => ({ name: field.name, status: 'unmapped',