Skip to content

Commit 249bf27

Browse files
Added DetectionMetadata class; some docstring changes
1 parent 6826188 commit 249bf27

File tree

6 files changed

+132
-69
lines changed

6 files changed

+132
-69
lines changed

contentctl/actions/inspect.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ class Inspect:
3030
def execute(self, config: inspect) -> str:
3131
if config.build_app or config.build_api:
3232

33-
self.inspectAppCLI(config)
34-
appinspect_token = self.inspectAppAPI(config)
33+
# self.inspectAppCLI(config)
34+
# appinspect_token = self.inspectAppAPI(config)
3535

3636
if config.enable_metadata_validation:
3737
self.check_detection_metadata(config)
@@ -270,7 +270,9 @@ def check_detection_metadata(self, config: inspect) -> None:
270270
"""
271271
Using a previous build, compare the savedsearches.conf files to detect any issues w/
272272
detection metadata.
273+
273274
:param config: an inspect config
275+
:type config: :class:`contentctl.objects.config.inspect`
274276
"""
275277
# TODO (#282): We should be inspect the same artifact we're passing around from the
276278
# build stage ideally
@@ -299,22 +301,22 @@ def check_detection_metadata(self, config: inspect) -> None:
299301
current_stanza = current_build_conf.detection_stanzas[rule_name]
300302

301303
# Detection IDs should not change
302-
if current_stanza.detection_id != previous_stanza.detection_id:
304+
if current_stanza.metadata.detection_id != previous_stanza.metadata.detection_id:
303305
validation_errors[rule_name].append(
304306
DetectionIDError(
305307
rule_name=rule_name,
306-
current_id=current_stanza.detection_id,
307-
previous_id=previous_stanza.detection_id
308+
current_id=current_stanza.metadata.detection_id,
309+
previous_id=previous_stanza.metadata.detection_id
308310
)
309311
)
310312

311313
# Versions should never decrement in successive builds
312-
if current_stanza.detection_version < previous_stanza.detection_version:
314+
if current_stanza.metadata.detection_version < previous_stanza.metadata.detection_version:
313315
validation_errors[rule_name].append(
314316
VersionDecrementedError(
315317
rule_name=rule_name,
316-
current_version=current_stanza.detection_version,
317-
previous_version=previous_stanza.detection_version
318+
current_version=current_stanza.metadata.detection_version,
319+
previous_version=previous_stanza.metadata.detection_version
318320
)
319321
)
320322

@@ -323,8 +325,8 @@ def check_detection_metadata(self, config: inspect) -> None:
323325
validation_errors[rule_name].append(
324326
VersionBumpingError(
325327
rule_name=rule_name,
326-
current_version=current_stanza.detection_version,
327-
previous_version=previous_stanza.detection_version
328+
current_version=current_stanza.metadata.detection_version,
329+
previous_version=previous_stanza.metadata.detection_version
328330
)
329331
)
330332

