Skip to content

Commit f7add87

Browse files
authored
🌊 Disallow manual_ingest_pipeline processor for wired streams (#233902)
As discussed, we don't want to offer manual_ingest_pipeline for wired streams. This PR makes sure it can only be used for classic streams (disallowing on the API and hiding from the UI) * Tests for `manual_ingest_pipeline` were currently done on wired streams, moved that over to classic streams. * The test for testing a half-created wired stream was done with a broken `manual_ingest_pipeline`, which is now not possible anymore - this PR changes the test to manually delete the streams definition while leaving the actual data stream in place, simulating the situation of a restored data stream.
1 parent 6680b4b commit f7add87

File tree

7 files changed

+261
-222
lines changed

7 files changed

+261
-222
lines changed

‎x-pack/platform/plugins/shared/streams/server/lib/streams/helpers/validate_stream.ts‎

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import type { Streams } from '@kbn/streams-schema';
99
import { isInheritLifecycle } from '@kbn/streams-schema';
1010
import { isEqual } from 'lodash';
11+
import type { StreamlangStep } from '@kbn/streamlang/types/streamlang';
1112
import { MalformedStreamError } from '../errors/malformed_stream_error';
1213
import { RootStreamImmutabilityError } from '../errors/root_stream_immutability_error';
1314

@@ -42,3 +43,14 @@ export function validateRootStreamChanges(
4243
throw new MalformedStreamError('Root stream cannot inherit lifecycle');
4344
}
4445
}
46+
47+
export function validateNoManualIngestPipelineUsage(steps: StreamlangStep[]) {
48+
for (const step of steps) {
49+
if ('action' in step && step.action === 'manual_ingest_pipeline') {
50+
throw new MalformedStreamError('Manual ingest pipelines are not allowed');
51+
}
52+
if ('where' in step && step.where && 'steps' in step.where) {
53+
validateNoManualIngestPipelineUsage(step.where.steps);
54+
}
55+
}
56+
}

‎x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts‎

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ import {
3434
validateDescendantFields,
3535
validateSystemFields,
3636
} from '../../helpers/validate_fields';
37-
import { validateRootStreamChanges } from '../../helpers/validate_stream';
37+
import {
38+
validateNoManualIngestPipelineUsage,
39+
validateRootStreamChanges,
40+
} from '../../helpers/validate_stream';
3841
import { generateIndexTemplate } from '../../index_templates/generate_index_template';
3942
import { getIndexTemplateName } from '../../index_templates/name';
4043
import { generateIngestPipeline } from '../../ingest_pipelines/generate_ingest_pipeline';
@@ -299,6 +302,11 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
299302

300303
const existsInStartingState = startingState.has(this._definition.name);
301304

