|
5 | 5 | * 2.0. |
6 | 6 | */ |
7 | 7 |
|
8 | | -import type { IngestProcessorContainer } from '@elastic/elasticsearch/lib/api/types'; |
| 8 | +import type { |
| 9 | + IndicesDataStream, |
| 10 | + IngestProcessorContainer, |
| 11 | +} from '@elastic/elasticsearch/lib/api/types'; |
9 | 12 | import type { |
10 | 13 | IngestStreamLifecycle, |
11 | 14 | StreamDefinition, |
@@ -202,7 +205,11 @@ export class UnwiredStream extends StreamActiveRecord<UnwiredStreamDefinition> { |
202 | 205 | }, |
203 | 206 | }); |
204 | 207 |
|
205 | | - const { pipeline, template } = await this.getPipelineTargets(); |
| 208 | + const pipelineTargets = await this.getPipelineTargets(); |
| 209 | + if (!pipelineTargets) { |
| 210 | + throw new StatusError('Could not find pipeline targets', 500); |
| 211 | + } |
| 212 | + const { pipeline, template } = pipelineTargets; |
206 | 213 | actions.push({ |
207 | 214 | type: 'delete_processor_from_ingest_pipeline', |
208 | 215 | pipeline, |
@@ -253,7 +260,11 @@ export class UnwiredStream extends StreamActiveRecord<UnwiredStreamDefinition> { |
253 | 260 | }, |
254 | 261 | }; |
255 | 262 |
|
256 | | - const { pipeline, template } = await this.getPipelineTargets(); |
| 263 | + const pipelineTargets = await this.getPipelineTargets(); |
| 264 | + if (!pipelineTargets) { |
| 265 | + throw new StatusError('Could not find pipeline targets', 500); |
| 266 | + } |
| 267 | + const { pipeline, template } = pipelineTargets; |
257 | 268 | actions.push({ |
258 | 269 | type: 'append_processor_to_ingest_pipeline', |
259 | 270 | pipeline, |
@@ -290,21 +301,32 @@ export class UnwiredStream extends StreamActiveRecord<UnwiredStreamDefinition> { |
290 | 301 | name: streamManagedPipelineName, |
291 | 302 | }, |
292 | 303 | }); |
293 | | - const { pipeline, template } = await this.getPipelineTargets(); |
294 | | - actions.push({ |
295 | | - type: 'delete_processor_from_ingest_pipeline', |
296 | | - pipeline, |
297 | | - template, |
298 | | - dataStream: this._definition.name, |
299 | | - referencePipeline: streamManagedPipelineName, |
300 | | - }); |
| 304 | + const pipelineTargets = await this.getPipelineTargets(); |
| 305 | + if (pipelineTargets) { |
| 306 | + const { pipeline, template } = pipelineTargets; |
| 307 | + actions.push({ |
| 308 | + type: 'delete_processor_from_ingest_pipeline', |
| 309 | + pipeline, |
| 310 | + template, |
| 311 | + dataStream: this._definition.name, |
| 312 | + referencePipeline: streamManagedPipelineName, |
| 313 | + }); |
| 314 | + } |
301 | 315 | } |
302 | 316 |
|
303 | 317 | return actions; |
304 | 318 | } |
305 | 319 |
|
306 | 320 | private async getPipelineTargets() { |
307 | | - const dataStream = await this.dependencies.streamsClient.getDataStream(this._definition.name); |
| 321 | + let dataStream: IndicesDataStream; |
| 322 | + try { |
| 323 | + dataStream = await this.dependencies.streamsClient.getDataStream(this._definition.name); |
| 324 | + } catch (error) { |
| 325 | + if (isNotFoundError(error)) { |
| 326 | + return undefined; |
| 327 | + } |
| 328 | + throw error; |
| 329 | + } |
308 | 330 | const unmanagedAssets = await getUnmanagedElasticsearchAssets({ |
309 | 331 | dataStream, |
310 | 332 | scopedClusterClient: this.dependencies.scopedClusterClient, |
|
0 commit comments