1
+ import os
2
+ import time
3
+ import json
4
+ import xml .etree .ElementTree as ET
5
+ from typing import List , Tuple , Optional
6
+ from urllib .parse import urlencode
7
+
8
+ import requests
9
+ import urllib3
10
+ import xmltodict
11
+ from requests .adapters import HTTPAdapter
12
+ from requests .packages .urllib3 .util .retry import Retry
13
+
14
+ urllib3 .disable_warnings (urllib3 .exceptions .InsecureRequestWarning )
15
+
16
+ MAX_RETRY = 3
17
+
18
+ class APIEndPoint :
19
+ """
20
+ Class which contains Static Endpoint
21
+ """
22
+
23
+ SPLUNK_BASE_AUTH_URL = "https://splunkbase.splunk.com/api/account:login/"
24
+ SPLUNK_BASE_FETCH_APP_BY_ENTRY_ID = (
25
+ "https://apps.splunk.com/api/apps/entriesbyid/{app_name_id}"
26
+ )
27
+ SPLUNK_BASE_GET_UID_REDIRECT = "https://apps.splunk.com/apps/id/{app_name_id}"
28
+ SPLUNK_BASE_APP_INFO = "https://splunkbase.splunk.com/api/v1/app/{app_uid}"
29
+
30
+ class RetryConstant :
31
+ """
32
+ Class which contains Retry Constant
33
+ """
34
+
35
+ RETRY_COUNT = 3
36
+ RETRY_INTERVAL = 15
37
+
38
+
39
+ class SplunkBaseError (requests .HTTPError ):
40
+ """An error raise in communicating with Splunkbase"""
41
+ pass
42
+
43
+
44
+ # TODO (PEX-306): validate w/ Splunkbase team if there are better APIs we can rely on being supported
45
+ class SplunkApp :
46
+ """
47
+ A Splunk app available for download on Splunkbase
48
+ """
49
+
50
+ class InitializationError (Exception ):
51
+ """An initialization error during SplunkApp setup"""
52
+ pass
53
+
54
+ @staticmethod
55
+ def requests_retry_session (
56
+ retries = RetryConstant .RETRY_COUNT ,
57
+ backoff_factor = 1 ,
58
+ status_forcelist = (500 , 502 , 503 , 504 ),
59
+ session = None ,
60
+ ):
61
+ session = session or requests .Session ()
62
+ retry = Retry (
63
+ total = retries ,
64
+ read = retries ,
65
+ connect = retries ,
66
+ backoff_factor = backoff_factor ,
67
+ status_forcelist = status_forcelist ,
68
+ )
69
+ adapter = HTTPAdapter (max_retries = retry )
70
+ session .mount ('http://' , adapter )
71
+ session .mount ('https://' , adapter )
72
+ return session
73
+
74
+ def __init__ (
75
+ self ,
76
+ app_uid : Optional [int ] = None ,
77
+ app_name_id : Optional [str ] = None ,
78
+ manual_setup : bool = False ,
79
+ ) -> None :
80
+ if app_uid is None and app_name_id is None :
81
+ raise SplunkApp .InitializationError (
82
+ "Either app_uid (the numeric app UID e.g. 742) or app_name_id (the app name "
83
+ "idenitifier e.g. Splunk_TA_windows) must be provided"
84
+ )
85
+
86
+ # init or declare instance vars
87
+ self .app_uid : Optional [int ] = app_uid
88
+ self .app_name_id : Optional [str ] = app_name_id
89
+ self .manual_setup = manual_setup
90
+ self .app_title : str
91
+ self .latest_version : str
92
+ self .latest_version_download_url : str
93
+ self ._app_info_cache : Optional [dict ] = None
94
+
95
+ # set instance vars as needed; skip if manual setup was indicated
96
+ if not self .manual_setup :
97
+ self .set_app_name_id ()
98
+ self .set_app_uid ()
99
+ self .set_app_title ()
100
+ self .set_latest_version_info ()
101
+
102
+ def __eq__ (self , __value : object ) -> bool :
103
+ if isinstance (__value , SplunkApp ):
104
+ return self .app_uid == __value .app_uid
105
+ return False
106
+
107
+ def __repr__ (self ) -> str :
108
+ return (
109
+ f"SplunkApp(app_name_id='{ self .app_name_id } ', app_uid={ self .app_uid } , "
110
+ f"latest_version_download_url='{ self .latest_version_download_url } ')"
111
+ )
112
+
113
+ def __str__ (self ) -> str :
114
+ return f"<'{ self .app_name_id } ' ({ self .app_uid } )"
115
+
116
+ def get_app_info_by_uid (self ) -> dict :
117
+ """
118
+ Retrieve app info via app_uid (e.g. 742)
119
+ :return: dictionary of app info
120
+ """
121
+ # return cache if already set and raise and raise is app_uid is not set
122
+ if self ._app_info_cache is not None :
123
+ return self ._app_info_cache
124
+ elif self .app_uid is None :
125
+ raise SplunkApp .InitializationError ("app_uid must be set in order to fetch app info" )
126
+
127
+ # NOTE: auth not required
128
+ # Get app info by uid
129
+ try :
130
+ response = self .requests_retry_session ().get (
131
+ APIEndPoint .SPLUNK_BASE_APP_INFO .format (app_uid = self .app_uid ),
132
+ timeout = RetryConstant .RETRY_INTERVAL
133
+ )
134
+ response .raise_for_status ()
135
+ except requests .exceptions .RequestException as e :
136
+ raise SplunkBaseError (f"Error fetching app info for app_uid { self .app_uid } : { str (e )} " )
137
+
138
+ # parse JSON and set cache
139
+ self ._app_info_cache : dict = json .loads (response .content )
140
+
141
+ return self ._app_info_cache
142
+
143
+ def set_app_name_id (self ) -> None :
144
+ """
145
+ Set app_name_id
146
+ """
147
+ # return if app_name_id is already set
148
+ if self .app_name_id is not None :
149
+ return
150
+
151
+ # get app info by app_uid
152
+ app_info = self .get_app_info_by_uid ()
153
+
154
+ # set app_name_id if found
155
+ if "appid" in app_info :
156
+ self .app_name_id = app_info ["appid" ]
157
+ else :
158
+ raise SplunkBaseError (f"Invalid response from Splunkbase; missing key 'appid': { app_info } " )
159
+
160
+ def set_app_uid (self ) -> None :
161
+ """
162
+ Set app_uid
163
+ """
164
+ # return if app_uid is already set and raise if app_name_id was not set
165
+ if self .app_uid is not None :
166
+ return
167
+ elif self .app_name_id is None :
168
+ raise SplunkApp .InitializationError ("app_name_id must be set in order to fetch app_uid" )
169
+
170
+ # NOTE: auth not required
171
+ # Get app_uid by app_name_id via a redirect
172
+ try :
173
+ response = self .requests_retry_session ().get (
174
+ APIEndPoint .SPLUNK_BASE_GET_UID_REDIRECT .format (app_name_id = self .app_name_id ),
175
+ allow_redirects = False ,
176
+ timeout = RetryConstant .RETRY_INTERVAL
177
+ )
178
+ response .raise_for_status ()
179
+ except requests .exceptions .RequestException as e :
180
+ raise SplunkBaseError (f"Error fetching app_uid for app_name_id '{ self .app_name_id } ': { str (e )} " )
181
+
182
+ # Extract the app_uid from the redirect path
183
+ if "Location" in response .headers :
184
+ self .app_uid = response .headers .split ("/" )[- 1 ]
185
+ else :
186
+ raise SplunkBaseError (
187
+ "Invalid response from Splunkbase; missing 'Location' in redirect header"
188
+ )
189
+
190
+ def set_app_title (self ) -> None :
191
+ """
192
+ Set app_title
193
+ """
194
+ # get app info by app_uid
195
+ app_info = self .get_app_info_by_uid ()
196
+
197
+ # set app_title if found
198
+ if "title" in app_info :
199
+ self .app_title = app_info ["title" ]
200
+ else :
201
+ raise SplunkBaseError (f"Invalid response from Splunkbase; missing key 'title': { app_info } " )
202
+
203
+ def __fetch_url_latest_version_info (self ) -> str :
204
+ """
205
+ Identify latest version of the app and return a URL pointing to download info for the build
206
+ :return: url for download info on the latest build
207
+ """
208
+ # retrieve app entries using the app_name_id
209
+ try :
210
+ response = self .requests_retry_session ().get (
211
+ APIEndPoint .SPLUNK_BASE_FETCH_APP_BY_ENTRY_ID .format (app_name_id = self .app_name_id ),
212
+ timeout = RetryConstant .RETRY_INTERVAL
213
+ )
214
+ response .raise_for_status ()
215
+ except requests .exceptions .RequestException as e :
216
+ raise SplunkBaseError (f"Error fetching app entries for app_name_id '{ self .app_name_id } ': { str (e )} " )
217
+
218
+ # parse xml
219
+ app_xml = xmltodict .parse (response .content )
220
+
221
+ # convert to list if only one entry exists
222
+ app_entries = app_xml .get ("feed" ).get ("entry" )
223
+ if not isinstance (app_entries , list ):
224
+ app_entries = [app_entries ]
225
+
226
+ # iterate over multiple entries if present
227
+ for entry in app_entries :
228
+ for key in entry .get ("content" ).get ("s:dict" ).get ("s:key" ):
229
+ if key .get ("@name" ) == "islatest" and key .get ("#text" ) == "True" :
230
+ return entry .get ("link" ).get ("@href" )
231
+
232
+ # raise if no entry was found
233
+ raise SplunkBaseError (f"No app entry found with 'islatest' tag set to True: { self .app_name_id } " )
234
+
235
+ def __fetch_url_latest_version_download (self , info_url : str ) -> str :
236
+ """
237
+ Fetch the download URL via the provided URL to build info
238
+ :param info_url: URL for download info for the latest build
239
+ :return: URL for downloading the latest build
240
+ """
241
+ # fetch download info
242
+ try :
243
+ response = self .requests_retry_session ().get (info_url , timeout = RetryConstant .RETRY_INTERVAL )
244
+ response .raise_for_status ()
245
+ except requests .exceptions .RequestException as e :
246
+ raise SplunkBaseError (f"Error fetching download info for app_name_id '{ self .app_name_id } ': { str (e )} " )
247
+
248
+ # parse XML and extract download URL
249
+ build_xml = xmltodict .parse (response .content )
250
+ download_url = build_xml .get ("feed" ).get ("entry" ).get ("link" ).get ("@href" )
251
+ return download_url
252
+
253
+ def set_latest_version_info (self ) -> None :
254
+ # raise if app_name_id not set
255
+ if self .app_name_id is None :
256
+ raise SplunkApp .InitializationError ("app_name_id must be set in order to fetch latest version info" )
257
+
258
+ # fetch the info URL
259
+ info_url = self .__fetch_url_latest_version_info ()
260
+
261
+ # parse out the version number and fetch the download URL
262
+ self .latest_version = info_url .split ("/" )[- 1 ]
263
+ self .latest_version_download_url = self .__fetch_url_latest_version_download (info_url )
0 commit comments