diff --git a/stix2/base.py b/stix2/base.py index a4f3f03b..991fd660 100644 --- a/stix2/base.py +++ b/stix2/base.py @@ -36,7 +36,7 @@ def get_required_properties(properties): class _STIXBase(collections.abc.Mapping): """Base class for STIX object types""" - def _check_property(self, prop_name, prop, kwargs, allow_custom): + def _check_property(self, prop_name, prop, kwargs, allow_custom, interoperability): if prop_name not in kwargs: if hasattr(prop, 'default'): value = prop.default() @@ -46,10 +46,11 @@ def _check_property(self, prop_name, prop, kwargs, allow_custom): has_custom = False if prop_name in kwargs: + arguments = [kwargs[prop_name], allow_custom] + if isinstance(prop, self.__INTEROPERABILITY_types): + arguments.append(interoperability) try: - kwargs[prop_name], has_custom = prop.clean( - kwargs[prop_name], allow_custom, - ) + kwargs[prop_name], has_custom = prop.clean(*arguments) except InvalidValueError: # No point in wrapping InvalidValueError in another # InvalidValueError... so let those propagate. @@ -114,12 +115,20 @@ def _check_object_constraints(self): for m in self.get('granular_markings', []): validate(self, m.get('selectors')) - def __init__(self, allow_custom=False, **kwargs): + def __init__(self, allow_custom=False, interoperability=False, **kwargs): cls = self.__class__ # Use the same timestamp for any auto-generated datetimes self.__now = get_timestamp() + self.__INTEROPERABILITY_types = ( + stix2.properties.EmbeddedObjectProperty, stix2.properties.EnumProperty, + stix2.properties.ExtensionsProperty, stix2.properties.DictionaryProperty, + stix2.properties.HashesProperty, stix2.properties.IDProperty, + stix2.properties.ListProperty, stix2.properties.OpenVocabProperty, + stix2.properties.ReferenceProperty, stix2.properties.SelectorProperty, + ) + custom_props = kwargs.pop('custom_properties', {}) if custom_props and not isinstance(custom_props, dict): raise ValueError("'custom_properties' must be a dictionary") @@ -203,7 +212,7 @@ def __init__(self, allow_custom=False, **kwargs): prop = defined_properties.get(prop_name) if prop: temp_custom = self._check_property( - prop_name, prop, setting_kwargs, allow_custom, + prop_name, prop, setting_kwargs, allow_custom, interoperability, ) has_custom = has_custom or temp_custom @@ -293,7 +302,7 @@ def __deepcopy__(self, memo): if isinstance(self, _Observable): # Assume: valid references in the original object are still valid in the new version new_inner['_valid_refs'] = {'*': '*'} - return cls(allow_custom=True, **new_inner) + return cls(allow_custom=True, interoperability=False, **new_inner) def properties_populated(self): return list(self._inner.keys()) @@ -411,8 +420,8 @@ def _check_ref(self, ref, prop, prop_name): if ref_type not in allowed_types: raise InvalidObjRefError(self.__class__, prop_name, "object reference '%s' is of an invalid type '%s'" % (ref, ref_type)) - def _check_property(self, prop_name, prop, kwargs, allow_custom): - has_custom = super(_Observable, self)._check_property(prop_name, prop, kwargs, allow_custom) + def _check_property(self, prop_name, prop, kwargs, allow_custom, interoperability): + has_custom = super(_Observable, self)._check_property(prop_name, prop, kwargs, allow_custom, interoperability) if prop_name in kwargs: from .properties import ObjectReferenceProperty diff --git a/stix2/parsing.py b/stix2/parsing.py index 8b39f919..382fa5c6 100644 --- a/stix2/parsing.py +++ b/stix2/parsing.py @@ -7,7 +7,7 @@ from .utils import _get_dict, detect_spec_version -def parse(data, allow_custom=False, version=None): +def parse(data, allow_custom=False, interoperability=False, version=None): """Convert a string, dict or file-like object into a STIX object. Args: @@ -37,12 +37,12 @@ def parse(data, allow_custom=False, version=None): obj = _get_dict(data) # convert dict to full python-stix2 obj - obj = dict_to_stix2(obj, allow_custom, version) + obj = dict_to_stix2(obj, allow_custom, interoperability, version) return obj -def dict_to_stix2(stix_dict, allow_custom=False, version=None): +def dict_to_stix2(stix_dict, allow_custom=False, interoperability=False, version=None): """convert dictionary to full python-stix2 object Args: @@ -96,10 +96,10 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None): return stix_dict raise ParseError("Can't parse unknown object type '%s'! For custom types, use the CustomObject decorator." % obj_type) - return obj_class(allow_custom=allow_custom, **stix_dict) + return obj_class(allow_custom=allow_custom, interoperability=interoperability, **stix_dict) -def parse_observable(data, _valid_refs=None, allow_custom=False, version=None): +def parse_observable(data, _valid_refs=None, allow_custom=False, interoperability=False, version=None): """Deserialize a string or file-like object into a STIX Cyber Observable object. @@ -144,4 +144,4 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None): "use the CustomObservable decorator." % obj['type'], ) - return obj_class(allow_custom=allow_custom, **obj) + return obj_class(allow_custom=allow_custom, interoperability=interoperability, **obj) diff --git a/stix2/properties.py b/stix2/properties.py index be1c4b69..6f34247d 100644 --- a/stix2/properties.py +++ b/stix2/properties.py @@ -21,6 +21,9 @@ ) from .version import DEFAULT_VERSION +ID_REGEX_interoperability = re.compile( + r"[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$", +) TYPE_REGEX = re.compile(r'^-?[a-z0-9]+(-[a-z0-9]+)*-?$') TYPE_21_REGEX = re.compile(r'^([a-z][a-z0-9]*)+([a-z0-9-]+)*-?$') ERROR_INVALID_ID = ( @@ -28,7 +31,7 @@ ) -def _check_uuid(uuid_str, spec_version): +def _check_uuid(uuid_str, spec_version, interoperability): """ Check whether the given UUID string is valid with respect to the given STIX spec version. STIX 2.0 requires UUIDv4; 2.1 only requires the RFC 4122 @@ -39,6 +42,9 @@ def _check_uuid(uuid_str, spec_version): :return: True if the UUID is valid, False if not :raises ValueError: If uuid_str is malformed """ + if interoperability: + return ID_REGEX_interoperability.match(uuid_str) + uuid_obj = uuid.UUID(uuid_str) ok = uuid_obj.variant == uuid.RFC_4122 @@ -48,7 +54,7 @@ def _check_uuid(uuid_str, spec_version): return ok -def _validate_id(id_, spec_version, required_prefix): +def _validate_id(id_, spec_version, required_prefix, interoperability=False): """ Check the STIX identifier for correctness, raise an exception if there are errors. @@ -71,7 +77,7 @@ def _validate_id(id_, spec_version, required_prefix): idx = id_.index("--") uuid_part = id_[idx+2:] - result = _check_uuid(uuid_part, spec_version) + result = _check_uuid(uuid_part, spec_version, interoperability) except ValueError: # replace their ValueError with ours raise ValueError(ERROR_INVALID_ID.format(id_)) @@ -170,7 +176,7 @@ class Property(object): """ - def _default_clean(self, value, allow_custom=False): + def _default_clean(self, value, allow_custom=False, interoperability=False): if value != self._fixed_value: raise ValueError("must equal '{}'.".format(self._fixed_value)) return value, False @@ -224,7 +230,7 @@ def __init__(self, contained, **kwargs): super(ListProperty, self).__init__(**kwargs) - def clean(self, value, allow_custom): + def clean(self, value, allow_custom, interoperability=False): try: iter(value) except TypeError: @@ -237,7 +243,7 @@ def clean(self, value, allow_custom): has_custom = False if isinstance(self.contained, Property): for item in value: - valid, temp_custom = self.contained.clean(item, allow_custom) + valid, temp_custom = self.contained.clean(item, allow_custom, interoperability) result.append(valid) has_custom = has_custom or temp_custom @@ -248,7 +254,7 @@ def clean(self, value, allow_custom): elif isinstance(item, collections.abc.Mapping): # attempt a mapping-like usage... - valid = self.contained(allow_custom=allow_custom, **item) + valid = self.contained(allow_custom=allow_custom, interoperability=interoperability, **item) else: raise ValueError( @@ -275,7 +281,7 @@ class StringProperty(Property): def __init__(self, **kwargs): super(StringProperty, self).__init__(**kwargs) - def clean(self, value, allow_custom=False): + def clean(self, value, allow_custom=False, interoperability=False): if not isinstance(value, str): value = str(value) return value, False @@ -296,8 +302,8 @@ def __init__(self, type, spec_version=DEFAULT_VERSION): self.spec_version = spec_version super(IDProperty, self).__init__() - def clean(self, value, allow_custom=False): - _validate_id(value, self.spec_version, self.required_prefix) + def clean(self, value, allow_custom=False, interoperability=False): + _validate_id(value, self.spec_version, self.required_prefix, interoperability) return value, False def default(self): @@ -311,7 +317,7 @@ def __init__(self, min=None, max=None, **kwargs): self.max = max super(IntegerProperty, self).__init__(**kwargs) - def clean(self, value, allow_custom=False): + def clean(self, value, allow_custom=False, interoperability=False): try: value = int(value) except Exception: @@ -391,7 +397,7 @@ def __init__(self, spec_version=DEFAULT_VERSION, **kwargs): self.spec_version = spec_version super(DictionaryProperty, self).__init__(**kwargs) - def clean(self, value, allow_custom=False): + def clean(self, value, allow_custom=False, interoperability=False): try: dictified = _get_dict(value) except ValueError: @@ -434,7 +440,7 @@ def __init__(self, spec_hash_names, spec_version=DEFAULT_VERSION, **kwargs): if alg: self.__alg_to_spec_name[alg] = spec_hash_name - def clean(self, value, allow_custom): + def clean(self, value, allow_custom, interoperability=False): # ignore the has_custom return value here; there is no customization # of DictionaryProperties. clean_dict, _ = super().clean(value, allow_custom) @@ -541,12 +547,12 @@ def __init__(self, valid_types=None, invalid_types=None, spec_version=DEFAULT_VE super(ReferenceProperty, self).__init__(**kwargs) - def clean(self, value, allow_custom): + def clean(self, value, allow_custom, interoperability=False): if isinstance(value, _STIXBase): value = value.id value = str(value) - _validate_id(value, self.spec_version, None) + _validate_id(value, self.spec_version, None, interoperability) obj_type = get_type_from_id(value) @@ -619,7 +625,7 @@ def clean(self, value, allow_custom): class SelectorProperty(Property): - def clean(self, value, allow_custom=False): + def clean(self, value, allow_custom=False, interoperability=False): if not SELECTOR_REGEX.match(value): raise ValueError("must adhere to selector syntax.") return value, False @@ -640,7 +646,7 @@ def __init__(self, type, **kwargs): self.type = type super(EmbeddedObjectProperty, self).__init__(**kwargs) - def clean(self, value, allow_custom): + def clean(self, value, allow_custom, interoperability=False): if isinstance(value, dict): value = self.type(allow_custom=allow_custom, **value) elif not isinstance(value, self.type): @@ -668,8 +674,8 @@ def __init__(self, allowed, **kwargs): self.allowed = allowed super(EnumProperty, self).__init__(**kwargs) - def clean(self, value, allow_custom): - cleaned_value, _ = super(EnumProperty, self).clean(value, allow_custom) + def clean(self, value, allow_custom, interoperability=False): + cleaned_value, _ = super(EnumProperty, self).clean(value, allow_custom, interoperability) if cleaned_value not in self.allowed: raise ValueError("value '{}' is not valid for this enumeration.".format(cleaned_value)) @@ -689,9 +695,9 @@ def __init__(self, allowed, **kwargs): allowed = [allowed] self.allowed = allowed - def clean(self, value, allow_custom): + def clean(self, value, allow_custom, interoperability=False): cleaned_value, _ = super(OpenVocabProperty, self).clean( - value, allow_custom, + value, allow_custom, interoperability, ) # Disabled: it was decided that enforcing this is too strict (might @@ -770,7 +776,7 @@ class ExtensionsProperty(DictionaryProperty): def __init__(self, spec_version=DEFAULT_VERSION, required=False): super(ExtensionsProperty, self).__init__(spec_version=spec_version, required=required) - def clean(self, value, allow_custom): + def clean(self, value, allow_custom, interoperability=False): try: dictified = _get_dict(value) # get deep copy since we are going modify the dict and might @@ -785,7 +791,7 @@ def clean(self, value, allow_custom): cls = class_for_type(key, self.spec_version, "extensions") if cls: if isinstance(subvalue, dict): - ext = cls(allow_custom=allow_custom, **subvalue) + ext = cls(allow_custom=allow_custom, interoperability=interoperability, **subvalue) elif isinstance(subvalue, cls): # If already an instance of the registered class, assume # it's valid @@ -836,7 +842,7 @@ def __init__(self, spec_version=DEFAULT_VERSION, *args, **kwargs): self.spec_version = spec_version super(STIXObjectProperty, self).__init__(*args, **kwargs) - def clean(self, value, allow_custom): + def clean(self, value, allow_custom, interoperability=False): # Any STIX Object (SDO, SRO, or Marking Definition) can be added to # a bundle with no further checks. stix2_classes = {'_DomainObject', '_RelationshipObject', 'MarkingDefinition'} @@ -876,7 +882,7 @@ def clean(self, value, allow_custom): "containing objects of a different spec version.", ) - parsed_obj = parse(dictified, allow_custom=allow_custom) + parsed_obj = parse(dictified, allow_custom=allow_custom, interoperability=interoperability) if isinstance(parsed_obj, _STIXBase): has_custom = parsed_obj.has_custom diff --git a/stix2/test/v20/test_interoperability.py b/stix2/test/v20/test_interoperability.py new file mode 100644 index 00000000..d8eb1438 --- /dev/null +++ b/stix2/test/v20/test_interoperability.py @@ -0,0 +1,196 @@ +import datetime + +import pytz + +import stix2 + +FAKE_TIME = datetime.datetime(2017, 1, 1, 12, 34, 56, tzinfo=pytz.utc) + +ATTACK_PATTERN_ID = "attack-pattern--168b3330-fc69-11e8-b98e-0800279d6dc6" +BUNDLE_ID = "bundle--2acecf31-5262-3981-8eff-db8a1de5945b" +CAMPAIGN_ID = "campaign--f22d70fa-871d-5155-9812-89b3a48f6e50" +COURSE_OF_ACTION_ID = "course-of-action--f9ae0d21-f4c9-360e-8743-b064b2ad2a2e" +IDENTITY_ID = "identity--035a5348-2485-3bca-99ce-62da0f14c37a" +INDICATOR_ID = "indicator--412aba5b-75b4-5827-99f1-c62d91504e97" +INTRUSION_SET_ID = "intrusion-set--2d1db502-fc6c-11e8-8b3f-00216af611cf" +MALWARE_ID = "malware--64ee70a4-8cc1-5d25-8bf2-dea6c79a09c8" +MARKING_DEFINITION_ID = "marking-definition--f8427579-bd0f-3550-8eef-c3f2cb33cd0f" +OBSERVED_DATA_ID = "observed-data--9a74c83e-2c09-3513-874b-91d679be82b8" +RELATIONSHIP_ID = "relationship--ee36ba22-c954-5d25-89c8-5a435eaebeb3" +REPORT_ID = "report--dbfe2a52-fc6c-11e8-8b3f-00216af611cf" +SIGHTING_ID = "sighting--9ae1145d-b1b2-57b6-83f1-36a173a24112" +THREAT_ACTOR_ID = "threat-actor--e5313ad6-6b11-3c07-8ace-7dc52824e063" +TOOL_ID = "tool--bf0895d6-7626-361f-89dd-d404aa340bc2" +VULNERABILITY_ID = "vulnerability--20296e55-98b9-5988-851a-51eddd5022c8" + +OBJECT_REFS = [ + ATTACK_PATTERN_ID, CAMPAIGN_ID, COURSE_OF_ACTION_ID, INDICATOR_ID, INTRUSION_SET_ID, + MALWARE_ID, MARKING_DEFINITION_ID, OBSERVED_DATA_ID, RELATIONSHIP_ID, SIGHTING_ID, + THREAT_ACTOR_ID, TOOL_ID, VULNERABILITY_ID, +] + +ATTACK_PATTERN_KWARGS = dict( + type='attack-pattern', + id=ATTACK_PATTERN_ID, + name="Phishing", + created_by_ref=IDENTITY_ID, +) + +BUNDLE_KWARGS = dict( + type='bundle', + id=BUNDLE_ID, + spec_version='2.0', +) + +CAMPAIGN_KWARGS = dict( + type='campaign', + id=CAMPAIGN_ID, + created_by_ref=IDENTITY_ID, + created="2016-04-06T20:03:00.000Z", + modified="2016-04-06T20:03:00.000Z", + name="Green Group Attacks Against Finance", + description="Campaign by Green Group against a series of targets in the financial services sector.", +) + +COURSE_OF_ACTION_KWARGS = dict( + type='course-of-action', + id=COURSE_OF_ACTION_ID, + name="Block", + created_by_ref=IDENTITY_ID, +) + +IDENTITY_KWARGS = dict( + type='identity', + id=IDENTITY_ID, + name="John Smith", + identity_class="individual", +) + +INDICATOR_KWARGS = dict( + type='indicator', + id=INDICATOR_ID, + labels=['malicious-activity'], + pattern="[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']", + created_by_ref=IDENTITY_ID, +) + +INTRUSION_SET_KWARGS = dict( + type='intrusion-set', + id=INTRUSION_SET_ID, + name="Bobcat Breakin", + created_by_ref=IDENTITY_ID, +) + +MALWARE_KWARGS = dict( + type='malware', + id=MALWARE_ID, + created="2016-04-06T20:03:00.000Z", + modified="2016-04-06T20:03:00.000Z", + labels=['ransomware'], + name="Cryptolocker", + description="A ransomware related to ...", + created_by_ref=IDENTITY_ID, +) + +MARKING_DEFINITION_KWARGS = dict( + type='marking-definition', + id=MARKING_DEFINITION_ID, + definition_type='statement', + definition={'statement': "Copyright 2016, Example Corp"}, + created_by_ref=IDENTITY_ID, +) + +OBSERVED_DATA_KWARGS = dict( + type='observed-data', + id=OBSERVED_DATA_ID, + first_observed=FAKE_TIME, + last_observed=FAKE_TIME, + number_observed=1, + objects={ + "0": { + "type": "windows-registry-key", + "key": "HKEY_LOCAL_MACHINE\\System\\Foo\\Bar", + }, + }, + created_by_ref=IDENTITY_ID, +) + +REPORT_KWARGS = dict( + type='report', + id=REPORT_ID, + labels=["campaign"], + name="Bad Cybercrime", + published=FAKE_TIME, + object_refs=OBJECT_REFS, + created_by_ref=IDENTITY_ID, +) + +RELATIONSHIP_KWARGS = dict( + type='relationship', + id=RELATIONSHIP_ID, + relationship_type="indicates", + source_ref=INDICATOR_ID, + target_ref=MALWARE_ID, + created_by_ref=IDENTITY_ID, +) + +SIGHTING_KWARGS = dict( + type='sighting', + id=SIGHTING_ID, + sighting_of_ref=INDICATOR_ID, + created_by_ref=IDENTITY_ID, + observed_data_refs=[OBSERVED_DATA_ID], + where_sighted_refs=[IDENTITY_ID], +) + +THREAT_ACTOR_KWARGS = dict( + type='threat-actor', + id=THREAT_ACTOR_ID, + labels=["crime-syndicate"], + name="Evil Org", + created_by_ref=IDENTITY_ID, +) + +TOOL_KWARGS = dict( + type='tool', + id=TOOL_ID, + labels=["remote-access"], + name="VNC", + created_by_ref=IDENTITY_ID, + interoperability=True, +) + +VULNERABILITY_KWARGS = dict( + type='vulnerability', + id=VULNERABILITY_ID, + name="Heartbleed", + created_by_ref=IDENTITY_ID, +) + + +if __name__ == '__main__': + attack_pattern = stix2.v20.AttackPattern(**ATTACK_PATTERN_KWARGS, interoperability=True) + campaign = stix2.v20.Campaign(**CAMPAIGN_KWARGS, interoperability=True) + course_of_action = stix2.v20.CourseOfAction(**COURSE_OF_ACTION_KWARGS, interoperability=True) + identity = stix2.v20.Identity(**IDENTITY_KWARGS, interoperability=True) + indicator = stix2.v20.Indicator(**INDICATOR_KWARGS, interoperability=True) + intrusion_set = stix2.v20.IntrusionSet(**INTRUSION_SET_KWARGS, interoperability=True) + malware = stix2.v20.Malware(**MALWARE_KWARGS, interoperability=True) + marking_definition = stix2.v20.MarkingDefinition(**MARKING_DEFINITION_KWARGS, interoperability=True) + observed_data = stix2.v20.ObservedData(**OBSERVED_DATA_KWARGS, interoperability=True) + relationship = stix2.v20.Relationship(**RELATIONSHIP_KWARGS, interoperability=True) + sighting = stix2.v20.Sighting(**SIGHTING_KWARGS, interoperability=True) + threat_actor = stix2.v20.ThreatActor(**THREAT_ACTOR_KWARGS, interoperability=True) + tool = stix2.v20.Tool(**TOOL_KWARGS) + vulnerability = stix2.v20.Vulnerability(**VULNERABILITY_KWARGS, interoperability=True) + report = stix2.v20.Report(**REPORT_KWARGS, interoperability=True) + bundle = stix2.v20.Bundle( + **BUNDLE_KWARGS, interoperability=True, + objects=[ + attack_pattern, campaign, course_of_action, identity, indicator, + intrusion_set, malware, marking_definition, observed_data, tool, + relationship, sighting, threat_actor, vulnerability, report, + ] + ) + stix2.parse(dict(bundle), interoperability=True) + print("All interoperability tests passed !") diff --git a/stix2/test/v21/test_interoperability.py b/stix2/test/v21/test_interoperability.py new file mode 100644 index 00000000..37256446 --- /dev/null +++ b/stix2/test/v21/test_interoperability.py @@ -0,0 +1,203 @@ +import datetime + +import pytz + +import stix2 + +FAKE_TIME = datetime.datetime(2017, 1, 1, 12, 34, 56, tzinfo=pytz.utc) + +ATTACK_PATTERN_ID = "attack-pattern--168b3330-fc69-11e8-b98e-0800279d6dc6" +BUNDLE_ID = "bundle--2acecf31-5262-3981-8eff-db8a1de5945b" +CAMPAIGN_ID = "campaign--f22d70fa-871d-5155-9812-89b3a48f6e50" +COURSE_OF_ACTION_ID = "course-of-action--f9ae0d21-f4c9-360e-8743-b064b2ad2a2e" +IDENTITY_ID = "identity--035a5348-2485-3bca-99ce-62da0f14c37a" +INDICATOR_ID = "indicator--412aba5b-75b4-5827-99f1-c62d91504e97" +INTRUSION_SET_ID = "intrusion-set--2d1db502-fc6c-11e8-8b3f-00216af611cf" +MALWARE_ID = "malware--64ee70a4-8cc1-5d25-8bf2-dea6c79a09c8" +MARKING_DEFINITION_ID = "marking-definition--f8427579-bd0f-3550-8eef-c3f2cb33cd0f" +OBSERVED_DATA_ID = "observed-data--9a74c83e-2c09-3513-874b-91d679be82b8" +RELATIONSHIP_ID = "relationship--ee36ba22-c954-5d25-89c8-5a435eaebeb3" +REPORT_ID = "report--dbfe2a52-fc6c-11e8-8b3f-00216af611cf" +SIGHTING_ID = "sighting--9ae1145d-b1b2-57b6-83f1-36a173a24112" +THREAT_ACTOR_ID = "threat-actor--e5313ad6-6b11-3c07-8ace-7dc52824e063" +TOOL_ID = "tool--bf0895d6-7626-361f-89dd-d404aa340bc2" +VULNERABILITY_ID = "vulnerability--20296e55-98b9-5988-851a-51eddd5022c8" + +OBJECT_REFS = [ + ATTACK_PATTERN_ID, CAMPAIGN_ID, COURSE_OF_ACTION_ID, INDICATOR_ID, INTRUSION_SET_ID, + MALWARE_ID, MARKING_DEFINITION_ID, OBSERVED_DATA_ID, RELATIONSHIP_ID, SIGHTING_ID, + THREAT_ACTOR_ID, TOOL_ID, VULNERABILITY_ID, +] + +ATTACK_PATTERN_KWARGS = dict( + type='attack-pattern', + id=ATTACK_PATTERN_ID, + name="Phishing", + created_by_ref=IDENTITY_ID, +) + +BUNDLE_KWARGS = dict( + type='bundle', + id=BUNDLE_ID, +) + +CAMPAIGN_KWARGS = dict( + type='campaign', + id=CAMPAIGN_ID, + created_by_ref=IDENTITY_ID, + created="2016-04-06T20:03:00.000Z", + modified="2016-04-06T20:03:00.000Z", + name="Green Group Attacks Against Finance", + description="Campaign by Green Group against a series of targets in the financial services sector.", +) + +COURSE_OF_ACTION_KWARGS = dict( + type='course-of-action', + id=COURSE_OF_ACTION_ID, + name="Block", + created_by_ref=IDENTITY_ID, +) + +IDENTITY_KWARGS = dict( + type='identity', + id=IDENTITY_ID, + name="John Smith", + identity_class="individual", +) + +INDICATOR_KWARGS = dict( + type='indicator', + id=INDICATOR_ID, + labels=['malicious-activity'], + pattern="[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']", + pattern_type="stix", + created_by_ref=IDENTITY_ID, + indicator_types=["malicious-activity"], + valid_from="2016-04-06T20:03:00.000Z", +) + +INTRUSION_SET_KWARGS = dict( + type='intrusion-set', + id=INTRUSION_SET_ID, + name="Bobcat Breakin", + created_by_ref=IDENTITY_ID, +) + +MALWARE_KWARGS = dict( + type='malware', + id=MALWARE_ID, + created="2016-04-06T20:03:00.000Z", + modified="2016-04-06T20:03:00.000Z", + labels=['ransomware'], + name="Cryptolocker", + description="A ransomware related to ...", + created_by_ref=IDENTITY_ID, + malware_types=["malicious-activity"], + is_family=False, +) + +MARKING_DEFINITION_KWARGS = dict( + type='marking-definition', + id=MARKING_DEFINITION_ID, + definition_type='statement', + definition={'statement': "Copyright 2016, Example Corp"}, + created_by_ref=IDENTITY_ID, +) + +OBSERVED_DATA_KWARGS = dict( + type='observed-data', + id=OBSERVED_DATA_ID, + first_observed=FAKE_TIME, + last_observed=FAKE_TIME, + number_observed=1, + objects={ + "0": { + "type": "windows-registry-key", + "key": "HKEY_LOCAL_MACHINE\\System\\Foo\\Bar", + }, + }, + created_by_ref=IDENTITY_ID, +) + +REPORT_KWARGS = dict( + type='report', + id=REPORT_ID, + labels=["campaign"], + name="Bad Cybercrime", + published=FAKE_TIME, + object_refs=OBJECT_REFS, + created_by_ref=IDENTITY_ID, + report_types=["malicious-activity"], +) + +RELATIONSHIP_KWARGS = dict( + type='relationship', + id=RELATIONSHIP_ID, + relationship_type="indicates", + source_ref=INDICATOR_ID, + target_ref=MALWARE_ID, + created_by_ref=IDENTITY_ID, +) + +SIGHTING_KWARGS = dict( + type='sighting', + id=SIGHTING_ID, + sighting_of_ref=INDICATOR_ID, + created_by_ref=IDENTITY_ID, + observed_data_refs=[OBSERVED_DATA_ID], + where_sighted_refs=[IDENTITY_ID], +) + +THREAT_ACTOR_KWARGS = dict( + type='threat-actor', + id=THREAT_ACTOR_ID, + labels=["crime-syndicate"], + name="Evil Org", + created_by_ref=IDENTITY_ID, + threat_actor_types=["malicious-activity"], +) + +TOOL_KWARGS = dict( + type='tool', + id=TOOL_ID, + labels=["remote-access"], + name="VNC", + created_by_ref=IDENTITY_ID, + interoperability=True, + tool_types=["malicious-activity"], +) + +VULNERABILITY_KWARGS = dict( + type='vulnerability', + id=VULNERABILITY_ID, + name="Heartbleed", + created_by_ref=IDENTITY_ID, +) + + +if __name__ == '__main__': + attack_pattern = stix2.v21.AttackPattern(**ATTACK_PATTERN_KWARGS, interoperability=True) + campaign = stix2.v21.Campaign(**CAMPAIGN_KWARGS, interoperability=True) + course_of_action = stix2.v21.CourseOfAction(**COURSE_OF_ACTION_KWARGS, interoperability=True) + identity = stix2.v21.Identity(**IDENTITY_KWARGS, interoperability=True) + indicator = stix2.v21.Indicator(**INDICATOR_KWARGS, interoperability=True) + intrusion_set = stix2.v21.IntrusionSet(**INTRUSION_SET_KWARGS, interoperability=True) + malware = stix2.v21.Malware(**MALWARE_KWARGS, interoperability=True) + marking_definition = stix2.v21.MarkingDefinition(**MARKING_DEFINITION_KWARGS, interoperability=True) + observed_data = stix2.v21.ObservedData(**OBSERVED_DATA_KWARGS, interoperability=True) + relationship = stix2.v21.Relationship(**RELATIONSHIP_KWARGS, interoperability=True) + sighting = stix2.v21.Sighting(**SIGHTING_KWARGS, interoperability=True) + threat_actor = stix2.v21.ThreatActor(**THREAT_ACTOR_KWARGS, interoperability=True) + tool = stix2.v21.Tool(**TOOL_KWARGS) + vulnerability = stix2.v21.Vulnerability(**VULNERABILITY_KWARGS, interoperability=True) + report = stix2.v21.Report(**REPORT_KWARGS, interoperability=True) + bundle = stix2.v21.Bundle( + **BUNDLE_KWARGS, interoperability=True, + objects=[ + attack_pattern, campaign, course_of_action, identity, indicator, + intrusion_set, malware, marking_definition, observed_data, tool, + relationship, sighting, threat_actor, vulnerability, report, + ] + ) + stix2.parse(dict(bundle), interoperability=True) + print("All interoperability tests passed !")