Skip to content

Commit e2f0fdd

Browse files
authored
🌊 Streams: Make deleting orphaned streams work (#218054)
Currently streams doesn't allow you to delete an orphaned stream because `getPipelineTargets` required the data stream to exist. This PR fixes the problem by handling the case gracefully.
1 parent 6722f14 commit e2f0fdd

File tree

2 files changed

+37
-14
lines changed
  • x-pack
    • platform/plugins/shared/streams/server/lib/streams/state_management/streams
    • test/api_integration/deployment_agnostic/apis/observability/streams

2 files changed

+37
-14
lines changed

‎x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/unwired_stream.ts‎

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
* 2.0.
66
*/
77

8-
import type { IngestProcessorContainer } from '@elastic/elasticsearch/lib/api/types';
8+
import type {
9+
IndicesDataStream,
10+
IngestProcessorContainer,
11+
} from '@elastic/elasticsearch/lib/api/types';
912
import type {
1013
IngestStreamLifecycle,
1114
StreamDefinition,
@@ -202,7 +205,11 @@ export class UnwiredStream extends StreamActiveRecord<UnwiredStreamDefinition> {
202205
},
203206
});
204207

205-
const { pipeline, template } = await this.getPipelineTargets();
208+
const pipelineTargets = await this.getPipelineTargets();
209+
if (!pipelineTargets) {
210+
throw new StatusError('Could not find pipeline targets', 500);
211+
}
212+
const { pipeline, template } = pipelineTargets;
206213
actions.push({
207214
type: 'delete_processor_from_ingest_pipeline',
208215
pipeline,
@@ -253,7 +260,11 @@ export class UnwiredStream extends StreamActiveRecord<UnwiredStreamDefinition> {
253260
},
254261
};
255262

256-
const { pipeline, template } = await this.getPipelineTargets();
263+
const pipelineTargets = await this.getPipelineTargets();
264+
if (!pipelineTargets) {
265+
throw new StatusError('Could not find pipeline targets', 500);
266+
}
267+
const { pipeline, template } = pipelineTargets;
257268
actions.push({
258269
type: 'append_processor_to_ingest_pipeline',
259270
pipeline,
@@ -290,21 +301,32 @@ export class UnwiredStream extends StreamActiveRecord<UnwiredStreamDefinition> {
290301
name: streamManagedPipelineName,
291302
},
292303
});
293-
const { pipeline, template } = await this.getPipelineTargets();
294-
actions.push({
295-
type: 'delete_processor_from_ingest_pipeline',
296-
pipeline,
297-
template,
298-
dataStream: this._definition.name,
299-
referencePipeline: streamManagedPipelineName,
300-
});
304+
const pipelineTargets = await this.getPipelineTargets();
305+
if (pipelineTargets) {
306+
const { pipeline, template } = pipelineTargets;
307+
actions.push({
308+
type: 'delete_processor_from_ingest_pipeline',
309+
pipeline,
310+
template,
311+
dataStream: this._definition.name,
312+
referencePipeline: streamManagedPipelineName,
313+
});
314+
}
301315
}
302316

303317
return actions;
304318
}
305319

306320
private async getPipelineTargets() {
307-
const dataStream = await this.dependencies.streamsClient.getDataStream(this._definition.name);
321+
let dataStream: IndicesDataStream;
322+
try {
323+
dataStream = await this.dependencies.streamsClient.getDataStream(this._definition.name);
324+
} catch (error) {
325+
if (isNotFoundError(error)) {
326+
return undefined;
327+
}
328+
throw error;
329+
}
308330
const unmanagedAssets = await getUnmanagedElasticsearchAssets({
309331
dataStream,
310332
scopedClusterClient: this.dependencies.scopedClusterClient,

‎x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/classic.ts‎

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -583,14 +583,15 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
583583
expect(isUnwiredStreamDefinition(classicStream!.stream)).to.be(true);
584584
});
585585

586-
after(async () => {
587-
await apiClient.fetch('DELETE /api/streams/{name} 2023-10-31', {
586+
it('should allow deleting', async () => {
587+
const response = await apiClient.fetch('DELETE /api/streams/{name} 2023-10-31', {
588588
params: {
589589
path: {
590590
name: ORPHANED_STREAM_NAME,
591591
},
592592
},
593593
});
594+
expect(response.status).to.eql(200);
594595
});
595596
});
596597
});

0 commit comments

Comments
 (0)