305+
if (this._changes.processing && this._definition.ingest.processing.steps.length > 0) {
306+
// recursively go through all steps to make sure it's not using manual_ingest_pipeline
307+
validateNoManualIngestPipelineUsage(this._definition.ingest.processing.steps);
308+
}
309+
302310
if (!existsInStartingState) {
303311
// Check for conflicts
304312
const { existsAsIndex, existsAsManagedDataStream, existsAsDataStream } =

‎x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_type_selector.tsx‎

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { FormattedMessage } from '@kbn/i18n-react';
1313
import { useController, useFormContext, useWatch } from 'react-hook-form';
1414
import type { DocLinksStart } from '@kbn/core/public';
1515
import type { ProcessorType } from '@kbn/streamlang';
16+
import { Streams } from '@kbn/streams-schema';
1617
import { useKibana } from '../../../../hooks/use_kibana';
1718
import { getDefaultFormStateByType } from '../utils';
1819
import type { ProcessorFormState } from '../types';
@@ -42,6 +43,9 @@ export const ProcessorTypeSelector = ({
4243
});
4344

4445
const processorType = useWatch<{ action: ProcessorType }>({ name: 'action' });
46+
const isWired = useStreamEnrichmentSelector((snapshot) =>
47+
Streams.WiredStream.GetResponse.is(snapshot.context.definition)
48+
);
4549

4650
const grokCollection = useStreamEnrichmentSelector((state) => state.context.grokCollection);
4751

@@ -54,19 +58,21 @@ export const ProcessorTypeSelector = ({
5458
reset(formState);
5559
};
5660

61+
const selectorOptions = React.useMemo(() => getProcessorTypeSelectorOptions(isWired), [isWired]);
62+
5763
return (
5864
<EuiFormRow
5965
fullWidth
6066
label={i18n.translate(
6167
'xpack.streams.streamDetailView.managementTab.enrichment.processor.typeSelectorLabel',
6268
{ defaultMessage: 'Processor' }
6369
)}
64-
helpText={getProcessorDescription(core.docLinks)(processorType)}
70+
helpText={getProcessorDescription(core.docLinks, isWired)(processorType)}
6571
>
6672
<EuiSuperSelect
6773
data-test-subj="streamsAppProcessorTypeSelector"
6874
disabled={disabled}
69-
options={processorTypeSelectorOptions}
75+
options={selectorOptions}
7076
isInvalid={fieldState.invalid}
7177
valueOfSelected={field.value}
7278
onChange={handleChange}
@@ -80,7 +86,7 @@ export const ProcessorTypeSelector = ({
8086
);
8187
};
8288

83-
const availableProcessors: TAvailableProcessors = {
89+
const getAvailableProcessors: (isWired: boolean) => Partial<TAvailableProcessors> = (isWired) => ({
8490
date: {
8591
type: 'date',
8692
inputDisplay: 'Date',
@@ -171,22 +177,29 @@ const availableProcessors: TAvailableProcessors = {
171177
},
172178
},
173179
...configDrivenProcessors,
174-
manual_ingest_pipeline: {
175-
type: 'manual_ingest_pipeline',
176-
inputDisplay: 'Manual pipeline configuration',
177-
getDocUrl: () => (
178-
<FormattedMessage
179-
id="xpack.streams.streamDetailView.managementTab.enrichment.processor.manualIngestPipelineHelpText"
180-
defaultMessage="Specify an array of ingest pipeline processors using JSON."
181-
/>
182-
),
183-
},
184-
};
180+
...(isWired
181+
? {}
182+
: {
183+
manual_ingest_pipeline: {
184+
type: 'manual_ingest_pipeline',
185+
inputDisplay: 'Manual pipeline configuration',
186+
getDocUrl: () => (
187+
<FormattedMessage
188+
id="xpack.streams.streamDetailView.managementTab.enrichment.processor.manualIngestPipelineHelpText"
189+
defaultMessage="Specify an array of ingest pipeline processors using JSON."
190+
/>
191+
),
192+
},
193+
}),
194+
});
185195

186-
const getProcessorDescription = (docLinks: DocLinksStart) => (type: ProcessorType) => {
187-
return availableProcessors[type].getDocUrl(docLinks);
188-
};
196+
const getProcessorDescription =
197+
(docLinks: DocLinksStart, isWired: boolean) => (type: ProcessorType) => {
198+
return getAvailableProcessors(isWired)[type]?.getDocUrl(docLinks);
199+
};
189200

190-
const processorTypeSelectorOptions = Object.values(availableProcessors).map(
191-
({ type, inputDisplay }) => ({ value: type, inputDisplay })
192-
);
201+
const getProcessorTypeSelectorOptions = (isWired: boolean) =>
202+
Object.values(getAvailableProcessors(isWired)).map(({ type, inputDisplay }) => ({
203+
value: type,
204+
inputDisplay,
205+
}));

‎x-pack/platform/test/api_integration_deployment_agnostic/apis/streams/basic.ts‎

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -595,12 +595,6 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
595595

596596
after(async () => {
597597
await disableStreams(apiClient);
598-
599-
await esClient.indices.deleteDataStream({ name: 'logs.invalid_pipeline' });
600-
await esClient.indices.deleteIndexTemplate({ name: 'logs.invalid_pipeline@stream' });
601-
await esClient.cluster.deleteComponentTemplate({
602-
603-
});
604598
});
605599

606600
it('inherit fields', async () => {
@@ -678,47 +672,6 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
678672
);
679673
});
680674

681-
it('reports when it fails to create a stream', async () => {
682-
const body: Streams.WiredStream.UpsertRequest = {
683-
dashboards: [],
684-
queries: [],
685-
rules: [],
686-
stream: {
687-
description: 'Should cause a failure due to invalid ingest pipeline',
688-
ingest: {
689-
lifecycle: { inherit: {} },
690-
processing: {
691-
steps: [
692-
{
693-
action: 'manual_ingest_pipeline',
694-
processors: [
695-
{
696-
set: {
697-
field: 'fails',
698-
value: 'whatever',
699-
fail: 'because this property is not valid',
700-
},
701-
},
702-
],
703-
},
704-
],
705-
},
706-
wired: {
707-
fields: {},
708-
routing: [],
709-
},
710-
},
711-
},
712-
};
713-
714-
const response = await putStream(apiClient, 'logs.invalid_pipeline', body, 500);
715-
716-
expect((response as any).message).to.contain('Failed to change state:');
717-
expect((response as any).message).to.contain(
718-
`The cluster state may be inconsistent. If you experience issues, please use the resync API to restore a consistent state.`
719-
);
720-
});
721-
722675
it('does not allow super deeply nested streams', async () => {
723676
const body: Streams.WiredStream.UpsertRequest = {
724677
dashboards: [],

0 commit comments

Comments
 (0)