@@ -179,3 +179,127 @@ def read(self, **kwargs):
179179 "[opencti_security_coverage] Missing parameters: id or filters"
180180 )
181181 return None
182+
183+ """
184+ Create a Security coverage object
185+
186+ :return Security Coverage object
187+ """
188+
189+ def create (self , ** kwargs ):
190+ stix_id = kwargs .get ("stix_id" , None )
191+ name = kwargs .get ("name" , None )
192+ description = kwargs .get ("description" , None )
193+ created_by = kwargs .get ("createdBy" , None )
194+ object_marking = kwargs .get ("objectMarking" , None )
195+ object_label = kwargs .get ("objectLabel" , None )
196+ object_covered = kwargs .get ("objectCovered" , None )
197+ external_references = kwargs .get ("externalReferences" , None )
198+ coverage_last_result = kwargs .get ("coverage_last_result" , None )
199+ coverage_valid_from = kwargs .get ("coverage_valid_from" , None )
200+ coverage_valid_to = kwargs .get ("coverage_valid_to" , None )
201+ coverage_information = kwargs .get ("coverage_information" , None )
202+
203+ if name is not None and object_covered is not None :
204+ self .opencti .app_logger .info ("Creating Security Coverage" , {"name" : name })
205+ query = """
206+ mutation SecurityCoverageAdd($input: SecurityCoverageAddInput!) {
207+ securityCoverageAdd(input: $input) {
208+ id
209+ standard_id
210+ entity_type
211+ parent_types
212+ }
213+ }
214+ """
215+ result = self .opencti .query (
216+ query ,
217+ {
218+ "input" : {
219+ "stix_id" : stix_id ,
220+ "name" : name ,
221+ "description" : description ,
222+ "createdBy" : created_by ,
223+ "objectMarking" : object_marking ,
224+ "objectLabel" : object_label ,
225+ "objectCovered" : object_covered ,
226+ "externalReferences" : external_references ,
227+ "coverage_last_result" : coverage_last_result ,
228+ "coverage_valid_from" : coverage_valid_from ,
229+ "coverage_valid_to" : coverage_valid_to ,
230+ "coverage_information" : coverage_information ,
231+ }
232+ },
233+ )
234+ return self .opencti .process_multiple_fields (result ["data" ]["securityCoverageAdd" ])
235+ else :
236+ self .opencti .app_logger .error (
237+ "[opencti_security_coverage] "
238+ "Missing parameters: name or object_covered"
239+ )
240+
241+ """
242+ Import a Security coverage from a STIX2 object
243+
244+ :param stixObject: the Stix-Object Security coverage
245+ :return Security coverage object
246+ """
247+
248+ def import_from_stix2 (self , ** kwargs ):
249+ stix_object = kwargs .get ("stixObject" , None )
250+ extras = kwargs .get ("extras" , {})
251+ if stix_object is not None :
252+ # Search in extensions
253+ if "x_opencti_stix_ids" not in stix_object :
254+ stix_object ["x_opencti_stix_ids" ] = (
255+ self .opencti .get_attribute_in_extension ("stix_ids" , stix_object )
256+ )
257+ if "x_opencti_granted_refs" not in stix_object :
258+ stix_object ["x_opencti_granted_refs" ] = (
259+ self .opencti .get_attribute_in_extension ("granted_refs" , stix_object )
260+ )
261+
262+ raw_coverages = stix_object ["coverage" ] if "coverage" in stix_object else []
263+ coverage_information = list (map (lambda cov : {"coverage_name" :cov ["name" ],"coverage_score" :cov ["score" ]}, raw_coverages ))
264+
265+ return self .create (
266+ stix_id = stix_object ["id" ],
267+ name = stix_object ["name" ],
268+ coverage_last_result = stix_object ["last_result" ] if "last_result" in stix_object else None ,
269+ coverage_valid_from = stix_object ["valid_from" ] if "valid_from" in stix_object else None ,
270+ coverage_valid_to = stix_object ["valid_to" ] if "valid_to" in stix_object else None ,
271+ coverage_information = coverage_information ,
272+ description = (
273+ self .opencti .stix2 .convert_markdown (stix_object ["description" ])
274+ if "description" in stix_object
275+ else None
276+ ),
277+ createdBy = (
278+ extras ["created_by_id" ] if "created_by_id" in extras else None
279+ ),
280+ objectMarking = (
281+ extras ["object_marking_ids" ]
282+ if "object_marking_ids" in extras
283+ else None
284+ ),
285+ objectLabel = (
286+ extras ["object_label_ids" ] if "object_label_ids" in extras else None
287+ ),
288+ objectCovered = (
289+ stix_object ["covered_ref" ] if "covered_ref" in stix_object else None
290+ ),
291+ externalReferences = (
292+ extras ["external_references_ids" ]
293+ if "external_references_ids" in extras
294+ else None
295+ ),
296+ x_opencti_stix_ids = (
297+ stix_object ["x_opencti_stix_ids" ]
298+ if "x_opencti_stix_ids" in stix_object
299+ else None
300+ ),
301+ )
302+ else :
303+ self .opencti .app_logger .error (
304+ "[opencti_security_coverage] Missing parameters: stixObject"
305+ )
0 commit comments