Skip to content

Commit 269b8e0

Browse files
flash1293CAWilson94
authored andcommitted
🌊 Streams: Actually disable routing when status == 'disabled' (elastic#234545)
I noticed that with elastic#231992 , all the hard work of passing the status of a routing item around is done, but it's not actually reflected in the ingest pipeline. This PR is fixing this problem by filtering out all disabled routing items when constructing the pipeline and adding a test that disabled routing is actually disabled in practice.
1 parent e56fbfb commit 269b8e0

File tree

2 files changed

+51
-17
lines changed

2 files changed

+51
-17
lines changed

‎x-pack/platform/plugins/shared/streams/server/lib/streams/ingest_pipelines/generate_reroute_pipeline.ts‎

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@ interface GenerateReroutePipelineParams {
1717
export function generateReroutePipeline({ definition }: GenerateReroutePipelineParams) {
1818
return {
1919
id: getReroutePipelineName(definition.name),
20-
processors: definition.ingest.wired.routing.map((child) => {
21-
return {
22-
reroute: {
23-
destination: child.destination,
24-
if: conditionToPainless(child.where),
25-
},
26-
};
27-
}),
20+
processors: definition.ingest.wired.routing
21+
.filter((child) => child.status !== 'disabled')
22+
.map((child) => {
23+
return {
24+
reroute: {
25+
destination: child.destination,
26+
if: conditionToPainless(child.where),
27+
},
28+
};
29+
}),
2830
_meta: {
2931
description: `Reroute pipeline for the ${definition.name} stream`,
3032
managed: true,

‎x-pack/platform/test/api_integration_deployment_agnostic/apis/streams/basic.ts‎

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -277,16 +277,21 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
277277
expect(response).to.have.property('acknowledged', true);
278278
});
279279

280+
const accessLogDoc = {
281+
'@timestamp': '2024-01-01T00:00:20.000Z',
282+
message: JSON.stringify({
283+
'log.level': 'info',
284+
'log.logger': 'nginx',
285+
message: 'test',
286+
}),
287+
};
288+
280289
it('Index an Nginx access log message, should goto logs.nginx.access', async () => {
281-
const doc = {
282-
'@timestamp': '2024-01-01T00:00:20.000Z',
283-
message: JSON.stringify({
284-
'log.level': 'info',
285-
'log.logger': 'nginx',
286-
message: 'test',
287-
}),
288-
};
289-
const result = await indexAndAssertTargetStream(esClient, 'logs.nginx.access', doc);
290+
const result = await indexAndAssertTargetStream(
291+
esClient,
292+
'logs.nginx.access',
293+
accessLogDoc
294+
);
290295
expect(result._source).to.eql({
291296
'@timestamp': '2024-01-01T00:00:20.000Z',
292297
body: { text: 'test' },
@@ -298,6 +303,33 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
298303
});
299304
});
300305

306+
it('Does not index to logs.nginx.access if routing is disabled', async () => {
307+
await putStream(apiClient, 'logs.nginx', {
308+
...emptyAssets,
309+
stream: {
310+
description: '',
311+
ingest: {
312+
lifecycle: { inherit: {} },
313+
processing: {
314+
steps: [],
315+
},
316+
wired: {
317+
fields: {},
318+
routing: [
319+
{
320+
destination: 'logs.nginx.access',
321+
where: { field: 'severity_text', eq: 'info' },
322+
status: 'disabled',
323+
},
324+
],
325+
},
326+
},
327+
},
328+
});
329+
330+
await indexAndAssertTargetStream(esClient, 'logs.nginx', accessLogDoc);
331+
});
332+
301333
it('Fork logs to logs.nginx.error with invalid condition', async () => {
302334
const body = {
303335
stream: {

0 commit comments

Comments
 (0)