22
33import json
44import logging
5+ import urllib .parse
56from typing import Any , Sequence
67
78import requests
@@ -63,16 +64,18 @@ def do_request(
6364 body : bytes | None = None ,
6465 headers : dict [str , Any ] | None = None ,
6566 retries : int = 3 ,
67+ params : dict [str , Any ] | None = None ,
6668 ) -> bytes :
6769 """Issues a request to the given URL.
6870
6971 Args:
7072 method: The HTTP method to use.
7173 url: The URL to issue the request to.
72- json : The JSON payload to include in the request.
74+ json_data : The JSON payload to include in the request.
7375 body: The body to include in the request.
7476 headers: Extra headers to include in the request.
7577 retries: The number of times to retry the request.
78+ params: The query parameters to include in the request.
7679
7780 Returns:
7881 The response from the API; a bytes object.
@@ -90,6 +93,8 @@ def do_request(
9093 request_kwargs ["json" ] = json_data
9194 if body :
9295 request_kwargs ["body" ] = body
96+ if params :
97+ url = f"{ url } ?{ urllib .parse .urlencode (params )} "
9398
9499 try :
95100 if method == "POST" :
@@ -145,6 +150,28 @@ def refresh_auth(self):
145150 else :
146151 logger .warning ("No auth function set, cannot refresh auth." )
147152
153+ def find_indicator (self , name : str , type : str ) -> YetiObject | None :
154+ """Finds an indicator in Yeti by name and type.
155+
156+ Args:
157+ name: The name of the indicator to find.
158+ type: The type of the indicator to find.
159+
160+ Returns:
161+ The response from the API; a dict representing the indicator.
162+ """
163+ try :
164+ response = self .do_request (
165+ "GET" ,
166+ f"{ self ._url_root } /api/v2/indicators/" ,
167+ params = {"name" : name , "type" : type },
168+ )
169+ except errors .YetiApiError as e :
170+ if e .status_code == 404 :
171+ return None
172+ raise
173+ return json .loads (response )
174+
148175 def search_indicators (
149176 self ,
150177 name : str | None = None ,
@@ -163,7 +190,7 @@ def search_indicators(
163190 tags: The tags of the indicator to search for.
164191
165192 Returns:
166- The response from the API; a dict representing the indicator .
193+ The response from the API; a list of dicts representing indicators .
167194 """
168195
169196 if not any ([name , indicator_type , pattern , tags ]):
@@ -188,6 +215,28 @@ def search_indicators(
188215 )
189216 return json .loads (response )["indicators" ]
190217
218+ def find_entity (self , name : str , type : str ) -> YetiObject | None :
219+ """Finds an entity in Yeti by name.
220+
221+ Args:
222+ name: The name of the entity to find.
223+ type: The type of the entity to find.
224+
225+ Returns:
226+ The response from the API; a dict representing the entity.
227+ """
228+ try :
229+ response = self .do_request (
230+ "GET" ,
231+ f"{ self ._url_root } /api/v2/entities/" ,
232+ params = {"name" : name , "type" : type },
233+ )
234+ except errors .YetiApiError as e :
235+ if e .status_code == 404 :
236+ return None
237+ raise
238+ return json .loads (response )
239+
191240 def search_entities (self , name : str ) -> list [YetiObject ]:
192241 params = {"query" : {"name" : name }, "count" : 0 }
193242 response = self .do_request (
@@ -197,6 +246,28 @@ def search_entities(self, name: str) -> list[YetiObject]:
197246 )
198247 return json .loads (response )["entities" ]
199248
249+ def find_observable (self , value : str , type : str ) -> YetiObject | None :
250+ """Finds an observable in Yeti by value and type.
251+
252+ Args:
253+ value: The value of the observable to find.
254+ type: The type of the observable to find.
255+
256+ Returns:
257+ The response from the API; a dict representing the observable.
258+ """
259+ try :
260+ response = self .do_request (
261+ "GET" ,
262+ f"{ self ._url_root } /api/v2/observables/" ,
263+ params = {"value" : value , "type" : type },
264+ )
265+ except errors .YetiApiError as e :
266+ if e .status_code == 404 :
267+ return None
268+ raise
269+ return json .loads (response )
270+
200271 def search_observables (self , value : str ) -> list [YetiObject ]:
201272 """Searches for an observable in Yeti.
202273
@@ -330,6 +401,28 @@ def get_yara_bundle_with_overlays(
330401
331402 return json .loads (result )
332403
404+ def find_dfiq (self , name : str , dfiq_type : str ) -> YetiObject | None :
405+ """Finds a DFIQ in Yeti by name and type.
406+
407+ Args:
408+ name: The name of the DFIQ to find.
409+ dfiq_type: The type of the DFIQ to find.
410+
411+ Returns:
412+ The response from the API; a dict representing the DFIQ object.
413+ """
414+ try :
415+ response = self .do_request (
416+ "GET" ,
417+ f"{ self ._url_root } /api/v2/dfiq/" ,
418+ params = {"name" : name , "type" : dfiq_type },
419+ )
420+ except errors .YetiApiError as e :
421+ if e .status_code == 404 :
422+ return None
423+ raise
424+ return json .loads (response )
425+
333426 def search_dfiq (self , name : str , dfiq_type : str | None = None ) -> list [YetiObject ]:
334427 """Searches for a DFIQ in Yeti.
335428
0 commit comments