Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import useGetPermissionsByScope from 'hooks/useScopePermissions';
import RuleDeprecationInfo from 'components/rules/RuleDeprecationInfo';
import usePermissions from 'hooks/usePermissions';

import PipelineProcessingErrors from './PipelineProcessingErrors';

import ButtonToolbar from '../bootstrap/ButtonToolbar';
import { Spinner } from '../common';

Expand Down Expand Up @@ -141,15 +143,19 @@ const PipelineListItem = ({ pipeline, pipelines, connections, streams, onDeleteP
noConnectionsMessage={<em>Not connected</em>}
/>
</StreamListTD>
<td>
<PipelineProcessingErrors pipeline={pipeline} />
</td>
<td>{_formatStages()}</td>
<td>
<ButtonToolbar>
<LinkContainer to={Routes.SYSTEM.PIPELINES.PIPELINE(id)}>
<LinkContainer to={Routes.SYSTEM.PIPELINES.PIPELINE(id)} aria-label='Edit Pipeline'>
<Button disabled={!isPermitted('pipeline:edit')} bsSize="xsmall">
Edit
</Button>
</LinkContainer>
<Button
aria-label='Delete Pipeline'
disabled={!isPermitted('pipeline:delete') || isNotDeletable}
bsStyle="danger"
bsSize="xsmall"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
import * as React from 'react';
import { render, screen } from 'wrappedTestingLibrary';

import type { PipelineType } from 'components/pipelines/types';
import usePipelineRulesMetadata from 'components/rules/hooks/usePipelineRulesMetadata';
import { useStore } from 'stores/connect';
import { MetricsActions } from 'stores/metrics/MetricsStore';

import PipelineProcessingErrors, { getPipelineRuleFailureMetricNames } from './PipelineProcessingErrors';

jest.mock('components/rules/hooks/usePipelineRulesMetadata');
jest.mock('stores/connect', () => ({
__esModule: true,
useStore: jest.fn(),
}));
jest.mock('stores/metrics/MetricsStore', () => ({
MetricsStore: {},
MetricsActions: {
addGlobal: jest.fn(),
removeGlobal: jest.fn(),
},
}));
jest.mock('components/metrics', () => ({
CounterRate: ({ metric }: { metric: { count: number } }) => (
<span data-testid="pipeline-processing-errors-rate">{metric?.count} errors/s</span>
),
}));

describe('PipelineProcessingErrors', () => {
const pipeline: PipelineType = {
id: 'pipeline-1',
title: 'Test Pipeline',
description: 'Test pipeline description',
source: '',
created_at: '2024-01-01T00:00:00.000Z',
modified_at: '2024-01-01T00:00:00.000Z',
stages: [
{ stage: 0, match: 'ALL', rules: ['rule-1-title'] },
{ stage: 2, match: 'EITHER', rules: ['rule-2-title'] },
],
errors: null,
has_deprecated_functions: false,
_scope: 'DEFAULT',
};

const mockPipelineRulesMetadata = {
functions: [],
streams: [],
rules: ['rule-1', 'rule-2'],
deprecated_functions: [],
};

beforeEach(() => {
(usePipelineRulesMetadata as jest.Mock).mockReturnValue({
data: mockPipelineRulesMetadata,
isLoading: false,
refetch: jest.fn(),
});

(useStore as jest.Mock).mockReturnValue({
metrics: {
node1: {
'org.graylog.plugins.pipelineprocessor.ast.Rule.rule-1.pipeline-1.0.failed': {
type: 'meter',
metric: { rate: { total: 2 } },
},
'org.graylog.plugins.pipelineprocessor.ast.Rule.rule-2.pipeline-1.2.failed': {
type: 'meter',
metric: { rate: { total: 3 } },
},
},
node2: {
'org.graylog.plugins.pipelineprocessor.ast.Rule.rule-1.pipeline-1.0.failed': {
type: 'meter',
metric: { rate: { total: 5 } },
},
},
},
});

jest.clearAllMocks();
});

it('builds unique rule failure metric names per stage', () => {
const names = getPipelineRuleFailureMetricNames(
{
...pipeline,
stages: [
{ stage: 0, match: 'ALL', rules: [] },
{ stage: 0, match: 'EITHER', rules: [] },
{ stage: 1, match: 'PASS', rules: [] },
],
},
['rule-1', 'rule-1'],
);

expect(names).toEqual([
'org.graylog.plugins.pipelineprocessor.ast.Rule.rule-1.pipeline-1.0.failed',
'org.graylog.plugins.pipelineprocessor.ast.Rule.rule-1.pipeline-1.1.failed',
]);
});

it('registers metrics and renders total failures across nodes', () => {
const { unmount } = render(<PipelineProcessingErrors pipeline={pipeline} />);

const expectedMetricNames = [
'org.graylog.plugins.pipelineprocessor.ast.Rule.rule-1.pipeline-1.0.failed',
'org.graylog.plugins.pipelineprocessor.ast.Rule.rule-1.pipeline-1.2.failed',
'org.graylog.plugins.pipelineprocessor.ast.Rule.rule-2.pipeline-1.0.failed',
'org.graylog.plugins.pipelineprocessor.ast.Rule.rule-2.pipeline-1.2.failed',
];

expect(MetricsActions.addGlobal).toHaveBeenCalledTimes(expectedMetricNames.length);
expectedMetricNames.forEach((name) => {
expect(MetricsActions.addGlobal).toHaveBeenCalledWith(name);
});

expect(screen.getByTestId('pipeline-processing-errors-rate')).toHaveTextContent('10 errors/s');
expect(screen.getByText('(10 total)')).toBeInTheDocument();

unmount();

expect(MetricsActions.removeGlobal).toHaveBeenCalledTimes(expectedMetricNames.length);
expectedMetricNames.forEach((name) => {
expect(MetricsActions.removeGlobal).toHaveBeenCalledWith(name);
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
import * as React from 'react';
import { useEffect, useMemo } from 'react';
import numeral from 'numeral';

import { CounterRate } from 'components/metrics';
import usePipelineRulesMetadata from 'components/rules/hooks/usePipelineRulesMetadata';
import type { PipelineType } from 'components/pipelines/types';
import { useStore } from 'stores/connect';
import type { ClusterMetric, Metric, NodeMetric } from 'stores/metrics/MetricsStore';
import { MetricsActions, MetricsStore } from 'stores/metrics/MetricsStore';

type Props = {
pipeline: PipelineType;
};

const INITIAL_METRIC = {
full_name: '',
count: 0,
};

const METRIC_PREFIX = 'org.graylog.plugins.pipelineprocessor.ast.Rule';
const METRIC_SUFFIX = 'failed';

const metricName = (ruleId: string, pipelineId: string, stage: number) =>
`${METRIC_PREFIX}.${ruleId}.${pipelineId}.${stage}.${METRIC_SUFFIX}`;

const metricCount = (metric?: Metric): number => {
if (!metric) {
return 0;
}

switch (metric.type) {
case 'counter':
return metric.metric.count;
case 'gauge':
return metric.metric.value;
case 'histogram':
return metric.metric.count;
case 'meter':
return metric.metric.rate.total;
case 'timer':
return metric.metric.rate.total;
default:
return 0;
}
};

export const getPipelineRuleFailureMetricNames = (pipeline: PipelineType, ruleIds: string[]): Array<string> => {
const stages = Array.from(new Set(pipeline.stages.map(({ stage }) => stage)));
const uniqueRuleIds = Array.from(new Set(ruleIds));

return uniqueRuleIds.flatMap((ruleId) => stages.map((stage) => metricName(ruleId, pipeline.id, stage)));
};

const PipelineProcessingErrors = ({ pipeline }: Props) => {
const { data: pipelineRulesMetadata } = usePipelineRulesMetadata(pipeline.id, {
enabled: !!pipeline.id,
});
const { metrics }: { metrics: ClusterMetric } = useStore(MetricsStore, (state) => ({
metrics: state.metrics ?? {},
}));

const metricNames = useMemo(
() => getPipelineRuleFailureMetricNames(pipeline, pipelineRulesMetadata?.rules ?? []),
[pipeline, pipelineRulesMetadata?.rules],
);

useEffect(() => {
metricNames.forEach((name) => MetricsActions.addGlobal(name));

return () => {
metricNames.forEach((name) => MetricsActions.removeGlobal(name));
};
}, [metricNames]);

const totalErrors = useMemo(
() =>
Object.values(metrics ?? {}).reduce((clusterTotal: number, nodeMetrics: NodeMetric) => {
const nodeTotal = metricNames.reduce(
(sum: number, currentMetricName: string) => sum + metricCount(nodeMetrics[currentMetricName]),
0,
);

return clusterTotal + nodeTotal;
}, 0),
[metricNames, metrics],
);

return (
<span>
<CounterRate
metric={{
...INITIAL_METRIC,
full_name: `pipeline.${pipeline.id}.failed`,
count: totalErrors,
}}
suffix="errors/s"
/>
<br />
<span className="number-format">({numeral(totalErrors).format('0')} total)</span>
</span>
);
};

export default PipelineProcessingErrors;
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ const ProcessingTimelineComponent = () => {
onDeletePipeline={() => _deletePipeline(pipelineItem)}
/>
);
const headers = ['Pipeline', 'Connected to Streams', 'Processing Timeline', 'Actions'];
const headers = ['Pipeline', 'Connected to Streams', 'Processing Errors', 'Processing Timeline', 'Actions'];

return (
<div>
Expand Down
Loading