Skip to content

Commit 3e38db0

Browse files
committed
Add support for CEL filters to dispatcher
A dispatcher is now capable to filter out invalid events using CEL expressions. This allows us to filter out invalid events on dispatcher level and stop the request before reaching Tekton pipeline listeners. JIRA: ISV-6225 Signed-off-by: Ales Raszka <[email protected]>
1 parent 38d6ff6 commit 3e38db0

File tree

12 files changed

+403
-10
lines changed

12 files changed

+403
-10
lines changed

ansible/inventory/group_vars/operator-pipeline-integration-tests-community.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ operator_pipeline_dispatcher_config:
1818
max_capacity: "{{ operator_pipeline_dispatcher_hosted_capacity }}"
1919
namespace: "{{ oc_namespace }}"
2020
callback_url: "{{ operator_pipeline_community_pipeline_callback_url }}"
21+
filter:
22+
cel_expression: "{{ operator_pipeline_hosted_pipeline_cel_filter | replace('\n', '') }}"
2123

2224
- name: Release pipeline for community operators
2325
events: "{{ operator_pipeline_dispatcher_release_pipeline_events }}"
@@ -28,3 +30,5 @@ operator_pipeline_dispatcher_config:
2830
max_capacity: "{{ operator_pipeline_dispatcher_release_capacity }}"
2931
namespace: "{{ oc_namespace }}"
3032
callback_url: "{{ operator_pipeline_community_pipeline_callback_url }}"
33+
filter:
34+
cel_expression: "{{ operator_pipeline_release_pipeline_cel_filter | replace('\n', '') }}"

ansible/inventory/group_vars/operator-pipeline-integration-tests.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ operator_pipeline_dispatcher_config:
4949
max_capacity: "{{ operator_pipeline_dispatcher_hosted_capacity }}"
5050
namespace: "{{ oc_namespace }}"
5151
callback_url: "{{ operator_pipeline_callback_url }}"
52+
filter:
53+
cel_expression: "{{ operator_pipeline_hosted_pipeline_cel_filter | replace('\n', '') }}"
54+
5255
- name: Release pipeline for certified operators
5356
events: "{{ operator_pipeline_dispatcher_release_pipeline_events }}"
5457
full_repository_name: "{{ integration_tests_git_upstream_repo }}"
@@ -58,3 +61,5 @@ operator_pipeline_dispatcher_config:
5861
max_capacity: "{{ operator_pipeline_dispatcher_release_capacity }}"
5962
namespace: "{{ oc_namespace }}"
6063
callback_url: "{{ operator_pipeline_callback_url }}"
64+
filter:
65+
cel_expression: "{{ operator_pipeline_release_pipeline_cel_filter | replace('\n', '') }}"

ansible/inventory/group_vars/operator-pipeline.yml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,34 @@ operator_pipeline_certified_operators_repository_name: "redhat-openshift-ecosyst
8585
operator_pipeline_marketplace_operators_repository_name: "redhat-openshift-ecosystem/redhat-marketplace-operators-preprod"
8686
operator_pipeline_community_operators_repository_name: "redhat-openshift-ecosystem/community-operators-pipeline-preprod"
8787

