|  | 
| 2 | 2 | import json | 
| 3 | 3 | import re | 
| 4 | 4 | import time | 
|  | 5 | +from datetime import datetime | 
| 5 | 6 | from warnings import warn | 
| 6 | 7 | 
 | 
|  | 8 | +from requests.exceptions import RetryError | 
| 7 | 9 | from requests_toolbelt.multipart.encoder import MultipartEncoder | 
| 8 | 10 | 
 | 
| 9 | 11 | try: | 
| @@ -1224,3 +1226,81 @@ def download_cve_report_csv(self, vuln_type="os", scope_type="static"): | 
| 1224 | 1226 |             return [False, self.lasterr] | 
| 1225 | 1227 | 
 | 
| 1226 | 1228 |         return [True, res.content.decode("utf-8")] | 
|  | 1229 | + | 
|  | 1230 | +    def get_image_scanning_results(self, image_name, policy_id=None): | 
|  | 1231 | +        ''' | 
|  | 1232 | +        Args: | 
|  | 1233 | +            image_name (str): Image name to retrieve the scanning results from | 
|  | 1234 | +            policy_id (str): Policy ID to check against. If not specified, will check against all policies. | 
|  | 1235 | +
 | 
|  | 1236 | +        Returns: | 
|  | 1237 | +            A tuple of (bool, str). | 
|  | 1238 | +            The first parameter, if true, means that the result is correct, while | 
|  | 1239 | +            if false, means that there's been an error. The second parameter | 
|  | 1240 | +            will hold the response of the API call. | 
|  | 1241 | +        ''' | 
|  | 1242 | +        try: | 
|  | 1243 | +            ok, res = self.get_image(image_name) | 
|  | 1244 | +            if not ok: | 
|  | 1245 | +                return ok, res | 
|  | 1246 | + | 
|  | 1247 | +            image_digest = res[0]["imageDigest"] | 
|  | 1248 | +            image_tag = res[0]["image_detail"][0]["fulltag"] | 
|  | 1249 | +        except RetryError: | 
|  | 1250 | +            return [False, "could not retrieve image digest for the given image name, " | 
|  | 1251 | +                           "ensure that the image has been scanned"] | 
|  | 1252 | + | 
|  | 1253 | +        url = f"{self.url}/api/scanning/v1/images/{image_digest}/policyEvaluation" | 
|  | 1254 | +        params = { | 
|  | 1255 | +            "tag": image_tag, | 
|  | 1256 | +        } | 
|  | 1257 | + | 
|  | 1258 | +        res = self.http.get(url, headers=self.hdrs, params=params, verify=self.ssl_verify) | 
|  | 1259 | +        if not self._checkResponse(res): | 
|  | 1260 | +            return [False, self.lasterr] | 
|  | 1261 | + | 
|  | 1262 | +        json_res = res.json() | 
|  | 1263 | + | 
|  | 1264 | +        result = { | 
|  | 1265 | +            "image_digest": json_res["imageDigest"], | 
|  | 1266 | +            "image_id": json_res["imageId"], | 
|  | 1267 | +            "status": json_res["status"], | 
|  | 1268 | +            "image_tag": image_tag, | 
|  | 1269 | +            "total_stop": json_res["nStop"], | 
|  | 1270 | +            "total_warn": json_res["nWarn"], | 
|  | 1271 | +            "last_evaluation": datetime.utcfromtimestamp(json_res["at"]), | 
|  | 1272 | +            "policy_id": "*", | 
|  | 1273 | +            "policy_name": "All policies", | 
|  | 1274 | +            "warn_results": [], | 
|  | 1275 | +            "stop_results": [] | 
|  | 1276 | +        } | 
|  | 1277 | + | 
|  | 1278 | +        if policy_id: | 
|  | 1279 | +            policy_results = [result for result in json_res["results"] if result["policyId"] == policy_id] | 
|  | 1280 | +            if policy_results: | 
|  | 1281 | +                filtered_result_by_policy_id = policy_results[0] | 
|  | 1282 | +                result["total_stop"] = filtered_result_by_policy_id["nStop"] | 
|  | 1283 | +                result["total_warn"] = filtered_result_by_policy_id["nWarn"] | 
|  | 1284 | +                result["warn_results"] = [rule_result["checkOutput"] | 
|  | 1285 | +                                          for gate_result in filtered_result_by_policy_id["gateResults"] | 
|  | 1286 | +                                          for rule_result in gate_result["ruleResults"] | 
|  | 1287 | +                                          if rule_result["gateAction"] == "warn"] | 
|  | 1288 | +                result["stop_results"] = [rule_result["checkOutput"] | 
|  | 1289 | +                                          for gate_result in filtered_result_by_policy_id["gateResults"] | 
|  | 1290 | +                                          for rule_result in gate_result["ruleResults"] | 
|  | 1291 | +                                          if rule_result["gateAction"] == "stop"] | 
|  | 1292 | +            else: | 
|  | 1293 | +                return [False, "the specified policy ID doesn't exist"] | 
|  | 1294 | +        else: | 
|  | 1295 | +            result["warn_results"] = [rule_result["checkOutput"] | 
|  | 1296 | +                                      for result in json_res["results"] | 
|  | 1297 | +                                      for gate_result in result["gateResults"] | 
|  | 1298 | +                                      for rule_result in gate_result["ruleResults"] | 
|  | 1299 | +                                      if rule_result["gateAction"] == "warn"] | 
|  | 1300 | +            result["stop_results"] = [rule_result["checkOutput"] | 
|  | 1301 | +                                      for result in json_res["results"] | 
|  | 1302 | +                                      for gate_result in result["gateResults"] | 
|  | 1303 | +                                      for rule_result in gate_result["ruleResults"] | 
|  | 1304 | +                                      if rule_result["gateAction"] == "stop"] | 
|  | 1305 | + | 
|  | 1306 | +        return [True, result] | 
0 commit comments