Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 73 additions & 47 deletions stix2/datastore/relational_db/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def _check_support(stix_id):
raise DataSourceError(f"Reading {stix_type} objects is not supported.")


def _tables_for(stix_class, metadata):
def _tables_for(stix_class, metadata, db_backend):
"""
Get the core and type-specific tables for the given class

Expand All @@ -41,17 +41,24 @@ def _tables_for(stix_class, metadata):
"""
# Info about the type-specific table
type_table_name = table_name_for(stix_class)
type_schema_name = schema_for(stix_class)
type_table = metadata.tables[f"{type_schema_name}.{type_table_name}"]
type_schema_name = db_backend.schema_for(stix_class)
canon_type_table_name = canonicalize_table_name(type_table_name, type_schema_name)

type_table = metadata.tables[canon_type_table_name]

# Some fixed info about core tables
if type_schema_name == "sco":
core_table_name = "common.core_sco"
if stix2.utils.is_sco(stix_class._type, stix2.DEFAULT_VERSION):
canon_core_table_name = canonicalize_table_name(
"core_sco", db_backend.schema_for_core()
)

else:
# for SROs and SMOs too?
core_table_name = "common.core_sdo"
canon_core_table_name = canonicalize_table_name(
"core_sdo", db_backend.schema_for_core()
)

core_table = metadata.tables[core_table_name]
core_table = metadata.tables[canon_core_table_name]

return core_table, type_table

Expand Down Expand Up @@ -134,18 +141,30 @@ def _read_hashes(fk_id, hashes_table, conn):
return hashes


def _read_external_references(stix_id, metadata, conn):
def _read_external_references(stix_id, metadata, conn, db_backend):
"""
Read external references from some fixed tables in the common schema.

:param stix_id: A STIX ID used to filter table rows
:param metadata: SQLAlchemy Metadata object containing all the table
information
:param conn: An SQLAlchemy DB connection
:param db_backend: A backend object with information about how data is
stored in the database
:return: The external references, as a list of dicts
"""
ext_refs_table = metadata.tables["common.external_references"]
ext_refs_hashes_table = metadata.tables["common.external_references_hashes"]
ext_refs_table = metadata.tables[
canonicalize_table_name(
"external_references",
db_backend.schema_for_core()
)
]
ext_refs_hashes_table = metadata.tables[
canonicalize_table_name(
"external_references_hashes",
db_backend.schema_for_core()
)
]
ext_refs = []

ext_refs_columns = (col for col in ext_refs_table.c if col.key != "id")
Expand All @@ -165,43 +184,44 @@ def _read_external_references(stix_id, metadata, conn):
return ext_refs


def _read_object_marking_refs(stix_id, stix_type_class, metadata, conn):
def _read_object_marking_refs(stix_id, common_table_kind, metadata, conn, db_backend):
"""
Read object marking refs from one of a couple special tables in the common
schema.

:param stix_id: A STIX ID, used to filter table rows
:param stix_type_class: STIXTypeClass enum value, used to determine whether
:param common_table_kind: "sco" or "sdo", used to determine whether
to read the table for SDOs or SCOs
:param metadata: SQLAlchemy Metadata object containing all the table
information
:param conn: An SQLAlchemy DB connection
:param db_backend: A backend object with information about how data is
stored in the database
:return: The references as a list of strings
"""

marking_table_name = "object_marking_refs_"
if stix_type_class is stix2.utils.STIXTypeClass.SCO:
marking_table_name += "sco"
else:
marking_table_name += "sdo"
marking_table_name = canonicalize_table_name(
"object_marking_refs_" + common_table_kind,
db_backend.schema_for_core()
)

# The SCO/SDO object_marking_refs tables are mostly identical; they just
# have different foreign key constraints (to different core tables).
marking_table = metadata.tables["common." + marking_table_name]
marking_table = metadata.tables[marking_table_name]

stmt = sa.select(marking_table.c.ref_id).where(marking_table.c.id == stix_id)
refs = conn.scalars(stmt).all()

return refs


def _read_granular_markings(stix_id, stix_type_class, metadata, conn, db_backend):
def _read_granular_markings(stix_id, common_table_kind, metadata, conn, db_backend):
"""
Read granular markings from one of a couple special tables in the common
schema.

:param stix_id: A STIX ID, used to filter table rows
:param stix_type_class: STIXTypeClass enum value, used to determine whether
:param common_table_kind: "sco" or "sdo", used to determine whether
to read the table for SDOs or SCOs
:param metadata: SQLAlchemy Metadata object containing all the table
information
Expand All @@ -211,13 +231,11 @@ def _read_granular_markings(stix_id, stix_type_class, metadata, conn, db_backend
:return: Granular markings as a list of dicts
"""

