1- # -*- coding: utf-8 -*-
21"""DomainTools enrichment module."""
32
43from datetime import datetime
5- from pathlib import Path
64from typing import Dict
75
86import domaintools
97import stix2
108import validators
11- import yaml
12- from pycti import Identity , OpenCTIConnectorHelper , get_config_variable
9+ from connector . settings import ConnectorSettings
10+ from pycti import Identity , OpenCTIConnectorHelper
1311
1412from .builder import DtBuilder
1513from .constants import DEFAULT_RISK_SCORE , DOMAIN_FIELDS , EMAIL_FIELDS , EntityType
@@ -21,42 +19,18 @@ class DomainToolsConnector:
2119 _DEFAULT_AUTHOR = "DomainTools"
2220 _CONNECTOR_RUN_INTERVAL_SEC = 60 * 60
2321
24- def __init__ (self ):
25- # Instantiate the connector helper from config
26- config_file_path = Path (__file__ ).parent .parent .resolve () / "config.yml"
27- config = (
28- yaml .load (open (config_file_path , encoding = "utf-8" ), Loader = yaml .FullLoader )
29- if config_file_path .is_file ()
30- else {}
22+ def __init__ (self , config : ConnectorSettings , helper : OpenCTIConnectorHelper ):
23+ self .config = config
24+ self .helper = helper
25+ self .api = domaintools .API (
26+ self .config .domaintools .api_username , self .config .domaintools .api_key
3127 )
32- self .helper = OpenCTIConnectorHelper (config , True )
33-
34- # DomainTools
35- api_username = get_config_variable (
36- "DOMAINTOOLS_API_USERNAME" ,
37- ["domaintools" , "api_username" ],
38- config ,
39- )
40- api_key = get_config_variable (
41- "DOMAINTOOLS_API_KEY" ,
42- ["domaintools" , "api_key" ],
43- config ,
44- )
45- self .api = domaintools .API (api_username , api_key )
46-
47- self .max_tlp = get_config_variable (
48- "DOMAINTOOLS_MAX_TLP" , ["domaintools" , "max_tlp" ], config
49- )
50-
28+ self .max_tlp = self .config .domaintools .max_tlp
5129 self .author = stix2 .Identity (
5230 id = Identity .generate_id (self ._DEFAULT_AUTHOR , "organization" ),
5331 name = self ._DEFAULT_AUTHOR ,
5432 identity_class = "organization" ,
55- description = " DomainTools is a leading provider of Whois and other DNS"
56- " profile data for threat intelligence enrichment."
57- " It is a part of the Datacenter Group (DCL Group SA)."
58- " DomainTools data helps security analysts investigate malicious"
59- " activity on their networks." ,
33+ description = " DomainTools is a leading provider of Whois and other DNS profile data for threat intelligence enrichment. It is a part of the Datacenter Group (DCL Group SA). DomainTools data helps security analysts investigate malicious activity on their networks." ,
6034 confidence = self .helper .connect_confidence_level ,
6135 )
6236 self .helper .metric .state ("idle" )
@@ -102,36 +76,28 @@ def _enrich_domaintools(self, builder, opencti_entity) -> str:
10276 raise ValueError (
10377 f"Entity type of the observable: { opencti_entity ['entity_type' ]} not supported."
10478 )
105-
10679 for entry in results :
10780 self .helper .log_info (f"Starting enrichment of domain { entry ['domain' ]} " )
108- # Retrieve common properties for all relationships.
10981 builder .reset_score ()
11082 score = entry .get ("domain_risk" , {}).get ("risk_score" , DEFAULT_RISK_SCORE )
11183 builder .set_score (score )
112- # Get the creation date / expiration date for the validity.
11384 creation_date = entry .get ("create_date" , {}).get ("value" , "" )
11485 expiration_date = entry .get ("expiration_date" , {}).get ("value" , "" )
11586 if creation_date != "" and expiration_date != "" :
11687 creation_date = datetime .strptime (creation_date , "%Y-%m-%d" )
11788 if expiration_date != "" :
11889 expiration_date = datetime .strptime (expiration_date , "%Y-%m-%d" )
119-
12090 if creation_date >= expiration_date :
12191 self .helper .log_warning (
12292 f"Expiration date { expiration_date } not after creation date { creation_date } , not using dates."
12393 )
12494 creation_date = ""
12595 expiration_date = ""
126-
127- # In case of IP enrichment, create the domain as it might not exist.
12896 domain_source_id = (
12997 builder .create_domain (entry ["domain" ])
13098 if opencti_entity ["entity_type" ] == "IPv4-Addr"
13199 else opencti_entity ["standard_id" ]
132100 )
133-
134- # Get ip
135101 for ip in entry .get ("ip" , ()):
136102 if "address" in ip :
137103 ip_id = builder .link_domain_resolves_to (
@@ -145,21 +111,15 @@ def _enrich_domaintools(self, builder, opencti_entity) -> str:
145111 if ip_id is not None :
146112 for asn in ip .get ("asn" , ()):
147113 builder .link_ip_belongs_to_asn (
148- ip_id ,
149- asn ["value" ],
150- creation_date ,
151- expiration_date ,
114+ ip_id , asn ["value" ], creation_date , expiration_date
152115 )
153-
154- # Get domains (name-server / mx)
155116 for category , description in DOMAIN_FIELDS .items ():
156117 for values in entry .get (category , ()):
157118 if (domain := values ["domain" ]["value" ]) != entry ["domain" ]:
158119 if not validators .domain (domain ):
159120 self .helper .metric .inc ("error_count" )
160121 self .helper .log_warning (
161- f"[DomainTools] domain { domain } is not correctly "
162- "formatted. Skipping."
122+ f"[DomainTools] domain { domain } is not correctly formatted. Skipping."
163123 )
164124 continue
165125 new_domain_id = builder .link_domain_resolves_to (
@@ -170,7 +130,6 @@ def _enrich_domaintools(self, builder, opencti_entity) -> str:
170130 expiration_date ,
171131 description ,
172132 )
173- # Add the related ips of the name server to the newly created domain.
174133 if new_domain_id is not None :
175134 for ip in values .get ("ip" , ()):
176135 builder .link_domain_resolves_to (
@@ -181,8 +140,6 @@ def _enrich_domaintools(self, builder, opencti_entity) -> str:
181140 expiration_date ,
182141 f"{ description } -ip" ,
183142 )
184-
185- # Emails
186143 for category , description in EMAIL_FIELDS .items ():
187144 emails = (
188145 entry .get (category , ())
@@ -197,8 +154,6 @@ def _enrich_domaintools(self, builder, opencti_entity) -> str:
197154 expiration_date ,
198155 description ,
199156 )
200-
201- # Domains of emails
202157 for domain in entry .get ("email_domain" , ()):
203158 if domain ["value" ] != entry ["domain" ]:
204159 builder .link_domain_resolves_to (
@@ -209,8 +164,6 @@ def _enrich_domaintools(self, builder, opencti_entity) -> str:
209164 expiration_date ,
210165 "email_domain" ,
211166 )
212-
213- # Redirects (red)
214167 if (red := entry .get ("redirect_domain" , {}).get ("value" , "" )) not in (
215168 domain_source_id ,
216169 "" ,
@@ -223,7 +176,6 @@ def _enrich_domaintools(self, builder, opencti_entity) -> str:
223176 expiration_date ,
224177 "redirect" ,
225178 )
226-
227179 if len (builder .bundle ) > 1 :
228180 builder .send_bundle ()
229181 self .helper .log_info (
@@ -235,31 +187,24 @@ def _enrich_domaintools(self, builder, opencti_entity) -> str:
235187 def _process_file (self , stix_objects , opencti_entity ):
236188 self .helper .metric .state ("running" )
237189 self .helper .metric .inc ("run_count" )
238-
239190 builder = DtBuilder (self .helper , self .author , stix_objects )
240-
241- # Enrichment using DomainTools API.
242191 result = self ._enrich_domaintools (builder , opencti_entity )
243192 self .helper .metric .state ("idle" )
244193 return result
245194
246195 def _process_message (self , data : Dict ):
247196 opencti_entity = data ["enrichment_entity" ]
248-
249- # Extract TLP
250197 tlp = "TLP:CLEAR"
251198 for marking_definition in opencti_entity .get ("objectMarking" , []):
252199 if marking_definition ["definition_type" ] == "TLP" :
253200 tlp = marking_definition ["definition" ]
254-
255201 if not OpenCTIConnectorHelper .check_max_tlp (tlp , self .max_tlp ):
256202 raise ValueError (
257203 "Do not send any data, TLP of the observable is greater than MAX TLP"
258204 )
259-
260205 stix_objects = data ["stix_objects" ]
261206 return self ._process_file (stix_objects , opencti_entity )
262207
263- def start (self ):
208+ def run (self ):
264209 """Start the main loop."""
265210 self .helper .listen (message_callback = self ._process_message )
0 commit comments