Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 44 additions & 23 deletions detection_rules/kbwrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def kibana_import_rules( # noqa: PLR0915
) -> tuple[dict[str, Any], list[RuleResource]]:
"""Import rules into Kibana."""

def _handle_response_errors(response: dict[str, Any]) -> None:
def _handle_response_errors(response: dict[str, Any], imported_exception_dicts: list[list[dict[str, Any]]]) -> None:
"""Handle errors from the import response."""

def _parse_list_id(s: str) -> str | None:
Expand All @@ -123,7 +123,7 @@ def _parse_list_id(s: str) -> str | None:
workaround_errors: list[str] = []
workaround_error_types: set[str] = set()

flattened_exceptions: list[dict[str, Any]] = [e for sublist in exception_dicts for e in sublist]
flattened_exceptions: list[dict[str, Any]] = [e for sublist in imported_exception_dicts for e in sublist]
all_exception_list_ids: set[str] = {exception["list_id"] for exception in flattened_exceptions}

click.echo(f"{len(response['errors'])} rule(s) failed to import!")
Expand Down Expand Up @@ -173,36 +173,57 @@ def _process_imported_items(
click.echo(f" - {ids_str}")

kibana = ctx.obj["kibana"]
rule_dicts = [r.contents.to_api_format() for r in rules]
rule_ids = {rule["rule_id"] for rule in rule_dicts}
batch_size = definitions.KIBANA_IMPORT_BATCH_SIZE
rules_list = list(rules)
imported_exception_dicts: list[list[dict[str, Any]]] = []
imported_action_connectors_dicts: list[list[dict[str, Any]]] = []
successful_rule_ids: list[str] = []
all_errors: list[dict[str, Any]] = []
results: list[RuleResource] = []

with kibana:
cl = GenericCollection.default()
exception_dicts: list[list[dict[str, Any]]] = [
d.contents.to_api_format() # type: ignore[reportAttributeAccessIssue, reportUnknownMemberType]
for d in cl.items_matching(TOMLExceptionContents, rule_ids)
]
action_connectors_dicts: list[list[dict[str, Any]]] = [
d.contents.to_api_format() # type: ignore[reportAttributeAccessIssue, reportUnknownMemberType]
for d in cl.items_matching(TOMLActionConnectorContents, rule_ids)
]
response, successful_rule_ids, results = RuleResource.import_rules( # type: ignore[reportUnknownMemberType]
rule_dicts,
exception_dicts,
action_connectors_dicts,
overwrite=overwrite,
overwrite_exceptions=overwrite_exceptions,
overwrite_action_connectors=overwrite_action_connectors,
)
for i in range(0, len(rules_list), batch_size):
batched_rules = rules_list[i : i + batch_size]
rule_dicts = [r.contents.to_api_format() for r in batched_rules]
rule_ids = {rule["rule_id"] for rule in rule_dicts}

exception_dicts: list[list[dict[str, Any]]] = [
d.contents.to_api_format() # type: ignore[reportAttributeAccessIssue, reportUnknownMemberType]
for d in cl.items_matching(TOMLExceptionContents, rule_ids)
]
action_connectors_dicts: list[list[dict[str, Any]]] = [
d.contents.to_api_format() # type: ignore[reportAttributeAccessIssue, reportUnknownMemberType]
for d in cl.items_matching(TOMLActionConnectorContents, rule_ids)
]

response, batch_successful_rule_ids, batch_results = RuleResource.import_rules( # type: ignore[reportUnknownMemberType]
rule_dicts,
exception_dicts,
action_connectors_dicts,
overwrite=overwrite,
overwrite_exceptions=overwrite_exceptions,
overwrite_action_connectors=overwrite_action_connectors,
)
response = cast("dict[str, Any]", response)

imported_exception_dicts.extend(exception_dicts)
imported_action_connectors_dicts.extend(action_connectors_dicts)
successful_rule_ids.extend(cast("list[str]", batch_successful_rule_ids))
all_errors.extend(cast("list[dict[str, Any]]", response.get("errors", [])))
results.extend(cast("list[RuleResource]", batch_results))

response: dict[str, Any] = {"errors": all_errors}

if successful_rule_ids:
click.echo(f"{len(successful_rule_ids)} rule(s) successfully imported") # type: ignore[reportUnknownArgumentType]
rule_str = "\n - ".join(successful_rule_ids) # type: ignore[reportUnknownArgumentType]
click.echo(f" - {rule_str}")
if response["errors"]:
_handle_response_errors(response) # type: ignore[reportUnknownArgumentType]
_handle_response_errors(response, imported_exception_dicts) # type: ignore[reportUnknownArgumentType]
else:
_process_imported_items(exception_dicts, "exception list(s)", "list_id") # type: ignore[reportUnknownArgumentType]
_process_imported_items(action_connectors_dicts, "action connector(s)", "id") # type: ignore[reportUnknownArgumentType]
_process_imported_items(imported_exception_dicts, "exception list(s)", "list_id") # type: ignore[reportUnknownArgumentType]
_process_imported_items(imported_action_connectors_dicts, "action connector(s)", "id") # type: ignore[reportUnknownArgumentType]

return response, results # type: ignore[reportUnknownVariableType]

Expand Down
1 change: 1 addition & 0 deletions detection_rules/schemas/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def validator_wrapper(value: Any) -> Any:
HTTP_STATUS_BAD_REQUEST = 400
ASSET_TYPE = "security_rule"
SAVED_OBJECT_TYPE = "security-rule"
KIBANA_IMPORT_BATCH_SIZE = 50

DATE_PATTERN = re.compile(r"^\d{4}/\d{2}/\d{2}$")
MATURITY_LEVELS = ["development", "experimental", "beta", "production", "deprecated"]
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "detection_rules"
version = "1.6.1"
version = "1.6.2"
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine."
readme = "README.md"
requires-python = ">=3.12"
Expand Down
Loading