-
Notifications
You must be signed in to change notification settings - Fork 635
Expand file tree
/
Copy pathkbwrap.py
More file actions
581 lines (505 loc) · 26.5 KB
/
kbwrap.py
File metadata and controls
581 lines (505 loc) · 26.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
"""Kibana cli commands."""
import re
import sys
from pathlib import Path
from typing import Any, cast
import click
import kql # type: ignore[reportMissingTypeStubs]
from kibana import RuleResource, Signal # type: ignore[reportMissingTypeStubs]
from .action_connector import (
TOMLActionConnector,
TOMLActionConnectorContents,
build_action_connector_objects,
parse_action_connector_results_from_api,
)
from .cli_utils import multi_collection
from .config import parse_rules_config
from .exception import TOMLException, TOMLExceptionContents, build_exception_objects, parse_exceptions_results_from_api
from .generic_loader import GenericCollection
from .main import root
from .misc import add_params, get_kibana_client, kibana_options, nested_set, raise_client_error
from .rule import TOMLRule, TOMLRuleContents, downgrade_contents_from_rule
from .rule_loader import RawRuleCollection, RuleCollection, update_metadata_from_file
from .schemas import definitions # noqa: TC001
from .utils import CUSTOM_RULES_KQL, format_command_options, rulename_to_filename
RULES_CONFIG = parse_rules_config()
@root.group("kibana")
@add_params(*kibana_options)
@click.pass_context
def kibana_group(ctx: click.Context, **kibana_kwargs: Any) -> None:
"""Commands for integrating with Kibana."""
_ = ctx.ensure_object(dict) # type: ignore[reportUnknownVariableType]
# only initialize an kibana client if the subcommand is invoked without help (hacky)
if sys.argv[-1] in ctx.help_option_names:
click.echo("Kibana client:")
click.echo(format_command_options(ctx))
else:
ctx.obj["kibana"] = get_kibana_client(**kibana_kwargs)
@kibana_group.command("upload-rule")
@multi_collection
@click.option("--replace-id", "-r", is_flag=True, help="Replace rule IDs with new IDs before export")
@click.pass_context
def upload_rule(ctx: click.Context, rules: RuleCollection, replace_id: bool) -> list[RuleResource]:
"""[Deprecated] Upload a list of rule .toml files to Kibana."""
kibana = ctx.obj["kibana"]
api_payloads: list[RuleResource] = []
click.secho(
"WARNING: This command is deprecated as of Elastic Stack version 9.0. Please use `kibana import-rules`.",
fg="yellow",
)
for rule in rules:
try:
payload = downgrade_contents_from_rule(rule, kibana.version, replace_id=replace_id)
except ValueError as e:
raise_client_error(f"{e} in version:{kibana.version}, for rule: {rule.name}", e, ctx=ctx)
api_payloads.append(RuleResource(payload))
with kibana:
results: list[RuleResource] = RuleResource.bulk_create_legacy(api_payloads) # type: ignore[reportUnknownMemberType]
success: list[str] = []
errors: list[str] = []
for result in results:
if "error" in result:
errors.append(f"{result['rule_id']} - {result['error']['message']}")
else:
success.append(result["rule_id"]) # type: ignore[reportUnknownArgumentType]
if success:
click.echo("Successful uploads:\n - " + "\n - ".join(success))
if errors:
click.echo("Failed uploads:\n - " + "\n - ".join(errors))
return results
@kibana_group.command("import-rules")
@multi_collection
@click.option("--overwrite", "-o", is_flag=True, help="Overwrite existing rules")
@click.option("--overwrite-exceptions", "-e", is_flag=True, help="Overwrite exceptions in existing rules")
@click.option(
"--overwrite-action-connectors",
"-ac",
is_flag=True,
help="Overwrite action connectors in existing rules",
)
@click.pass_context
def kibana_import_rules( # noqa: PLR0915
ctx: click.Context,
rules: RuleCollection,
overwrite: bool = False,
overwrite_exceptions: bool = False,
overwrite_action_connectors: bool = False,
) -> tuple[dict[str, Any], list[RuleResource]]:
"""Import rules into Kibana."""
def _handle_response_errors(response: dict[str, Any], imported_exception_dicts: list[list[dict[str, Any]]]) -> None:
"""Handle errors from the import response."""
def _parse_list_id(s: str) -> str | None:
"""Parse the list ID from the error message."""
match = re.search(r'list_id: "(.*?)"', s)
return match.group(1) if match else None
# Re-try to address known Kibana issue: https://github.com/elastic/kibana/issues/143864
workaround_errors: list[str] = []
workaround_error_types: set[str] = set()
flattened_exceptions: list[dict[str, Any]] = [e for sublist in imported_exception_dicts for e in sublist]
all_exception_list_ids: set[str] = {exception["list_id"] for exception in flattened_exceptions}
click.echo(f"{len(response['errors'])} rule(s) failed to import!")
action_connector_validation_error = "Error validating create data"
action_connector_type_error = "expected value of type [string] but got [undefined]"
for error in response["errors"]:
error_message = error["error"]["message"]
click.echo(f" - {error['rule_id']}: ({error['error']['status_code']}) {error_message}")
if "references a non existent exception list" in error_message:
list_id = _parse_list_id(error_message)
if list_id in all_exception_list_ids:
workaround_errors.append(error["rule_id"])
workaround_error_types.add("non existent exception list")
if action_connector_validation_error in error_message and action_connector_type_error in error_message:
workaround_error_types.add("connector still being built")
if workaround_errors:
workaround_errors = list(set(workaround_errors))
if "non existent exception list" in workaround_error_types:
click.echo(
f"Missing exception list errors detected for {len(workaround_errors)} rules. "
"Try re-importing using the following command and rule IDs:\n"
)
click.echo("python -m detection_rules kibana import-rules -o ", nl=False)
click.echo(" ".join(f"-id {rule_id}" for rule_id in workaround_errors))
click.echo()
if "connector still being built" in workaround_error_types:
click.echo(
f"Connector still being built errors detected for {len(workaround_errors)} rules. "
"Please try re-importing the rules again."
)
click.echo()
def _process_imported_items(
imported_items_list: list[list[dict[str, Any]]],
item_type_description: str,
item_key: str,
) -> None:
"""Displays appropriately formatted success message that all items imported successfully."""
all_ids = {item[item_key] for sublist in imported_items_list for item in sublist}
if all_ids:
click.echo(f"{len(all_ids)} {item_type_description} successfully imported")
ids_str = "\n - ".join(all_ids)
click.echo(f" - {ids_str}")
kibana = ctx.obj["kibana"]
batch_size = definitions.KIBANA_IMPORT_BATCH_SIZE
rules_list = list(rules)
imported_exception_dicts: list[list[dict[str, Any]]] = []
imported_action_connectors_dicts: list[list[dict[str, Any]]] = []
successful_rule_ids: list[str] = []
all_errors: list[dict[str, Any]] = []
results: list[RuleResource] = []
with kibana:
cl = GenericCollection.default()
for i in range(0, len(rules_list), batch_size):
batched_rules = rules_list[i : i + batch_size]
rule_dicts = [r.contents.to_api_format() for r in batched_rules]
rule_ids = {rule["rule_id"] for rule in rule_dicts}
exception_dicts: list[list[dict[str, Any]]] = [
d.contents.to_api_format() # type: ignore[reportAttributeAccessIssue, reportUnknownMemberType]
for d in cl.items_matching(TOMLExceptionContents, rule_ids)
]
action_connectors_dicts: list[list[dict[str, Any]]] = [
d.contents.to_api_format() # type: ignore[reportAttributeAccessIssue, reportUnknownMemberType]
for d in cl.items_matching(TOMLActionConnectorContents, rule_ids)
]
response, batch_successful_rule_ids, batch_results = RuleResource.import_rules( # type: ignore[reportUnknownMemberType]
rule_dicts,
exception_dicts,
action_connectors_dicts,
overwrite=overwrite,
overwrite_exceptions=overwrite_exceptions,
overwrite_action_connectors=overwrite_action_connectors,
)
response = cast("dict[str, Any]", response)
imported_exception_dicts.extend(exception_dicts)
imported_action_connectors_dicts.extend(action_connectors_dicts)
successful_rule_ids.extend(cast("list[str]", batch_successful_rule_ids))
all_errors.extend(cast("list[dict[str, Any]]", response.get("errors", [])))
results.extend(cast("list[RuleResource]", batch_results))
response: dict[str, Any] = {"errors": all_errors}
if successful_rule_ids:
click.echo(f"{len(successful_rule_ids)} rule(s) successfully imported") # type: ignore[reportUnknownArgumentType]
rule_str = "\n - ".join(successful_rule_ids) # type: ignore[reportUnknownArgumentType]
click.echo(f" - {rule_str}")
if response["errors"]:
_handle_response_errors(response, imported_exception_dicts) # type: ignore[reportUnknownArgumentType]
else:
_process_imported_items(imported_exception_dicts, "exception list(s)", "list_id") # type: ignore[reportUnknownArgumentType]
_process_imported_items(imported_action_connectors_dicts, "action connector(s)", "id") # type: ignore[reportUnknownArgumentType]
return response, results # type: ignore[reportUnknownVariableType]
@kibana_group.command("export-rules")
@click.option("--directory", "-d", required=True, type=Path, help="Directory to export rules to")
@click.option(
"--action-connectors-directory", "-acd", required=False, type=Path, help="Directory to export action connectors to"
)
@click.option("--exceptions-directory", "-ed", required=False, type=Path, help="Directory to export exceptions to")
@click.option("--default-author", "-da", type=str, required=False, help="Default author for rules missing one")
@click.option("--rule-id", "-r", multiple=True, help="Optional Rule IDs to restrict export to")
@click.option(
"--rule-name",
"-rn",
required=False,
help="Optional Rule name to restrict export to (KQL, case-insensitive, supports wildcards)",
)
@click.option("--export-action-connectors", "-ac", is_flag=True, help="Include action connectors in export")
@click.option("--export-exceptions", "-e", is_flag=True, help="Include exceptions in export")
@click.option("--skip-errors", "-s", is_flag=True, help="Skip errors when exporting rules")
@click.option("--strip-version", "-sv", is_flag=True, help="Strip the version fields from all rules")
@click.option(
"--no-tactic-filename",
"-nt",
is_flag=True,
help="Exclude tactic prefix in exported filenames for rules. "
"Use same flag for import-rules to prevent warnings and disable its unit test.",
)
@click.option("--local-creation-date", "-lc", is_flag=True, help="Preserve the local creation date of the rule")
@click.option("--local-updated-date", "-lu", is_flag=True, help="Preserve the local updated date of the rule")
@click.option("--custom-rules-only", "-cro", is_flag=True, help="Only export custom rules")
@click.option(
"--export-query",
"-eq",
type=str,
required=False,
help=(
"Apply a query filter to exporting rules e.g. "
'"alert.attributes.tags: \\"test\\"" to filter for rules that have the tag "test"'
),
)
@click.option(
"--load-rule-loading",
"-lr",
is_flag=True,
help="Enable arbitrary rule loading from the rules directories (Can be very slow!)",
)
@click.pass_context
def kibana_export_rules( # noqa: PLR0912, PLR0913, PLR0915
ctx: click.Context,
directory: Path,
action_connectors_directory: Path | None,
exceptions_directory: Path | None,
default_author: str,
rule_id: list[str] | None = None,
rule_name: str | None = None,
export_action_connectors: bool = False,
export_exceptions: bool = False,
skip_errors: bool = False,
strip_version: bool = False,
no_tactic_filename: bool = False,
local_creation_date: bool = False,
local_updated_date: bool = False,
custom_rules_only: bool = False,
export_query: str | None = None,
load_rule_loading: bool = False,
) -> list[TOMLRule]:
"""Export rules from Kibana."""
kibana = ctx.obj["kibana"]
kibana_include_details = export_exceptions or export_action_connectors or custom_rules_only or export_query
# Only allow one of rule_id or rule_name
if rule_name and rule_id:
raise click.UsageError("Cannot use --rule-id and --rule-name together. Please choose one.")
raw_rule_collection = RawRuleCollection()
if load_rule_loading:
raw_rule_collection = raw_rule_collection.default()
with kibana:
# Look up rule IDs by name if --rule-name was provided
if rule_name:
found = RuleResource.find(filter=f"alert.attributes.name:{rule_name}") # type: ignore[reportUnknownMemberType]
rule_id = [r["rule_id"] for r in found] # type: ignore[reportUnknownVariableType]
if not rule_id:
click.echo(
f"No rules found to export matching the provided name '{rule_name}' "
f"using filter 'alert.attributes.name:{rule_name}'"
)
return []
query = (
export_query
if not custom_rules_only
else (f"({CUSTOM_RULES_KQL}){f' and ({export_query})' if export_query else ''}")
)
results = ( # type: ignore[reportUnknownVariableType]
RuleResource.bulk_export(rule_ids=list(rule_id), query=query) # type: ignore[reportArgumentType]
if query
else RuleResource.export_rules(list(rule_id), exclude_export_details=not kibana_include_details) # type: ignore[reportArgumentType]
)
# Handle Exceptions Directory Location
if results and exceptions_directory:
exceptions_directory.mkdir(parents=True, exist_ok=True)
exceptions_directory = exceptions_directory or RULES_CONFIG.exception_dir
if not exceptions_directory and export_exceptions:
click.echo("Warning: Exceptions export requested, but no exceptions directory found")
# Handle Actions Connector Directory Location
if results and action_connectors_directory:
action_connectors_directory.mkdir(parents=True, exist_ok=True)
action_connectors_directory = action_connectors_directory or RULES_CONFIG.action_connector_dir
if not action_connectors_directory and export_action_connectors:
click.echo("Warning: Action Connector export requested, but no Action Connector directory found")
if results:
directory.mkdir(parents=True, exist_ok=True)
else:
click.echo("No rules found to export")
return []
rules_results = results # type: ignore[reportUnknownVariableType]
action_connector_results = []
exception_results = []
results_len = len(results) # type: ignore[reportUnknownVariableType]
if kibana_include_details:
# Assign counts to variables
results_len = results_len - 1
rules_count = results[-1]["exported_rules_count"] # type: ignore[reportUnknownVariableType]
exception_list_count = results[-1]["exported_exception_list_count"] # type: ignore[reportUnknownVariableType]
exception_list_item_count = results[-1]["exported_exception_list_item_count"] # type: ignore[reportUnknownVariableType]
action_connector_count = results[-1]["exported_action_connector_count"] # type: ignore[reportUnknownVariableType]
# Parse rules results and exception results from API return
rules_results = results[:rules_count] # type: ignore[reportUnknownVariableType]
exception_results = results[rules_count : rules_count + exception_list_count + exception_list_item_count] # type: ignore[reportUnknownVariableType]
rules_and_exceptions_count = rules_count + exception_list_count + exception_list_item_count # type: ignore[reportUnknownVariableType]
action_connector_results = results[ # type: ignore[reportUnknownVariableType]
rules_and_exceptions_count : rules_and_exceptions_count + action_connector_count
]
errors: list[str] = []
exported: list[TOMLRule] = []
exception_list_rule_table: dict[str, list[dict[str, Any]]] = {}
action_connector_rule_table: dict[str, list[dict[str, Any]]] = {}
for rule_resource in rules_results: # type: ignore[reportUnknownVariableType]
try:
if strip_version:
rule_resource.pop("revision", None) # type: ignore[reportUnknownMemberType]
rule_resource.pop("version", None) # type: ignore[reportUnknownMemberType]
rule_resource["author"] = rule_resource.get("author") or default_author or [rule_resource.get("created_by")] # type: ignore[reportUnknownMemberType]
if isinstance(rule_resource["author"], str):
rule_resource["author"] = [rule_resource["author"]]
# Inherit maturity and optionally local dates from the rule if it already exists
params: dict[str, Any] = {
"rule": rule_resource,
"maturity": "development",
}
threat = rule_resource.get("threat") # type: ignore[reportUnknownMemberType]
first_tactic = threat[0].get("tactic").get("name") if threat else "" # type: ignore[reportUnknownMemberType]
# Check if flag or config is set to not include tactic in the filename
no_tactic_filename = no_tactic_filename or RULES_CONFIG.no_tactic_filename
# Check if the flag is set to not include tactic in the filename
tactic_name = first_tactic if not no_tactic_filename else None # type: ignore[reportUnknownMemberType]
rule_name = rulename_to_filename(rule_resource.get("name"), tactic_name=tactic_name) # type: ignore[reportUnknownMemberType]
save_path = directory / f"{rule_name}"
# Get local rule data if load_rule_loading is enabled. If not enabled rules variable will be None.
local_rule: dict[str, Any] = params.get("rule", {})
input_rule_id: str | None = None
if local_rule:
input_rule_id = cast("definitions.UUIDString", local_rule.get("rule_id"))
if input_rule_id and input_rule_id in raw_rule_collection.id_map:
save_path = raw_rule_collection.id_map[input_rule_id].path or save_path
params.update(
update_metadata_from_file(
save_path, {"creation_date": local_creation_date, "updated_date": local_updated_date}
)
)
contents = TOMLRuleContents.from_rule_resource(**params) # type: ignore[reportArgumentType]
rule = TOMLRule(contents=contents, path=save_path)
except Exception as e:
if skip_errors:
print(f"- skipping {rule_resource.get('name')} - {type(e).__name__}") # type: ignore[reportUnknownMemberType]
errors.append(f"- {rule_resource.get('name')} - {e}") # type: ignore[reportUnknownMemberType]
continue
raise
if rule.contents.data.exceptions_list:
# For each item in rule.contents.data.exceptions_list to the exception_list_rule_table under the list_id
for exception in rule.contents.data.exceptions_list:
exception_id = exception["list_id"]
if exception_id not in exception_list_rule_table:
exception_list_rule_table[exception_id] = []
exception_list_rule_table[exception_id].append({"id": rule.id, "name": rule.name})
if rule.contents.data.actions:
# use connector ids as rule source
for action in rule.contents.data.actions:
action_id = action["id"]
if action_id not in action_connector_rule_table:
action_connector_rule_table[action_id] = []
action_connector_rule_table[action_id].append({"id": rule.id, "name": rule.name})
exported.append(rule)
# Parse exceptions results from API return
exceptions = []
if export_exceptions:
exceptions_containers = {}
exceptions_items = {}
exceptions_containers, exceptions_items, parse_errors, _ = parse_exceptions_results_from_api(exception_results) # type: ignore[reportArgumentType]
errors.extend(parse_errors)
# Build TOMLException Objects
exceptions, e_output, e_errors = build_exception_objects(
exceptions_containers,
exceptions_items,
exception_list_rule_table,
exceptions_directory if exceptions_directory else None,
save_toml=False,
skip_errors=skip_errors,
verbose=False,
)
for line in e_output:
click.echo(line)
errors.extend(e_errors)
# Parse action connector results from API return
action_connectors = []
if export_action_connectors:
action_connector_results, _ = parse_action_connector_results_from_api(action_connector_results) # type: ignore[reportArgumentType]
# Build TOMLActionConnector Objects
action_connectors, ac_output, ac_errors = build_action_connector_objects(
action_connector_results,
action_connector_rule_table,
action_connectors_directory=action_connectors_directory if action_connectors_directory else None,
save_toml=False,
skip_errors=skip_errors,
verbose=False,
)
for line in ac_output:
click.echo(line)
errors.extend(ac_errors)
saved: list[TOMLRule] = []
for rule in exported:
try:
rule.save_toml()
except Exception as e:
if skip_errors:
print(f"- skipping {rule.contents.data.name} - {type(e).__name__}")
errors.append(f"- {rule.contents.data.name} - {e}")
continue
raise
saved.append(rule)
saved_exceptions: list[TOMLException] = []
for exception in exceptions:
try:
exception.save_toml()
except Exception as e:
if skip_errors:
print(f"- skipping {exception.rule_name} - {type(e).__name__}") # type: ignore[reportUnknownMemberType]
errors.append(f"- {exception.rule_name} - {e}") # type: ignore[reportUnknownMemberType]
continue
raise
saved_exceptions.append(exception)
saved_action_connectors: list[TOMLActionConnector] = []
for action in action_connectors:
try:
action.save_toml()
except Exception as e:
if skip_errors:
print(f"- skipping {action.name} - {type(e).__name__}")
errors.append(f"- {action.name} - {e}")
continue
raise
saved_action_connectors.append(action)
click.echo(f"{results_len} results exported") # type: ignore[reportUnknownArgumentType]
click.echo(f"{len(exported)} rules converted")
click.echo(f"{len(exceptions)} exceptions exported")
click.echo(f"{len(action_connectors)} action connectors exported")
click.echo(f"{len(saved)} rules saved to {directory}")
click.echo(f"{len(saved_exceptions)} exception lists saved to {exceptions_directory}")
click.echo(f"{len(saved_action_connectors)} action connectors saved to {action_connectors_directory}")
if errors:
err_file = directory / "_errors.txt"
_ = err_file.write_text("\n".join(errors))
click.echo(f"{len(errors)} errors saved to {err_file}")
return exported
@kibana_group.command("search-alerts")
@click.argument("query", required=False)
@click.option("--date-range", "-d", type=(str, str), default=("now-7d", "now"), help="Date range to scope search")
@click.option("--columns", "-c", multiple=True, help="Columns to display in table")
@click.option("--extend", "-e", is_flag=True, help="If columns are specified, extend the original columns")
@click.option("--max-count", "-m", default=100, help="The max number of alerts to return")
@click.pass_context
def search_alerts( # noqa: PLR0913
ctx: click.Context,
query: str,
date_range: tuple[str, str],
columns: list[str],
extend: bool,
max_count: int,
) -> None:
"""Search detection engine alerts with KQL."""
from eql.table import Table # type: ignore[reportMissingTypeStubs]
from .eswrap import MATCH_ALL, add_range_to_dsl
kibana = ctx.obj["kibana"]
start_time, end_time = date_range
kql_query = kql.to_dsl(query) if query else MATCH_ALL # type: ignore[reportUnknownMemberType]
add_range_to_dsl(kql_query["bool"].setdefault("filter", []), start_time, end_time) # type: ignore[reportUnknownArgumentType]
with kibana:
alerts = [a["_source"] for a in Signal.search({"query": kql_query}, size=max_count)["hits"]["hits"]] # type: ignore[reportUnknownMemberType]
# check for events with nested signal fields
if alerts:
table_columns = ["host.hostname"]
if "signal" in alerts[0]:
table_columns += ["signal.rule.name", "signal.status", "signal.original_time"]
elif "kibana.alert.rule.name" in alerts[0]:
table_columns += ["kibana.alert.rule.name", "kibana.alert.status", "kibana.alert.original_time"]
else:
table_columns += ["rule.name", "@timestamp"]
if columns:
columns = list(columns)
table_columns = table_columns + columns if extend else columns
# Table requires the data to be nested, but depending on the version, some data uses dotted keys, so
# they must be nested explicitly
for alert in alerts: # type: ignore[reportUnknownVariableType]
for key in table_columns:
if key in alert:
nested_set(alert, key, alert[key]) # type: ignore[reportUnknownArgumentType]
click.echo(Table.from_list(table_columns, alerts)) # type: ignore[reportUnknownMemberType]
else:
click.echo("No alerts detected")