Skip to content

Commit 98764b8

Browse files
author
Patrick Bareiss
committed
Add latest TA version validation
1 parent ea9f56d commit 98764b8

File tree

3 files changed

+297
-1
lines changed

3 files changed

+297
-1
lines changed

contentctl/actions/validate.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from contentctl.enrichments.cve_enrichment import CveEnrichment
77
from contentctl.objects.atomic import AtomicTest
88
from contentctl.helper.utils import Utils
9+
from contentctl.objects.data_source import DataSource
10+
from contentctl.helper.splunk_app import SplunkApp
911

1012

1113
class Validate:
@@ -33,6 +35,11 @@ def execute(self, input_dto: validate) -> DirectorOutputDto:
3335
director = Director(director_output_dto)
3436
director.execute(input_dto)
3537
self.ensure_no_orphaned_files_in_lookups(input_dto.path, director_output_dto)
38+
if input_dto.data_source_TA_validation:
39+
if self.validate_latest_TA_information(director_output_dto.data_sources) != 1:
40+
print("All TA versions are up to date.")
41+
else:
42+
raise Exception("One or more TA versions are out of date. Please update the data source with the latest version.")
3643
return director_output_dto
3744

3845

@@ -72,4 +79,29 @@ def ensure_no_orphaned_files_in_lookups(self, repo_path:pathlib.Path, director_o
7279
if len(unusedLookupFiles) > 0:
7380
raise Exception(f"The following .csv or .mlmodel files exist in '{lookupsDirectory}', but are not referenced by a lookup file: {[str(path) for path in unusedLookupFiles]}")
7481
return
75-
82+
83+
84+
def validate_latest_TA_information(self, data_sources: list[DataSource]) -> int:
85+
validated_TAs: list[tuple[str, str]] = []
86+
error_occurred = False
87+
print("----------------------")
88+
print("Validating latest TA:")
89+
print("----------------------")
90+
for data_source in data_sources:
91+
for supported_TA in data_source.supported_TA:
92+
ta_identifier = (supported_TA["name"], supported_TA["version"])
93+
if ta_identifier in validated_TAs:
94+
continue
95+
if "url" in supported_TA:
96+
validated_TAs.append(ta_identifier)
97+
uid = int(supported_TA["url"].rstrip('/').split("/")[-1])
98+
try:
99+
splunk_app = SplunkApp(app_uid=uid)
100+
if splunk_app.latest_version != supported_TA["version"]:
101+
raise Exception(f"Version mismatch for TA {supported_TA['name']}: "
102+
f"Latest version on Splunkbase is {splunk_app.latest_version}, "
103+
f"but version {supported_TA['version']} is specified in the data source {data_source.name}.")
104+
except Exception as e:
105+
print(f"Error processing TA {supported_TA['name']}: {str(e)}")
106+
error_occurred = True
107+
return 1 if error_occurred else 0

contentctl/helper/splunk_app.py

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
import os
2+
import time
3+
import json
4+
import xml.etree.ElementTree as ET
5+
from typing import List, Tuple, Optional
6+
from urllib.parse import urlencode
7+
8+
import requests
9+
import urllib3
10+
import xmltodict
11+
from requests.adapters import HTTPAdapter
12+
from requests.packages.urllib3.util.retry import Retry
13+
14+
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
15+
16+
MAX_RETRY = 3
17+
18+
class APIEndPoint:
19+
"""
20+
Class which contains Static Endpoint
21+
"""
22+
23+
SPLUNK_BASE_AUTH_URL = "https://splunkbase.splunk.com/api/account:login/"
24+
SPLUNK_BASE_FETCH_APP_BY_ENTRY_ID = (
25+
"https://apps.splunk.com/api/apps/entriesbyid/{app_name_id}"
26+
)
27+
SPLUNK_BASE_GET_UID_REDIRECT = "https://apps.splunk.com/apps/id/{app_name_id}"
28+
SPLUNK_BASE_APP_INFO = "https://splunkbase.splunk.com/api/v1/app/{app_uid}"
29+
30+
class RetryConstant:
31+
"""
32+
Class which contains Retry Constant
33+
"""
34+
35+
RETRY_COUNT = 3
36+
RETRY_INTERVAL = 15
37+
38+
39+
class SplunkBaseError(requests.HTTPError):
40+
"""An error raise in communicating with Splunkbase"""
41+
pass
42+
43+
44+
# TODO (PEX-306): validate w/ Splunkbase team if there are better APIs we can rely on being supported
45+
class SplunkApp:
46+
"""
47+
A Splunk app available for download on Splunkbase
48+
"""
49+
50+
class InitializationError(Exception):
51+
"""An initialization error during SplunkApp setup"""
52+
pass
53+
54+
@staticmethod
55+
def requests_retry_session(
56+
retries=RetryConstant.RETRY_COUNT,
57+
backoff_factor=1,
58+
status_forcelist=(500, 502, 503, 504),
59+
session=None,
60+
):
61+
session = session or requests.Session()
62+
retry = Retry(
63+
total=retries,
64+
read=retries,
65+
connect=retries,
66+
backoff_factor=backoff_factor,
67+
status_forcelist=status_forcelist,
68+
)
69+
adapter = HTTPAdapter(max_retries=retry)
70+
session.mount('http://', adapter)
71+
session.mount('https://', adapter)
72+
return session
73+
74+
def __init__(
75+
self,
76+
app_uid: Optional[int] = None,
77+
app_name_id: Optional[str] = None,
78+
manual_setup: bool = False,
79+
) -> None:
80+
if app_uid is None and app_name_id is None:
81+
raise SplunkApp.InitializationError(
82+
"Either app_uid (the numeric app UID e.g. 742) or app_name_id (the app name "
83+
"idenitifier e.g. Splunk_TA_windows) must be provided"
84+
)
85+
86+
# init or declare instance vars
87+
self.app_uid: Optional[int] = app_uid
88+
self.app_name_id: Optional[str] = app_name_id
89+
self.manual_setup = manual_setup
90+
self.app_title: str
91+
self.latest_version: str
92+
self.latest_version_download_url: str
93+
self._app_info_cache: Optional[dict] = None
94+
95+
# set instance vars as needed; skip if manual setup was indicated
96+
if not self.manual_setup:
97+
self.set_app_name_id()
98+
self.set_app_uid()
99+
self.set_app_title()
100+
self.set_latest_version_info()
101+
102+
def __eq__(self, __value: object) -> bool:
103+
if isinstance(__value, SplunkApp):
104+
return self.app_uid == __value.app_uid
105+
return False
106+
107+
def __repr__(self) -> str:
108+
return (
109+
f"SplunkApp(app_name_id='{self.app_name_id}', app_uid={self.app_uid}, "
110+
f"latest_version_download_url='{self.latest_version_download_url}')"
111+
)
112+
113+
def __str__(self) -> str:
114+
return f"<'{self.app_name_id}' ({self.app_uid})"
115+
116+
def get_app_info_by_uid(self) -> dict:
117+
"""
118+
Retrieve app info via app_uid (e.g. 742)
119+
:return: dictionary of app info
120+
"""
121+
# return cache if already set and raise and raise is app_uid is not set
122+
if self._app_info_cache is not None:
123+
return self._app_info_cache
124+
elif self.app_uid is None:
125+
raise SplunkApp.InitializationError("app_uid must be set in order to fetch app info")
126+
127+
# NOTE: auth not required
128+
# Get app info by uid
129+
try:
130+
response = self.requests_retry_session().get(
131+
APIEndPoint.SPLUNK_BASE_APP_INFO.format(app_uid=self.app_uid),
132+
timeout=RetryConstant.RETRY_INTERVAL
133+
)
134+
response.raise_for_status()
135+
except requests.exceptions.RequestException as e:
136+
raise SplunkBaseError(f"Error fetching app info for app_uid {self.app_uid}: {str(e)}")
137+
138+
# parse JSON and set cache
139+
self._app_info_cache: dict = json.loads(response.content)
140+
141+
return self._app_info_cache
142+
143+
def set_app_name_id(self) -> None:
144+
"""
145+
Set app_name_id
146+
"""
147+
# return if app_name_id is already set
148+
if self.app_name_id is not None:
149+
return
150+
151+
# get app info by app_uid
152+
app_info = self.get_app_info_by_uid()
153+
154+
# set app_name_id if found
155+
if "appid" in app_info:
156+
self.app_name_id = app_info["appid"]
157+
else:
158+
raise SplunkBaseError(f"Invalid response from Splunkbase; missing key 'appid': {app_info}")
159+
160+
def set_app_uid(self) -> None:
161+
"""
162+
Set app_uid
163+
"""
164+
# return if app_uid is already set and raise if app_name_id was not set
165+
if self.app_uid is not None:
166+
return
167+
elif self.app_name_id is None:
168+
raise SplunkApp.InitializationError("app_name_id must be set in order to fetch app_uid")
169+
170+
# NOTE: auth not required
171+
# Get app_uid by app_name_id via a redirect
172+
try:
173+
response = self.requests_retry_session().get(
174+
APIEndPoint.SPLUNK_BASE_GET_UID_REDIRECT.format(app_name_id=self.app_name_id),
175+
allow_redirects=False,
176+
timeout=RetryConstant.RETRY_INTERVAL
177+
)
178+
response.raise_for_status()
179+
except requests.exceptions.RequestException as e:
180+
raise SplunkBaseError(f"Error fetching app_uid for app_name_id '{self.app_name_id}': {str(e)}")
181+
182+
# Extract the app_uid from the redirect path
183+
if "Location" in response.headers:
184+
self.app_uid = response.headers.split("/")[-1]
185+
else:
186+
raise SplunkBaseError(
187+
"Invalid response from Splunkbase; missing 'Location' in redirect header"
188+
)
189+
190+
def set_app_title(self) -> None:
191+
"""
192+
Set app_title
193+
"""
194+
# get app info by app_uid
195+
app_info = self.get_app_info_by_uid()
196+
197+
# set app_title if found
198+
if "title" in app_info:
199+
self.app_title = app_info["title"]
200+
else:
201+
raise SplunkBaseError(f"Invalid response from Splunkbase; missing key 'title': {app_info}")
202+
203+
def __fetch_url_latest_version_info(self) -> str:
204+
"""
205+
Identify latest version of the app and return a URL pointing to download info for the build
206+
:return: url for download info on the latest build
207+
"""
208+
# retrieve app entries using the app_name_id
209+
try:
210+
response = self.requests_retry_session().get(
211+
APIEndPoint.SPLUNK_BASE_FETCH_APP_BY_ENTRY_ID.format(app_name_id=self.app_name_id),
212+
timeout=RetryConstant.RETRY_INTERVAL
213+
)
214+
response.raise_for_status()
215+
except requests.exceptions.RequestException as e:
216+
raise SplunkBaseError(f"Error fetching app entries for app_name_id '{self.app_name_id}': {str(e)}")
217+
218+
# parse xml
219+
app_xml = xmltodict.parse(response.content)
220+
221+
# convert to list if only one entry exists
222+
app_entries = app_xml.get("feed").get("entry")
223+
if not isinstance(app_entries, list):
224+
app_entries = [app_entries]
225+
226+
# iterate over multiple entries if present
227+
for entry in app_entries:
228+
for key in entry.get("content").get("s:dict").get("s:key"):
229+
if key.get("@name") == "islatest" and key.get("#text") == "True":
230+
return entry.get("link").get("@href")
231+
232+
# raise if no entry was found
233+
raise SplunkBaseError(f"No app entry found with 'islatest' tag set to True: {self.app_name_id}")
234+
235+
def __fetch_url_latest_version_download(self, info_url: str) -> str:
236+
"""
237+
Fetch the download URL via the provided URL to build info
238+
:param info_url: URL for download info for the latest build
239+
:return: URL for downloading the latest build
240+
"""
241+
# fetch download info
242+
try:
243+
response = self.requests_retry_session().get(info_url, timeout=RetryConstant.RETRY_INTERVAL)
244+
response.raise_for_status()
245+
except requests.exceptions.RequestException as e:
246+
raise SplunkBaseError(f"Error fetching download info for app_name_id '{self.app_name_id}': {str(e)}")
247+
248+
# parse XML and extract download URL
249+
build_xml = xmltodict.parse(response.content)
250+
download_url = build_xml.get("feed").get("entry").get("link").get("@href")
251+
return download_url
252+
253+
def set_latest_version_info(self) -> None:
254+
# raise if app_name_id not set
255+
if self.app_name_id is None:
256+
raise SplunkApp.InitializationError("app_name_id must be set in order to fetch latest version info")
257+
258+
# fetch the info URL
259+
info_url = self.__fetch_url_latest_version_info()
260+
261+
# parse out the version number and fetch the download URL
262+
self.latest_version = info_url.split("/")[-1]
263+
self.latest_version_download_url = self.__fetch_url_latest_version_download(info_url)

contentctl/objects/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ class validate(Config_Base):
176176
build_app: bool = Field(default=True, description="Should an app be built and output in the build_path?")
177177
build_api: bool = Field(default=False, description="Should api objects be built and output in the build_path?")
178178
build_ssa: bool = Field(default=False, description="Should ssa objects be built and output in the build_path?")
179+
data_source_TA_validation: bool = Field(default=False, description="Validate latest TA information from Splunkbase")
179180

180181
def getAtomicRedTeamRepoPath(self, atomic_red_team_repo_name:str = "atomic-red-team"):
181182
return self.path/atomic_red_team_repo_name

0 commit comments

Comments
 (0)