Skip to content

Commit 033c828

Browse files
[FR] Add Support for Local Dates Flag (#4582)
* Add support for local dates flag * Use two variables * Add support for import-rules-to-repo * Revert arg formatting * Update comment * Pass Rule Path as Path Object * Update to rule loader function * Streamline metadata function * Also support dictionaries * Bump patch version * Reduce complexity * Add if path exists check * Fix version bump
1 parent ba16e27 commit 033c828

File tree

5 files changed

+52
-30
lines changed

5 files changed

+52
-30
lines changed

detection_rules/cli_utils.py

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -210,18 +210,11 @@ def rule_prompt(path=None, rule_type=None, required_only=True, save=True, verbos
210210
# DEFAULT_PREBUILT_RULES_DIRS[0] is a required directory just as a suggestion
211211
suggested_path = Path(DEFAULT_PREBUILT_RULES_DIRS[0]) / contents['name']
212212
path = Path(path or input(f'File path for rule [{suggested_path}]: ') or suggested_path).resolve()
213-
# Inherit maturity from the rule already exists
214-
maturity = "development"
215-
if path.exists():
216-
rules = RuleCollection()
217-
rules.load_file(path)
218-
if rules:
219-
maturity = rules.rules[0].contents.metadata.maturity
220-
213+
# Inherit maturity and optionally local dates from the rule if it already exists
221214
meta = {
222-
"creation_date": creation_date,
223-
"updated_date": creation_date,
224-
"maturity": maturity,
215+
"creation_date": kwargs.get("creation_date") or creation_date,
216+
"updated_date": kwargs.get("updated_date") or creation_date,
217+
"maturity": "development" or kwargs.get("maturity"),
225218
}
226219

227220
try:

detection_rules/kbwrap.py

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from .main import root
2525
from .misc import add_params, client_error, kibana_options, get_kibana_client, nested_set
2626
from .rule import downgrade_contents_from_rule, TOMLRuleContents, TOMLRule
27-
from .rule_loader import RuleCollection
27+
from .rule_loader import RuleCollection, update_metadata_from_file
2828
from .utils import format_command_options, rulename_to_filename
2929

3030
RULES_CONFIG = parse_rules_config()
@@ -199,12 +199,14 @@ def _process_imported_items(imported_items_list, item_type_description, item_key
199199
@click.option("--export-exceptions", "-e", is_flag=True, help="Include exceptions in export")
200200
@click.option("--skip-errors", "-s", is_flag=True, help="Skip errors when exporting rules")
201201
@click.option("--strip-version", "-sv", is_flag=True, help="Strip the version fields from all rules")
202+
@click.option("--local-creation-date", "-lc", is_flag=True, help="Preserve the local creation date of the rule")
203+
@click.option("--local-updated-date", "-lu", is_flag=True, help="Preserve the local updated date of the rule")
202204
@click.pass_context
203205
def kibana_export_rules(ctx: click.Context, directory: Path, action_connectors_directory: Optional[Path],
204206
exceptions_directory: Optional[Path], default_author: str,
205207
rule_id: Optional[Iterable[str]] = None, export_action_connectors: bool = False,
206-
export_exceptions: bool = False, skip_errors: bool = False, strip_version: bool = False
207-
) -> List[TOMLRule]:
208+
export_exceptions: bool = False, skip_errors: bool = False, strip_version: bool = False,
209+
local_creation_date: bool = False, local_updated_date: bool = False) -> List[TOMLRule]:
208210
"""Export custom rules from Kibana."""
209211
kibana = ctx.obj["kibana"]
210212
kibana_include_details = export_exceptions or export_action_connectors
@@ -232,6 +234,8 @@ def kibana_export_rules(ctx: click.Context, directory: Path, action_connectors_d
232234
return []
233235

234236
rules_results = results
237+
action_connector_results = []
238+
exception_results = []
235239
if kibana_include_details:
236240
# Assign counts to variables
237241
rules_count = results[-1]["exported_rules_count"]
@@ -259,22 +263,23 @@ def kibana_export_rules(ctx: click.Context, directory: Path, action_connectors_d
259263
rule_resource["author"] = rule_resource.get("author") or default_author or [rule_resource.get("created_by")]
260264
if isinstance(rule_resource["author"], str):
261265
rule_resource["author"] = [rule_resource["author"]]
262-
# Inherit maturity from the rule already exists
263-
maturity = "development"
266+
# Inherit maturity and optionally local dates from the rule if it already exists
267+
params = {
268+
"rule": rule_resource,
269+
"maturity": "development",
270+
}
264271
threat = rule_resource.get("threat")
265272
first_tactic = threat[0].get("tactic").get("name") if threat else ""
266273
rule_name = rulename_to_filename(rule_resource.get("name"), tactic_name=first_tactic)
267-
# check if directory / f"{rule_name}" exists
268-
if (directory / f"{rule_name}").exists():
269-
rules = RuleCollection()
270-
rules.load_file(directory / f"{rule_name}")
271-
if rules:
272-
maturity = rules.rules[0].contents.metadata.maturity
273-
274-
contents = TOMLRuleContents.from_rule_resource(
275-
rule_resource, maturity=maturity
274+
275+
save_path = directory / f"{rule_name}"
276+
params.update(
277+
update_metadata_from_file(
278+
save_path, {"creation_date": local_creation_date, "updated_date": local_updated_date}
279+
)
276280
)
277-
rule = TOMLRule(contents=contents, path=directory / f"{rule_name}")
281+
contents = TOMLRuleContents.from_rule_resource(**params)
282+
rule = TOMLRule(contents=contents, path=save_path)
278283
except Exception as e:
279284
if skip_errors:
280285
print(f'- skipping {rule_resource.get("name")} - {type(e).__name__}')

detection_rules/main.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
)
3333
from .rule import TOMLRule, TOMLRuleContents, QueryRuleData
3434
from .rule_formatter import toml_write
35-
from .rule_loader import RuleCollection
35+
from .rule_loader import RuleCollection, update_metadata_from_file
3636
from .schemas import all_versions, definitions, get_incompatible_fields, get_schema_file
3737
from .utils import Ndjson, get_path, get_etc_path, clear_caches, load_dump, load_rule_contents, rulename_to_filename
3838

@@ -128,10 +128,13 @@ def generate_rules_index(ctx: click.Context, query, overwrite, save_files=True):
128128
@click.option("--skip-errors", "-ske", is_flag=True, help="Skip rule import errors")
129129
@click.option("--default-author", "-da", type=str, required=False, help="Default author for rules missing one")
130130
@click.option("--strip-none-values", "-snv", is_flag=True, help="Strip None values from the rule")
131+
@click.option("--local-creation-date", "-lc", is_flag=True, help="Preserve the local creation date of the rule")
132+
@click.option("--local-updated-date", "-lu", is_flag=True, help="Preserve the local updated date of the rule")
131133
def import_rules_into_repo(input_file: click.Path, required_only: bool, action_connector_import: bool,
132134
exceptions_import: bool, directory: click.Path, save_directory: click.Path,
133135
action_connectors_directory: click.Path, exceptions_directory: click.Path,
134-
skip_errors: bool, default_author: str, strip_none_values: bool):
136+
skip_errors: bool, default_author: str, strip_none_values: bool, local_creation_date: bool,
137+
local_updated_date: bool):
135138
"""Import rules from json, toml, or yaml files containing Kibana exported rule(s)."""
136139
errors = []
137140
rule_files = glob.glob(os.path.join(directory, "**", "*.*"), recursive=True) if directory else []
@@ -179,6 +182,12 @@ def import_rules_into_repo(input_file: click.Path, required_only: bool, action_c
179182
if isinstance(contents["author"], str):
180183
contents["author"] = [contents["author"]]
181184

185+
contents.update(
186+
update_metadata_from_file(
187+
Path(rule_path), {"creation_date": local_creation_date, "updated_date": local_updated_date}
188+
)
189+
)
190+
182191
output = rule_prompt(
183192
rule_path,
184193
required_only=required_only,

detection_rules/rule_loader.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
from . import utils
1919
from .config import parse_rules_config
2020
from .rule import (
21-
DeprecatedRule, DeprecatedRuleContents, DictRule, TOMLRule, TOMLRuleContents
21+
DeprecatedRule, DeprecatedRuleContents, DictRule, TOMLRule,
22+
TOMLRuleContents
2223
)
2324
from .schemas import definitions
2425
from .utils import cached, get_path
@@ -116,6 +117,20 @@ def load_locks_from_tag(remote: str, tag: str, version_lock: str = 'detection_ru
116117
return commit_hash, version, deprecated
117118

118119

120+
def update_metadata_from_file(rule_path: Path, fields_to_update: dict) -> dict:
121+
"""Update metadata fields for a rule with local contents."""
122+
contents = {}
123+
if not rule_path.exists():
124+
return contents
125+
local_metadata = RuleCollection().load_file(rule_path).contents.metadata.to_dict()
126+
if local_metadata:
127+
contents["maturity"] = local_metadata.get("maturity", "development")
128+
for field_name, should_update in fields_to_update.items():
129+
if should_update and field_name in local_metadata:
130+
contents[field_name] = local_metadata[field_name]
131+
return contents
132+
133+
119134
@dataclass
120135
class BaseCollection:
121136
"""Base class for collections."""

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "detection_rules"
3-
version = "1.0.9"
3+
version = "1.0.10"
44
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine."
55
readme = "README.md"
66
requires-python = ">=3.12"

0 commit comments

Comments
 (0)