55import json
66import os
77import uuid
8- from typing import List
8+ from typing import List , Any , Dict , Optional , Union
99
1010import datefinder
1111import dateutil .parser
1212import pytz
1313
14+ from pycti .entities .opencti_identity import Identity
1415from pycti .utils .constants import IdentityTypes , LocationTypes , StixCyberObservableTypes
1516from pycti .utils .opencti_stix2_splitter import OpenCTIStix2Splitter
1617from pycti .utils .opencti_stix2_update import OpenCTIStix2Update
@@ -35,13 +36,13 @@ def __init__(self, opencti):
3536
3637 ######### UTILS
3738 # region utils
38- def unknown_type (self , stix_object ) :
39+ def unknown_type (self , stix_object : Dict ) -> None :
3940 self .opencti .log (
4041 "error" ,
4142 'Unknown object type "' + stix_object ["type" ] + '", doing nothing...' ,
4243 )
4344
44- def convert_markdown (self , text ) -> str :
45+ def convert_markdown (self , text : str ) -> str :
4546 """converts input text to markdown style code annotation
4647
4748 :param text: input text
@@ -52,31 +53,33 @@ def convert_markdown(self, text) -> str:
5253
5354 return text .replace ("<code>" , "`" ).replace ("</code>" , "`" )
5455
55- def format_date (self , date ) :
56+ def format_date (self , date : Any = None ) -> str :
5657 """converts multiple input date formats to OpenCTI style dates
5758
5859 :param date: input date
59- :type date:
60+ :type date: Any [datetime, date, str or none]
6061 :return: OpenCTI style date
61- :rtype: datetime
62+ :rtype: string
6263 """
63-
64- if isinstance ( date , datetime . date ):
65- return date . isoformat ( timespec = "milliseconds" ). replace ( "+00:00" , "Z" )
66- if date is not None :
67- return (
68- dateutil . parser . parse ( date )
69- . isoformat ( timespec = "milliseconds" )
70- . replace ( "+00:00" , "Z" )
71- )
64+ if isinstance ( date , datetime . datetime ):
65+ date_value = date
66+ elif isinstance ( date , datetime . date ):
67+ date_value = datetime . datetime . combine ( date , datetime . datetime . min . time ())
68+ elif isinstance ( date , str ):
69+ try :
70+ date_value = dateutil . parser . parse ( date )
71+ except ( dateutil . parser . ParserError , TypeError , OverflowError ) as e :
72+ raise ValueError ( f" { e } : { date } does not contain a valid date string" )
7273 else :
73- return (
74- datetime .datetime .utcnow ()
75- .isoformat (timespec = "milliseconds" )
76- .replace ("+00:00" , "Z" )
77- )
74+ date_value = datetime .datetime .utcnow ()
75+
76+ if not date_value .tzinfo :
77+ self .opencti .log ("No timezone found. Setting to UTC" , "info" )
78+ date_value = date_value .replace (tzinfo = datetime .timezone .utc )
79+
80+ return date_value .isoformat (timespec = "milliseconds" ).replace ("+00:00" , "Z" )
7881
79- def filter_objects (self , uuids : list , objects : list ) -> list :
82+ def filter_objects (self , uuids : List , objects : List ) -> List :
8083 """filters objects based on UUIDs
8184
8285 :param uuids: list of UUIDs
@@ -94,7 +97,7 @@ def filter_objects(self, uuids: list, objects: list) -> list:
9497 result .append (item )
9598 return result
9699
97- def pick_aliases (self , stix_object ) -> list :
100+ def pick_aliases (self , stix_object : Dict ) -> Optional [ List ] :
98101 """check stix2 object for multiple aliases and return a list
99102
100103 :param stix_object: valid stix2 object
@@ -115,7 +118,7 @@ def pick_aliases(self, stix_object) -> list:
115118 return None
116119
117120 def check_max_marking_definition (
118- self , max_marking_definition_entity : str , entity_marking_definitions : list
121+ self , max_marking_definition_entity : Dict , entity_marking_definitions : List
119122 ) -> bool :
120123 """checks if a list of marking definitions conforms with a given max level
121124
@@ -151,7 +154,9 @@ def check_max_marking_definition(
151154 return True
152155 return False
153156
154- def import_bundle_from_file (self , file_path : str , update = False , types = None ) -> List :
157+ def import_bundle_from_file (
158+ self , file_path : str , update : bool = False , types : List = None
159+ ) -> Optional [List ]:
155160 """import a stix2 bundle from a file
156161
157162 :param file_path: valid path to the file
@@ -171,7 +176,11 @@ def import_bundle_from_file(self, file_path: str, update=False, types=None) -> L
171176 return self .import_bundle (data , update , types )
172177
173178 def import_bundle_from_json (
174- self , json_data , update = False , types = None , retry_number = None
179+ self ,
180+ json_data : Union [str , bytes ],
181+ update : bool = False ,
182+ types : List = None ,
183+ retry_number : int = None ,
175184 ) -> List :
176185 """import a stix2 bundle from JSON data
177186
@@ -192,7 +201,7 @@ def import_bundle_from_json(
192201 types ,
193202 )
194203
195- def resolve_author (self , title ) :
204+ def resolve_author (self , title : str ) -> Optional [ Identity ] :
196205 if "fireeye" in title .lower () or "mandiant" in title .lower ():
197206 return self .get_author ("FireEye" )
198207 if "eset" in title .lower ():
@@ -233,7 +242,7 @@ def resolve_author(self, title):
233242 return self .get_author ("The MITRE Corporation" )
234243 return None
235244
236- def get_author (self , name ) :
245+ def get_author (self , name : str ) -> Identity :
237246 if name in self .mapping_cache :
238247 return self .mapping_cache [name ]
239248 else :
@@ -245,7 +254,9 @@ def get_author(self, name):
245254 self .mapping_cache [name ] = author
246255 return author
247256
248- def extract_embedded_relationships (self , stix_object , types = None ) -> dict :
257+ def extract_embedded_relationships (
258+ self , stix_object : Dict , types : List = None
259+ ) -> Dict :
249260 """extracts embedded relationship objects from a stix2 entity
250261
251262 :param stix_object: valid stix2 object
@@ -462,7 +473,9 @@ def extract_embedded_relationships(self, stix_object, types=None) -> dict:
462473 # endregion
463474
464475 # region import
465- def import_object (self , stix_object , update = False , types = None ) -> list :
476+ def import_object (
477+ self , stix_object : Dict , update : bool = False , types : List = None
478+ ) -> Optional [List ]:
466479 """import a stix2 object
467480
468481 :param stix_object: valid stix2 object
@@ -539,7 +552,7 @@ def import_object(self, stix_object, update=False, types=None) -> list:
539552 )
540553
541554 if stix_object_results is None :
542- return stix_object_results
555+ return None
543556
544557 if not isinstance (stix_object_results , list ):
545558 stix_object_results = [stix_object_results ]
@@ -629,7 +642,9 @@ def import_object(self, stix_object, update=False, types=None) -> list:
629642
630643 return stix_object_results
631644
632- def import_observable (self , stix_object , update = False , types = None ):
645+ def import_observable (
646+ self , stix_object : Dict , update : bool = False , types : List = None
647+ ) -> None :
633648 # Extract
634649 embedded_relationships = self .extract_embedded_relationships (stix_object , types )
635650 created_by_id = embedded_relationships ["created_by" ]
@@ -708,7 +723,9 @@ def import_observable(self, stix_object, update=False, types=None):
708723 else :
709724 return None
710725
711- def import_relationship (self , stix_relation , update = False , types = None ):
726+ def import_relationship (
727+ self , stix_relation : Dict , update : bool = False , types : List = None
728+ ) -> None :
712729 # Extract
713730 embedded_relationships = self .extract_embedded_relationships (
714731 stix_relation , types
@@ -792,7 +809,14 @@ def import_relationship(self, stix_relation, update=False, types=None):
792809 stixObjectOrStixRelationshipId = stix_relation ["target_ref" ],
793810 )
794811
795- def import_sighting (self , stix_sighting , from_id , to_id , update = False , types = None ):
812+ def import_sighting (
813+ self ,
814+ stix_sighting : Dict ,
815+ from_id : str ,
816+ to_id : str ,
817+ update : bool = False ,
818+ types : List = None ,
819+ ) -> None :
796820 # Extract
797821 embedded_relationships = self .extract_embedded_relationships (
798822 stix_sighting , types
@@ -900,7 +924,7 @@ def import_sighting(self, stix_sighting, from_id, to_id, update=False, types=Non
900924 # endregion
901925
902926 # region export
903- def generate_export (self , entity ) :
927+ def generate_export (self , entity : Dict ) -> Dict :
904928 # Handle model deviation
905929 # Identities
906930 if IdentityTypes .has_value (entity ["entity_type" ]):
@@ -996,11 +1020,11 @@ def generate_export(self, entity):
9961020
9971021 def prepare_export (
9981022 self ,
999- entity ,
1000- mode = "simple" ,
1001- max_marking_definition_entity = None ,
1002- no_custom_attributes = False ,
1003- ):
1023+ entity : Dict ,
1024+ mode : str = "simple" ,
1025+ max_marking_definition_entity : Dict = None ,
1026+ no_custom_attributes : bool = False ,
1027+ ) -> List :
10041028 if (
10051029 self .check_max_marking_definition (
10061030 max_marking_definition_entity ,
@@ -1333,12 +1357,12 @@ def prepare_export(
13331357
13341358 def export_entity (
13351359 self ,
1336- entity_type ,
1337- entity_id ,
1338- mode = "simple" ,
1339- max_marking_definition = None ,
1340- no_custom_attributes = False ,
1341- ):
1360+ entity_type : Dict ,
1361+ entity_id : str ,
1362+ mode : str = "simple" ,
1363+ max_marking_definition : Dict = None ,
1364+ no_custom_attributes : bool = False ,
1365+ ) -> Dict :
13421366 max_marking_definition_entity = (
13431367 self .opencti .marking_definition .read (id = max_marking_definition )
13441368 if max_marking_definition is not None
@@ -1394,14 +1418,14 @@ def export_entity(
13941418
13951419 def export_list (
13961420 self ,
1397- entity_type ,
1398- search = None ,
1399- filters = None ,
1400- order_by = None ,
1401- order_mode = None ,
1402- max_marking_definition = None ,
1403- types = None ,
1404- ):
1421+ entity_type : Dict ,
1422+ search : Dict = None ,
1423+ filters : List = None ,
1424+ order_by : str = None ,
1425+ order_mode : str = None ,
1426+ max_marking_definition : Dict = None ,
1427+ types : List = None ,
1428+ ) -> Dict :
14051429 max_marking_definition_entity = (
14061430 self .opencti .marking_definition .read (id = max_marking_definition )
14071431 if max_marking_definition is not None
@@ -1484,7 +1508,9 @@ def export_list(
14841508 bundle ["objects" ] = bundle ["objects" ] + entity_bundle_filtered
14851509 return bundle
14861510
1487- def import_bundle (self , stix_bundle , update = False , types = None ) -> List :
1511+ def import_bundle (
1512+ self , stix_bundle : Dict , update : bool = False , types : List = None
1513+ ) -> List :
14881514 # Check if the bundle is correctly formatted
14891515 if "type" not in stix_bundle or stix_bundle ["type" ] != "bundle" :
14901516 raise ValueError ("JSON data type is not a STIX2 bundle" )
0 commit comments