88+
operator_pipeline_hosted_pipeline_cel_filter: >-
89+
(
90+
(
91+
body.action in ["opened", "reopened", "synchronize", "ready_for_review"]
92+
&& body.pull_request.base.ref == "{{ branch }}"
93+
) ||
94+
(
95+
body.action == "labeled"
96+
&& body.label.name == "pipeline/trigger-hosted"
97+
&& body.pull_request.base.ref == "{{ branch }}"
98+
)
99+
)
100+
101+
operator_pipeline_release_pipeline_cel_filter: >-
102+
(
103+
(
104+
body.action == "closed"
105+
&& body.pull_request.base.ref == "{{ branch }}"
106+
&& body.pull_request.merged == true
107+
) ||
108+
(
109+
body.action == "labeled"
110+
&& body.label.name == "pipeline/trigger-release"
111+
&& body.pull_request.base.ref == "{{ branch }}"
112+
&& body.pull_request.merged == true
113+
)
114+
)
115+
88116
operator_pipeline_dispatcher_config:
89117
- name: Hosted pipeline for certified operators
90118
events: "{{ operator_pipeline_dispatcher_hosted_pipeline_events }}"
@@ -95,6 +123,9 @@ operator_pipeline_dispatcher_config:
95123
max_capacity: "{{ operator_pipeline_dispatcher_hosted_capacity }}"
96124
namespace: "{{ oc_namespace }}"
97125
callback_url: "{{ operator_pipeline_callback_url }}"
126+
filter:
127+
cel_expression: "{{ operator_pipeline_hosted_pipeline_cel_filter | replace('\n', '') }}"
128+
98129
- name: Release pipeline for certified operators
99130
events: "{{ operator_pipeline_dispatcher_release_pipeline_events }}"
100131
full_repository_name: "{{ operator_pipeline_certified_operators_repository_name }}"
@@ -104,6 +135,8 @@ operator_pipeline_dispatcher_config:
104135
max_capacity: "{{ operator_pipeline_dispatcher_release_capacity }}"
105136
namespace: "{{ oc_namespace }}"
106137
callback_url: "{{ operator_pipeline_callback_url }}"
138+
filter:
139+
cel_expression: "{{ operator_pipeline_release_pipeline_cel_filter | replace('\n', '') }}"
107140

108141
- name: Hosted pipeline for marketplace operators
109142
events: "{{ operator_pipeline_dispatcher_hosted_pipeline_events }}"
@@ -114,6 +147,8 @@ operator_pipeline_dispatcher_config:
114147
max_capacity: "{{ operator_pipeline_dispatcher_hosted_capacity }}"
115148
namespace: "{{ oc_namespace }}"
116149
callback_url: "{{ operator_pipeline_callback_url }}"
150+
filter:
151+
cel_expression: "{{ operator_pipeline_hosted_pipeline_cel_filter | replace('\n', '') }}"
117152

118153
- name: Release pipeline for marketplace operators
119154
events: "{{ operator_pipeline_dispatcher_release_pipeline_events }}"
@@ -124,6 +159,8 @@ operator_pipeline_dispatcher_config:
124159
max_capacity: "{{ operator_pipeline_dispatcher_release_capacity }}"
125160
namespace: "{{ oc_namespace }}"
126161
callback_url: "{{ operator_pipeline_callback_url }}"
162+
filter:
163+
cel_expression: "{{ operator_pipeline_release_pipeline_cel_filter | replace('\n', '') }}"
127164

128165
- name: Hosted pipeline for community operators
129166
events: "{{ operator_pipeline_dispatcher_hosted_pipeline_events }}"
@@ -134,6 +171,8 @@ operator_pipeline_dispatcher_config:
134171
max_capacity: "{{ operator_pipeline_dispatcher_hosted_capacity }}"
135172
namespace: "{{ oc_namespace }}"
136173
callback_url: "{{ operator_pipeline_community_pipeline_callback_url }}"
174+
filter:
175+
cel_expression: "{{ operator_pipeline_hosted_pipeline_cel_filter | replace('\n', '') }}"
137176

138177
- name: Release pipeline for community operators
139178
events: "{{ operator_pipeline_dispatcher_release_pipeline_events }}"
@@ -144,3 +183,5 @@ operator_pipeline_dispatcher_config:
144183
max_capacity: "{{ operator_pipeline_dispatcher_release_capacity }}"
145184
namespace: "{{ oc_namespace }}"
146185
callback_url: "{{ operator_pipeline_community_pipeline_callback_url }}"
186+
filter:
187+
cel_expression: "{{ operator_pipeline_release_pipeline_cel_filter | replace('\n', '') }}"

