|
3 | 3 | from enum import Enum
|
4 | 4 | from typing import Any, Dict, List, Optional, Union
|
5 | 5 | from dataclasses import dataclass, asdict, field
|
6 |
| - |
| 6 | +from socketdev.exceptions import APIFailure |
7 | 7 |
|
8 | 8 | from ..utils import IntegrationType, Utils
|
9 | 9 |
|
@@ -710,137 +710,137 @@ def create_params_string(self, params: dict) -> str:
|
710 | 710 | def get(self, org_slug: str, params: dict, use_types: bool = False) -> Union[dict, GetFullScanMetadataResponse]:
|
711 | 711 | params_arg = self.create_params_string(params)
|
712 | 712 | path = "orgs/" + org_slug + "/full-scans" + str(params_arg)
|
713 |
| - response = self.api.do_request(path=path) |
714 |
| - |
715 |
| - if response.status_code == 200: |
716 |
| - result = response.json() |
717 |
| - if use_types: |
718 |
| - return GetFullScanMetadataResponse.from_dict({"success": True, "status": 200, "data": result}) |
719 |
| - return result |
720 |
| - |
721 |
| - error_message = response.json().get("error", {}).get("message", "Unknown error") |
722 |
| - log.error(f"Error getting full scan metadata: {response.status_code}, message: {error_message}") |
723 |
| - if use_types: |
724 |
| - return GetFullScanMetadataResponse.from_dict( |
725 |
| - {"success": False, "status": response.status_code, "message": error_message} |
726 |
| - ) |
| 713 | + try: |
| 714 | + response = self.api.do_request(path=path) |
| 715 | + if response.status_code == 200: |
| 716 | + result = response.json() |
| 717 | + if use_types: |
| 718 | + return GetFullScanMetadataResponse.from_dict({"success": True, "status": 200, "data": result}) |
| 719 | + return result |
| 720 | + except APIFailure as e: |
| 721 | + log.error(f"Socket SDK: API failure while getting full scan metadata {e}") |
| 722 | + raise |
| 723 | + except Exception as e: |
| 724 | + log.error(f"Socket SDK: Unexpected error while getting full scan metadata {e}") |
| 725 | + raise |
| 726 | + |
727 | 727 | return {}
|
728 | 728 |
|
729 | 729 | def post(self, files: list, params: FullScanParams, use_types: bool = False) -> Union[dict, CreateFullScanResponse]:
|
730 | 730 | Utils.validate_integration_type(params.integration_type if params.integration_type else "api")
|
731 | 731 | org_slug = str(params.org_slug)
|
732 | 732 | params_dict = params.to_dict()
|
733 | 733 | params_dict.pop("org_slug")
|
734 |
| - |
735 | 734 | params_arg = self.create_params_string(params_dict)
|
736 |
| - |
737 | 735 | path = "orgs/" + org_slug + "/full-scans" + str(params_arg)
|
738 | 736 |
|
739 |
| - response = self.api.do_request(path=path, method="POST", files=files) |
740 |
| - |
741 |
| - if response.status_code == 201: |
742 |
| - result = response.json() |
743 |
| - if use_types: |
744 |
| - return CreateFullScanResponse.from_dict({"success": True, "status": 201, "data": result}) |
745 |
| - return result |
| 737 | + try: |
| 738 | + response = self.api.do_request(path=path, method="POST", files=files) |
| 739 | + if response.status_code == 201: |
| 740 | + result = response.json() |
| 741 | + if use_types: |
| 742 | + return CreateFullScanResponse.from_dict({"success": True, "status": 201, "data": result}) |
| 743 | + return result |
| 744 | + except APIFailure as e: |
| 745 | + log.error(f"Socket SDK: API failure while posting full scan {e}") |
| 746 | + raise |
| 747 | + except Exception as e: |
| 748 | + log.error(f"Socket SDK: Unexpected error while posting full scan {e}") |
| 749 | + raise |
746 | 750 |
|
747 |
| - error_message = response.json().get("error", {}).get("message", "Unknown error") |
748 |
| - log.error(f"Error posting {files} to the Fullscans API: {response.status_code}, message: {error_message}") |
749 |
| - if use_types: |
750 |
| - return CreateFullScanResponse.from_dict( |
751 |
| - {"success": False, "status": response.status_code, "message": error_message} |
752 |
| - ) |
753 | 751 | return {}
|
754 | 752 |
|
755 | 753 | def delete(self, org_slug: str, full_scan_id: str) -> dict:
|
756 | 754 | path = "orgs/" + org_slug + "/full-scans/" + full_scan_id
|
| 755 | + try: |
| 756 | + response = self.api.do_request(path=path, method="DELETE") |
| 757 | + if response.status_code == 200: |
| 758 | + return response.json() |
| 759 | + except APIFailure as e: |
| 760 | + log.error(f"Socket SDK: API failure while deleting full scan {e}") |
| 761 | + raise |
| 762 | + except Exception as e: |
| 763 | + log.error(f"Socket SDK: Unexpected error while deleting full scan {e}") |
| 764 | + raise |
757 | 765 |
|
758 |
| - response = self.api.do_request(path=path, method="DELETE") |
759 |
| - |
760 |
| - if response.status_code == 200: |
761 |
| - result = response.json() |
762 |
| - return result |
763 |
| - |
764 |
| - error_message = response.json().get("error", {}).get("message", "Unknown error") |
765 |
| - log.error(f"Error deleting full scan: {response.status_code}, message: {error_message}") |
766 | 766 | return {}
|
767 | 767 |
|
768 | 768 | def stream_diff(
|
769 | 769 | self, org_slug: str, before: str, after: str, use_types: bool = False
|
770 | 770 | ) -> Union[dict, StreamDiffResponse]:
|
771 | 771 | path = f"orgs/{org_slug}/full-scans/diff?before={before}&after={after}"
|
| 772 | + try: |
| 773 | + response = self.api.do_request(path=path, method="GET") |
| 774 | + if response.status_code == 200: |
| 775 | + result = response.json() |
| 776 | + if use_types: |
| 777 | + return StreamDiffResponse.from_dict({"success": True, "status": 200, "data": result}) |
| 778 | + return result |
| 779 | + except APIFailure as e: |
| 780 | + log.error(f"Socket SDK: API failure while streaming diff {e}") |
| 781 | + raise |
| 782 | + except Exception as e: |
| 783 | + log.error(f"Socket SDK: Unexpected error while streaming diff {e}") |
| 784 | + raise |
772 | 785 |
|
773 |
| - response = self.api.do_request(path=path, method="GET") |
774 |
| - |
775 |
| - if response.status_code == 200: |
776 |
| - result = response.json() |
777 |
| - if use_types: |
778 |
| - return StreamDiffResponse.from_dict({"success": True, "status": 200, "data": result}) |
779 |
| - return result |
780 |
| - |
781 |
| - error_message = response.json().get("error", {}).get("message", "Unknown error") |
782 |
| - log.error(f"Error streaming diff: {response.status_code}, message: {error_message}") |
783 |
| - if use_types: |
784 |
| - return StreamDiffResponse.from_dict( |
785 |
| - {"success": False, "status": response.status_code, "message": error_message} |
786 |
| - ) |
787 | 786 | return {}
|
788 | 787 |
|
789 | 788 | def stream(self, org_slug: str, full_scan_id: str, use_types: bool = False) -> Union[dict, FullScanStreamResponse]:
|
790 | 789 | path = "orgs/" + org_slug + "/full-scans/" + full_scan_id
|
791 |
| - response = self.api.do_request(path=path, method="GET") |
792 |
| - |
793 |
| - if response.status_code == 200: |
794 |
| - try: |
795 |
| - stream_str = [] |
796 |
| - artifacts = {} |
797 |
| - result = response.text |
798 |
| - result = result.strip('"').strip() |
799 |
| - for line in result.split("\n"): |
800 |
| - if line != '"' and line != "" and line is not None: |
801 |
| - item = json.loads(line) |
802 |
| - stream_str.append(item) |
803 |
| - for val in stream_str: |
804 |
| - artifacts[val["id"]] = val |
805 |
| - |
806 |
| - if use_types: |
807 |
| - return FullScanStreamResponse.from_dict({"success": True, "status": 200, "artifacts": artifacts}) |
808 |
| - return artifacts |
| 790 | + try: |
| 791 | + response = self.api.do_request(path=path, method="GET") |
| 792 | + if response.status_code == 200: |
| 793 | + try: |
| 794 | + stream_str = [] |
| 795 | + artifacts = {} |
| 796 | + result = response.text.strip('"').strip() |
| 797 | + for line in result.split("\n"): |
| 798 | + if line != '"' and line != "" and line is not None: |
| 799 | + item = json.loads(line) |
| 800 | + stream_str.append(item) |
| 801 | + for val in stream_str: |
| 802 | + artifacts[val["id"]] = val |
| 803 | + |
| 804 | + if use_types: |
| 805 | + return FullScanStreamResponse.from_dict( |
| 806 | + {"success": True, "status": 200, "artifacts": artifacts} |
| 807 | + ) |
| 808 | + return artifacts |
| 809 | + |
| 810 | + except Exception as e: |
| 811 | + error_message = f"Error parsing stream response: {str(e)}" |
| 812 | + log.error(error_message) |
| 813 | + if use_types: |
| 814 | + return FullScanStreamResponse.from_dict( |
| 815 | + {"success": False, "status": response.status_code, "message": error_message} |
| 816 | + ) |
| 817 | + return {} |
| 818 | + |
| 819 | + except APIFailure as e: |
| 820 | + log.error(f"Socket SDK: API failure while streaming full scan {e}") |
| 821 | + raise |
| 822 | + except Exception as e: |
| 823 | + log.error(f"Socket SDK: Unexpected error while streaming full scan {e}") |
| 824 | + raise |
809 | 825 |
|
810 |
| - except Exception as e: |
811 |
| - error_message = f"Error parsing stream response: {str(e)}" |
812 |
| - log.error(error_message) |
813 |
| - if use_types: |
814 |
| - return FullScanStreamResponse.from_dict( |
815 |
| - {"success": False, "status": response.status_code, "message": error_message} |
816 |
| - ) |
817 |
| - return {} |
818 |
| - |
819 |
| - error_message = response.json().get("error", {}).get("message", "Unknown error") |
820 |
| - log.error(f"Error streaming full scan: {response.status_code}, message: {error_message}") |
821 |
| - if use_types: |
822 |
| - return FullScanStreamResponse.from_dict( |
823 |
| - {"success": False, "status": response.status_code, "message": error_message} |
824 |
| - ) |
825 | 826 | return {}
|
826 | 827 |
|
827 | 828 | def metadata(
|
828 | 829 | self, org_slug: str, full_scan_id: str, use_types: bool = False
|
829 | 830 | ) -> Union[dict, GetFullScanMetadataResponse]:
|
830 | 831 | path = "orgs/" + org_slug + "/full-scans/" + full_scan_id + "/metadata"
|
| 832 | + try: |
| 833 | + response = self.api.do_request(path=path, method="GET") |
| 834 | + if response.status_code == 200: |
| 835 | + result = response.json() |
| 836 | + if use_types: |
| 837 | + return GetFullScanMetadataResponse.from_dict({"success": True, "status": 200, "data": result}) |
| 838 | + return result |
| 839 | + except APIFailure as e: |
| 840 | + log.error(f"Socket SDK: API failure while getting metadata {e}") |
| 841 | + raise |
| 842 | + except Exception as e: |
| 843 | + log.error(f"Socket SDK: Unexpected error while getting metadata {e}") |
| 844 | + raise |
831 | 845 |
|
832 |
| - response = self.api.do_request(path=path, method="GET") |
833 |
| - |
834 |
| - if response.status_code == 200: |
835 |
| - result = response.json() |
836 |
| - if use_types: |
837 |
| - return GetFullScanMetadataResponse.from_dict({"success": True, "status": 200, "data": result}) |
838 |
| - return result |
839 |
| - |
840 |
| - error_message = response.json().get("error", {}).get("message", "Unknown error") |
841 |
| - log.error(f"Error getting metadata: {response.status_code}, message: {error_message}") |
842 |
| - if use_types: |
843 |
| - return GetFullScanMetadataResponse.from_dict( |
844 |
| - {"success": False, "status": response.status_code, "message": error_message} |
845 |
| - ) |
846 | 846 | return {}
|
0 commit comments