Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ def filter_workflows(self, request: Request, organization: Organization) -> Quer
# If specific IDs are provided, skip query and project filtering
return queryset

if raw_detectorlist := request.GET.getlist("detector"):
try:
detector_ids = [int(id) for id in raw_detectorlist]
except ValueError:
raise ValidationError({"detector": ["Invalid detector ID format"]})
queryset = queryset.filter(detectorworkflow__detector_id__in=detector_ids).distinct()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also add a filter here for the users organization -- that way we can prevent any possible security issues where a user asks for a detector not part of their org.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are already adding the org filter at the top of filter_workflows

queryset: QuerySet[Workflow] = Workflow.objects.filter(organization_id=organization.id)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, i didn't notice it was the same queryset 👍


if raw_query := request.GET.get("query"):
for filter in parse_workflow_query(raw_query):
assert isinstance(filter, SearchFilter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ def test_simple(self) -> None:
hits = int(response["X-Hits"])
assert hits == 3

def test_only_returns_workflows_from_organization(self) -> None:
other_org = self.create_organization()
self.create_workflow(organization_id=other_org.id, name="Other Org Workflow")

response = self.get_success_response(self.organization.slug)
assert len(response.data) == 3
workflow_names = {w["name"] for w in response.data}
assert "Other Org Workflow" not in workflow_names
assert workflow_names == {
self.workflow.name,
self.workflow_two.name,
self.workflow_three.name,
}

def test_empty_result(self) -> None:
response = self.get_success_response(
self.organization.slug, qs_params={"query": "aaaaaaaaaaaaa"}
Expand Down Expand Up @@ -346,6 +360,53 @@ def test_query_filter_by_action(self) -> None:
assert len(response3.data) == 2
assert {self.workflow.name, self.workflow_two.name} == {w["name"] for w in response3.data}

def test_filter_by_detector(self) -> None:
Copy link
Contributor

@saponifi3d saponifi3d Jan 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thoughts on adding a test case for selecting a detector not part of your org and ensure it returns the correct status codes? (my guess is we'd either want to 401 or 404)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add a test case for it certainly. I think the current behavior is correct (which is that it will just return an empty list)

project_1 = self.create_project(organization=self.organization)
project_2 = self.create_project(organization=self.organization)
project_3 = self.create_project(organization=self.organization)

detector_1 = self.create_detector(project=project_1, name="Detector 1")
detector_2 = self.create_detector(project=project_2, name="Detector 2")
detector_3 = self.create_detector(project=project_3, name="Detector 3")

self.create_detector_workflow(workflow=self.workflow, detector=detector_1)
self.create_detector_workflow(workflow=self.workflow_two, detector=detector_2)
self.create_detector_workflow(workflow=self.workflow_three, detector=detector_3)

# Filter by single detector
response = self.get_success_response(
self.organization.slug,
qs_params={"detector": str(detector_1.id)},
)
assert len(response.data) == 1
assert response.data[0]["name"] == self.workflow.name

# Filter by multiple detectors
response2 = self.get_success_response(
self.organization.slug,
qs_params=[
("detector", str(detector_1.id)),
("detector", str(detector_2.id)),
],
)
assert len(response2.data) == 2
assert {w["name"] for w in response2.data} == {self.workflow.name, self.workflow_two.name}

# Filter by non-existent detector ID returns no results
response3 = self.get_success_response(
self.organization.slug,
qs_params={"detector": "999999"},
)
assert len(response3.data) == 0

# Invalid detector ID format returns error
response4 = self.get_error_response(
self.organization.slug,
qs_params={"detector": "not-an-id"},
status_code=400,
)
assert response4.data == {"detector": ["Invalid detector ID format"]}

def test_compound_query(self) -> None:
self.create_detector_workflow(
workflow=self.workflow, detector=self.create_detector(project=self.project)
Expand Down
Loading