marking_table_name = "granular_marking_"
if stix_type_class is stix2.utils.STIXTypeClass.SCO:
marking_table_name += "sco"
else:
marking_table_name += "sdo"

marking_table = metadata.tables["common." + marking_table_name]
marking_table_name = canonicalize_table_name(
"granular_marking_" + common_table_kind,
db_backend.schema_for_core()
)
marking_table = metadata.tables[marking_table_name]

if db_backend.array_allowed():
# arrays allowed: everything combined in the same table
Expand Down Expand Up @@ -330,7 +348,7 @@ def _read_dictionary_property(
stmt = sa.select(
dict_table.c.name, list_table.c.value
).select_from(dict_table).join(
list_table, list_table.c.id == dict_table.c.values
list_table, list_table.c.id == dict_table.c["values"]
).where(
dict_table.c.id == stix_id
)
Expand Down Expand Up @@ -606,7 +624,7 @@ def _read_complex_property_value(

def _read_complex_top_level_property_value(
stix_id,
stix_type_class,
common_table_kind,
prop_name,
prop_instance,
type_table,
Expand All @@ -620,8 +638,8 @@ def _read_complex_top_level_property_value(
reading top-level common properties, which use special fixed tables.

:param stix_id: STIX ID of an object to read
:param stix_type_class: The kind of object (SCO, SDO, etc). Which DB
tables to read can depend on this.
:param common_table_kind: Used to find auxiliary common tables, e.g. those
for object markings, granular markings, etc. Either "sco" or "sdo".
:param prop_name: The name of the property to read
:param prop_instance: A Property (subclass) instance with property
config information
Expand All @@ -637,20 +655,26 @@ def _read_complex_top_level_property_value(

# Common properties: these use a fixed set of tables for all STIX objects
if prop_name == "external_references":
prop_value = _read_external_references(stix_id, metadata, conn)
prop_value = _read_external_references(
stix_id,
metadata,
conn,
db_backend
)

elif prop_name == "object_marking_refs":
prop_value = _read_object_marking_refs(
stix_id,
stix_type_class,
common_table_kind,
metadata,
conn,
db_backend,
)

elif prop_name == "granular_markings":
prop_value = _read_granular_markings(
stix_id,
stix_type_class,
common_table_kind,
metadata,
conn,
db_backend,
Expand All @@ -659,7 +683,10 @@ def _read_complex_top_level_property_value(
# Will apply when array columns are unsupported/disallowed by the backend
elif prop_name == "labels":
label_table = metadata.tables[
f"common.core_{stix_type_class.name.lower()}_labels"
canonicalize_table_name(
f"core_{common_table_kind}_labels",
db_backend.schema_for_core()
)
]
prop_value = _read_simple_array(stix_id, "label", label_table, conn)

Expand Down Expand Up @@ -698,16 +725,10 @@ def read_object(stix_id, metadata, conn, db_backend):
stix_type = stix2.utils.get_type_from_id(stix_id)
raise DataSourceError("Can't find registered class for type: " + stix_type)

core_table, type_table = _tables_for(stix_class, metadata)

if type_table.schema == "common":
# Applies to extension-definition SMO, whose data is stored in the
# common schema; it does not get its own. This type class is used to
# determine which common tables to use; its markings are
# in the *_sdo tables.
stix_type_class = stix2.utils.STIXTypeClass.SDO
else:
stix_type_class = stix2.utils.to_enum(type_table.schema, stix2.utils.STIXTypeClass)
core_table, type_table = _tables_for(stix_class, metadata, db_backend)
# Used to find auxiliary common tables, e.g. those for object markings,
# granular markings, etc.
common_table_kind = core_table.name[-3:]

simple_props = _read_simple_properties(stix_id, core_table, type_table, conn)
if simple_props is None:
Expand All @@ -721,7 +742,7 @@ def read_object(stix_id, metadata, conn, db_backend):
if prop_name not in obj_dict:
prop_value = _read_complex_top_level_property_value(
stix_id,
stix_type_class,
common_table_kind,
prop_name,
prop_instance,
type_table,
Expand All @@ -733,5 +754,10 @@ def read_object(stix_id, metadata, conn, db_backend):
if prop_value is not None:
obj_dict[prop_name] = prop_value

stix_obj = stix_class(**obj_dict, allow_custom=True)
stix_obj = stix2.parse(
obj_dict,
allow_custom=True,
version=stix2.DEFAULT_VERSION
)

return stix_obj
Loading
Loading