1
1
import platform
2
2
import urllib .parse
3
- from typing import Any , Dict , Iterable , List , Optional , Tuple , Union
3
+ from typing import Dict , Iterable , Optional , Union
4
4
5
5
import requests
6
- from marshmallow import Schema
7
6
from requests import Response , Session , codes
8
7
9
8
from .config import (
12
11
DEFAULT_TIMEOUT ,
13
12
MULTI_DOCUMENT_LIMIT ,
14
13
)
15
- from .models import Detail , ScanResult
16
- from .schemas import DetailSchema , DocumentSchema , ScanResultSchema
14
+ from .models import Detail , Document , MultiScanResult , ScanResult
17
15
18
16
19
17
class GGClient :
20
- DETAIL_SCHEMA = DetailSchema ()
21
- DOCUMENT_SCHEMA = DocumentSchema ()
22
- SCAN_RESULT_SCHEMA = ScanResultSchema ()
23
18
_version = "undefined"
24
19
25
20
def __init__ (
26
21
self ,
27
- token : str ,
22
+ api_key : str ,
28
23
base_uri : Optional [str ] = None ,
29
24
session : Optional [requests .Session ] = None ,
30
25
user_agent : Optional [str ] = None ,
31
26
timeout : Optional [float ] = DEFAULT_TIMEOUT ,
32
27
) -> "GGClient" :
33
28
"""
34
- :param token: APIKey to be added to requests
29
+ :param api_key: API Key to be added to requests
35
30
:param base_uri: Base URI for the API, defaults to "https://api.gitguardian.com"
36
31
:param session: custom requests session, defaults to requests.Session()
37
32
:param user_agent: user agent to identify requests, defaults to ""
@@ -46,11 +41,11 @@ def __init__(
46
41
else :
47
42
base_uri = DEFAULT_BASE_URI
48
43
49
- if not isinstance (token , str ):
50
- raise TypeError ("Missing token string" )
44
+ if not isinstance (api_key , str ):
45
+ raise TypeError ("api_key is not a string" )
51
46
52
47
self .base_uri = base_uri
53
- self .token = token
48
+ self .api_key = api_key
54
49
self .session = (
55
50
session if session is isinstance (session , Session ) else requests .Session ()
56
51
)
@@ -63,77 +58,64 @@ def __init__(
63
58
self .user_agent = " " .join ([self .user_agent , user_agent ])
64
59
65
60
self .session .headers .update (
66
- {"User-Agent" : self .user_agent , "Authorization" : "Token {0}" .format (token )}
61
+ {
62
+ "User-Agent" : self .user_agent ,
63
+ "Authorization" : "Token {0}" .format (api_key ),
64
+ }
67
65
)
68
66
69
67
def request (
70
- self ,
71
- method : str ,
72
- endpoint : str ,
73
- schema : Schema = None ,
74
- version : str = DEFAULT_API_VERSION ,
75
- many : bool = False ,
76
- ** kwargs
77
- ) -> Tuple [Any , Response ]:
68
+ self , method : str , endpoint : str , version : str = DEFAULT_API_VERSION , ** kwargs
69
+ ) -> Response :
78
70
if version :
79
71
endpoint = urllib .parse .urljoin (version + "/" , endpoint )
80
72
81
73
url = urllib .parse .urljoin (self .base_uri , endpoint )
82
74
83
- response = self .session .request (
75
+ resp = self .session .request (
84
76
method = method , url = url , timeout = self .timeout , ** kwargs
85
77
)
86
78
87
- if response .headers ["content-type" ] != "application/json" :
79
+ if resp .headers ["content-type" ] != "application/json" :
88
80
raise TypeError ("Response is not JSON" )
89
81
90
- if response .status_code == codes .ok and schema :
91
- obj = schema .load (response .json (), many = many )
92
- if many :
93
- for element in obj :
94
- element .status_code = response .status_code
95
- else :
96
- obj .status_code = response .status_code
97
- else :
98
- obj = self .DETAIL_SCHEMA .load (response .json ())
99
- obj .status_code = response .status_code
100
-
101
- return obj , response
82
+ return resp
102
83
103
84
def post (
104
85
self ,
105
86
endpoint : str ,
106
87
data : str = None ,
107
- schema : Schema = None ,
108
88
version : str = DEFAULT_API_VERSION ,
109
- many : bool = False ,
110
89
** kwargs
111
- ) -> Tuple [ Any , Response ] :
90
+ ) -> Response :
112
91
return self .request (
113
- "post" ,
114
- endpoint = endpoint ,
115
- schema = schema ,
116
- json = data ,
117
- version = version ,
118
- many = many ,
119
- ** kwargs ,
92
+ "post" , endpoint = endpoint , json = data , version = version , ** kwargs ,
120
93
)
121
94
122
95
def get (
123
- self ,
124
- endpoint : str ,
125
- schema : Schema = None ,
126
- version : str = DEFAULT_API_VERSION ,
127
- many : bool = False ,
128
- ** kwargs
129
- ) -> Tuple [Any , Response ]:
130
- return self .request (
131
- method = "get" , endpoint = endpoint , schema = schema , version = version , ** kwargs
132
- )
96
+ self , endpoint : str , version : str = DEFAULT_API_VERSION , ** kwargs
97
+ ) -> Response :
98
+ return self .request (method = "get" , endpoint = endpoint , version = version , ** kwargs )
99
+
100
+ def health_check (self ) -> Detail :
101
+ """
102
+ health_check handles the /health endpoint of the API
103
+
104
+ use Detail.status_code to check the response status code of the API
105
+
106
+ 200 if server is online and api_key is valid
107
+ :return: Detail response and status code
108
+ """
109
+ resp = self .get (endpoint = "health" )
110
+
111
+ obj = Detail .SCHEMA .load (resp .json ())
112
+ obj .status_code = resp .status_code
113
+
114
+ return obj
133
115
134
116
def content_scan (
135
117
self , document : str , filename : Optional [str ] = None
136
- ) -> Tuple [ Union [Detail , ScanResult ], int ]:
118
+ ) -> Union [Detail , ScanResult ]:
137
119
"""
138
120
content_scan handles the /scan endpoint of the API
139
121
@@ -146,15 +128,21 @@ def content_scan(
146
128
if filename :
147
129
doc_dict ["filename" ] = filename
148
130
149
- request_obj = self .DOCUMENT_SCHEMA .load (doc_dict )
150
- obj , resp = self .post (
151
- endpoint = "scan" , data = request_obj , schema = self .SCAN_RESULT_SCHEMA
152
- )
153
- return obj , resp .status_code
131
+ request_obj = Document .SCHEMA .load (doc_dict )
132
+
133
+ resp = self .post (endpoint = "scan" , data = request_obj )
134
+ if resp .status_code == codes .ok :
135
+ obj = ScanResult .SCHEMA .load (resp .json ())
136
+ else :
137
+ obj = Detail .SCHEMA .load (resp .json ())
138
+
139
+ obj .status_code = resp .status_code
140
+
141
+ return obj
154
142
155
143
def multi_content_scan (
156
144
self , documents : Iterable [Dict [str , str ]],
157
- ) -> Tuple [ Union [Detail , List [ ScanResult ]], int ]:
145
+ ) -> Union [Detail , MultiScanResult ]:
158
146
"""
159
147
multi_content_scan handles the /multiscan endpoint of the API
160
148
@@ -171,26 +159,17 @@ def multi_content_scan(
171
159
)
172
160
173
161
if all (isinstance (doc , dict ) for doc in documents ):
174
- request_obj = self . DOCUMENT_SCHEMA .load (documents , many = True )
162
+ request_obj = Document . SCHEMA .load (documents , many = True )
175
163
else :
176
164
raise TypeError ("each document must be a dict" )
177
165
178
- obj , resp = self .post (
179
- endpoint = "multiscan" ,
180
- data = request_obj ,
181
- schema = self .SCAN_RESULT_SCHEMA ,
182
- many = True ,
183
- )
184
- return obj , resp .status_code
166
+ resp = self .post (endpoint = "multiscan" , data = request_obj )
185
167
186
- def health_check (self ) -> Tuple [Detail , int ]:
187
- """
188
- health_check handles the /health endpoint of the API
168
+ if resp .status_code == codes .ok :
169
+ obj = MultiScanResult .SCHEMA .load (dict (scan_results = resp .json ()))
170
+ else :
171
+ obj = Detail .SCHEMA .load (resp .json ())
189
172
190
- use Detail .status_code to check the response status code of the API
173
+ obj .status_code = resp . status_code
191
174
192
- 200 if server is online and token is valid
193
- :return: Detail response and status code
194
- """
195
- obj , resp = self .get (endpoint = "health" )
196
- return obj , resp .status_code
175
+ return obj
0 commit comments