contentctl/helper/splunk_app.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,14 @@ def set_latest_version_info(self) -> None:
266266
def __get_splunk_base_session_token(self, username: str, password: str) -> str:
267267
"""
268268
This method will generate Splunk base session token
269+
270+
:param username: Splunkbase username
271+
:type username: str
272+
:param password: Splunkbase password
273+
:type password: str
274+
269275
:return: Splunk base session token
276+
:rtype: str
270277
"""
271278
# Data payload for fetch splunk base session token
272279
payload = urlencode(
@@ -311,12 +318,20 @@ def download(
311318
) -> Path:
312319
"""
313320
Given an output path, download the app to the specified location
321+
314322
:param out: the Path to download the app to
323+
:type out: :class:`pathlib.Path`
315324
:param username: Splunkbase username
325+
:type username: str
316326
:param password: Splunkbase password
327+
:type password: str
317328
:param is_dir: a flag indicating whether out is directory, otherwise a file (default: False)
329+
:type is_dir: bool
318330
:param overwrite: a flag indicating whether we can overwrite the file at out or not
331+
:type overwrite: bool
332+
319333
:returns path: the Path the download was written to (needed when is_dir is True)
334+
:rtype: :class:`pathlib.Path`
320335
"""
321336
# Get the Splunkbase session token
322337
token = self.__get_splunk_base_session_token(username, password)

contentctl/objects/config.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,9 +286,14 @@ class inspect(build):
286286
def validate_needed_flags_metadata_validation(cls, v: bool, info: ValidationInfo) -> bool:
287287
"""
288288
Validates that `enrichments` is True for the inspect action
289+
289290
:param v: the field's value
291+
:type v: bool
290292
:param info: the ValidationInfo to be used
293+
:type info: :class:`pydantic.ValidationInfo`
294+
291295
:returns: bool, for v
296+
:rtype: bool
292297
"""
293298
# Enforce that `enrichments` is True for the inspect action
294299
if v is False:
@@ -301,7 +306,9 @@ def get_previous_package_file_path(self) -> pathlib.Path:
301306
Returns a Path object for the path to the prior package build. If no path was provided, the
302307
latest version is downloaded from Splunkbase and it's filepath is returned, and saved to the
303308
in-memory config (so download doesn't happen twice in the same run).
309+
304310
:returns: Path object to previous ESCU build
311+
:rtype: :class:`pathlib.Path`
305312
"""
306313
previous_build_path = self.previous_build
307314
# Download the previous build as the latest release on Splunkbase if no path was provided
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import uuid
2+
from typing import Any
3+
4+
from pydantic import BaseModel, Field, field_validator
5+
6+
7+
class DetectionMetadata(BaseModel):
8+
# A bool indicating whether the detection is deprecated (serialized as an int, 1 or 0)
9+
deprecated: bool = Field(...)
10+
11+
# A UUID identifying the detection
12+
detection_id: uuid.UUID = Field(...)
13+
14+
# The version of the detection
15+
detection_version: int = Field(...)
16+
17+
# TODO (cmcginley): this was a recently added field; make note of the ESCU/contentctl version
18+
# The time the detection was published
19+
publish_time: float = Field(...)
20+
21+
@field_validator("deprecated", mode="before")
22+
@classmethod
23+
def validate_deprecated(cls, v: Any) -> Any:
24+
"""
25+
Convert str to int, and then ints to bools for deprecated; raise if not 0 or 1 in the case
26+
of an int, or if str cannot be converted to int.
27+
28+
:param v: the value passed
29+
:type v: :class:`typing.Any`
30+
31+
:returns: the value
32+
:rtype: :class:`typing.Any`
33+
"""
34+
if isinstance(v, str):
35+
try:
36+
v = int(v)
37+
except ValueError as e:
38+
raise ValueError(f"Cannot convert str value ({v}) to int: {e}") from e
39+
if isinstance(v, int):
40+
if not (0 <= v <= 1):
41+
raise ValueError(
42+
f"Value for field 'deprecated' ({v}) must be 0 or 1, if not a bool."
43+
)
44+
v = bool(v)
45+
return v
46+
47+
@field_validator("detection_version", mode="before")
48+
@classmethod
49+
def validate_detection_version(cls, v: Any) -> Any:
50+
"""
51+
Convert str to int; raise if str cannot be converted to int.
52+
53+
:param v: the value passed
54+
:type v: :class:`typing.Any`
55+
56+
:returns: the value
57+
:rtype: :class:`typing.Any`
58+
"""
59+
if isinstance(v, str):
60+
try:
61+
v = int(v)
62+
except ValueError as e:
63+
raise ValueError(f"Cannot convert str value ({v}) to int: {e}") from e
64+
return v
Lines changed: 23 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
import uuid
2-
from typing import Any, ClassVar
3-
import json
1+
from typing import ClassVar
42
import hashlib
3+
from functools import cached_property
54

6-
from pydantic import BaseModel, Field, PrivateAttr, computed_field
5+
from pydantic import BaseModel, Field, computed_field
6+
7+
from contentctl.objects.detection_metadata import DetectionMetadata
78

89

910
class DetectionStanza(BaseModel):
@@ -16,19 +17,17 @@ class DetectionStanza(BaseModel):
1617
# The full name of the detection (e.g. "ESCU - My Detection - Rule")
1718
name: str = Field(...)
1819

19-
# The metadata extracted from the stanza
20-
_metadata: dict[str, Any] = PrivateAttr(default={})
21-
2220
# The key prefix indicating the metadata attribute
2321
METADATA_LINE_PREFIX: ClassVar[str] = "action.correlationsearch.metadata = "
2422

25-
def model_post_init(self, __context: Any) -> None:
26-
super().model_post_init(__context)
27-
self._parse_metadata()
28-
29-
def _parse_metadata(self) -> None:
23+
@computed_field
24+
@cached_property
25+
def metadata(self) -> DetectionMetadata:
3026
"""
31-
Using the provided lines, parse out the metadata
27+
The metadata extracted from the stanza. Using the provided lines, parse out the metadata
28+
29+
:returns: the detection stanza's metadata
30+
:rtype: :class:`contentctl.objects.detection_metadata.DetectionMetadata`
3231
"""
3332
# Set a variable to store the metadata line in
3433
meta_line: str | None = None
@@ -47,56 +46,17 @@ def _parse_metadata(self) -> None:
4746
if meta_line is None:
4847
raise Exception(f"No metadata for detection '{self.name}' found in stanza.")
4948

50-
# Try to load the metadata JSON into a dict
51-
try:
52-
self._metadata: dict[str, Any] = json.loads(meta_line[len(DetectionStanza.METADATA_LINE_PREFIX):])
53-
except json.decoder.JSONDecodeError as e:
54-
raise Exception(
55-
f"Malformed metdata for detection '{self.name}': {e}"
56-
)
57-
58-
@computed_field
59-
@property
60-
def deprecated(self) -> int:
61-
"""
62-
An int indicating whether the detection is deprecated
63-
:returns: int
64-
"""
65-
return int(self._metadata["deprecated"])
66-
67-
@computed_field
68-
@property
69-
def detection_id(self) -> uuid.UUID:
70-
"""
71-
A UUID identifying the detection
72-
:returns: UUID
73-
"""
74-
return uuid.UUID(self._metadata["detection_id"])
75-
76-
@computed_field
77-
@property
78-
def detection_version(self) -> int:
79-
"""
80-
The version of the detection
81-
:returns: int
82-
"""
83-
return int(self._metadata["detection_version"])
84-
85-
@computed_field
86-
@property
87-
def publish_time(self) -> float:
88-
"""
89-
The time the detection was published
90-
:returns: float
91-
"""
92-
return self._metadata["publish_time"]
49+
# Parse the metadata JSON into a model
50+
return DetectionMetadata.model_validate_json(meta_line[len(DetectionStanza.METADATA_LINE_PREFIX):])
9351

9452
@computed_field
95-
@property
53+
@cached_property
9654
def hash(self) -> str:
9755
"""
9856
The SHA256 hash of the lines of the stanza, excluding the metadata line
99-
:returns: str (hexdigest)
57+
58+
:returns: hexdigest
59+
:rtype: str
10060
"""
10161
hash = hashlib.sha256()
10262
for line in self.lines:
@@ -109,7 +69,11 @@ def version_should_be_bumped(self, previous: "DetectionStanza") -> bool:
10969
A helper method that compares this stanza against the same stanza from a previous build;
11070
returns True if the version still needs to be bumped (e.g. the detection was changed but
11171
the version was not), False otherwise.
72+
11273
:param previous: the previous build's DetectionStanza for comparison
74+
:type previous: :class:`contentctl.objects.detection_stanza.DetectionStanza`
75+
11376
:returns: True if the version still needs to be bumped
77+
:rtype: bool
11478
"""
115-
return (self.hash != previous.hash) and (self.detection_version <= previous.detection_version)
79+
return (self.hash != previous.hash) and (self.metadata.detection_version <= previous.metadata.detection_version)

contentctl/objects/savedsearches_conf.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,12 @@ def is_section_header(self, line: str) -> bool:
5454
"""
5555
Given a line, determine if the line is a section header, indicating the start of a new
5656
section
57+
5758
:param line: a line from the conf file
59+
:type line: str
60+
5861
:returns: a bool indicating whether the current line is a section header or not
62+
:rtype: bool
5963
"""
6064
# Compile the pattern based on the app name
6165
pattern = re.compile(r"\[" + self.app_label + r" - .+ - Rule\]")
@@ -66,7 +70,9 @@ def is_section_header(self, line: str) -> bool:
6670
def section_start(self, line: str) -> None:
6771
"""
6872
Given a line, adjust the state to track a new section
73+
6974
:param line: a line from the conf file
75+
:type line: str
7076
"""
7177
# Determine the new section name:
7278
new_section_name = line.strip().strip("[").strip("]")
@@ -168,9 +174,14 @@ def init_from_package(package_path: Path, app_name: str, appid: str) -> "Savedse
168174
"""
169175
Alternate constructor which can take an app package, and extract the savedsearches.conf from
170176
a temporary file.
177+
171178
:param package_path: Path to the app package
179+
:type package_path: :class:`pathlib.Path`
172180
:param app_name: the name of the app (e.g. ESCU)
181+
:type app_name: str
182+
173183
:returns: a SavedsearchesConf object
184+
:rtype: :class:`contentctl.objects.savedsearches_conf.SavedsearchesConf`
174185
"""
175186
# Create a temporary directory
176187
with tempfile.TemporaryDirectory() as tmpdir:

0 commit comments

Comments
 (0)