operator-pipeline-images/operatorcert/webhook_dispatcher/api.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import flask
1010
from flask import jsonify, request
11+
1112
from operatorcert.webhook_dispatcher.config import load_config
1213
from operatorcert.webhook_dispatcher.database import get_database, get_db_session
1314
from operatorcert.webhook_dispatcher.models import (
@@ -43,6 +44,7 @@ def event_to_dict(event: WebhookEvent) -> dict[str, Any]:
4344
return {
4445
"id": event.id,
4546
"delivery_id": event.delivery_id,
47+
"action": event.action,
4648
"repository_full_name": event.repository_full_name,
4749
"pull_request_number": event.pull_request_number,
4850
"processed": event.processed,

operator-pipeline-images/operatorcert/webhook_dispatcher/config.py

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
"""A module for configuration models."""
22

33
import os
4-
from typing import List, Optional
4+
from typing import Any, List, Optional
55
from urllib.parse import quote_plus
66

7+
import celpy
8+
import celpy.celparser
79
import yaml
8-
from pydantic import BaseModel, Field
10+
from pydantic import BaseModel, ConfigDict, Field, field_validator
911

1012

1113
class DatabaseConfig(BaseModel):
@@ -60,6 +62,42 @@ class CapacityConfig(BaseModel):
6062
namespace: str
6163

6264

65+
class Filter(BaseModel):
66+
"""Filter configuration for webhook events."""
67+
68+
model_config = ConfigDict(arbitrary_types_allowed=True)
69+
70+
cel_expression: celpy.Expression[Any] | None = Field(
71+
default=None,
72+
description="CEL expression to filter events. "
73+
"If empty, no filtering is applied.",
74+
)
75+
76+
@field_validator("cel_expression", mode="before")
77+
@classmethod
78+
def validate_cel_expression_and_compile(
79+
cls, value: str
80+
) -> celpy.Expression[Any] | None:
81+
"""
82+
Validate and parse the CEL expression syntax.
83+
84+
Raise ValueError if the expression is invalid.
85+
86+
Args:
87+
value (celpy.Expression[Any] | None): A parsed cel expression if
88+
available or None.
89+
"""
90+
if not value:
91+
return None
92+
93+
try:
94+
env = celpy.Environment()
95+
ast = env.compile(value)
96+
return ast
97+
except celpy.celparser.CELParseError as e:
98+
raise ValueError(f"Invalid CEL expression: {e}") from e
99+
100+
63101
class DispatcherConfigItem(BaseModel):
64102
"""Configuration for a webhook dispatcher item."""
65103

@@ -68,6 +106,7 @@ class DispatcherConfigItem(BaseModel):
68106
full_repository_name: str
69107
callback_url: str
70108
capacity: CapacityConfig
109+
filter: Optional[Filter] = None
71110

72111

73112
class DispatcherConfig(BaseModel):

operator-pipeline-images/operatorcert/webhook_dispatcher/dispatcher.py

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33
and dispatches them to the appropriate pipelines based on the configuration.
44
"""
55

6+
import asyncio
67
import logging
7-
import time
88
from datetime import datetime
9+
from typing import Any
910

11+
import celpy
12+
import celpy.adapter
13+
import celpy.evaluation
1014
from operatorcert.webhook_dispatcher.config import (
1115
DispatcherConfig,
1216
DispatcherConfigItem,
@@ -59,7 +63,7 @@ async def run(self) -> None:
5963

6064
except Exception: # pylint: disable=broad-except
6165
LOGGER.exception("Error in event dispatcher")
62-
time.sleep(5)
66+
await asyncio.sleep(5)
6367

6468
async def process_repository_events(
6569
self, repository_events: dict[int, list[WebhookEvent]]
@@ -154,9 +158,50 @@ def assign_configs_to_event(
154158
continue
155159
if event.action not in item.events:
156160
continue
161+
if item.filter and item.filter.cel_expression:
162+
if not self.match_cel_expression(
163+
event,
164+
item.filter.cel_expression,
165+
):
166+
LOGGER.debug(
167+
"Event %s did not match the CEL expression for config %s",
168+
event.id,
169+
item.name,
170+
)
171+
continue
157172
configs.append(item)
158173
return configs
159174

175+
def match_cel_expression(
176+
self,
177+
event: WebhookEvent,
178+
cel_expression: celpy.Expression[Any],
179+
) -> bool:
180+
"""
181+
Match the event against a CEL expression and return whether it matches.
182+
183+
Args:
184+
event (WebhookEvent): A WebhookEvent object containing event data.
185+
cel_expression (celpy.Expression): A CEL expression to match against the event.
186+
187+
Returns:
188+
bool: A True if the event matches the CEL expression, False otherwise.
189+
"""
190+
try:
191+
env = celpy.Environment()
192+
program = env.program(cel_expression)
193+
194+
context = {
195+
"body": celpy.adapter.json_to_cel(event.payload),
196+
"headers": celpy.adapter.json_to_cel(event.request_headers),
197+
}
198+
199+
result = program.evaluate(context)
200+
return bool(result)
201+
except (celpy.evaluation.CELEvalError, celpy.evaluation.CELUnsupportedError):
202+
LOGGER.exception("Failed to evaluate CEL.")
203+
return False
204+
160205
def convert_to_pipeline_events(self, event: WebhookEvent) -> list[PipelineEvent]:
161206
"""
162207
Convert a WebhookEvent to a PipelineEvents with the assigned configuration.
@@ -198,6 +243,10 @@ async def process_pipeline_event(self, pipeline_event: PipelineEvent) -> None:
198243
# Don't mark the event as processed, so it will be retried in the next loop
199244
return
200245

246+
# Give some time for the pipeline to start so it can be tracked
247+
# by the capacity manager in the next loop
248+
await asyncio.sleep(10)
249+
201250
event.processed = True
202251
event.processing_error = None
203252
event.processed_at = datetime.now()

operator-pipeline-images/tests/webhook_dispatcher/test_api.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
CapacityConfig,
99
DispatcherConfig,
1010
DispatcherConfigItem,
11+
Filter,
1112
SecurityConfig,
1213
WebhookDispatcherConfig,
1314
)
@@ -30,6 +31,7 @@ def test_get_config(mock_load_config: MagicMock) -> None:
3031
max_capacity=10,
3132
namespace="test",
3233
),
34+
filter=Filter(cel_expression="body.action == 'pull_request'"), # type: ignore[arg-type]
3335
)
3436
],
3537
),
@@ -48,6 +50,7 @@ def test_event_to_dict() -> None:
4850
event = WebhookEvent(
4951
id=1,
5052
delivery_id="123",
53+
action="opened",
5154
repository_full_name="test/test",
5255
pull_request_number=123,
5356
processed=False,
@@ -59,6 +62,7 @@ def test_event_to_dict() -> None:
5962
assert api.event_to_dict(event) == {
6063
"id": 1,
6164
"delivery_id": "123",
65+
"action": "opened",
6266
"repository_full_name": "test/test",
6367
"pull_request_number": 123,
6468
"processed": False,
@@ -97,6 +101,7 @@ def test_github_pipeline_webhook(
97101
max_capacity=10,
98102
namespace="test",
99103
),
104+
filter=None,
100105
)
101106
],
102107
),

