Skip to content

Commit b8725d6

Browse files
authored
Merge branch 'main' into windowspath
2 parents 471682c + 35fb650 commit b8725d6

File tree

6 files changed

+316
-8
lines changed

6 files changed

+316
-8
lines changed

contentctl/actions/validate.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11

22
import pathlib
3+
34
from contentctl.input.director import Director, DirectorOutputDto
45
from contentctl.objects.config import validate
56
from contentctl.enrichments.attack_enrichment import AttackEnrichment
67
from contentctl.enrichments.cve_enrichment import CveEnrichment
78
from contentctl.objects.atomic import AtomicTest
89
from contentctl.helper.utils import Utils
10+
from contentctl.objects.data_source import DataSource
11+
from contentctl.helper.splunk_app import SplunkApp
912

1013

1114
class Validate:
@@ -33,6 +36,9 @@ def execute(self, input_dto: validate) -> DirectorOutputDto:
3336
director = Director(director_output_dto)
3437
director.execute(input_dto)
3538
self.ensure_no_orphaned_files_in_lookups(input_dto.path, director_output_dto)
39+
if input_dto.data_source_TA_validation:
40+
self.validate_latest_TA_information(director_output_dto.data_sources)
41+
3642
return director_output_dto
3743

3844

@@ -72,4 +78,37 @@ def ensure_no_orphaned_files_in_lookups(self, repo_path:pathlib.Path, director_o
7278
if len(unusedLookupFiles) > 0:
7379
raise Exception(f"The following .csv or .mlmodel files exist in '{lookupsDirectory}', but are not referenced by a lookup file: {[str(path) for path in unusedLookupFiles]}")
7480
return
75-
81+
82+
83+
def validate_latest_TA_information(self, data_sources: list[DataSource]) -> None:
84+
validated_TAs: list[tuple[str, str]] = []
85+
errors:list[str] = []
86+
print("----------------------")
87+
print("Validating latest TA:")
88+
print("----------------------")
89+
for data_source in data_sources:
90+
for supported_TA in data_source.supported_TA:
91+
ta_identifier = (supported_TA.name, supported_TA.version)
92+
if ta_identifier in validated_TAs:
93+
continue
94+
if supported_TA.url is not None:
95+
validated_TAs.append(ta_identifier)
96+
uid = int(str(supported_TA.url).rstrip('/').split("/")[-1])
97+
try:
98+
splunk_app = SplunkApp(app_uid=uid)
99+
if splunk_app.latest_version != supported_TA.version:
100+
errors.append(f"Version mismatch in '{data_source.file_path}' supported TA '{supported_TA.name}'"
101+
f"\n Latest version on Splunkbase : {splunk_app.latest_version}"
102+
f"\n Version specified in data source: {supported_TA.version}")
103+
except Exception as e:
104+
errors.append(f"Error processing checking version of TA {supported_TA.name}: {str(e)}")
105+
106+
if len(errors) > 0:
107+
errorString = '\n\n'.join(errors)
108+
raise Exception(f"[{len(errors)}] or more TA versions are out of date or have other errors."
109+
f"Please update the following data sources with the latest versions of "
110+
f"their supported tas:\n\n{errorString}")
111+
print("All TA versions are up to date.")
112+
113+
114+

contentctl/helper/splunk_app.py

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
import os
2+
import time
3+
import json
4+
import xml.etree.ElementTree as ET
5+
from typing import List, Tuple, Optional
6+
from urllib.parse import urlencode
7+
8+
import requests
9+
import urllib3
10+
import xmltodict
11+
from requests.adapters import HTTPAdapter
12+
from requests.packages.urllib3.util.retry import Retry
13+
14+
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
15+
16+
MAX_RETRY = 3
17+
18+
class APIEndPoint:
19+
"""
20+
Class which contains Static Endpoint
21+
"""
22+
23+
SPLUNK_BASE_AUTH_URL = "https://splunkbase.splunk.com/api/account:login/"
24+
SPLUNK_BASE_FETCH_APP_BY_ENTRY_ID = (
25+
"https://apps.splunk.com/api/apps/entriesbyid/{app_name_id}"
26+
)
27+
SPLUNK_BASE_GET_UID_REDIRECT = "https://apps.splunk.com/apps/id/{app_name_id}"
28+
SPLUNK_BASE_APP_INFO = "https://splunkbase.splunk.com/api/v1/app/{app_uid}"
29+
30+
class RetryConstant:
31+
"""
32+
Class which contains Retry Constant
33+
"""
34+
35+
RETRY_COUNT = 3
36+
RETRY_INTERVAL = 15
37+
38+
39+
class SplunkBaseError(requests.HTTPError):
40+
"""An error raise in communicating with Splunkbase"""
41+
pass
42+
43+
44+
# TODO (PEX-306): validate w/ Splunkbase team if there are better APIs we can rely on being supported
45+
class SplunkApp:
46+
"""
47+
A Splunk app available for download on Splunkbase
48+
"""
49+
50+
class InitializationError(Exception):
51+
"""An initialization error during SplunkApp setup"""
52+
pass
53+
54+
@staticmethod
55+
def requests_retry_session(
56+
retries=RetryConstant.RETRY_COUNT,
57+
backoff_factor=1,
58+
status_forcelist=(500, 502, 503, 504),
59+
session=None,
60+
):
61+
session = session or requests.Session()
62+
retry = Retry(
63+
total=retries,
64+
read=retries,
65+
connect=retries,
66+
backoff_factor=backoff_factor,
67+
status_forcelist=status_forcelist,
68+
)
69+
adapter = HTTPAdapter(max_retries=retry)
70+
session.mount('http://', adapter)
71+
session.mount('https://', adapter)
72+
return session
73+
74+
def __init__(
75+
self,
76+
app_uid: Optional[int] = None,
77+
app_name_id: Optional[str] = None,
78+
manual_setup: bool = False,
79+
) -> None:
80+
if app_uid is None and app_name_id is None:
81+
raise SplunkApp.InitializationError(
82+
"Either app_uid (the numeric app UID e.g. 742) or app_name_id (the app name "
83+
"idenitifier e.g. Splunk_TA_windows) must be provided"
84+
)
85+
86+
# init or declare instance vars
87+
self.app_uid: Optional[int] = app_uid
88+
self.app_name_id: Optional[str] = app_name_id
89+
self.manual_setup = manual_setup
90+
self.app_title: str
91+
self.latest_version: str
92+
self.latest_version_download_url: str
93+
self._app_info_cache: Optional[dict] = None
94+
95+
# set instance vars as needed; skip if manual setup was indicated
96+
if not self.manual_setup:
97+
self.set_app_name_id()
98+
self.set_app_uid()
99+
self.set_app_title()
100+
self.set_latest_version_info()
101+
102+
def __eq__(self, __value: object) -> bool:
103+
if isinstance(__value, SplunkApp):
104+
return self.app_uid == __value.app_uid
105+
return False
106+
107+
def __repr__(self) -> str:
108+
return (
109+
f"SplunkApp(app_name_id='{self.app_name_id}', app_uid={self.app_uid}, "
110+
f"latest_version_download_url='{self.latest_version_download_url}')"
111+
)
112+
113+
def __str__(self) -> str:
114+
return f"<'{self.app_name_id}' ({self.app_uid})"
115+
116+
def get_app_info_by_uid(self) -> dict:
117+
"""
118+
Retrieve app info via app_uid (e.g. 742)
119+
:return: dictionary of app info
120+
"""
121+
# return cache if already set and raise and raise is app_uid is not set
122+
if self._app_info_cache is not None:
123+
return self._app_info_cache
124+
elif self.app_uid is None:
125+
raise SplunkApp.InitializationError("app_uid must be set in order to fetch app info")
126+
127+
# NOTE: auth not required
128+
# Get app info by uid
129+
try:
130+
response = self.requests_retry_session().get(
131+
APIEndPoint.SPLUNK_BASE_APP_INFO.format(app_uid=self.app_uid),
132+
timeout=RetryConstant.RETRY_INTERVAL
133+
)
134+
response.raise_for_status()
135+
except requests.exceptions.RequestException as e:
136+
raise SplunkBaseError(f"Error fetching app info for app_uid {self.app_uid}: {str(e)}")
137+
138+
# parse JSON and set cache
139+
self._app_info_cache: dict = json.loads(response.content)
140+
141+
return self._app_info_cache
142+
143+
def set_app_name_id(self) -> None:
144+
"""
145+
Set app_name_id
146+
"""
147+
# return if app_name_id is already set
148+
if self.app_name_id is not None:
149+
return
150+
151+
# get app info by app_uid
152+
app_info = self.get_app_info_by_uid()
153+
154+
# set app_name_id if found
155+
if "appid" in app_info:
156+
self.app_name_id = app_info["appid"]
157+
else:
158+
raise SplunkBaseError(f"Invalid response from Splunkbase; missing key 'appid': {app_info}")
159+
160+
def set_app_uid(self) -> None:
161+
"""
162+
Set app_uid
163+
"""
164+
# return if app_uid is already set and raise if app_name_id was not set
165+
if self.app_uid is not None:
166+
return
167+
elif self.app_name_id is None:
168+
raise SplunkApp.InitializationError("app_name_id must be set in order to fetch app_uid")
169+
170+
# NOTE: auth not required
171+
# Get app_uid by app_name_id via a redirect
172+
try:
173+
response = self.requests_retry_session().get(
174+
APIEndPoint.SPLUNK_BASE_GET_UID_REDIRECT.format(app_name_id=self.app_name_id),
175+
allow_redirects=False,
176+
timeout=RetryConstant.RETRY_INTERVAL
177+
)
178+
response.raise_for_status()
179+
except requests.exceptions.RequestException as e:
180+
raise SplunkBaseError(f"Error fetching app_uid for app_name_id '{self.app_name_id}': {str(e)}")
181+
182+
# Extract the app_uid from the redirect path
183+
if "Location" in response.headers:
184+
self.app_uid = response.headers.split("/")[-1]
185+
else:
186+
raise SplunkBaseError(
187+
"Invalid response from Splunkbase; missing 'Location' in redirect header"
188+
)
189+
190+
def set_app_title(self) -> None:
191+
"""
192+
Set app_title
193+
"""
194+
# get app info by app_uid
195+
app_info = self.get_app_info_by_uid()
196+
197+
# set app_title if found
198+
if "title" in app_info:
199+
self.app_title = app_info["title"]
200+
else:
201+
raise SplunkBaseError(f"Invalid response from Splunkbase; missing key 'title': {app_info}")
202+
203+
def __fetch_url_latest_version_info(self) -> str:
204+
"""
205+
Identify latest version of the app and return a URL pointing to download info for the build
206+
:return: url for download info on the latest build
207+
"""
208+
# retrieve app entries using the app_name_id
209+
try:
210+
response = self.requests_retry_session().get(
211+
APIEndPoint.SPLUNK_BASE_FETCH_APP_BY_ENTRY_ID.format(app_name_id=self.app_name_id),
212+
timeout=RetryConstant.RETRY_INTERVAL
213+
)
214+
response.raise_for_status()
215+
except requests.exceptions.RequestException as e:
216+
raise SplunkBaseError(f"Error fetching app entries for app_name_id '{self.app_name_id}': {str(e)}")
217+
218+
# parse xml
219+
app_xml = xmltodict.parse(response.content)
220+
221+
# convert to list if only one entry exists
222+
app_entries = app_xml.get("feed").get("entry")
223+
if not isinstance(app_entries, list):
224+
app_entries = [app_entries]
225+
226+
# iterate over multiple entries if present
227+
for entry in app_entries:
228+
for key in entry.get("content").get("s:dict").get("s:key"):
229+
if key.get("@name") == "islatest" and key.get("#text") == "True":
230+
return entry.get("link").get("@href")
231+
232+
# raise if no entry was found
233+
raise SplunkBaseError(f"No app entry found with 'islatest' tag set to True: {self.app_name_id}")
234+
235+
def __fetch_url_latest_version_download(self, info_url: str) -> str:
236+
"""
237+
Fetch the download URL via the provided URL to build info
238+
:param info_url: URL for download info for the latest build
239+
:return: URL for downloading the latest build
240+
"""
241+
# fetch download info
242+
try:
243+
response = self.requests_retry_session().get(info_url, timeout=RetryConstant.RETRY_INTERVAL)
244+
response.raise_for_status()
245+
except requests.exceptions.RequestException as e:
246+
raise SplunkBaseError(f"Error fetching download info for app_name_id '{self.app_name_id}': {str(e)}")
247+
248+
# parse XML and extract download URL
249+
build_xml = xmltodict.parse(response.content)
250+
download_url = build_xml.get("feed").get("entry").get("link").get("@href")
251+
return download_url
252+
253+
def set_latest_version_info(self) -> None:
254+
# raise if app_name_id not set
255+
if self.app_name_id is None:
256+
raise SplunkApp.InitializationError("app_name_id must be set in order to fetch latest version info")
257+
258+
# fetch the info URL
259+
info_url = self.__fetch_url_latest_version_info()
260+
261+
# parse out the version number and fetch the download URL
262+
self.latest_version = info_url.split("/")[-1]
263+
self.latest_version_download_url = self.__fetch_url_latest_version_download(info_url)

contentctl/objects/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ class validate(Config_Base):
176176
build_app: bool = Field(default=True, description="Should an app be built and output in the build_path?")
177177
build_api: bool = Field(default=False, description="Should api objects be built and output in the build_path?")
178178
build_ssa: bool = Field(default=False, description="Should ssa objects be built and output in the build_path?")
179+
data_source_TA_validation: bool = Field(default=False, description="Validate latest TA information from Splunkbase")
179180

180181
def getAtomicRedTeamRepoPath(self, atomic_red_team_repo_name:str = "atomic-red-team"):
181182
return self.path/atomic_red_team_repo_name

contentctl/objects/data_source.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
from __future__ import annotations
22
from typing import Optional, Any
3-
from pydantic import Field, FilePath, model_serializer
3+
from pydantic import Field, HttpUrl, model_serializer, BaseModel
44
from contentctl.objects.security_content_object import SecurityContentObject
55
from contentctl.objects.event_source import EventSource
66

7+
8+
class TA(BaseModel):
9+
name: str
10+
url: HttpUrl | None = None
11+
version: str
712
class DataSource(SecurityContentObject):
813
source: str = Field(...)
914
sourcetype: str = Field(...)
1015
separator: Optional[str] = None
1116
configuration: Optional[str] = None
12-
supported_TA: Optional[list] = None
17+
supported_TA: list[TA] = []
1318
fields: Optional[list] = None
1419
field_mappings: Optional[list] = None
1520
convert_to_log_source: Optional[list] = None

contentctl/output/data_source_writer.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ def writeDataSourceCsv(data_source_objects: List[DataSource], file_path: pathlib
1818
])
1919
# Write the data
2020
for data_source in data_source_objects:
21-
if data_source.supported_TA and isinstance(data_source.supported_TA, list) and len(data_source.supported_TA) > 0:
22-
supported_TA_name = data_source.supported_TA[0].get('name', '')
23-
supported_TA_version = data_source.supported_TA[0].get('version', '')
24-
supported_TA_url = data_source.supported_TA[0].get('url', '')
21+
if len(data_source.supported_TA) > 0:
22+
supported_TA_name = data_source.supported_TA[0].name
23+
supported_TA_version = data_source.supported_TA[0].version
24+
supported_TA_url = data_source.supported_TA[0].url or ''
2525
else:
2626
supported_TA_name = ''
2727
supported_TA_version = ''

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "contentctl"
3-
version = "4.2.2"
3+
version = "4.2.3"
44
description = "Splunk Content Control Tool"
55
authors = ["STRT <[email protected]>"]
66
license = "Apache 2.0"

0 commit comments

Comments
 (0)