diff --git a/.github/workflows/python-ci-tests.yml b/.github/workflows/python-ci-tests.yml
index 2dc51d85..4bc4e3d8 100644
--- a/.github/workflows/python-ci-tests.yml
+++ b/.github/workflows/python-ci-tests.yml
@@ -6,7 +6,7 @@ on: [push, pull_request]
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
- POSTGRES_DB: postgres
+ POSTGRES_DB: stix
jobs:
test-job:
@@ -14,7 +14,7 @@ jobs:
services:
postgres:
- image: postgres:11
+ image: postgres
# Provide the password for postgres
env:
POSTGRES_USER: postgres
@@ -34,9 +34,9 @@ jobs:
name: Python ${{ matrix.python-version }} Build
steps:
- - uses: actions/checkout@v2
+ - uses: actions/checkout@v4.2.2
- name: Set up Python ${{ matrix.python-version }}
- uses: actions/setup-python@v2
+ uses: actions/setup-python@v5.4.0
with:
python-version: ${{ matrix.python-version }}
- name: Install and update essential dependencies
@@ -48,7 +48,7 @@ jobs:
run: |
tox
- name: Upload coverage information to Codecov
- uses: codecov/codecov-action@v4.2.0
+ uses: codecov/codecov-action@v5.4.0
with:
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: false # optional (default = false)
diff --git a/USING_NEO4J.md b/USING_NEO4J.md
new file mode 100644
index 00000000..42c7564d
--- /dev/null
+++ b/USING_NEO4J.md
@@ -0,0 +1,76 @@
+# Experimenting with the Neo4j graph database Python STIX DataStore
+
+The Neo4j graph database Python STIX DataStore is a proof-of-concept implementation to show how to store STIX content in a graph database.
+
+## Limitations:
+
+As a proof-of-concept it has minimal functionality.
+
+## Installing Neo4j
+
+See https://neo4j.com/docs/desktop-manual/current/installation
+
+This will install the neo4j desktop application, which contains the neo4j browser to view the database.
+
+## Installing Neo4j python library
+
+The python neo4j library used is py2neo, available in pypi at https://pypi.org/project/py2neo/. Note this library is no longer being supported and has reached the "end-of-life". A different implementation of the DataStore could be written using https://neo4j.com/docs/api/python-driver/current/.
+
+## Implementation Details
+
+We would like to that the folks at JHU/APL for their implementation of [STIX2NEO4J.py](https://github.com/opencybersecurityalliance/oca-iob/tree/main/STIX2NEO4J%20Converter), which this code is based on.
+
+Only the DataSink (for storing STIX data) part of the DataStore object has been implemented. The DataSource part is implemented as a stub. However, the graph database can be queried using the neo4j cypher langauge within
+the neo4j browser.
+
+The main concept behind any graphs is nodes and edges. STIX data is similar as it contains relationship objects (SROs) and node objects (SDOs, SCOs and SMOs). Additional edges are provided by STIX embedded relationships, which are expressed as properties in STIX node objects. This organization of data in STIX is a natural fit for graph models, such as neo4j.
+
+The order in which STIX objects are added to the graph database is arbitrary. Therefore, when an SRO or embedded relationship is added via the DataStore, the nodes that it connects may not be present in the database, so the relationship is not added to the database, but remembered by the DataStore code as an unconnected relationship. Whenever a new node is
+added to the database, the unconnected relationships must be reviewed to determine if both nodes of a relationship can now be represented using an edge in the graph database.
+
+Note that unless both the source and target nodes are eventually added,
+the relationship will not be added either.
+How to address this issue in the implementation has not been determined.
+
+## Demonstrating a neo4j database for STIX
+
+Open the neo4j desktop app and create a new project named STIX.
+
+Select local DBMS on your local machine.
+
+
+
+Create the database.
+
+
+
+Start the database.
+
+
+
+python demo.py \ is used populate a local neo4j database, which can be viewed using the neo4j browser.
+A sample bundle file bundle--21531315-283d-4604-8501-4b7166e58c84.json is provided in the docs directory.
+
+Open the neo4j browser to view the database.
+
+
+
+Query using the cypher language.
+
+
+
+Left-clicking on a node gives you a choice of adding all related nodes and edges, removing the node and its edges from the display, or locking the node position.
+
+
+
+Remove the report object node for a better view of the graph.
+
+
+
+Explore the graph.
+
+
+
+View the node properties, by mousing-over any node.
+
+
diff --git a/docs/diagrams/create-dbms.png b/docs/diagrams/create-dbms.png
new file mode 100644
index 00000000..8556b42b
Binary files /dev/null and b/docs/diagrams/create-dbms.png differ
diff --git a/docs/diagrams/dont-show-node-and-edges.png b/docs/diagrams/dont-show-node-and-edges.png
new file mode 100644
index 00000000..649decaf
Binary files /dev/null and b/docs/diagrams/dont-show-node-and-edges.png differ
diff --git a/docs/diagrams/exploring-the-graph.png b/docs/diagrams/exploring-the-graph.png
new file mode 100644
index 00000000..3a37a4b8
Binary files /dev/null and b/docs/diagrams/exploring-the-graph.png differ
diff --git a/docs/diagrams/node-actions.png b/docs/diagrams/node-actions.png
new file mode 100644
index 00000000..d4cf8d2b
Binary files /dev/null and b/docs/diagrams/node-actions.png differ
diff --git a/docs/diagrams/node-properties.png b/docs/diagrams/node-properties.png
new file mode 100644
index 00000000..07ef67e0
Binary files /dev/null and b/docs/diagrams/node-properties.png differ
diff --git a/docs/diagrams/open-browser.png b/docs/diagrams/open-browser.png
new file mode 100644
index 00000000..669f2280
Binary files /dev/null and b/docs/diagrams/open-browser.png differ
diff --git a/docs/diagrams/query-for-incident.png b/docs/diagrams/query-for-incident.png
new file mode 100644
index 00000000..036978a6
Binary files /dev/null and b/docs/diagrams/query-for-incident.png differ
diff --git a/docs/diagrams/select-dbms.png b/docs/diagrams/select-dbms.png
new file mode 100644
index 00000000..50d9e4ef
Binary files /dev/null and b/docs/diagrams/select-dbms.png differ
diff --git a/docs/diagrams/start-dbms.png b/docs/diagrams/start-dbms.png
new file mode 100644
index 00000000..9fdea865
Binary files /dev/null and b/docs/diagrams/start-dbms.png differ
diff --git a/stix2/datastore/__init__.py b/stix2/datastore/__init__.py
index 715c6e6b..5d6e4dfc 100644
--- a/stix2/datastore/__init__.py
+++ b/stix2/datastore/__init__.py
@@ -210,11 +210,7 @@ def add(self, *args, **kwargs):
stix_objs (list): a list of STIX objects
"""
- try:
- return self.sink.add(*args, **kwargs)
- except AttributeError:
- msg = "%s has no data sink to put objects in"
- raise AttributeError(msg % self.__class__.__name__)
+ return self.sink.add(*args, **kwargs)
class DataSink(metaclass=ABCMeta):
diff --git a/stix2/datastore/neo4j/STIX2NEO4J.py.doc b/stix2/datastore/neo4j/STIX2NEO4J.py.doc
new file mode 100644
index 00000000..2073eff3
--- /dev/null
+++ b/stix2/datastore/neo4j/STIX2NEO4J.py.doc
@@ -0,0 +1,153 @@
+# Reference implementation python script to load STIX 2.1 bundles into
+# Neo4J graph database
+# Code developed by JHU/APL - First Draft December 2021
+
+# DISCLAIMER
+# The script developed by JHU/APL for the demonstration are not “turn key” and are
+# not safe for deployment without being tailored to production infrastructure. These
+# files are not being delivered as software and are not appropriate for direct use on any
+# production networks. JHU/APL assumes no liability for the direct use of these files and
+# they are provided strictly as a reference implementation.
+#
+# NO WARRANTY, NO LIABILITY. THIS MATERIAL IS PROVIDED “AS IS.” JHU/APL MAKES NO
+# REPRESENTATION OR WARRANTY WITH RESPECT TO THE PERFORMANCE OF THE MATERIALS, INCLUDING
+# THEIR SAFETY, EFFECTIVENESS, OR COMMERCIAL VIABILITY, AND DISCLAIMS ALL WARRANTIES IN
+# THE MATERIAL, WHETHER EXPRESS OR IMPLIED, INCLUDING (BUT NOT LIMITED TO) ANY AND ALL
+# IMPLIED WARRANTIES OF PERFORMANCE, MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE,
+# AND NON-INFRINGEMENT OF INTELLECTUAL PROPERTY OR OTHER THIRD PARTY RIGHTS. ANY USER OF
+# THE MATERIAL ASSUMES THE ENTIRE RISK AND LIABILITY FOR USING THE MATERIAL. IN NO EVENT
+# SHALL JHU/APL BE LIABLE TO ANY USER OF THE MATERIAL FOR ANY ACTUAL, INDIRECT,
+# CONSEQUENTIAL, SPECIAL OR OTHER DAMAGES ARISING FROM THE USE OF, OR INABILITY TO USE,
+# THE MATERIAL, INCLUDING, BUT NOT LIMITED TO, ANY DAMAGES FOR LOST PROFITS.
+
+from getpass import getpass
+## Import python modules for this script
+import json
+from typing import List
+
+from py2neo import Graph, Node
+from tqdm import tqdm
+
+#Import variables
+BundleName = input("Enter the name you want for your bundle: ")
+NeoHost = input("Enter the hostname for Neo4j server: ")
+NeoUser = input("Neo4j User: ")
+NeoPass = getpass("Neo4j Password: ")
+JSONFILE = input("Path to STIX JSON: ")
+
+class NeoUploader(object):
+
+ def __init__(self):
+ # Connect to neo4j
+ self.sgraph = Graph(host=NeoHost, auth=(NeoUser, NeoPass))
+ self.relations = list()
+ self.relationship_ids = set()
+ self.nodes_with_object_ref = list()
+ self.nodes = list()
+ self.bundlename = BundleName
+ self.infer_relation = {
+ "parent_ref": "parent_of",
+ "created_by_ref": "created_by",
+ "src_ref": "source_of",
+ "dst_ref": "destination_of",
+ }
+ self.__load_json(JSONFILE)
+
+ def __load_json(self, fd):
+ data = None
+ with open(fd) as json_file:
+ data = json.load(json_file)
+ for entry in data["objects"]:
+ if entry["type"] == "relationship":
+ self.relations.append(entry)
+ else:
+ self.nodes.append(entry)
+
+ # Make Nodes
+ def make_nodes(self):
+ total_nodes=len(self.nodes)
+ for idx, apobj in tqdm(enumerate(self.nodes), total=total_nodes, desc="Making Nodes", unit="node"):
+ keys = apobj.keys()
+ node_contents = dict()
+ #If the SCO does not have a name field, use the type as name
+ if 'name' not in keys:
+ node_name = apobj["type"]
+ else:
+ node_name = apobj["name"]
+ # add id and type to node contents
+ node_contents["ap_id"] = apobj["id"]
+ node_contents["type"] = apobj["type"]
+ # store rest of object contents in node contents
+ for key in keys:
+ if key not in ["type", "name", "id"]:
+ # collections not allowed as neo4j property value
+ # convert nested collections to string
+ if isinstance(apobj[key], list) or isinstance(apobj[key], dict):
+ node_contents[key] = json.dumps(apobj[key])
+ else:
+ node_contents[key] = apobj[key]
+ # Make the Bundle ID a property
+ # use dictionary expansion as keywork for optional node properties
+ node = Node(
+ apobj["type"],
+ name=node_name,
+ bundlesource=self.bundlename,
+ **node_contents,
+ )
+ # if node needs new created_by relation, create the node and then the relationship
+ self.sgraph.create(node)
+ # save off these nodes for additional relationship creating
+ if 'object_refs' in keys:
+ self.nodes_with_object_ref.append(apobj)
+
+ # create relationships that exist outside of relationship objects
+ # such as Created_by and Parent_Of
+ def __make_inferred_relations(self):
+ total_nodes=len(self.nodes)
+ for idx, apobj in tqdm(enumerate(self.nodes), total=total_nodes, desc="Checking Inferred Relationships", unit="node"):
+ for k in apobj.keys():
+ k_tokens = k.split("_")
+ # find refs, but ignore external_references since they aren't objects
+ if "ref" in k_tokens[len(k_tokens) - 1] and k_tokens[len(k_tokens) - 1] != "references":
+ rel_type = "_".join(k_tokens[: -1])
+ ref_list = []
+ # refs are lists, push singular ref into list to make it iterable for loop
+ if not type(apobj[k]).__name__ == "list":
+ ref_list.append(apobj[k])
+ else:
+ ref_list = apobj[k]
+ for ref in ref_list:
+ # The "b to a" relationship is reversed in this cypher query to ensure the correct relationship direction in the graph
+ cypher_string = f'MATCH (a),(b) WHERE a.bundlesource="{self.bundlename}" AND b.bundlesource="{self.bundlename}" AND a.ap_id="{str(ref)}" AND b.ap_id="{str(apobj["id"])}" CREATE (b)-[r:{rel_type}]->(a) RETURN a,b'
+ try:
+ self.sgraph.run(cypher_string)
+ except Exception as err:
+ print(err)
+ continue
+
+ # Make Relationships
+ def make_relationships(self):
+ total_rels=len(self.relations)
+ for idx, apobj in tqdm(enumerate(self.relations), total=total_rels, desc="Making Relationships", unit="rel"):
+ # Define Relationship Type
+ reltype = str(apobj['relationship_type'])
+ # Fix Relationships with hyphens, neo4j will throw syntax error as
+ # the hyphen is interpreted as an operation in the query string
+ reltype = reltype.replace('-', '_')
+ # create the relationship
+ cypher_string = f'MATCH (a),(b) WHERE a.bundlesource="{self.bundlename}" AND b.bundlesource="{self.bundlename}" AND a.ap_id="{str(apobj["source_ref"])}" AND b.ap_id="{str(apobj["target_ref"])}" CREATE (a)-[r:{reltype}]->(b) RETURN a,b'
+ self.sgraph.run(cypher_string)
+ # maintain set of object ids that are in relationship objects
+ self.relationship_ids.add(str(apobj['source_ref']))
+ self.relationship_ids.add(str(apobj['target_ref']))
+ self.__make_inferred_relations()
+
+ # run the helper methods to upload bundle to neo4j database
+ def upload(self):
+ self.make_nodes()
+ self.make_relationships()
+
+
+if __name__ == '__main__':
+ uploader = NeoUploader()
+ uploader.upload()
diff --git a/stix2/datastore/neo4j/__init__.py b/stix2/datastore/neo4j/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/stix2/datastore/neo4j/demo.py b/stix2/datastore/neo4j/demo.py
new file mode 100644
index 00000000..2beb2b10
--- /dev/null
+++ b/stix2/datastore/neo4j/demo.py
@@ -0,0 +1,26 @@
+
+import json
+import sys
+
+from identity_contact_information import \
+ identity_contact_information # noqa F401
+# needed so the relational db code knows to create tables for this
+from incident import event, impact, incident, task # noqa F401
+from observed_string import observed_string # noqa F401
+
+import stix2
+from stix2.datastore.neo4j.neo4j import Neo4jStore
+import stix2.properties
+
+
+def main():
+ with open(sys.argv[1], "r") as f:
+ bundle = stix2.parse(json.load(f), allow_custom=True)
+ store = Neo4jStore(clear_database=True)
+
+ for obj in bundle.objects:
+ store.add(obj)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/stix2/datastore/neo4j/neo4j.py b/stix2/datastore/neo4j/neo4j.py
new file mode 100644
index 00000000..ae3fe6ac
--- /dev/null
+++ b/stix2/datastore/neo4j/neo4j.py
@@ -0,0 +1,327 @@
+import re
+
+from py2neo import Graph, Node, Relationship
+
+import stix2
+from stix2.base import _STIXBase
+from stix2.datastore import DataSink, DataSource, DataStoreMixin
+from stix2.parsing import parse
+
+
+def convert_camel_case_to_snake_case(name):
+ return re.sub(r'(?(b) RETURN a,b'
+ self.sgraph.run(cypher_string)
+ # print(f'Created {str(obj["source_ref"])} {reltype} {obj["target_ref"]}')
+ if recheck:
+ remove_sro_from_list(obj, self.relationships_to_recheck)
+ else:
+ if not recheck:
+ self.relationships_to_recheck.append(obj)
+
+ def _insert_embedded_relationships(self, obj, id, recheck=False):
+ for k in obj.keys():
+ k_tokens = k.split("_")
+ # find refs, but ignore external_references since they aren't objects
+ if "ref" in k_tokens[len(k_tokens) - 1] and k_tokens[len(k_tokens) - 1] != "references":
+ rel_type = "_".join(k_tokens[: -1]) # noqa F841
+ ref_list = []
+ # refs are lists, push singular ref into list to make it iterable for loop
+ if not type(obj[k]).__name__ == "list":
+ ref_list.append(obj[k])
+ else:
+ ref_list = obj[k]
+ for ref in ref_list:
+ if self._is_node_available(ref):
+ # The "b to a" relationship is reversed in this cypher query to ensure the correct relationship direction in the graph
+ cypher_string = f'MATCH (a),(b) WHERE a.id="{str(ref)}" AND b.id="{str(id)}" CREATE (b)-[r:{k}]->(a) RETURN a,b'
+ self.sgraph.run(cypher_string)
+ # print(f'Created * {str(id)} {k} {str(ref)}')
+ if recheck:
+ remove_sro_from_list(obj, self.relationships_to_recheck)
+ else:
+ if not recheck:
+ embedded_relationship = {
+ "source_ref": id,
+ "target_ref": ref,
+ "relationship_type": k,
+ }
+ self.relationships_to_recheck.append(embedded_relationship)
diff --git a/stix2/datastore/neo4j/neo4j_testing.py b/stix2/datastore/neo4j/neo4j_testing.py
new file mode 100644
index 00000000..8235ca0f
--- /dev/null
+++ b/stix2/datastore/neo4j/neo4j_testing.py
@@ -0,0 +1,338 @@
+import datetime as dt
+import os # noqa: F401
+
+import pytz
+
+import stix2
+from stix2.datastore.neo4j.neo4j import Neo4jStore
+import stix2.properties
+
+email_message = stix2.EmailMessage(
+ type="email-message",
+ spec_version="2.1",
+ id="email-message--0c57a381-2a17-5e61-8754-5ef96efb286c",
+ from_ref="email-addr--9b7e29b3-fd8d-562e-b3f0-8fc8134f5dda",
+ to_refs=["email-addr--d1b3bf0c-f02a-51a1-8102-11aba7959868"],
+ is_multipart=False,
+ date="2004-05-19T12:22:23.000Z",
+ subject="Did you see this?",
+ additional_header_fields={
+ "Reply-To": [
+ "steve@example.com",
+ "jane@example.com",
+ ],
+ },
+)
+
+directory_stix_object = stix2.Directory(
+ path="/foo/bar/a",
+ path_enc="latin1",
+ ctime="1980-02-23T05:43:28.2678Z",
+ atime="1991-06-09T18:06:33.915Z",
+ mtime="2000-06-28T13:06:09.5827Z",
+ contains_refs=[
+ "file--8903b558-40e3-43e2-be90-b341c12ff7ae",
+ "directory--e0604d0c-bab3-4487-b350-87ac1a3a195c",
+ ],
+ object_marking_refs=[
+ "marking-definition--1b3eec29-5376-4837-bd93-73203e65d73c",
+ ],
+)
+
+s = stix2.v21.Software(
+ id="software--28897173-7314-4eec-b1cf-2c625b635bf6",
+ name="Word",
+ cpe="cpe:2.3:a:microsoft:word:2000:*:*:*:*:*:*:*",
+ swid="com.acme.rms-ce-v4-1-5-0",
+ version="2002",
+ languages=["c", "lisp"],
+ vendor="Microsoft",
+)
+
+
+def windows_registry_key_example():
+ v1 = stix2.v21.WindowsRegistryValueType(
+ name="Foo",
+ data="qwerty",
+ data_type="REG_SZ",
+ )
+ v2 = stix2.v21.WindowsRegistryValueType(
+ name="Bar",
+ data="Fred",
+ data_type="REG_SZ",
+ )
+ w = stix2.v21.WindowsRegistryKey(
+ key="hkey_local_machine\\system\\bar\\foo",
+ values=[v1, v2],
+ )
+ return w
+
+
+def malware_with_all_required_properties():
+ ref1 = stix2.v21.ExternalReference(
+ source_name="veris",
+ external_id="0001AA7F-C601-424A-B2B8-BE6C9F5164E7",
+ hashes={
+ "SHA-256": "6db12788c37247f2316052e142f42f4b259d6561751e5f401a1ae2a6df9c674b",
+ "MD5": "3773a88f65a5e780c8dff9cdc3a056f3",
+ },
+ url="https://github.com/vz-risk/VCDB/blob/master/data/json/0001AA7F-C601-424A-B2B8-BE6C9F5164E7.json",
+ )
+ ref2 = stix2.v21.ExternalReference(
+ source_name="ACME Threat Intel",
+ description="Threat report",
+ url="http://www.example.com/threat-report.pdf",
+ )
+ now = dt.datetime(2016, 5, 12, 8, 17, 27, tzinfo=pytz.utc)
+
+ malware = stix2.v21.Malware(
+ external_references=[ref1, ref2],
+ type="malware",
+ id="malware--9c4638ec-f1de-4ddb-abf4-1b760417654e",
+ created=now,
+ modified=now,
+ name="Cryptolocker",
+ is_family=False,
+ labels=["foo", "bar"],
+ )
+ return malware
+
+
+def file_example_with_PDFExt_Object():
+ f = stix2.v21.File(
+ name="qwerty.dll",
+ magic_number_hex="504B0304",
+ extensions={
+ "pdf-ext": stix2.v21.PDFExt(
+ version="1.7",
+ document_info_dict={
+ "Title": "Sample document",
+ "Author": "Adobe Systems Incorporated",
+ "Creator": "Adobe FrameMaker 5.5.3 for Power Macintosh",
+ "Producer": "Acrobat Distiller 3.01 for Power Macintosh",
+ "CreationDate": "20070412090123-02",
+ },
+ pdfid0="DFCE52BD827ECF765649852119D",
+ pdfid1="57A1E0F9ED2AE523E313C",
+ ),
+ },
+ )
+ return f
+
+
+def extension_definition_insert():
+ return stix2.ExtensionDefinition(
+ created_by_ref="identity--8a5fb7e4-aabe-4635-8972-cbcde1fa4792",
+ name="test",
+ schema="a schema",
+ version="1.2.3",
+ extension_types=["property-extension", "new-sdo", "new-sro"],
+ object_marking_refs=[
+ "marking-definition--caa0d913-5db8-4424-aae0-43e770287d30",
+ "marking-definition--122a27a0-b96f-46bc-8fcd-f7a159757e77",
+ ],
+ granular_markings=[
+ {
+ "lang": "en_US",
+ "selectors": ["name", "schema"],
+ },
+ {
+ "marking_ref": "marking-definition--50902d70-37ae-4f85-af68-3f4095493b42",
+ "selectors": ["name", "schema"],
+ },
+ ],
+ )
+
+
+def dictionary_test():
+ return stix2.File(
+ spec_version="2.1",
+ name="picture.jpg",
+ defanged=True,
+ ctime="1980-02-23T05:43:28.2678Z",
+ extensions={
+ "raster-image-ext": {
+ "exif_tags": {
+ "Make": "Nikon",
+ "Model": "D7000",
+ "XResolution": 4928,
+ "YResolution": 3264,
+ },
+ },
+ },
+ )
+
+
+def kill_chain_test():
+ return stix2.AttackPattern(
+ spec_version="2.1",
+ id="attack-pattern--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061",
+ created="2016-05-12T08:17:27.000Z",
+ modified="2016-05-12T08:17:27.000Z",
+ name="Spear Phishing",
+ kill_chain_phases=[
+ {
+ "kill_chain_name": "lockheed-martin-cyber-kill-chain",
+ "phase_name": "reconnaissance",
+ },
+ ],
+ external_references=[
+ {
+ "source_name": "capec",
+ "external_id": "CAPEC-163",
+ },
+ ],
+ granular_markings=[
+ {
+ "lang": "en_US",
+ "selectors": ["kill_chain_phases"],
+ },
+ {
+ "marking_ref": "marking-definition--50902d70-37ae-4f85-af68-3f4095493b42",
+ "selectors": ["external_references"],
+ },
+ ], )
+
+
+@stix2.CustomObject(
+ 'x-custom-type',
+ properties=[
+ ("phases", stix2.properties.ListProperty(stix2.KillChainPhase)),
+ ("something_else", stix2.properties.IntegerProperty()),
+ ],
+)
+class CustomClass:
+ pass
+
+
+def custom_obj():
+ obj = CustomClass(
+ phases=[
+ {
+ "kill_chain_name": "chain name",
+ "phase_name": "the phase name",
+ },
+ ],
+ something_else=5,
+ )
+ return obj
+
+
+@stix2.CustomObject(
+ "test-object", [
+ ("prop_name", stix2.properties.ListProperty(stix2.properties.BinaryProperty())),
+ ],
+ "extension-definition--15de9cdb-3515-4271-8479-8141154c5647",
+ is_sdo=True,
+)
+class TestClass:
+ pass
+
+
+def test_binary_list():
+ return TestClass(prop_name=["AREi", "7t3M"])
+
+
+@stix2.CustomObject(
+ "test2-object", [
+ (
+ "prop_name", stix2.properties.ListProperty(
+ stix2.properties.HexProperty(),
+ ),
+ ),
+ ],
+ "extension-definition--15de9cdb-4567-4271-8479-8141154c5647",
+ is_sdo=True,
+)
+class Test2Class:
+ pass
+
+
+def test_hex_list():
+ return Test2Class(
+ prop_name=["1122", "fedc"],
+ )
+
+
+@stix2.CustomObject(
+ "test3-object", [
+ (
+ "prop_name",
+ stix2.properties.DictionaryProperty(
+ valid_types=[
+ stix2.properties.IntegerProperty,
+ stix2.properties.FloatProperty,
+ stix2.properties.StringProperty,
+ ],
+ ),
+ ),
+ (
+ "list_of_timestamps",
+ stix2.properties.ListProperty(stix2.properties.TimestampProperty()),
+ ),
+ ],
+ "extension-definition--15de9cdb-1234-4271-8479-8141154c5647",
+ is_sdo=True,
+)
+class Test3Class:
+ pass
+
+
+def test_dictionary():
+ return Test3Class(
+ prop_name={"a": 1, "b": 2.3, "c": "foo"},
+ list_of_timestamps=["2016-05-12T08:17:27.000Z", "2024-05-12T08:17:27.000Z"],
+ )
+
+
+def main():
+ store = Neo4jStore()
+
+ if store.sink:
+
+ ap = kill_chain_test()
+ store.add(ap)
+
+ x = email_message
+
+ store.add(x)
+
+ td = test_dictionary()
+
+ store.add(td)
+
+ th = test_hex_list()
+
+ store.add(th)
+
+ tb = test_binary_list()
+
+ store.add(tb)
+
+ co = custom_obj()
+
+ store.add(co)
+
+ pdf_file = file_example_with_PDFExt_Object()
+ store.add(pdf_file)
+
+ store.add(directory_stix_object)
+
+ store.add(s)
+
+ store.add(extension_definition_insert())
+
+ dict_example = dictionary_test()
+ store.add(dict_example)
+
+ malware = malware_with_all_required_properties()
+ store.add(malware)
+
+ # read_obj = store.get(directory_stix_object.id)
+ # print(read_obj)
+ else:
+ print("database does not exist")
+
+
+if __name__ == '__main__':
+ main()
diff --git a/stix2/datastore/relational_db/database_backends/database_backend_base.py b/stix2/datastore/relational_db/database_backends/database_backend_base.py
index 674c053c..797a5740 100644
--- a/stix2/datastore/relational_db/database_backends/database_backend_base.py
+++ b/stix2/datastore/relational_db/database_backends/database_backend_base.py
@@ -1,7 +1,8 @@
from typing import Any
from sqlalchemy import (
- Boolean, CheckConstraint, Float, Integer, String, Text, create_engine,
+ Boolean, CheckConstraint, Float, Integer, Sequence, String, Text,
+ create_engine,
)
from sqlalchemy_utils import create_database, database_exists, drop_database
@@ -71,6 +72,9 @@ def determine_sql_type_for_hex_property(): # noqa: F811
def determine_sql_type_for_timestamp_property(): # noqa: F811
pass
+ def create_regex_constraint_clause(self, column_name, pattern):
+ pass
+
# ------------------------------------------------------------------
# Common SQL types for STIX property classes
@@ -127,6 +131,20 @@ def array_allowed():
def create_regex_constraint_expression(self, column_name, pattern):
return CheckConstraint(self.create_regex_constraint_clause(column_name, pattern))
+ @staticmethod
+ def check_for_none(val):
+ return val is None
+
+ def create_min_max_constraint_expression(self, int_property, column_name):
+ if not self.check_for_none(int_property.min) and not self.check_for_none(int_property.max):
+ return CheckConstraint(f"{column_name} >= {int_property.min} and {column_name} <= {int_property.max}")
+ elif not self.check_for_none(int_property.min):
+ return CheckConstraint(f"{column_name} >= {int_property.min}")
+ elif not self.check_for_none(int_property.max):
+ return CheckConstraint(f"{column_name} <= {int_property.max}")
+ else:
+ return None
+
def create_regex_constraint_and_expression(self, clause1, clause2):
return (
CheckConstraint(
@@ -139,7 +157,9 @@ def process_value_for_insert(self, stix_type, value):
sql_type = stix_type.determine_sql_type(self)
if sql_type == self.determine_sql_type_for_timestamp_property() and isinstance(value, STIXdatetime):
return value.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
- elif sql_type == self.determine_sql_type_for_hex_property() and isinstance(stix_type, HexProperty):
+ elif sql_type == self.determine_sql_type_for_hex_property() and isinstance(stix_type, HexProperty) and \
+ sql_type is not Text:
+ # make sure it isn't represented as Text
return bytes.fromhex(value)
else:
return value
@@ -147,3 +167,6 @@ def process_value_for_insert(self, stix_type, value):
def next_id(self, data_sink):
with self.database_connection.begin() as trans:
return trans.execute(data_sink.sequence)
+
+ def create_sequence(self, metadata):
+ return Sequence("my_general_seq", metadata=metadata, start=1, schema=self.schema_for_core())
diff --git a/stix2/datastore/relational_db/database_backends/sqlite_backend.py b/stix2/datastore/relational_db/database_backends/sqlite_backend.py
index a839b853..eba5505a 100644
--- a/stix2/datastore/relational_db/database_backends/sqlite_backend.py
+++ b/stix2/datastore/relational_db/database_backends/sqlite_backend.py
@@ -1,6 +1,7 @@
from typing import Any
-from sqlalchemy import Text, event
+from sqlalchemy import Column, Table, Text, event, insert, select, update
+from sqlalchemy.schema import CreateTable
from .database_backend_base import DatabaseBackend
@@ -10,6 +11,8 @@ class SQLiteBackend(DatabaseBackend):
temp_sequence_count = 0
+ select_stmt_for_sequence = None
+
def __init__(self, database_connection_url=default_database_connection_url, force_recreate=False, **kwargs: Any):
super().__init__(database_connection_url, force_recreate=force_recreate, **kwargs)
@@ -60,8 +63,36 @@ def array_allowed():
def create_regex_constraint_clause(self, column_name, pattern):
return f"{column_name} REGEXP {pattern}"
- @staticmethod
- def next_id(data_sink):
+ def next_id(self, data_sink):
# hack, which is not reliable, must look for a better solution
- SQLiteBackend.temp_sequence_count += 1
- return SQLiteBackend.temp_sequence_count
+ # SQLiteBackend.temp_sequence_count += 1
+ # return SQLiteBackend.temp_sequence_count
+ # ALWAYS CALL WITHIN A TRANSACTION?????
+
+ # better solution, but probably not best
+ value = 0
+ conn = self.database_connection.connect()
+ for row in conn.execute(self.select_stmt_for_sequence):
+ value = row[0]
+ if value == 0:
+ stmt = insert(data_sink.sequence).values({"id": 1, "value": 0})
+ conn = self.database_connection.connect()
+ conn.execute(stmt)
+ conn.commit()
+ value += 1
+ conn.execute(update(data_sink.sequence).where(data_sink.sequence.c.id == 1).values({"value": value}))
+ conn.commit()
+ return value
+
+ def create_sequence(self, metadata):
+ # need id column, so update has something to work with (see above)
+ t = Table(
+ "my_general_seq",
+ metadata,
+ Column("id", SQLiteBackend.determine_sql_type_for_key_as_int()),
+ Column("value", SQLiteBackend.determine_sql_type_for_integer_property()),
+ schema=self.schema_for_core(),
+ )
+ CreateTable(t).compile(self.database_connection)
+ self.select_stmt_for_sequence = select(t.c.value)
+ return t
diff --git a/stix2/datastore/relational_db/demo.py b/stix2/datastore/relational_db/demo.py
index dec59dfb..8920934f 100644
--- a/stix2/datastore/relational_db/demo.py
+++ b/stix2/datastore/relational_db/demo.py
@@ -1,18 +1,18 @@
-import datetime as dt
-
-from database_backends.postgres_backend import PostgresBackend
-import sys
import json
+import sys
+
+from database_backends.postgres_backend import PostgresBackend # noqa F401
+from database_backends.sqlite_backend import SQLiteBackend # noqa F401
+# needed so the relational db code knows to create tables for this
+from identity_contact_information import \
+ identity_contact_information # noqa F401
+from incident import event, impact, incident, task # noqa F401
+from observed_string import observed_string # noqa F401
import stix2
from stix2.datastore.relational_db.relational_db import RelationalDBStore
import stix2.properties
-# needed so the relational db code knows to create tables for this
-from incident import incident, event, task, impact
-from identity_contact_information import identity_contact_information
-from observed_string import observed_string
-
def main():
with open(sys.argv[1], "r") as f:
diff --git a/stix2/datastore/relational_db/input_creation.py b/stix2/datastore/relational_db/input_creation.py
index 12dcb966..9dc0bc1f 100644
--- a/stix2/datastore/relational_db/input_creation.py
+++ b/stix2/datastore/relational_db/input_creation.py
@@ -3,6 +3,7 @@
from stix2.datastore.relational_db.add_method import add_method
from stix2.datastore.relational_db.utils import (
SCO_COMMON_PROPERTIES, SDO_COMMON_PROPERTIES, canonicalize_table_name,
+ shorten_extension_definition_id,
)
from stix2.properties import (
BinaryProperty, BooleanProperty, DictionaryProperty,
@@ -57,19 +58,18 @@ def is_valid_type(cls, valid_types):
return cls in valid_types or instance_in_valid_types(cls, valid_types)
-def generate_insert_for_dictionary_list(table, next_id, value):
+def generate_insert_for_dictionary_list(table, next_id, value, data_sink, contained_type):
insert_stmts = list()
for v in value:
bindings = dict()
bindings["id"] = next_id
- bindings["value"] = v
+ bindings["value"] = data_sink.db_backend.process_value_for_insert(contained_type, v)
insert_stmts.append(insert(table).values(bindings))
return insert_stmts
@add_method(DictionaryProperty)
def generate_insert_information(self, dictionary_name, stix_object, **kwargs): # noqa: F811
- bindings = dict()
data_sink = kwargs.get("data_sink")
table_name = kwargs.get("table_name")
schema_name = kwargs.get("schema_name")
@@ -96,30 +96,42 @@ def generate_insert_information(self, dictionary_name, stix_object, **kwargs):
if not valid_types or len(self.valid_types) == 1:
if is_valid_type(ListProperty, valid_types):
value_binding = "values"
+ contained_type = valid_types[0].contained
if not data_sink.db_backend.array_allowed():
next_id = data_sink.db_backend.next_id(data_sink)
table_child = data_sink.tables_dictionary[
canonicalize_table_name(table_name + "_" + dictionary_name + "_" + "values", schema_name)
]
- child_table_inserts = generate_insert_for_dictionary_list(table_child, next_id, value)
+ child_table_inserts.extend(generate_insert_for_dictionary_list(table_child, next_id, value, data_sink, contained_type))
value = next_id
+ stix_type = IntegerProperty()
+ else:
+ stix_type = ListProperty(contained_type)
+ value = [data_sink.db_backend.process_value_for_insert(contained_type, x) for x in value]
else:
value_binding = "value"
+ stix_type = StringProperty()
elif isinstance(value, int) and is_valid_type(IntegerProperty, valid_types):
value_binding = "integer_value"
+ stix_type = IntegerProperty()
elif isinstance(value, str) and is_valid_type(StringProperty, valid_types):
value_binding = "string_value"
+ stix_type = StringProperty()
elif isinstance(value, bool) and is_valid_type(BooleanProperty, valid_types):
value_binding = "boolean_value"
+ stix_type = BooleanProperty()
elif isinstance(value, float) and is_valid_type(FloatProperty, valid_types):
value_binding = "float_value"
+ stix_type = FloatProperty()
elif isinstance(value, STIXdatetime) and is_valid_type(TimestampProperty, valid_types):
value_binding = "timestamp_value"
+ stix_type = TimestampProperty()
else:
value_binding = "string_value"
+ stix_type = StringProperty()
bindings["name"] = name
- bindings[value_binding] = value
+ bindings[value_binding] = data_sink.db_backend.process_value_for_insert(stix_type, value)
insert_statements.append(insert(table).values(bindings))
@@ -153,9 +165,10 @@ def generate_insert_information(self, name, stix_object, data_sink=None, table_n
for ex_name, ex in stix_object["extensions"].items():
# ignore new extensions - they have no properties
if ex.extension_type is None or not ex.extension_type.startswith("new"):
- if ex_name.startswith("extension-definition"):
- ex_name = ex_name[0:30]
- ex_name = ex_name.replace("extension-definition-", "ext_def")
+ if ex_name.startswith("extension-definition--"):
+ # ex_name = ex_name[0:30]
+ # ex_name = ex_name.replace("extension-definition-", "ext_def")
+ ex_name = shorten_extension_definition_id(ex_name)
bindings = {
"id": stix_object["id"],
"ext_table_name": canonicalize_table_name(ex_name, schema_name),
@@ -285,11 +298,7 @@ def generate_insert_information( # noqa: F811
return insert_statements
else:
if db_backend.array_allowed():
- if isinstance(self.contained, HexProperty):
- return {name: [data_sink.db_backend.process_value_for_insert(self.contained, x) for x in stix_object[name]]}
- else:
- return {name: stix_object[name]}
-
+ return {name: [data_sink.db_backend.process_value_for_insert(self.contained, x) for x in stix_object[name]]}
else:
insert_statements = list()
table = data_sink.tables_dictionary[
@@ -300,7 +309,7 @@ def generate_insert_information( # noqa: F811
]
for elem in stix_object[name]:
bindings = {
- "id": stix_object["id"],
+ "id": foreign_key_value,
name: db_backend.process_value_for_insert(self.contained, elem),
}
insert_statements.append(insert(table).values(bindings))
@@ -510,9 +519,10 @@ def generate_insert_for_sub_object(
bindings["id"] = foreign_key_value
if parent_table_name and (not is_extension or level > 0):
type_name = parent_table_name + "_" + type_name
- if type_name.startswith("extension-definition"):
- type_name = type_name[0:30]
- type_name = type_name.replace("extension-definition-", "ext_def")
+ if type_name.startswith("extension-definition--"):
+ # type_name = type_name[0:30]
+ # type_name = type_name.replace("extension-definition-", "ext_def")
+ type_name = shorten_extension_definition_id(type_name)
sub_insert_statements = list()
for name, prop in stix_object._properties.items():
if name in stix_object:
diff --git a/stix2/datastore/relational_db/query.py b/stix2/datastore/relational_db/query.py
index 5cdf14cd..3e901ee4 100644
--- a/stix2/datastore/relational_db/query.py
+++ b/stix2/datastore/relational_db/query.py
@@ -6,7 +6,7 @@
import stix2
from stix2.datastore import DataSourceError
from stix2.datastore.relational_db.utils import (
- canonicalize_table_name, schema_for, table_name_for,
+ canonicalize_table_name, see_through_workbench, table_name_for,
)
import stix2.properties
import stix2.utils
@@ -30,7 +30,7 @@ def _check_support(stix_id):
raise DataSourceError(f"Reading {stix_type} objects is not supported.")
-def _tables_for(stix_class, metadata):
+def _tables_for(stix_class, metadata, db_backend):
"""
Get the core and type-specific tables for the given class
@@ -41,17 +41,24 @@ def _tables_for(stix_class, metadata):
"""
# Info about the type-specific table
type_table_name = table_name_for(stix_class)
- type_schema_name = schema_for(stix_class)
- type_table = metadata.tables[f"{type_schema_name}.{type_table_name}"]
+ type_schema_name = db_backend.schema_for(stix_class)
+ canon_type_table_name = canonicalize_table_name(type_table_name, type_schema_name)
+
+ type_table = metadata.tables[canon_type_table_name]
# Some fixed info about core tables
- if type_schema_name == "sco":
- core_table_name = "common.core_sco"
+ if stix2.utils.is_sco(stix_class._type, stix2.DEFAULT_VERSION):
+ canon_core_table_name = canonicalize_table_name(
+ "core_sco", db_backend.schema_for_core(),
+ )
+
else:
# for SROs and SMOs too?
- core_table_name = "common.core_sdo"
+ canon_core_table_name = canonicalize_table_name(
+ "core_sdo", db_backend.schema_for_core(),
+ )
- core_table = metadata.tables[core_table_name]
+ core_table = metadata.tables[canon_core_table_name]
return core_table, type_table
@@ -68,6 +75,9 @@ def _stix2_class_for(stix_id):
stix_type, stix_version=stix2.DEFAULT_VERSION,
)
+ if stix_class:
+ stix_class = see_through_workbench(stix_class)
+
return stix_class
@@ -134,7 +144,7 @@ def _read_hashes(fk_id, hashes_table, conn):
return hashes
-def _read_external_references(stix_id, metadata, conn):
+def _read_external_references(stix_id, metadata, conn, db_backend):
"""
Read external references from some fixed tables in the common schema.
@@ -142,10 +152,22 @@ def _read_external_references(stix_id, metadata, conn):
:param metadata: SQLAlchemy Metadata object containing all the table
information
:param conn: An SQLAlchemy DB connection
+ :param db_backend: A backend object with information about how data is
+ stored in the database
:return: The external references, as a list of dicts
"""
- ext_refs_table = metadata.tables["common.external_references"]
- ext_refs_hashes_table = metadata.tables["common.external_references_hashes"]
+ ext_refs_table = metadata.tables[
+ canonicalize_table_name(
+ "external_references",
+ db_backend.schema_for_core(),
+ )
+ ]
+ ext_refs_hashes_table = metadata.tables[
+ canonicalize_table_name(
+ "external_references_hashes",
+ db_backend.schema_for_core(),
+ )
+ ]
ext_refs = []
ext_refs_columns = (col for col in ext_refs_table.c if col.key != "id")
@@ -165,29 +187,30 @@ def _read_external_references(stix_id, metadata, conn):
return ext_refs
-def _read_object_marking_refs(stix_id, stix_type_class, metadata, conn):
+def _read_object_marking_refs(stix_id, common_table_kind, metadata, conn, db_backend):
"""
Read object marking refs from one of a couple special tables in the common
schema.
:param stix_id: A STIX ID, used to filter table rows
- :param stix_type_class: STIXTypeClass enum value, used to determine whether
+ :param common_table_kind: "sco" or "sdo", used to determine whether
to read the table for SDOs or SCOs
:param metadata: SQLAlchemy Metadata object containing all the table
information
:param conn: An SQLAlchemy DB connection
+ :param db_backend: A backend object with information about how data is
+ stored in the database
:return: The references as a list of strings
"""
- marking_table_name = "object_marking_refs_"
- if stix_type_class is stix2.utils.STIXTypeClass.SCO:
- marking_table_name += "sco"
- else:
- marking_table_name += "sdo"
+ marking_table_name = canonicalize_table_name(
+ "object_marking_refs_" + common_table_kind,
+ db_backend.schema_for_core(),
+ )
# The SCO/SDO object_marking_refs tables are mostly identical; they just
# have different foreign key constraints (to different core tables).
- marking_table = metadata.tables["common." + marking_table_name]
+ marking_table = metadata.tables[marking_table_name]
stmt = sa.select(marking_table.c.ref_id).where(marking_table.c.id == stix_id)
refs = conn.scalars(stmt).all()
@@ -195,13 +218,13 @@ def _read_object_marking_refs(stix_id, stix_type_class, metadata, conn):
return refs
-def _read_granular_markings(stix_id, stix_type_class, metadata, conn, db_backend):
+def _read_granular_markings(stix_id, common_table_kind, metadata, conn, db_backend):
"""
Read granular markings from one of a couple special tables in the common
schema.
:param stix_id: A STIX ID, used to filter table rows
- :param stix_type_class: STIXTypeClass enum value, used to determine whether
+ :param common_table_kind: "sco" or "sdo", used to determine whether
to read the table for SDOs or SCOs
:param metadata: SQLAlchemy Metadata object containing all the table
information
@@ -211,13 +234,11 @@ def _read_granular_markings(stix_id, stix_type_class, metadata, conn, db_backend
:return: Granular markings as a list of dicts
"""
- marking_table_name = "granular_marking_"
- if stix_type_class is stix2.utils.STIXTypeClass.SCO:
- marking_table_name += "sco"
- else:
- marking_table_name += "sdo"
-
- marking_table = metadata.tables["common." + marking_table_name]
+ marking_table_name = canonicalize_table_name(
+ "granular_marking_" + common_table_kind,
+ db_backend.schema_for_core(),
+ )
+ marking_table = metadata.tables[marking_table_name]
if db_backend.array_allowed():
# arrays allowed: everything combined in the same table
@@ -288,7 +309,7 @@ def _read_dictionary_property(
prop_instance,
metadata,
conn,
- db_backend
+ db_backend,
):
"""
Read a dictionary from a table.
@@ -328,11 +349,11 @@ def _read_dictionary_property(
list_table_name = f"{dict_table_name}_values"
list_table = metadata.tables[list_table_name]
stmt = sa.select(
- dict_table.c.name, list_table.c.value
+ dict_table.c.name, list_table.c.value,
).select_from(dict_table).join(
- list_table, list_table.c.id == dict_table.c.values
+ list_table, list_table.c.id == dict_table.c["values"],
).where(
- dict_table.c.id == stix_id
+ dict_table.c.id == stix_id,
)
results = conn.execute(stmt)
@@ -487,7 +508,7 @@ def _read_complex_property_value(
obj_table,
metadata,
conn,
- db_backend
+ db_backend,
):
"""
Read property values which require auxiliary tables to store. These are
@@ -583,7 +604,7 @@ def _read_complex_property_value(
prop_instance,
metadata,
conn,
- db_backend
+ db_backend,
)
elif isinstance(prop_instance, stix2.properties.EmbeddedObjectProperty):
@@ -606,7 +627,7 @@ def _read_complex_property_value(
def _read_complex_top_level_property_value(
stix_id,
- stix_type_class,
+ common_table_kind,
prop_name,
prop_instance,
type_table,
@@ -620,8 +641,8 @@ def _read_complex_top_level_property_value(
reading top-level common properties, which use special fixed tables.
:param stix_id: STIX ID of an object to read
- :param stix_type_class: The kind of object (SCO, SDO, etc). Which DB
- tables to read can depend on this.
+ :param common_table_kind: Used to find auxiliary common tables, e.g. those
+ for object markings, granular markings, etc. Either "sco" or "sdo".
:param prop_name: The name of the property to read
:param prop_instance: A Property (subclass) instance with property
config information
@@ -637,20 +658,26 @@ def _read_complex_top_level_property_value(
# Common properties: these use a fixed set of tables for all STIX objects
if prop_name == "external_references":
- prop_value = _read_external_references(stix_id, metadata, conn)
+ prop_value = _read_external_references(
+ stix_id,
+ metadata,
+ conn,
+ db_backend,
+ )
elif prop_name == "object_marking_refs":
prop_value = _read_object_marking_refs(
stix_id,
- stix_type_class,
+ common_table_kind,
metadata,
conn,
+ db_backend,
)
elif prop_name == "granular_markings":
prop_value = _read_granular_markings(
stix_id,
- stix_type_class,
+ common_table_kind,
metadata,
conn,
db_backend,
@@ -659,7 +686,10 @@ def _read_complex_top_level_property_value(
# Will apply when array columns are unsupported/disallowed by the backend
elif prop_name == "labels":
label_table = metadata.tables[
- f"common.core_{stix_type_class.name.lower()}_labels"
+ canonicalize_table_name(
+ f"core_{common_table_kind}_labels",
+ db_backend.schema_for_core(),
+ )
]
prop_value = _read_simple_array(stix_id, "label", label_table, conn)
@@ -672,7 +702,7 @@ def _read_complex_top_level_property_value(
type_table,
metadata,
conn,
- db_backend
+ db_backend,
)
return prop_value
@@ -698,16 +728,10 @@ def read_object(stix_id, metadata, conn, db_backend):
stix_type = stix2.utils.get_type_from_id(stix_id)
raise DataSourceError("Can't find registered class for type: " + stix_type)
- core_table, type_table = _tables_for(stix_class, metadata)
-
- if type_table.schema == "common":
- # Applies to extension-definition SMO, whose data is stored in the
- # common schema; it does not get its own. This type class is used to
- # determine which common tables to use; its markings are
- # in the *_sdo tables.
- stix_type_class = stix2.utils.STIXTypeClass.SDO
- else:
- stix_type_class = stix2.utils.to_enum(type_table.schema, stix2.utils.STIXTypeClass)
+ core_table, type_table = _tables_for(stix_class, metadata, db_backend)
+ # Used to find auxiliary common tables, e.g. those for object markings,
+ # granular markings, etc.
+ common_table_kind = core_table.name[-3:]
simple_props = _read_simple_properties(stix_id, core_table, type_table, conn)
if simple_props is None:
@@ -721,7 +745,7 @@ def read_object(stix_id, metadata, conn, db_backend):
if prop_name not in obj_dict:
prop_value = _read_complex_top_level_property_value(
stix_id,
- stix_type_class,
+ common_table_kind,
prop_name,
prop_instance,
type_table,
@@ -733,5 +757,10 @@ def read_object(stix_id, metadata, conn, db_backend):
if prop_value is not None:
obj_dict[prop_name] = prop_value
- stix_obj = stix_class(**obj_dict, allow_custom=True)
+ stix_obj = stix2.parse(
+ obj_dict,
+ allow_custom=True,
+ version=stix2.DEFAULT_VERSION,
+ )
+
return stix_obj
diff --git a/stix2/datastore/relational_db/relational_db.py b/stix2/datastore/relational_db/relational_db.py
index 071874cf..a65036be 100644
--- a/stix2/datastore/relational_db/relational_db.py
+++ b/stix2/datastore/relational_db/relational_db.py
@@ -1,5 +1,5 @@
from sqlalchemy import MetaData, delete
-from sqlalchemy.schema import CreateTable, Sequence
+from sqlalchemy.schema import CreateTable
from stix2.base import _STIXBase
from stix2.datastore import DataSink, DataSource, DataStoreMixin
@@ -138,7 +138,6 @@ def __init__(
create_table_objects(
self.metadata, stix_object_classes,
)
- self.sequence = Sequence("my_general_seq", metadata=self.metadata, start=1, schema=db_backend.schema_for_core())
self.allow_custom = allow_custom
@@ -155,6 +154,7 @@ def __init__(
self._instantiate_database(print_sql)
def _instantiate_database(self, print_sql=False):
+ self.sequence = self.db_backend.create_sequence(self.metadata)
self.metadata.create_all(self.db_backend.database_connection)
if print_sql:
for t in self.metadata.tables.values():
diff --git a/stix2/datastore/relational_db/relational_db_testing.py b/stix2/datastore/relational_db/relational_db_testing.py
index d4fdded7..01ffbb81 100644
--- a/stix2/datastore/relational_db/relational_db_testing.py
+++ b/stix2/datastore/relational_db/relational_db_testing.py
@@ -3,7 +3,7 @@
from database_backends.mariadb_backend import MariaDBBackend # noqa: F401
from database_backends.postgres_backend import PostgresBackend # noqa: F401
-from database_backends.sqlite_backend import SQLiteBackend
+from database_backends.sqlite_backend import SQLiteBackend # noqa: F401
import pytz
import stix2
@@ -288,9 +288,50 @@ def test_dictionary():
)
+multipart_email_msg_dict = {
+ "type": "email-message",
+ "spec_version": "2.1",
+ "id": "email-message--ef9b4b7f-14c8-5955-8065-020e0316b559",
+ "is_multipart": True,
+ "received_lines": [
+ "from mail.example.com ([198.51.100.3]) by smtp.gmail.com with ESMTPSA id \
+ q23sm23309939wme.17.2016.07.19.07.20.32 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 \
+ bits=128/128); Tue, 19 Jul 2016 07:20:40 -0700 (PDT)",
+ ],
+ "content_type": "multipart/mixed",
+ "date": "2016-06-19T14:20:40.000Z",
+ "from_ref": "email-addr--89f52ea8-d6ef-51e9-8fce-6a29236436ed",
+ "to_refs": ["email-addr--d1b3bf0c-f02a-51a1-8102-11aba7959868"],
+ "cc_refs": ["email-addr--e4ee5301-b52d-59cd-a8fa-8036738c7194"],
+ "subject": "Check out this picture of a cat!",
+ "additional_header_fields": {
+ "Content-Disposition": ["inline"],
+ "X-Mailer": ["Mutt/1.5.23"],
+ "X-Originating-IP": ["198.51.100.3"],
+ },
+ "body_multipart": [
+ {
+ "content_type": "text/plain; charset=utf-8",
+ "content_disposition": "inline",
+ "body": "Cats are funny!",
+ },
+ {
+ "content_type": "image/png",
+ "content_disposition": "attachment; filename=\"tabby.png\"",
+ "body_raw_ref": "artifact--4cce66f8-6eaa-53cb-85d5-3a85fca3a6c5",
+ },
+ {
+ "content_type": "application/zip",
+ "content_disposition": "attachment; filename=\"tabby_pics.zip\"",
+ "body_raw_ref": "file--6ce09d9c-0ad3-5ebf-900c-e3cb288955b5",
+ },
+ ],
+}
+
+
def main():
store = RelationalDBStore(
- # MariaDBBackend(f"mariadb+pymysql://admin:admin@127.0.0.1:3306/rdb", force_recreate=True),
+ # MariaDBBackend("mariadb+pymysql://admin:admin@127.0.0.1:3306/rdb", force_recreate=True),
# PostgresBackend("postgresql://localhost/stix-data-sink", force_recreate=True),
SQLiteBackend("sqlite:///stix-data-sink.db", force_recreate=True),
@@ -340,6 +381,8 @@ def main():
malware = malware_with_all_required_properties()
store.add(malware)
+ store.add(stix2.parse(multipart_email_msg_dict))
+
# read_obj = store.get(directory_stix_object.id)
# print(read_obj)
else:
diff --git a/stix2/datastore/relational_db/table_creation.py b/stix2/datastore/relational_db/table_creation.py
index b9ca5641..083b37df 100644
--- a/stix2/datastore/relational_db/table_creation.py
+++ b/stix2/datastore/relational_db/table_creation.py
@@ -5,9 +5,9 @@
from stix2.datastore.relational_db.add_method import add_method
from stix2.datastore.relational_db.utils import (
- SCO_COMMON_PROPERTIES, SDO_COMMON_PROPERTIES, canonicalize_table_name,
- determine_column_name, determine_sql_type_from_stix, flat_classes,
- get_stix_object_classes,
+ canonicalize_table_name, determine_column_name, determine_core_properties,
+ determine_sql_type_from_stix, flat_classes, get_stix_object_classes,
+ shorten_extension_definition_id,
)
from stix2.properties import (
BinaryProperty, BooleanProperty, DictionaryProperty,
@@ -275,7 +275,11 @@ def create_core_table(metadata, db_backend, stix_type_name):
Column("created", db_backend.determine_sql_type_for_timestamp_property()),
Column("modified", db_backend.determine_sql_type_for_timestamp_property()),
Column("revoked", db_backend.determine_sql_type_for_boolean_property()),
- Column("confidence", db_backend.determine_sql_type_for_integer_property()),
+ Column(
+ "confidence",
+ db_backend.determine_sql_type_for_integer_property(),
+ db_backend.create_min_max_constraint_expression(IntegerProperty(min=0, max=100), "confidence"),
+ ),
Column("lang", db_backend.determine_sql_type_for_string_property()),
]
columns.extend(sdo_columns)
@@ -481,7 +485,7 @@ def generate_table_information(self, name, db_backend, metadata, schema_name, ta
),
Column(
"value",
- db_backend.determine_sql_type_for_string_property(),
+ contained_class.determine_sql_type(db_backend),
nullable=False,
),
]
@@ -630,6 +634,7 @@ def generate_table_information(self, name, db_backend, **kwargs): # noqa: F811
return Column(
name,
self.determine_sql_type(db_backend),
+ db_backend.create_min_max_constraint_expression(self, name),
nullable=not self.required,
default=self._fixed_value if hasattr(self, "_fixed_value") else None,
)
@@ -720,6 +725,8 @@ def generate_table_information(self, name, db_backend, metadata, schema_name, ta
elif self.contained == KillChainPhase:
tables.append(create_kill_chain_phases_table(name, metadata, db_backend, schema_name, table_name))
return tables
+ elif isinstance(self.contained, DictionaryProperty):
+ raise NotImplementedError(f"A list of dictionaries property for {table_name}.{name} is not supported for the RDB DataStore yet") # noqa: E131
else:
# if ARRAY is not allowed, it is handled by a previous if clause
if isinstance(self.contained, Property):
@@ -800,19 +807,13 @@ def generate_object_table(
else:
table_name = stix_object_class.__name__
# avoid long table names
- if table_name.startswith("extension-definition"):
- table_name = table_name[0:30]
- table_name = table_name.replace("extension-definition-", "ext_def")
+ if table_name.startswith("extension-definition--"):
+ # table_name = table_name[0:30]
+ # table_name = table_name.replace("extension-definition-", "ext_def")
+ table_name = shorten_extension_definition_id(table_name)
if parent_table_name:
table_name = parent_table_name + "_" + table_name
- if is_embedded_object:
- core_properties = list()
- elif schema_name in ["sdo", "sro", "common"]:
- core_properties = SDO_COMMON_PROPERTIES
- elif schema_name == "sco":
- core_properties = SCO_COMMON_PROPERTIES
- else:
- core_properties = list()
+ core_properties = determine_core_properties(stix_object_class, is_embedded_object)
columns = list()
tables = list()
if issubclass(stix_object_class, _Observable):
diff --git a/stix2/datastore/relational_db/utils.py b/stix2/datastore/relational_db/utils.py
index 65257426..2ba661f6 100644
--- a/stix2/datastore/relational_db/utils.py
+++ b/stix2/datastore/relational_db/utils.py
@@ -7,6 +7,7 @@
IntegerProperty, Property, ReferenceProperty, StringProperty,
TimestampProperty,
)
+import stix2.v21
from stix2.v21.base import (
_DomainObject, _Extension, _MetaObject, _Observable, _RelationshipObject,
)
@@ -40,6 +41,17 @@
}
+def determine_core_properties(stix_object_class, is_embedded_object):
+ if is_embedded_object or issubclass(stix_object_class, _Extension):
+ return list()
+ elif issubclass(stix_object_class, (_MetaObject, _RelationshipObject, _DomainObject)):
+ return SDO_COMMON_PROPERTIES
+ elif issubclass(stix_object_class, _Observable):
+ return SCO_COMMON_PROPERTIES
+ else:
+ raise ValueError(f"{stix_object_class} not a STIX object")
+
+
def canonicalize_table_name(table_name, schema_name=None):
if schema_name:
full_name = schema_name + "." + table_name
@@ -68,21 +80,41 @@ def get_all_subclasses(cls):
return all_subclasses
+def see_through_workbench(cls):
+ """
+ Deal with the workbench patching the registry. This takes the given
+ "class" as obtained from the registry, and tries to find a real type.
+ The workbench replaces real types with "partial" objects, which causes
+ errors if used in type-specific contexts, e.g. issubclass().
+
+ :param cls: A registry-obtained "class" value
+ :return: A real class value
+ """
+ if hasattr(cls, "args"):
+ # The partial object's one stored positional arg is a subclass
+ # of the class we need. But it will do.
+ return cls.args[0]
+ else:
+ return cls
+
+
def get_stix_object_classes():
- yield from get_all_subclasses(_DomainObject)
- yield from get_all_subclasses(_RelationshipObject)
- yield from get_all_subclasses(_Observable)
- yield from get_all_subclasses(_MetaObject)
- # Non-object extensions (property or toplevel-property only)
- for ext_cls in get_all_subclasses(_Extension):
- if ext_cls.extension_type not in (
+ for type_, cls in stix2.v21.OBJ_MAP.items():
+ if type_ != "bundle":
+ yield see_through_workbench(cls)
+
+ # The workbench only patches SDO types, so we shouldn't have to do the
+ # same hackage with other kinds of types.
+ yield from stix2.v21.OBJ_MAP_OBSERVABLE.values()
+ yield from (
+ cls for cls in stix2.v21.EXT_MAP.values()
+ if cls.extension_type not in (
"new-sdo", "new-sco", "new-sro",
- ):
- yield ext_cls
+ )
+ )
def schema_for(stix_class):
-
if issubclass(stix_class, _DomainObject):
schema_name = "sdo"
elif issubclass(stix_class, _RelationshipObject):
@@ -95,7 +127,6 @@ def schema_for(stix_class):
schema_name = getattr(stix_class, "_applies_to", "sco")
else:
schema_name = None
-
return schema_name
@@ -169,3 +200,12 @@ def determine_column_name(cls_or_inst): # noqa: F811
return "string_value"
elif is_class_or_instance(cls_or_inst, TimestampProperty):
return "timestamp_value"
+
+
+def shorten_extension_definition_id(id):
+ id_parts = id.split("--")
+ uuid_parts = id_parts[1].split("-")
+ shortened_part = ""
+ for p in uuid_parts:
+ shortened_part = shortened_part + p[0] + p[-1]
+ return "ext_def_" + shortened_part
diff --git a/stix2/test/v20/test_datastore.py b/stix2/test/v20/test_datastore.py
index 8bb5494c..bd9e3e44 100644
--- a/stix2/test/v20/test_datastore.py
+++ b/stix2/test/v20/test_datastore.py
@@ -65,9 +65,8 @@ def test_datastore_related_to_raises():
def test_datastore_add_raises():
- with pytest.raises(AttributeError) as excinfo:
+ with pytest.raises(AttributeError):
DataStoreMixin().add(CAMPAIGN_MORE_KWARGS)
- assert "DataStoreMixin has no data sink to put objects in" == str(excinfo.value)
def test_composite_datastore_get_raises_error():
diff --git a/stix2/test/v20/test_environment.py b/stix2/test/v20/test_environment.py
index 8dd4eca7..9e7377e1 100644
--- a/stix2/test/v20/test_environment.py
+++ b/stix2/test/v20/test_environment.py
@@ -221,9 +221,8 @@ def test_environment_datastore_and_sink():
def test_environment_no_datastore():
env = stix2.Environment(factory=stix2.ObjectFactory())
- with pytest.raises(AttributeError) as excinfo:
+ with pytest.raises(AttributeError):
env.add(stix2.v20.Indicator(**INDICATOR_KWARGS))
- assert 'Environment has no data sink to put objects in' in str(excinfo.value)
with pytest.raises(AttributeError) as excinfo:
env.get(INDICATOR_ID)
diff --git a/stix2/test/v21/test_custom.py b/stix2/test/v21/test_custom.py
index 03a8357b..6ecbef5d 100644
--- a/stix2/test/v21/test_custom.py
+++ b/stix2/test/v21/test_custom.py
@@ -5,6 +5,8 @@
import stix2
import stix2.base
+import stix2.exceptions
+import stix2.properties
import stix2.registration
import stix2.registry
import stix2.v21
@@ -24,12 +26,48 @@
)
+@contextlib.contextmanager
+def _register_custom_sdo(ext_class, stix_type, props):
+ ext_class = stix2.v21.CustomObject(
+ stix_type, props, None, True,
+ )(ext_class)
+
+ try:
+ yield ext_class
+ finally:
+ del stix2.registry.STIX2_OBJ_MAPS["2.1"]["objects"][stix_type]
+
+
+@contextlib.contextmanager
+def _register_custom_sco(ext_class, stix_type, props, id_contrib_props=None):
+ ext_class = stix2.v21.CustomObservable(
+ stix_type, props, id_contrib_props,
+ )(ext_class)
+
+ try:
+ yield ext_class
+ finally:
+ del stix2.registry.STIX2_OBJ_MAPS["2.1"]["observables"][stix_type]
+
+
+@contextlib.contextmanager
+def _register_custom_marking(ext_class, marking_type, props):
+ ext_class = stix2.v21.CustomMarking(marking_type, props)(
+ ext_class,
+ )
+
+ try:
+ yield ext_class
+ finally:
+ del stix2.registry.STIX2_OBJ_MAPS["2.1"]["markings"][marking_type]
+
+
@contextlib.contextmanager
def _register_extension(ext, props):
"""
A contextmanager useful for registering an extension and then ensuring
it gets unregistered again. A random extension-definition STIX ID is
- generated for the extension and yielded as the contextmanager's value.
+ generated for the extension. The resulting extension class is yielded.
:param ext: The class which would normally be decorated with the
CustomExtension decorator.
@@ -39,18 +77,98 @@ def _register_extension(ext, props):
ext_def_id = "extension-definition--" + str(uuid.uuid4())
- stix2.v21.CustomExtension(
+ ext_class = stix2.v21.CustomExtension(
ext_def_id,
props,
)(ext)
try:
- yield ext_def_id
+ yield ext_class
finally:
# "unregister" the extension
del stix2.registry.STIX2_OBJ_MAPS["2.1"]["extensions"][ext_def_id]
+@contextlib.contextmanager
+def _register_subtype_extension(ext_class, name, props):
+ ext_class = stix2.v21.CustomExtension(name, props)(
+ ext_class,
+ )
+
+ try:
+ yield ext_class
+ finally:
+ # "unregister" the extension
+ del stix2.registry.STIX2_OBJ_MAPS["2.1"]["extensions"][name]
+
+
+@contextlib.contextmanager
+def _register_sdo_extension(ext_class, stix_type, props):
+ ext_def_id = "extension-definition--" + str(uuid.uuid4())
+
+ ext_class = stix2.v21.CustomObject(
+ stix_type, props, ext_def_id, True,
+ )(ext_class)
+
+ try:
+ yield ext_class
+ finally:
+ # "unregister" the extension
+ del stix2.registry.STIX2_OBJ_MAPS["2.1"]["objects"][stix_type]
+ del stix2.registry.STIX2_OBJ_MAPS["2.1"]["extensions"][ext_def_id]
+
+
+@contextlib.contextmanager
+def _register_sro_extension(ext_class, stix_type, props):
+ ext_def_id = "extension-definition--" + str(uuid.uuid4())
+
+ ext_class = stix2.v21.CustomObject(
+ stix_type, props, ext_def_id, False,
+ )(ext_class)
+
+ try:
+ yield ext_class
+ finally:
+ # "unregister" the extension
+ del stix2.registry.STIX2_OBJ_MAPS["2.1"]["objects"][stix_type]
+ del stix2.registry.STIX2_OBJ_MAPS["2.1"]["extensions"][ext_def_id]
+
+
+@contextlib.contextmanager
+def _register_sco_extension(ext_class, stix_type, props, id_contrib_props=None):
+ ext_def_id = "extension-definition--" + str(uuid.uuid4())
+
+ ext_class = stix2.v21.CustomObservable(
+ stix_type, props, id_contrib_props, ext_def_id,
+ )(ext_class)
+
+ try:
+ yield ext_class
+ finally:
+ # "unregister" the extension
+ del stix2.registry.STIX2_OBJ_MAPS["2.1"]["observables"][stix_type]
+ del stix2.registry.STIX2_OBJ_MAPS["2.1"]["extensions"][ext_def_id]
+
+
+@pytest.fixture
+def custom_subtype_extension_type():
+ class NewExtension:
+ def __init__(self, property2=None, **kwargs):
+ if property2 and property2 < 10:
+ raise ValueError("'property2' is too small.")
+ if "property3" in kwargs and not isinstance(kwargs.get("property3"), int):
+ raise TypeError("Must be integer!")
+
+ with _register_subtype_extension(
+ NewExtension,
+ 'x-new-ext', [
+ ('property1', stix2.properties.StringProperty(required=True)),
+ ('property2', stix2.properties.IntegerProperty()),
+ ],
+ ) as ext:
+ yield ext
+
+
def test_identity_custom_property():
identity = stix2.v21.Identity(
id=IDENTITY_ID,
@@ -375,29 +493,29 @@ class NewObj():
def test_custom_marking_no_init_1():
- @stix2.v21.CustomMarking(
- 'x-new-obj', [
- ('property1', stix2.properties.StringProperty(required=True)),
- ],
- )
- class NewObj():
+ class NewObj:
pass
- no = NewObj(property1='something')
- assert no.property1 == 'something'
+ with _register_custom_marking(
+ NewObj, 'x-new-obj', [
+ ('property1', stix2.properties.StringProperty(required=True)),
+ ],
+ ) as marking_class:
+ no = marking_class(property1='something')
+ assert no.property1 == 'something'
def test_custom_marking_no_init_2():
- @stix2.v21.CustomMarking(
- 'x-new-obj2', [
- ('property1', stix2.properties.StringProperty(required=True)),
- ],
- )
class NewObj2(object):
pass
- no2 = NewObj2(property1='something')
- assert no2.property1 == 'something'
+ with _register_custom_marking(
+ NewObj2, 'x-new-obj2', [
+ ('property1', stix2.properties.StringProperty(required=True)),
+ ],
+ ) as marking_class:
+ no2 = marking_class(property1='something')
+ assert no2.property1 == 'something'
def test_custom_marking_invalid_type_name():
@@ -433,77 +551,89 @@ class NewObj3(object):
def test_register_duplicate_marking():
- with pytest.raises(DuplicateRegistrationError) as excinfo:
- @stix2.v21.CustomMarking(
- 'x-new-obj', [
- ('property1', stix2.properties.StringProperty(required=True)),
- ],
- )
- class NewObj2():
- pass
- assert "cannot be registered again" in str(excinfo.value)
+ class NewObj:
+ pass
+
+ with _register_custom_marking(
+ NewObj, 'x-new-obj', [
+ ('property1', stix2.properties.StringProperty(required=True)),
+ ],
+ ):
+ with pytest.raises(DuplicateRegistrationError) as excinfo:
+ @stix2.v21.CustomMarking(
+ 'x-new-obj', [
+ ('property1', stix2.properties.StringProperty(required=True)),
+ ],
+ )
+ class NewObj2:
+ pass
+ assert "cannot be registered again" in str(excinfo.value)
# Custom Objects
-@stix2.v21.CustomObject(
- 'x-new-type', [
- ('property1', stix2.properties.StringProperty(required=True)),
- ('property2', stix2.properties.IntegerProperty()),
- ],
-)
-class NewType(object):
- def __init__(self, property2=None, **kwargs):
- if property2 and property2 < 10:
- raise ValueError("'property2' is too small.")
- if "property3" in kwargs and not isinstance(kwargs.get("property3"), int):
- raise TypeError("Must be integer!")
+@pytest.fixture
+def custom_sdo_type():
+ class NewType(object):
+ def __init__(self, property2=None, **kwargs):
+ if property2 and property2 < 10:
+ raise ValueError("'property2' is too small.")
+ if "property3" in kwargs and not isinstance(kwargs.get("property3"), int):
+ raise TypeError("Must be integer!")
+
+ with _register_custom_sdo(
+ NewType, 'x-new-type', [
+ ('property1', stix2.properties.StringProperty(required=True)),
+ ('property2', stix2.properties.IntegerProperty()),
+ ],
+ ) as sdo_class:
+ yield sdo_class
-def test_custom_object_raises_exception():
+def test_custom_object_raises_exception(custom_sdo_type):
with pytest.raises(TypeError) as excinfo:
- NewType(property1='something', property3='something', allow_custom=True)
+ custom_sdo_type(property1='something', property3='something', allow_custom=True)
assert str(excinfo.value) == "Must be integer!"
-def test_custom_object_type():
- nt = NewType(property1='something')
+def test_custom_object_type(custom_sdo_type):
+ nt = custom_sdo_type(property1='something')
assert nt.property1 == 'something'
with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo:
- NewType(property2=42)
+ custom_sdo_type(property2=42)
assert "No values for required properties" in str(excinfo.value)
with pytest.raises(ValueError) as excinfo:
- NewType(property1='something', property2=4)
+ custom_sdo_type(property1='something', property2=4)
assert "'property2' is too small." in str(excinfo.value)
def test_custom_object_no_init_1():
- @stix2.v21.CustomObject(
- 'x-new-obj', [
- ('property1', stix2.properties.StringProperty(required=True)),
- ],
- )
- class NewObj():
+ class NewObj:
pass
- no = NewObj(property1='something')
- assert no.property1 == 'something'
+ with _register_custom_sdo(
+ NewObj, 'x-new-obj', [
+ ('property1', stix2.properties.StringProperty(required=True)),
+ ],
+ ) as sdo_class:
+ no = sdo_class(property1='something')
+ assert no.property1 == 'something'
def test_custom_object_no_init_2():
- @stix2.v21.CustomObject(
- 'x-new-obj2', [
- ('property1', stix2.properties.StringProperty(required=True)),
- ],
- )
class NewObj2(object):
pass
- no2 = NewObj2(property1='something')
- assert no2.property1 == 'something'
+ with _register_custom_sdo(
+ NewObj2, 'x-new-obj2', [
+ ('property1', stix2.properties.StringProperty(required=True)),
+ ],
+ ) as sdo_class:
+ no2 = sdo_class(property1='something')
+ assert no2.property1 == 'something'
def test_custom_object_invalid_type_name():
@@ -539,22 +669,26 @@ class NewObj3(object):
def test_custom_object_ref_property_containing_identifier():
- @stix2.v21.CustomObject(
- 'x-new-obj-with-ref', [
+ class NewObs:
+ pass
+
+ with _register_custom_sdo(
+ NewObs, 'x-new-obj-with-ref', [
('property_ref', stix2.properties.ReferenceProperty(invalid_types=[])),
],
- )
- class NewObs():
+ ):
pass
def test_custom_object_refs_property_containing_identifiers():
- @stix2.v21.CustomObject(
- 'x-new-obj-with-refs', [
+ class NewObs:
+ pass
+
+ with _register_custom_sdo(
+ NewObs, 'x-new-obj-with-refs', [
('property_refs', stix2.properties.ListProperty(stix2.properties.ReferenceProperty(invalid_types=[]))),
],
- )
- class NewObs():
+ ):
pass
@@ -698,70 +832,72 @@ def test_parse_unregistered_custom_object_type_w_allow_custom():
# Custom SCOs
-@stix2.v21.CustomObservable(
- 'x-new-observable', [
- ('property1', stix2.properties.StringProperty(required=True)),
- ('property2', stix2.properties.IntegerProperty()),
- ('x_property3', stix2.properties.BooleanProperty()),
- ],
-)
-class NewObservable():
- def __init__(self, property2=None, **kwargs):
- if property2 and property2 < 10:
- raise ValueError("'property2' is too small.")
- if "property3" in kwargs and not isinstance(kwargs.get("property3"), int):
- raise TypeError("Must be integer!")
+@pytest.fixture
+def custom_sco_type():
+ class NewObservable:
+ def __init__(self, property2=None, **kwargs):
+ if property2 and property2 < 10:
+ raise ValueError("'property2' is too small.")
+ if "property3" in kwargs and not isinstance(kwargs.get("property3"), int):
+ raise TypeError("Must be integer!")
+
+ with _register_custom_sco(
+ NewObservable, 'x-new-observable', [
+ ('property1', stix2.properties.StringProperty(required=True)),
+ ('property2', stix2.properties.IntegerProperty()),
+ ('x_property3', stix2.properties.BooleanProperty()),
+ ],
+ ) as sco_type:
+ yield sco_type
-def test_custom_observable_object_1():
- no = NewObservable(property1='something')
+def test_custom_observable_object_1(custom_sco_type):
+ no = custom_sco_type(property1='something')
assert no.property1 == 'something'
-def test_custom_observable_object_2():
+def test_custom_observable_object_2(custom_sco_type):
with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo:
- NewObservable(property2=42)
+ custom_sco_type(property2=42)
assert excinfo.value.properties == ['property1']
assert "No values for required properties" in str(excinfo.value)
-def test_custom_observable_object_3():
+def test_custom_observable_object_3(custom_sco_type):
with pytest.raises(ValueError) as excinfo:
- NewObservable(property1='something', property2=4)
+ custom_sco_type(property1='something', property2=4)
assert "'property2' is too small." in str(excinfo.value)
-def test_custom_observable_raises_exception():
+def test_custom_observable_raises_exception(custom_sco_type):
with pytest.raises(TypeError) as excinfo:
- NewObservable(property1='something', property3='something', allow_custom=True)
+ custom_sco_type(property1='something', property3='something', allow_custom=True)
assert str(excinfo.value) == "Must be integer!"
def test_custom_observable_object_no_init_1():
- @stix2.v21.CustomObservable(
- 'x-new-observable-2', [
+ class AnSCO:
+ pass
+ with _register_custom_sco(
+ AnSCO, 'x-new-observable-2', [
('property1', stix2.properties.StringProperty()),
],
- )
- class NewObs():
- pass
-
- no = NewObs(property1='something')
- assert no.property1 == 'something'
+ ) as sco_type:
+ no = sco_type(property1='something')
+ assert no.property1 == 'something'
def test_custom_observable_object_no_init_2():
- @stix2.v21.CustomObservable(
- 'x-new-obs2', [
+ class AnSCO:
+ pass
+ with _register_custom_sco(
+ AnSCO, 'x-new-obs2', [
('property1', stix2.properties.StringProperty()),
],
- )
- class NewObs2(object):
- pass
-
- no2 = NewObs2(property1='something')
- assert no2.property1 == 'something'
+ ) as sco_type:
+ no2 = sco_type(property1='something')
+ assert no2.property1 == 'something'
def test_invalid_custom_property_in_custom_observable_object():
@@ -809,22 +945,26 @@ class NewObs3(object):
def test_custom_observable_object_ref_property_as_identifier():
- @stix2.v21.CustomObservable(
- 'x-new-obs-with-ref', [
+ class NewObs:
+ pass
+
+ with _register_custom_sco(
+ NewObs, 'x-new-obs-with-ref', [
('property_ref', stix2.properties.ReferenceProperty(invalid_types=[])),
],
- )
- class NewObs():
+ ):
pass
def test_custom_observable_object_refs_property_containing_identifiers():
- @stix2.v21.CustomObservable(
- 'x-new-obs-with-refs', [
+ class NewObs:
+ pass
+
+ with _register_custom_sco(
+ NewObs, 'x-new-obs-with-refs', [
('property_refs', stix2.properties.ListProperty(stix2.properties.ReferenceProperty(invalid_types=[]))),
],
- )
- class NewObs():
+ ):
pass
@@ -902,12 +1042,12 @@ class NewObject2(object):
pass
-def test_parse_custom_observable_object():
+def test_parse_custom_observable_object(custom_sco_type):
nt_string = """{
"type": "x-new-observable",
"property1": "something"
}"""
- nt = stix2.parse(nt_string, [], version='2.1')
+ nt = stix2.parse(nt_string, False, version='2.1')
assert isinstance(nt, stix2.base._STIXBase)
assert nt.property1 == 'something'
@@ -968,15 +1108,15 @@ def test_parse_invalid_custom_observable_object():
assert "Can't parse object with no 'type' property" in str(excinfo.value)
-def test_observable_custom_property():
+def test_observable_custom_property(custom_sco_type):
with pytest.raises(ValueError) as excinfo:
- NewObservable(
+ custom_sco_type(
property1='something',
custom_properties="foobar",
)
assert "'custom_properties' must be a dictionary" in str(excinfo.value)
- no = NewObservable(
+ no = custom_sco_type(
property1='something',
custom_properties={
"foo": "bar",
@@ -985,9 +1125,9 @@ def test_observable_custom_property():
assert no.foo == "bar"
-def test_observable_custom_property_invalid():
+def test_observable_custom_property_invalid(custom_sco_type):
with pytest.raises(stix2.exceptions.ExtraPropertiesError) as excinfo:
- NewObservable(
+ custom_sco_type(
property1='something',
x_foo="bar",
)
@@ -995,8 +1135,8 @@ def test_observable_custom_property_invalid():
assert "Unexpected properties for" in str(excinfo.value)
-def test_observable_custom_property_allowed():
- no = NewObservable(
+def test_observable_custom_property_allowed(custom_sco_type):
+ no = custom_sco_type(
property1='something',
x_foo="bar",
allow_custom=True,
@@ -1004,8 +1144,8 @@ def test_observable_custom_property_allowed():
assert no.x_foo == "bar"
-def test_observed_data_with_custom_observable_object():
- no = NewObservable(property1='something')
+def test_observed_data_with_custom_observable_object(custom_sco_type):
+ no = custom_sco_type(property1='something')
ob_data = stix2.v21.ObservedData(
first_observed=FAKE_TIME,
last_observed=FAKE_TIME,
@@ -1017,115 +1157,103 @@ def test_observed_data_with_custom_observable_object():
def test_custom_observable_object_det_id_1():
- @stix2.v21.CustomObservable(
- 'x-det-id-observable-1', [
+ class DetIdObs1:
+ pass
+
+ with _register_custom_sco(
+ DetIdObs1, 'x-det-id-observable-1', [
('property1', stix2.properties.StringProperty(required=True)),
('property2', stix2.properties.IntegerProperty()),
], [
'property1',
],
- )
- class DetIdObs1():
- pass
+ ) as sco_class:
- dio_1 = DetIdObs1(property1='I am property1!', property2=42)
- dio_2 = DetIdObs1(property1='I am property1!', property2=24)
- assert dio_1.property1 == dio_2.property1 == 'I am property1!'
- assert dio_1.id == dio_2.id
+ dio_1 = sco_class(property1='I am property1!', property2=42)
+ dio_2 = sco_class(property1='I am property1!', property2=24)
+ assert dio_1.property1 == dio_2.property1 == 'I am property1!'
+ assert dio_1.id == dio_2.id
- uuid_obj = uuid.UUID(dio_1.id[-36:])
- assert uuid_obj.variant == uuid.RFC_4122
- assert uuid_obj.version == 5
+ uuid_obj = uuid.UUID(dio_1.id[-36:])
+ assert uuid_obj.variant == uuid.RFC_4122
+ assert uuid_obj.version == 5
- dio_3 = DetIdObs1(property1='I am property1!', property2=42)
- dio_4 = DetIdObs1(property1='I am also property1!', property2=24)
- assert dio_3.property1 == 'I am property1!'
- assert dio_4.property1 == 'I am also property1!'
- assert dio_3.id != dio_4.id
+ dio_3 = sco_class(property1='I am property1!', property2=42)
+ dio_4 = sco_class(property1='I am also property1!', property2=24)
+ assert dio_3.property1 == 'I am property1!'
+ assert dio_4.property1 == 'I am also property1!'
+ assert dio_3.id != dio_4.id
def test_custom_observable_object_det_id_2():
- @stix2.v21.CustomObservable(
- 'x-det-id-observable-2', [
+ class DetIdObs2:
+ pass
+
+ with _register_custom_sco(
+ DetIdObs2, 'x-det-id-observable-2', [
('property1', stix2.properties.StringProperty(required=True)),
('property2', stix2.properties.IntegerProperty()),
], [
'property1', 'property2',
],
- )
- class DetIdObs2():
- pass
+ ) as sco_class:
- dio_1 = DetIdObs2(property1='I am property1!', property2=42)
- dio_2 = DetIdObs2(property1='I am property1!', property2=42)
- assert dio_1.property1 == dio_2.property1 == 'I am property1!'
- assert dio_1.property2 == dio_2.property2 == 42
- assert dio_1.id == dio_2.id
+ dio_1 = sco_class(property1='I am property1!', property2=42)
+ dio_2 = sco_class(property1='I am property1!', property2=42)
+ assert dio_1.property1 == dio_2.property1 == 'I am property1!'
+ assert dio_1.property2 == dio_2.property2 == 42
+ assert dio_1.id == dio_2.id
- dio_3 = DetIdObs2(property1='I am property1!', property2=42)
- dio_4 = DetIdObs2(property1='I am also property1!', property2=42)
- assert dio_3.property1 == 'I am property1!'
- assert dio_4.property1 == 'I am also property1!'
- assert dio_3.property2 == dio_4.property2 == 42
- assert dio_3.id != dio_4.id
+ dio_3 = sco_class(property1='I am property1!', property2=42)
+ dio_4 = sco_class(property1='I am also property1!', property2=42)
+ assert dio_3.property1 == 'I am property1!'
+ assert dio_4.property1 == 'I am also property1!'
+ assert dio_3.property2 == dio_4.property2 == 42
+ assert dio_3.id != dio_4.id
def test_custom_observable_object_no_id_contrib_props():
- @stix2.v21.CustomObservable(
- 'x-det-id-observable-3', [
- ('property1', stix2.properties.StringProperty(required=True)),
- ],
- )
- class DetIdObs3():
+ class DetIdObs3:
pass
- dio = DetIdObs3(property1="I am property1!")
+ with _register_custom_sco(
+ DetIdObs3, 'x-det-id-observable-3', [
+ ('property1', stix2.properties.StringProperty(required=True)),
+ ],
+ ) as sco_class:
+ dio = sco_class(property1="I am property1!")
- uuid_obj = uuid.UUID(dio.id[-36:])
- assert uuid_obj.variant == uuid.RFC_4122
- assert uuid_obj.version == 4
+ uuid_obj = uuid.UUID(dio.id[-36:])
+ assert uuid_obj.variant == uuid.RFC_4122
+ assert uuid_obj.version == 4
# Custom Extensions
-@stix2.v21.CustomExtension(
- 'x-new-ext', [
- ('property1', stix2.properties.StringProperty(required=True)),
- ('property2', stix2.properties.IntegerProperty()),
- ],
-)
-class NewExtension():
- def __init__(self, property2=None, **kwargs):
- if property2 and property2 < 10:
- raise ValueError("'property2' is too small.")
- if "property3" in kwargs and not isinstance(kwargs.get("property3"), int):
- raise TypeError("Must be integer!")
-
-
-def test_custom_extension_raises_exception():
+def test_custom_extension_raises_exception(custom_subtype_extension_type):
with pytest.raises(TypeError) as excinfo:
- NewExtension(property1='something', property3='something', allow_custom=True)
+ custom_subtype_extension_type(property1='something', property3='something', allow_custom=True)
assert str(excinfo.value) == "Must be integer!"
-def test_custom_extension():
- ext = NewExtension(property1='something')
+def test_custom_extension(custom_subtype_extension_type):
+ ext = custom_subtype_extension_type(property1='something')
assert ext.property1 == 'something'
with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo:
- NewExtension(property2=42)
+ custom_subtype_extension_type(property2=42)
assert excinfo.value.properties == ['property1']
assert str(excinfo.value) == "No values for required properties for NewExtension: (property1)."
with pytest.raises(ValueError) as excinfo:
- NewExtension(property1='something', property2=4)
+ custom_subtype_extension_type(property1='something', property2=4)
assert str(excinfo.value) == "'property2' is too small."
-def test_custom_extension_wrong_observable_type():
+def test_custom_extension_wrong_observable_type(custom_subtype_extension_type):
# NewExtension is an extension of DomainName, not File
- ext = NewExtension(property1='something')
+ ext = custom_subtype_extension_type(property1='something')
with pytest.raises(InvalidValueError) as excinfo:
stix2.v21.File(
name="abc.txt",
@@ -1150,16 +1278,17 @@ def test_custom_extension_wrong_observable_type():
],
)
def test_custom_extension_with_list_and_dict_properties_observable_type(data):
- @stix2.v21.CustomExtension(
- 'x-some-extension-ext', [
- ('keys', stix2.properties.ListProperty(stix2.properties.DictionaryProperty, required=True)),
- ],
- )
class SomeCustomExtension:
pass
- example = SomeCustomExtension(keys=[{'test123': 123, 'test345': 'aaaa'}])
- assert data == example.serialize(pretty=True)
+ with _register_subtype_extension(
+ SomeCustomExtension,
+ "x-some-extension-ext", [
+ ('keys', stix2.properties.ListProperty(stix2.properties.DictionaryProperty, required=True)),
+ ],
+ ) as ext_type:
+ example = ext_type(keys=[{'test123': 123, 'test345': 'aaaa'}])
+ assert data == example.serialize(pretty=True)
def test_custom_extension_invalid_type_name():
@@ -1216,29 +1345,31 @@ class BarExtension():
def test_custom_extension_no_init_1():
- @stix2.v21.CustomExtension(
- 'x-new-extension-ext', [
- ('property1', stix2.properties.StringProperty(required=True)),
- ],
- )
- class NewExt():
+ class NewExt:
pass
- ne = NewExt(property1="foobar")
- assert ne.property1 == "foobar"
+ with _register_subtype_extension(
+ NewExt,
+ 'x-new-extension-ext', [
+ ('property1', stix2.properties.StringProperty(required=True)),
+ ],
+ ) as ext_class:
+ ne = ext_class(property1="foobar")
+ assert ne.property1 == "foobar"
def test_custom_extension_no_init_2():
- @stix2.v21.CustomExtension(
- 'x-new2-ext', [
- ('property1', stix2.properties.StringProperty(required=True)),
- ],
- )
class NewExt2(object):
pass
- ne2 = NewExt2(property1="foobar")
- assert ne2.property1 == "foobar"
+ with _register_subtype_extension(
+ NewExt2,
+ 'x-new-extension-ext', [
+ ('property1', stix2.properties.StringProperty(required=True)),
+ ],
+ ) as ext_class:
+ ne = ext_class(property1="foobar")
+ assert ne.property1 == "foobar"
def test_invalid_custom_property_in_extension():
@@ -1254,7 +1385,7 @@ class NewExt():
assert "must begin with an alpha character." in str(excinfo.value)
-def test_parse_observable_with_custom_extension():
+def test_parse_observable_with_custom_extension(custom_subtype_extension_type):
input_str = """{
"type": "domain-name",
"value": "example.com",
@@ -1423,9 +1554,17 @@ class CustomObject2(object):
assert '@CustomObject decorator' in str(excinfo)
-def test_extension_property_location():
- assert 'extensions' in stix2.v21.OBJ_MAP_OBSERVABLE['x-new-observable']._properties
- assert 'extensions' not in stix2.v21.EXT_MAP['x-new-ext']._properties
+def test_extension_property_location(custom_sco_type):
+ class AnExt:
+ pass
+ with _register_subtype_extension(
+ AnExt, 'x-new-ext', [
+ ('property1', stix2.properties.StringProperty(required=True)),
+ ('property2', stix2.properties.IntegerProperty()),
+ ],
+ ):
+ assert 'extensions' in stix2.v21.OBJ_MAP_OBSERVABLE['x-new-observable']._properties
+ assert 'extensions' not in stix2.v21.EXT_MAP['x-new-ext']._properties
@pytest.mark.parametrize(
@@ -1446,36 +1585,40 @@ def test_extension_property_location():
],
)
def test_custom_object_nested_dictionary(data):
- @stix2.v21.CustomObject(
- 'x-example', [
- ('dictionary', stix2.properties.DictionaryProperty()),
- ],
- )
class Example(object):
def __init__(self, **kwargs):
pass
- example = Example(
- id='x-example--336d8a9f-91f1-46c5-b142-6441bb9f8b8d',
- created='2018-06-12T16:20:58.059Z',
- modified='2018-06-12T16:20:58.059Z',
- dictionary={'key': {'key_b': 'value', 'key_a': 'value'}},
- )
+ with _register_custom_sdo(
+ Example, 'x-example', [
+ ('dictionary', stix2.properties.DictionaryProperty()),
+ ],
+ ) as sdo_class:
+ example = sdo_class(
+ id='x-example--336d8a9f-91f1-46c5-b142-6441bb9f8b8d',
+ created='2018-06-12T16:20:58.059Z',
+ modified='2018-06-12T16:20:58.059Z',
+ dictionary={'key': {'key_b': 'value', 'key_a': 'value'}},
+ )
- assert data == example.serialize(pretty=True)
+ assert data == example.serialize(pretty=True)
-@stix2.v21.CustomObject(
- 'x-new-type-2', [
- ('property1', stix2.properties.StringProperty()),
- ('property2', stix2.properties.IntegerProperty()),
- ],
-)
-class NewType3(object):
- pass
+@pytest.fixture
+def custom_sdo_type2():
+ class NewType3(object):
+ pass
+
+ with _register_custom_sdo(
+ NewType3, 'x-new-type-2', [
+ ('property1', stix2.properties.StringProperty()),
+ ('property2', stix2.properties.IntegerProperty()),
+ ],
+ ) as sdo_class:
+ yield sdo_class
-def test_register_custom_object_with_version():
+def test_register_custom_object_with_version(custom_sdo_type2):
custom_obj_1 = {
"type": "x-new-type-2",
"id": "x-new-type-2--00000000-0000-4000-8000-000000000007",
@@ -1488,7 +1631,7 @@ def test_register_custom_object_with_version():
assert cust_obj_1.spec_version == "2.1"
-def test_register_duplicate_object_with_version():
+def test_register_duplicate_object_with_version(custom_sdo_type2):
with pytest.raises(DuplicateRegistrationError) as excinfo:
@stix2.v21.CustomObject(
'x-new-type-2', [
@@ -1501,60 +1644,74 @@ class NewType2(object):
assert "cannot be registered again" in str(excinfo.value)
-@stix2.v21.CustomObservable(
- 'x-new-observable-3', [
- ('property1', stix2.properties.StringProperty()),
- ],
-)
-class NewObservable3(object):
- pass
-
-
def test_register_observable():
- custom_obs = NewObservable3(property1="Test Observable")
+ class NewObservable3:
+ pass
+
+ with _register_custom_sco(
+ NewObservable3, 'x-new-observable-3', [
+ ('property1', stix2.properties.StringProperty()),
+ ],
+ ) as sco_class:
+ custom_obs = sco_class(property1="Test Observable")
- assert custom_obs.type in stix2.registry.STIX2_OBJ_MAPS['2.1']['observables']
+ assert custom_obs.type in stix2.registry.STIX2_OBJ_MAPS['2.1']['observables']
def test_register_duplicate_observable():
- with pytest.raises(DuplicateRegistrationError) as excinfo:
- @stix2.v21.CustomObservable(
- 'x-new-observable-2', [
- ('property1', stix2.properties.StringProperty()),
- ],
- )
- class NewObservable2(object):
- pass
- assert "cannot be registered again" in str(excinfo.value)
+ class AnSCO:
+ pass
+ with _register_custom_sco(
+ AnSCO, 'x-new-observable-2', [
+ ('property1', stix2.properties.StringProperty()),
+ ],
+ ):
+ with pytest.raises(DuplicateRegistrationError) as excinfo:
+ @stix2.v21.CustomObservable(
+ 'x-new-observable-2', [
+ ('property1', stix2.properties.StringProperty()),
+ ],
+ )
+ class NewObservable2(object):
+ pass
+ assert "cannot be registered again" in str(excinfo.value)
def test_register_observable_custom_extension():
- @stix2.v21.CustomExtension(
- 'x-new-2-ext', [
- ('property1', stix2.properties.StringProperty(required=True)),
- ('property2', stix2.properties.IntegerProperty()),
- ],
- )
- class NewExtension2():
+ class NewExtension2:
pass
- example = NewExtension2(property1="Hi there")
+ with _register_subtype_extension(
+ NewExtension2, 'x-new-2-ext', [
+ ('property1', stix2.properties.StringProperty(required=True)),
+ ('property2', stix2.properties.IntegerProperty()),
+ ],
+ ) as ext_class:
+ example = ext_class(property1="Hi there")
- assert 'domain-name' in stix2.registry.STIX2_OBJ_MAPS['2.1']['observables']
- assert example._type in stix2.registry.STIX2_OBJ_MAPS['2.1']['extensions']
+ assert 'domain-name' in stix2.registry.STIX2_OBJ_MAPS['2.1']['observables']
+ assert example._type in stix2.registry.STIX2_OBJ_MAPS['2.1']['extensions']
def test_register_duplicate_observable_extension():
- with pytest.raises(DuplicateRegistrationError) as excinfo:
- @stix2.v21.CustomExtension(
- 'x-new-2-ext', [
+ class SubtypeExt:
+ pass
+ with _register_subtype_extension(
+ SubtypeExt, 'x-new-2-ext', [
('property1', stix2.properties.StringProperty(required=True)),
('property2', stix2.properties.IntegerProperty()),
],
- )
- class NewExtension2():
- pass
- assert "cannot be registered again" in str(excinfo.value)
+ ):
+ with pytest.raises(DuplicateRegistrationError) as excinfo:
+ @stix2.v21.CustomExtension(
+ 'x-new-2-ext', [
+ ('property1', stix2.properties.StringProperty(required=True)),
+ ('property2', stix2.properties.IntegerProperty()),
+ ],
+ )
+ class NewExtension2():
+ pass
+ assert "cannot be registered again" in str(excinfo.value)
def test_unregistered_top_level_extension_passes_with_allow_custom_false():
@@ -1614,181 +1771,192 @@ def test_unregistered_embedded_extension_passes_with_allow_custom_false():
def test_registered_top_level_extension_passes_with_allow_custom_false():
- @stix2.v21.CustomExtension(
- 'extension-definition--d83fce45-ef58-4c6c-a3f4-1fbc32e98c6e', [
- ('rank', stix2.properties.IntegerProperty(required=True)),
- ('toxicity', stix2.properties.IntegerProperty(required=True)),
- ],
- )
class ExtensionFoo1:
extension_type = 'toplevel-property-extension'
- indicator = stix2.v21.Indicator(
- id='indicator--e97bfccf-8970-4a3c-9cd1-5b5b97ed5d0c',
- created='2014-02-20T09:16:08.989000Z',
- modified='2014-02-20T09:16:08.989000Z',
- name='File hash for Poison Ivy variant',
- description='This file hash indicates that a sample of Poison Ivy is present.',
- labels=[
- 'malicious-activity',
+ with _register_extension(
+ ExtensionFoo1, [
+ ('rank', stix2.properties.IntegerProperty(required=True)),
+ ('toxicity', stix2.properties.IntegerProperty(required=True)),
],
- rank=5,
- toxicity=8,
- pattern='[file:hashes.\'SHA-256\' = \'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c\']',
- pattern_type='stix',
- valid_from='2014-02-20T09:00:00.000000Z',
- extensions={
- 'extension-definition--d83fce45-ef58-4c6c-a3f4-1fbc32e98c6e': {
- 'extension_type': 'toplevel-property-extension',
+ ) as ext_class:
+ ext_id = ext_class._type
+
+ indicator = stix2.v21.Indicator(
+ id='indicator--e97bfccf-8970-4a3c-9cd1-5b5b97ed5d0c',
+ created='2014-02-20T09:16:08.989000Z',
+ modified='2014-02-20T09:16:08.989000Z',
+ name='File hash for Poison Ivy variant',
+ description='This file hash indicates that a sample of Poison Ivy is present.',
+ labels=[
+ 'malicious-activity',
+ ],
+ rank=5,
+ toxicity=8,
+ pattern='[file:hashes.\'SHA-256\' = \'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c\']',
+ pattern_type='stix',
+ valid_from='2014-02-20T09:00:00.000000Z',
+ extensions={
+ ext_id: {
+ 'extension_type': 'toplevel-property-extension',
+ },
},
- },
- allow_custom=False,
- )
- assert indicator.rank == 5
- assert indicator.toxicity == 8
- assert indicator.extensions['extension-definition--d83fce45-ef58-4c6c-a3f4-1fbc32e98c6e']['extension_type'] == 'toplevel-property-extension'
- assert isinstance(indicator.extensions['extension-definition--d83fce45-ef58-4c6c-a3f4-1fbc32e98c6e'], ExtensionFoo1)
+ allow_custom=False,
+ )
+ assert indicator.rank == 5
+ assert indicator.toxicity == 8
+ assert indicator.extensions[ext_id]['extension_type'] == 'toplevel-property-extension'
+ assert isinstance(indicator.extensions[ext_id], ext_class)
def test_registered_embedded_extension_passes_with_allow_custom_false():
- @stix2.v21.CustomExtension(
- 'extension-definition--d83fce45-ef58-4c6c-a3ff-1fbc32e98c6e', [
- ('rank', stix2.properties.IntegerProperty(required=True)),
- ('toxicity', stix2.properties.IntegerProperty(required=True)),
- ],
- )
class ExtensionFoo1:
extension_type = "property-extension"
- indicator = stix2.v21.Indicator(
- id='indicator--e97bfccf-8970-4a3c-9cd1-5b5b97ed5d0c',
- created='2014-02-20T09:16:08.989000Z',
- modified='2014-02-20T09:16:08.989000Z',
- name='File hash for Poison Ivy variant',
- description='This file hash indicates that a sample of Poison Ivy is present.',
- labels=[
- 'malicious-activity',
+ with _register_extension(
+ ExtensionFoo1, [
+ ('rank', stix2.properties.IntegerProperty(required=True)),
+ ('toxicity', stix2.properties.IntegerProperty(required=True)),
],
- pattern='[file:hashes.\'SHA-256\' = \'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c\']',
- pattern_type='stix',
- valid_from='2014-02-20T09:00:00.000000Z',
- extensions={
- 'extension-definition--d83fce45-ef58-4c6c-a3ff-1fbc32e98c6e': {
- 'extension_type': 'property-extension',
- 'rank': 5,
- 'toxicity': 8,
+ ) as ext_class:
+
+ ext_id = ext_class._type
+
+ indicator = stix2.v21.Indicator(
+ id='indicator--e97bfccf-8970-4a3c-9cd1-5b5b97ed5d0c',
+ created='2014-02-20T09:16:08.989000Z',
+ modified='2014-02-20T09:16:08.989000Z',
+ name='File hash for Poison Ivy variant',
+ description='This file hash indicates that a sample of Poison Ivy is present.',
+ labels=[
+ 'malicious-activity',
+ ],
+ pattern='[file:hashes.\'SHA-256\' = \'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c\']',
+ pattern_type='stix',
+ valid_from='2014-02-20T09:00:00.000000Z',
+ extensions={
+ ext_id: {
+ 'extension_type': 'property-extension',
+ 'rank': 5,
+ 'toxicity': 8,
+ },
},
- },
- allow_custom=False,
- )
- assert indicator.extensions['extension-definition--d83fce45-ef58-4c6c-a3ff-1fbc32e98c6e']['rank'] == 5
- assert indicator.extensions['extension-definition--d83fce45-ef58-4c6c-a3ff-1fbc32e98c6e']['toxicity'] == 8
- assert indicator.extensions['extension-definition--d83fce45-ef58-4c6c-a3ff-1fbc32e98c6e']['extension_type'] == 'property-extension'
- assert isinstance(indicator.extensions['extension-definition--d83fce45-ef58-4c6c-a3ff-1fbc32e98c6e'], ExtensionFoo1)
+ allow_custom=False,
+ )
+ assert indicator.extensions[ext_id]['rank'] == 5
+ assert indicator.extensions[ext_id]['toxicity'] == 8
+ assert indicator.extensions[ext_id]['extension_type'] == 'property-extension'
+ assert isinstance(indicator.extensions[ext_id], ext_class)
def test_registered_new_extension_sdo_allow_custom_false():
- @stix2.v21.CustomObject(
- 'my-favorite-sdo', [
+ class MyFavSDO:
+ pass
+
+ with _register_sdo_extension(
+ MyFavSDO, 'my-favorite-sdo', [
('name', stix2.properties.StringProperty(required=True)),
('some_property_name1', stix2.properties.StringProperty(required=True)),
('some_property_name2', stix2.properties.StringProperty()),
- ], 'extension-definition--d83fce45-ef58-4c6c-a3f4-1fbc32e9999',
- )
- class MyFavSDO:
- pass
+ ],
+ ) as sdo_class:
+ ext_id = sdo_class.with_extension
- my_favorite_sdo = {
- 'type': 'my-favorite-sdo',
- 'spec_version': '2.1',
- 'id': 'my-favorite-sdo--c5ba9dba-5ad9-4bbe-9825-df4cb8675774',
- 'created': '2014-02-20T09:16:08.989000Z',
- 'modified': '2014-02-20T09:16:08.989000Z',
- 'name': 'This is the name of my favorite',
- 'some_property_name1': 'value1',
- 'some_property_name2': 'value2',
- # 'extensions': {
- # 'extension-definition--d83fce45-ef58-4c6c-a3f4-1fbc32e9999': ExtensionDefinitiond83fce45ef584c6ca3f41fbc32e98c6e()
- # }
- }
- sdo_object = stix2.parse(my_favorite_sdo)
- assert isinstance(sdo_object, MyFavSDO)
- assert isinstance(
- sdo_object.extensions['extension-definition--d83fce45-ef58-4c6c-a3f4-1fbc32e9999'],
- stix2.v21.EXT_MAP['extension-definition--d83fce45-ef58-4c6c-a3f4-1fbc32e9999'],
- )
+ my_favorite_sdo = {
+ 'type': 'my-favorite-sdo',
+ 'spec_version': '2.1',
+ 'id': 'my-favorite-sdo--c5ba9dba-5ad9-4bbe-9825-df4cb8675774',
+ 'created': '2014-02-20T09:16:08.989000Z',
+ 'modified': '2014-02-20T09:16:08.989000Z',
+ 'name': 'This is the name of my favorite',
+ 'some_property_name1': 'value1',
+ 'some_property_name2': 'value2',
+ # 'extensions': {
+ # 'extension-definition--d83fce45-ef58-4c6c-a3f4-1fbc32e9999': ExtensionDefinitiond83fce45ef584c6ca3f41fbc32e98c6e()
+ # }
+ }
+ sdo_object = stix2.parse(my_favorite_sdo)
+ assert isinstance(sdo_object, sdo_class)
+ assert isinstance(
+ sdo_object.extensions[ext_id],
+ stix2.v21.EXT_MAP[ext_id],
+ )
- sdo_serialized = sdo_object.serialize()
- assert '"extensions": {"extension-definition--d83fce45-ef58-4c6c-a3f4-1fbc32e9999": {"extension_type": "new-sdo"}}' in sdo_serialized
+ sdo_serialized = sdo_object.serialize()
+ assert f'"extensions": {{"{ext_id}": {{"extension_type": "new-sdo"}}}}' in sdo_serialized
def test_registered_new_extension_sro_allow_custom_false():
- @stix2.v21.CustomObject(
- 'my-favorite-sro', [
+ class MyFavSRO:
+ pass
+
+ with _register_sro_extension(
+ MyFavSRO, 'my-favorite-sro', [
('name', stix2.properties.StringProperty(required=True)),
('some_property_name1', stix2.properties.StringProperty(required=True)),
('some_property_name2', stix2.properties.StringProperty()),
- ], 'extension-definition--e96690a5-dc13-4f27-99dd-0f2188ad74ce', False,
- )
- class MyFavSRO:
- pass
+ ],
+ ) as sro_class:
+ ext_id = sro_class.with_extension
- my_favorite_sro = {
- 'type': 'my-favorite-sro',
- 'spec_version': '2.1',
- 'id': 'my-favorite-sro--c5ba9dba-5ad9-4bbe-9825-df4cb8675774',
- 'created': '2014-02-20T09:16:08.989000Z',
- 'modified': '2014-02-20T09:16:08.989000Z',
- 'name': 'This is the name of my favorite',
- 'some_property_name1': 'value1',
- 'some_property_name2': 'value2',
- # 'extensions': {
- # 'extension-definition--e96690a5-dc13-4f27-99dd-0f2188ad74ce': ExtensionDefinitiond83fce45ef584c6ca3f41fbc32e98c6e()
- # }
- }
- sro_object = stix2.parse(my_favorite_sro)
- assert isinstance(sro_object, MyFavSRO)
- assert isinstance(
- sro_object.extensions['extension-definition--e96690a5-dc13-4f27-99dd-0f2188ad74ce'],
- stix2.v21.EXT_MAP['extension-definition--e96690a5-dc13-4f27-99dd-0f2188ad74ce'],
- )
+ my_favorite_sro = {
+ 'type': 'my-favorite-sro',
+ 'spec_version': '2.1',
+ 'id': 'my-favorite-sro--c5ba9dba-5ad9-4bbe-9825-df4cb8675774',
+ 'created': '2014-02-20T09:16:08.989000Z',
+ 'modified': '2014-02-20T09:16:08.989000Z',
+ 'name': 'This is the name of my favorite',
+ 'some_property_name1': 'value1',
+ 'some_property_name2': 'value2',
+ # 'extensions': {
+ # 'extension-definition--e96690a5-dc13-4f27-99dd-0f2188ad74ce': ExtensionDefinitiond83fce45ef584c6ca3f41fbc32e98c6e()
+ # }
+ }
+ sro_object = stix2.parse(my_favorite_sro)
+ assert isinstance(sro_object, sro_class)
+ assert isinstance(
+ sro_object.extensions[ext_id],
+ stix2.v21.EXT_MAP[ext_id],
+ )
- sdo_serialized = sro_object.serialize()
- assert '"extensions": {"extension-definition--e96690a5-dc13-4f27-99dd-0f2188ad74ce": {"extension_type": "new-sro"}}' in sdo_serialized
+ sro_serialized = sro_object.serialize()
+ assert f'"extensions": {{"{ext_id}": {{"extension_type": "new-sro"}}}}' in sro_serialized
def test_registered_new_extension_sco_allow_custom_false():
- @stix2.v21.CustomObservable(
- 'my-favorite-sco', [
- ('name', stix2.properties.StringProperty(required=True)),
- ('some_network_protocol_field', stix2.properties.StringProperty(required=True)),
- ], ['name', 'some_network_protocol_field'], 'extension-definition--a932fcc6-e032-177c-126f-cb970a5a1fff',
- )
class MyFavSCO:
pass
- my_favorite_sco = {
- 'type': 'my-favorite-sco',
- 'spec_version': '2.1',
- 'id': 'my-favorite-sco--f9dbe89c-0030-4a9d-8b78-0dcd0a0de874',
- 'name': 'This is the name of my favorite SCO',
- 'some_network_protocol_field': 'value',
- # 'extensions': {
- # 'extension-definition--a932fcc6-e032-177c-126f-cb970a5a1fff': {
- # 'is_extension_so': true
- # }
- # }
- }
+ with _register_sco_extension(
+ MyFavSCO, 'my-favorite-sco', [
+ ('name', stix2.properties.StringProperty(required=True)),
+ ('some_network_protocol_field', stix2.properties.StringProperty(required=True)),
+ ], ['name', 'some_network_protocol_field'],
+ ) as sco_class:
+ ext_id = sco_class.with_extension
- sco_object = stix2.parse(my_favorite_sco)
- assert isinstance(sco_object, MyFavSCO)
- assert isinstance(
- sco_object.extensions['extension-definition--a932fcc6-e032-177c-126f-cb970a5a1fff'],
- stix2.v21.EXT_MAP['extension-definition--a932fcc6-e032-177c-126f-cb970a5a1fff'],
- )
+ my_favorite_sco = {
+ 'type': 'my-favorite-sco',
+ 'spec_version': '2.1',
+ 'id': 'my-favorite-sco--f9dbe89c-0030-4a9d-8b78-0dcd0a0de874',
+ 'name': 'This is the name of my favorite SCO',
+ 'some_network_protocol_field': 'value',
+ # 'extensions': {
+ # 'extension-definition--a932fcc6-e032-177c-126f-cb970a5a1fff': {
+ # 'is_extension_so': true
+ # }
+ # }
+ }
+
+ sco_object = stix2.parse(my_favorite_sco)
+ assert isinstance(sco_object, sco_class)
+ assert isinstance(
+ sco_object.extensions[ext_id],
+ stix2.v21.EXT_MAP[ext_id],
+ )
- sco_serialized = sco_object.serialize()
- assert '"extensions": {"extension-definition--a932fcc6-e032-177c-126f-cb970a5a1fff": {"extension_type": "new-sco"}}' in sco_serialized
+ sco_serialized = sco_object.serialize()
+ assert f'"extensions": {{"{ext_id}": {{"extension_type": "new-sco"}}}}' in sco_serialized
def test_registered_new_extension_marking_allow_custom_false():
@@ -1800,7 +1968,8 @@ class MyFavMarking:
'some_marking_field': stix2.properties.StringProperty(required=True),
}
- with _register_extension(MyFavMarking, props) as ext_def_id:
+ with _register_extension(MyFavMarking, props) as ext_class:
+ ext_def_id = ext_class._type
my_favorite_marking = {
'type': 'marking-definition',
@@ -1835,7 +2004,8 @@ class CustomMarking:
"foo": stix2.properties.StringProperty(required=True),
}
- with _register_extension(CustomMarking, props) as ext_def_id:
+ with _register_extension(CustomMarking, props) as ext_class:
+ ext_def_id = ext_class._type
marking_dict = {
"type": "marking-definition",
@@ -1864,7 +2034,8 @@ class TestExt:
),
}
- with _register_extension(TestExt, props) as ext_def_id:
+ with _register_extension(TestExt, props) as ext_class:
+ ext_def_id = ext_class._type
obj = stix2.v21.Identity(
name="test",
@@ -1957,7 +2128,8 @@ class TestExt:
),
}
- with _register_extension(TestExt, props) as ext_def_id:
+ with _register_extension(TestExt, props) as ext_class:
+ ext_def_id = ext_class._type
obj = stix2.v21.Identity(
name="test",
@@ -2051,7 +2223,8 @@ class ExtensionsExtension:
"extensions": stix2.properties.ExtensionsProperty(spec_version="2.1"),
}
- with _register_extension(ExtensionsExtension, ext_props) as ext_id:
+ with _register_extension(ExtensionsExtension, ext_props) as ext_class:
+ ext_id = ext_class._type
# extension-definition is not defined with an "extensions" property.
obj_dict = {
diff --git a/stix2/test/v21/test_datastore.py b/stix2/test/v21/test_datastore.py
index 8bb5494c..bd9e3e44 100644
--- a/stix2/test/v21/test_datastore.py
+++ b/stix2/test/v21/test_datastore.py
@@ -65,9 +65,8 @@ def test_datastore_related_to_raises():
def test_datastore_add_raises():
- with pytest.raises(AttributeError) as excinfo:
+ with pytest.raises(AttributeError):
DataStoreMixin().add(CAMPAIGN_MORE_KWARGS)
- assert "DataStoreMixin has no data sink to put objects in" == str(excinfo.value)
def test_composite_datastore_get_raises_error():
diff --git a/stix2/test/v21/test_datastore_relational_db.py b/stix2/test/v21/test_datastore_relational_db.py
index 7e17c25a..efd5f68a 100644
--- a/stix2/test/v21/test_datastore_relational_db.py
+++ b/stix2/test/v21/test_datastore_relational_db.py
@@ -10,19 +10,52 @@
from stix2.datastore.relational_db.database_backends.postgres_backend import (
PostgresBackend,
)
+from stix2.datastore.relational_db.database_backends.sqlite_backend import (
+ SQLiteBackend,
+)
from stix2.datastore.relational_db.relational_db import RelationalDBStore
import stix2.properties
import stix2.registry
import stix2.v21
-_DB_CONNECT_URL = f"postgresql://{os.getenv('POSTGRES_USER', 'postgres')}:{os.getenv('POSTGRES_PASSWORD', 'postgres')}@0.0.0.0:5432/postgres"
-store = RelationalDBStore(
- PostgresBackend(_DB_CONNECT_URL, True),
- True,
- None,
- False,
+@pytest.fixture(
+ scope="module",
+ params=["postgresql", "sqlite"],
)
+def db_backend(request):
+ if request.param == "postgresql":
+ user = os.getenv('POSTGRES_USER', 'postgres')
+ pass_ = os.getenv('POSTGRES_PASSWORD', 'postgres')
+ dbname = os.getenv('POSTGRES_DB', 'postgres')
+
+ connect_url = f"postgresql://{user}:{pass_}@0.0.0.0:5432/{dbname}"
+ backend = PostgresBackend(connect_url, force_recreate=True)
+
+ elif request.param == "sqlite":
+ connect_url = "sqlite://" # in-memory DB
+ backend = SQLiteBackend(connect_url, force_recreate=True)
+
+ else:
+ raise ValueError(request.param)
+
+ return backend
+
+
+@pytest.fixture
+def store(db_backend):
+ store = RelationalDBStore(
+ db_backend,
+ True,
+ None,
+ True,
+ )
+
+ try:
+ yield store
+ finally:
+ store.metadata.drop_all(db_backend.database_connection)
+
# Artifacts
basic_artifact_dict = {
@@ -48,7 +81,7 @@
}
-def test_basic_artifact():
+def test_basic_artifact(store):
artifact_stix_object = stix2.parse(basic_artifact_dict)
store.add(artifact_stix_object)
read_obj = json.loads(store.get(artifact_stix_object['id']).serialize())
@@ -57,7 +90,7 @@ def test_basic_artifact():
assert basic_artifact_dict[attrib] == read_obj[attrib]
-def test_encrypted_artifact():
+def test_encrypted_artifact(store):
artifact_stix_object = stix2.parse(encrypted_artifact_dict)
store.add(artifact_stix_object)
read_obj = json.loads(store.get(artifact_stix_object['id']).serialize())
@@ -77,7 +110,7 @@ def test_encrypted_artifact():
}
-def test_autonomous_system():
+def test_autonomous_system(store):
as_obj = stix2.parse(as_dict)
store.add(as_obj)
read_obj = json.loads(store.get(as_obj['id']).serialize())
@@ -102,7 +135,7 @@ def test_autonomous_system():
}
-def test_directory():
+def test_directory(store):
directory_obj = stix2.parse(directory_dict)
store.add(directory_obj)
read_obj = json.loads(store.get(directory_obj['id']).serialize())
@@ -123,7 +156,7 @@ def test_directory():
}
-def test_domain_name():
+def test_domain_name(store):
domain_name_obj = stix2.parse(domain_name_dict)
store.add(domain_name_obj)
read_obj = json.loads(store.get(domain_name_obj['id']).serialize())
@@ -143,7 +176,7 @@ def test_domain_name():
}
-def test_email_addr():
+def test_email_addr(store):
email_addr_stix_object = stix2.parse(email_addr_dict)
store.add(email_addr_stix_object)
read_obj = json.loads(store.get(email_addr_stix_object['id']).serialize())
@@ -228,7 +261,7 @@ def test_email_addr():
}
-def test_email_msg():
+def test_email_msg(store):
email_msg_stix_object = stix2.parse(email_msg_dict)
store.add(email_msg_stix_object)
read_obj = json.loads(store.get(email_msg_stix_object['id']).serialize())
@@ -242,7 +275,7 @@ def test_email_msg():
assert email_msg_dict[attrib] == read_obj[attrib]
-def test_multipart_email_msg():
+def test_multipart_email_msg(store):
multipart_email_msg_stix_object = stix2.parse(multipart_email_msg_dict)
store.add(multipart_email_msg_stix_object)
read_obj = json.loads(store.get(multipart_email_msg_stix_object['id']).serialize())
@@ -281,7 +314,7 @@ def test_multipart_email_msg():
}
-def test_file():
+def test_file(store):
file_stix_object = stix2.parse(file_dict)
store.add(file_stix_object)
read_obj = json.loads(store.get(file_stix_object['id']).serialize())
@@ -309,7 +342,7 @@ def test_file():
}
-def test_ipv4():
+def test_ipv4(store):
ipv4_stix_object = stix2.parse(ipv4_dict)
store.add(ipv4_stix_object)
read_obj = store.get(ipv4_stix_object['id'])
@@ -318,7 +351,7 @@ def test_ipv4():
assert ipv4_dict[attrib] == read_obj[attrib]
-def test_ipv6():
+def test_ipv6(store):
ipv6_stix_object = stix2.parse(ipv6_dict)
store.add(ipv6_stix_object)
read_obj = store.get(ipv6_stix_object['id'])
@@ -336,7 +369,7 @@ def test_ipv6():
}
-def test_mutex():
+def test_mutex(store):
mutex_stix_object = stix2.parse(mutex_dict)
store.add(mutex_stix_object)
read_obj = store.get(mutex_stix_object['id'])
@@ -376,7 +409,7 @@ def test_mutex():
}
-def test_network_traffic():
+def test_network_traffic(store):
network_traffic_stix_object = stix2.parse(network_traffic_dict)
store.add(network_traffic_stix_object)
read_obj = store.get(network_traffic_stix_object['id'])
@@ -414,7 +447,7 @@ def test_network_traffic():
}
-def test_process():
+def test_process(store):
process_stix_object = stix2.parse(process_dict)
store.add(process_stix_object)
read_obj = json.loads(store.get(process_stix_object['id']).serialize())
@@ -438,7 +471,7 @@ def test_process():
}
-def test_software():
+def test_software(store):
software_stix_object = stix2.parse(software_dict)
store.add(software_stix_object)
read_obj = json.loads(store.get(software_stix_object['id']).serialize())
@@ -455,7 +488,7 @@ def test_software():
}
-def test_url():
+def test_url(store):
url_stix_object = stix2.parse(url_dict)
store.add(url_stix_object)
read_obj = json.loads(store.get(url_stix_object['id']).serialize())
@@ -486,7 +519,7 @@ def test_url():
}
-def test_user_account():
+def test_user_account(store):
user_account_stix_object = stix2.parse(user_account_dict)
store.add(user_account_stix_object)
read_obj = json.loads(store.get(user_account_stix_object['id']).serialize())
@@ -526,7 +559,7 @@ def test_user_account():
}
-def test_windows_registry():
+def test_windows_registry(store):
windows_registry_stix_object = stix2.parse(windows_registry_dict)
store.add(windows_registry_stix_object)
read_obj = json.loads(store.get(windows_registry_stix_object['id']).serialize())
@@ -584,7 +617,7 @@ def test_windows_registry():
}
-def test_basic_x509_certificate():
+def test_basic_x509_certificate(store):
basic_x509_certificate_stix_object = stix2.parse(basic_x509_certificate_dict)
store.add(basic_x509_certificate_stix_object)
read_obj = json.loads(store.get(basic_x509_certificate_stix_object['id']).serialize())
@@ -598,7 +631,7 @@ def test_basic_x509_certificate():
assert basic_x509_certificate_dict[attrib] == read_obj[attrib]
-def test_x509_certificate_with_extensions():
+def test_x509_certificate_with_extensions(store):
extensions_x509_certificate_stix_object = stix2.parse(extensions_x509_certificate_dict)
store.add(extensions_x509_certificate_stix_object)
read_obj = json.loads(store.get(extensions_x509_certificate_stix_object['id']).serialize())
@@ -612,12 +645,12 @@ def test_x509_certificate_with_extensions():
assert extensions_x509_certificate_dict[attrib] == read_obj[attrib]
-def test_source_get_not_exists():
+def test_source_get_not_exists(store):
obj = store.get("identity--00000000-0000-0000-0000-000000000000")
assert obj is None
-def test_source_no_registration():
+def test_source_no_registration(store):
with pytest.raises(DataSourceError):
# error, since no registered class can be found
store.get("doesnt-exist--a9e52398-3312-4377-90c2-86d49446c0d0")
@@ -656,15 +689,13 @@ class TestClass:
try:
yield TestClass
- except: # noqa: E722
+ finally:
ext_id = kwargs.get("extension_name")
if not ext_id and len(args) >= 3:
ext_id = args[2]
_unregister("objects", TestClass._type, ext_id)
- raise
-
@contextlib.contextmanager
def _register_observable(*args, **kwargs):
@@ -682,15 +713,13 @@ class TestClass:
try:
yield TestClass
- except: # noqa: E722
+ finally:
ext_id = kwargs.get("extension_name")
if not ext_id and len(args) >= 4:
ext_id = args[3]
_unregister("observables", TestClass._type, ext_id)
- raise
-
# "Base" properties used to derive property variations for testing (e.g. in a
# list, in a dictionary, in an embedded object, etc). Also includes sample
@@ -875,13 +904,10 @@ class TestClass:
_unregister(reg_section, TestClass._type, ext_id)
-def test_property(object_variation):
- """
- Try to more exhaustively test many different property configurations:
- ensure schemas can be created and values can be stored and retrieved.
- """
+@pytest.fixture
+def property_store(db_backend, object_variation):
rdb_store = RelationalDBStore(
- PostgresBackend(_DB_CONNECT_URL, True),
+ db_backend,
True,
None,
True,
@@ -889,13 +915,25 @@ def test_property(object_variation):
type(object_variation),
)
- rdb_store.add(object_variation)
- read_obj = rdb_store.get(object_variation["id"])
+ try:
+ yield rdb_store
+ finally:
+ rdb_store.metadata.drop_all(db_backend.database_connection)
+
+
+def test_property(property_store, object_variation):
+ """
+ Try to more exhaustively test many different property configurations:
+ ensure schemas can be created and values can be stored and retrieved.
+ """
+
+ property_store.add(object_variation)
+ read_obj = property_store.get(object_variation["id"])
assert read_obj == object_variation
-def test_dictionary_property_complex():
+def test_dictionary_property_complex(db_backend):
"""
Test a dictionary property with multiple valid_types
"""
@@ -921,7 +959,7 @@ def test_dictionary_property_complex():
)
rdb_store = RelationalDBStore(
- PostgresBackend(_DB_CONNECT_URL, True),
+ db_backend,
True,
None,
True,
@@ -929,12 +967,15 @@ def test_dictionary_property_complex():
cls,
)
- rdb_store.add(obj)
- read_obj = rdb_store.get(obj["id"])
- assert read_obj == obj
+ try:
+ rdb_store.add(obj)
+ read_obj = rdb_store.get(obj["id"])
+ assert read_obj == obj
+ finally:
+ rdb_store.metadata.drop_all(db_backend.database_connection)
-def test_extension_definition():
+def test_extension_definition(store):
obj = stix2.ExtensionDefinition(
created_by_ref="identity--8a5fb7e4-aabe-4635-8972-cbcde1fa4792",
labels=["label1", "label2"],
diff --git a/stix2/test/v21/test_environment.py b/stix2/test/v21/test_environment.py
index 502f2b77..61754658 100644
--- a/stix2/test/v21/test_environment.py
+++ b/stix2/test/v21/test_environment.py
@@ -230,9 +230,8 @@ def test_environment_datastore_and_sink():
def test_environment_no_datastore():
env = stix2.Environment(factory=stix2.ObjectFactory())
- with pytest.raises(AttributeError) as excinfo:
+ with pytest.raises(AttributeError):
env.add(stix2.v21.Indicator(**INDICATOR_KWARGS))
- assert 'Environment has no data sink to put objects in' in str(excinfo.value)
with pytest.raises(AttributeError) as excinfo:
env.get(INDICATOR_ID)
diff --git a/stix2/test/v21/test_observed_data.py b/stix2/test/v21/test_observed_data.py
index 91baa69e..48f94908 100644
--- a/stix2/test/v21/test_observed_data.py
+++ b/stix2/test/v21/test_observed_data.py
@@ -1108,9 +1108,9 @@ def test_network_traffic_http_request_example():
request_value="/download.html",
request_version="http/1.1",
request_header={
- "Accept-Encoding": "gzip,deflate",
- "User-Agent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.6) Gecko/20040113",
- "Host": "www.example.com",
+ "Accept-Encoding": ["gzip,deflate"],
+ "User-Agent": ["Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.6) Gecko/20040113"],
+ "Host": ["www.example.com"],
},
)
nt = stix2.v21.NetworkTraffic(
@@ -1121,9 +1121,9 @@ def test_network_traffic_http_request_example():
assert nt.extensions['http-request-ext'].request_method == "get"
assert nt.extensions['http-request-ext'].request_value == "/download.html"
assert nt.extensions['http-request-ext'].request_version == "http/1.1"
- assert nt.extensions['http-request-ext'].request_header['Accept-Encoding'] == "gzip,deflate"
- assert nt.extensions['http-request-ext'].request_header['User-Agent'] == "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.6) Gecko/20040113"
- assert nt.extensions['http-request-ext'].request_header['Host'] == "www.example.com"
+ assert nt.extensions['http-request-ext'].request_header['Accept-Encoding'] == ["gzip,deflate"]
+ assert nt.extensions['http-request-ext'].request_header['User-Agent'] == ["Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.6) Gecko/20040113"]
+ assert nt.extensions['http-request-ext'].request_header['Host'] == ["www.example.com"]
def test_network_traffic_icmp_example():
@@ -1183,7 +1183,7 @@ def test_incorrect_socket_options():
socket_type="SOCK_STREAM",
options={"SO_RCVTIMEO": '100'},
)
- assert "Dictionary Property does not support this value's type" in str(excinfo.value)
+ assert "Invalid value for SocketExt 'options': Invalid value: '100'" in str(excinfo.value)
def test_network_traffic_tcp_example():
diff --git a/stix2/v21/sdo.py b/stix2/v21/sdo.py
index 209337e7..1dd32f1b 100644
--- a/stix2/v21/sdo.py
+++ b/stix2/v21/sdo.py
@@ -50,7 +50,7 @@ class AttackPattern(_DomainObject):
('kill_chain_phases', ListProperty(KillChainPhase)),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
- ('confidence', IntegerProperty()),
+ ('confidence', IntegerProperty(min=0, max=100)),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
@@ -80,7 +80,7 @@ class Campaign(_DomainObject):
('objective', StringProperty()),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
- ('confidence', IntegerProperty()),
+ ('confidence', IntegerProperty(min=0, max=100)),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
@@ -116,7 +116,7 @@ class CourseOfAction(_DomainObject):
('description', StringProperty()),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
- ('confidence', IntegerProperty()),
+ ('confidence', IntegerProperty(min=0, max=100)),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
@@ -144,7 +144,7 @@ class Grouping(_DomainObject):
('object_refs', ListProperty(ReferenceProperty(valid_types=["SCO", "SDO", "SRO"], spec_version='2.1'), required=True)),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
- ('confidence', IntegerProperty()),
+ ('confidence', IntegerProperty(min=0, max=100)),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
@@ -174,7 +174,7 @@ class Identity(_DomainObject):
('contact_information', StringProperty()),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
- ('confidence', IntegerProperty()),
+ ('confidence', IntegerProperty(min=0, max=100)),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
@@ -201,7 +201,7 @@ class Incident(_DomainObject):
('kill_chain_phases', ListProperty(KillChainPhase)),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
- ('confidence', IntegerProperty()),
+ ('confidence', IntegerProperty(min=0, max=100)),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
@@ -234,7 +234,7 @@ class Indicator(_DomainObject):
('kill_chain_phases', ListProperty(KillChainPhase)),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
- ('confidence', IntegerProperty()),
+ ('confidence', IntegerProperty(min=0, max=100)),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
@@ -292,7 +292,7 @@ class Infrastructure(_DomainObject):
('last_seen', TimestampProperty()),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
- ('confidence', IntegerProperty()),
+ ('confidence', IntegerProperty(min=0, max=100)),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
@@ -335,7 +335,7 @@ class IntrusionSet(_DomainObject):
('secondary_motivations', ListProperty(OpenVocabProperty(ATTACK_MOTIVATION))),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
- ('confidence', IntegerProperty()),
+ ('confidence', IntegerProperty(min=0, max=100)),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
@@ -380,7 +380,7 @@ class Location(_DomainObject):
('postal_code', StringProperty()),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
- ('confidence', IntegerProperty()),
+ ('confidence', IntegerProperty(min=0, max=100)),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
@@ -490,7 +490,7 @@ class Malware(_DomainObject):
('sample_refs', ListProperty(ReferenceProperty(valid_types=['artifact', 'file'], spec_version='2.1'))),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
- ('confidence', IntegerProperty()),
+ ('confidence', IntegerProperty(min=0, max=100)),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
@@ -546,7 +546,7 @@ class MalwareAnalysis(_DomainObject):
('sample_ref', ReferenceProperty(valid_types="SCO", spec_version='2.1')),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
- ('confidence', IntegerProperty()),
+ ('confidence', IntegerProperty(min=0, max=100)),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
@@ -579,7 +579,7 @@ class Note(_DomainObject):
('object_refs', ListProperty(ReferenceProperty(valid_types=["SCO", "SDO", "SRO"], spec_version='2.1'), required=True)),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
- ('confidence', IntegerProperty()),
+ ('confidence', IntegerProperty(min=0, max=100)),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
@@ -608,7 +608,7 @@ class ObservedData(_DomainObject):
('object_refs', ListProperty(ReferenceProperty(valid_types=["SCO", "SRO"], spec_version='2.1'))),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
- ('confidence', IntegerProperty()),
+ ('confidence', IntegerProperty(min=0, max=100)),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
@@ -661,7 +661,7 @@ class Opinion(_DomainObject):
('object_refs', ListProperty(ReferenceProperty(valid_types=["SCO", "SDO", "SRO"], spec_version='2.1'), required=True)),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
- ('confidence', IntegerProperty()),
+ ('confidence', IntegerProperty(min=0, max=100)),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
@@ -690,7 +690,7 @@ class Report(_DomainObject):
('object_refs', ListProperty(ReferenceProperty(valid_types=["SCO", "SDO", "SRO"], spec_version='2.1'), required=True)),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
- ('confidence', IntegerProperty()),
+ ('confidence', IntegerProperty(min=0, max=100)),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
@@ -727,7 +727,7 @@ class ThreatActor(_DomainObject):
('personal_motivations', ListProperty(OpenVocabProperty(ATTACK_MOTIVATION))),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
- ('confidence', IntegerProperty()),
+ ('confidence', IntegerProperty(min=0, max=100)),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
@@ -767,7 +767,7 @@ class Tool(_DomainObject):
('tool_version', StringProperty()),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
- ('confidence', IntegerProperty()),
+ ('confidence', IntegerProperty(min=0, max=100)),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
@@ -793,7 +793,7 @@ class Vulnerability(_DomainObject):
('description', StringProperty()),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
- ('confidence', IntegerProperty()),
+ ('confidence', IntegerProperty(min=0, max=100)),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
@@ -846,7 +846,7 @@ def wrapper(cls):
+ [
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
- ('confidence', IntegerProperty()),
+ ('confidence', IntegerProperty(min=0, max=100)),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
diff --git a/tox.ini b/tox.ini
index 5b160a7c..6c0960fd 100644
--- a/tox.ini
+++ b/tox.ini
@@ -3,22 +3,22 @@ envlist = py38,py39,py310,py311,py312,packaging,pre-commit-check
[testenv]
deps =
- tox
pytest
pytest-cov
coverage
- taxii2-client
- rapidfuzz
- haversine
medallion
- sqlalchemy
- sqlalchemy_utils
- psycopg2
+
+extras =
+ taxii
+ semantic
+ relationaldb
commands =
python -m pytest --cov=stix2 stix2/test/ --cov-report term-missing -W ignore::stix2.exceptions.STIXDeprecationWarning
-passenv = GITHUB_*
+passenv =
+ GITHUB_*
+ POSTGRES_*
[testenv:packaging]
deps =