operator-pipeline-images/tests/webhook_dispatcher/test_config.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
from unittest.mock import MagicMock, patch
22

3-
from operatorcert.webhook_dispatcher.config import load_config
3+
import lark
4+
from operatorcert.webhook_dispatcher.config import Filter, load_config
5+
import pytest
6+
from pydantic import ValidationError
47

58

69
@patch("operatorcert.webhook_dispatcher.config.yaml.safe_load")
@@ -21,6 +24,9 @@ def test_load_config(mock_open: MagicMock, mock_yaml_load: MagicMock) -> None:
2124
"pipeline_name": "test",
2225
"namespace": "test",
2326
},
27+
"filter": {
28+
"cel_expression": "body.action == 'push'",
29+
},
2430
}
2531
]
2632
},
@@ -34,3 +40,17 @@ def test_load_config(mock_open: MagicMock, mock_yaml_load: MagicMock) -> None:
3440
assert config.dispatcher.items[0].name == "test"
3541
assert config.dispatcher.items[0].events == ["push"]
3642
assert config.dispatcher.items[0].full_repository_name == "test/test"
43+
44+
45+
def test_cel_expression_compilation() -> None:
46+
# Valid expression
47+
filter_config = Filter(cel_expression='body.action == "push"') # type: ignore[arg-type]
48+
assert isinstance(filter_config.cel_expression, lark.Tree)
49+
50+
# Empty expression
51+
filter_config_no_expr = Filter(cel_expression="") # type: ignore[arg-type]
52+
assert filter_config_no_expr.cel_expression is None
53+
54+
# Invalid expression
55+
with pytest.raises(ValidationError):
56+
Filter(cel_expression="body.action = push") # type: ignore[arg-type]

0 commit comments

Comments
 (0)