Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions nodestream/cli/operations/execute_migration.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from logging import getLogger

from ...project import Target
from ...schema.migrations import ProjectMigrations
from .operation import NodestreamCommand, Operation
Expand All @@ -10,11 +12,16 @@ def __init__(self, migrations: ProjectMigrations, target: Target):

async def perform(self, command: NodestreamCommand):
migrator = self.target.make_migrator()
with command.spin(
f"Executing migrations on target {self.target.name}...",
f"Migrations executed on target {self.target.name}.",
) as indicator:
if command.has_json_logging_set:
logger = getLogger()
async for migration in self.migrations.execute_pending(migrator):
indicator.set_message(
f"Migration {migration.name} executed successfully."
)
logger.info(f"Migration {migration.name} executed successfully.")
else:
with command.spin(
f"Executing migrations on target {self.target.name}...",
f"Migrations executed on target {self.target.name}.",
) as indicator:
async for migration in self.migrations.execute_pending(migrator):
indicator.set_message(
f"Migration {migration.name} executed successfully."
)
40 changes: 31 additions & 9 deletions nodestream/databases/copy.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
from abc import ABC, abstractmethod
from logging import getLogger
from typing import AsyncGenerator, List

from ..model import Node, RelationshipWithNodes
from ..pipeline import Extractor
from ..schema import Schema
from ..schema import Adjacency, Schema


class TypeRetriever(ABC):
@abstractmethod
def get_nodes_of_type(self, shape: str) -> AsyncGenerator[Node, None]:
def get_nodes_of_type(self, node_type: str) -> AsyncGenerator[Node, None]:
raise NotImplementedError

@abstractmethod
def get_relationships_of_type(
self, type: str
def get_relationships_of_type_between(
self, from_node_type: str, to_node_type: str, relationship_type: str
) -> AsyncGenerator[RelationshipWithNodes, None]:
raise NotImplementedError

Expand All @@ -30,17 +31,36 @@ def __init__(
self.relationship_types = relationship_types_to_copy
self.node_types = node_types_to_copy
self.schema = schema
self.logger = getLogger(__name__)
self.logger.info(
f"Copying {len(self.node_types)} node types and {len(self.relationship_types)} relationship types"
)

async def extract_records(self):
for node_type in self.node_types:
nodes = self.type_retriever.get_nodes_of_type(node_type)
async for node in nodes:
self.logger.info(f"Copying nodes of type {node_type}")
async for node in self.type_retriever.get_nodes_of_type(node_type):
yield self.convert_node_to_ingest(node)

for rel_type in self.relationship_types:
rels = self.type_retriever.get_relationships_of_type(rel_type)
async for relationship in rels:
yield self.convert_relationship_to_ingest(relationship)
# Prefer schema-driven adjacency expansion when available so we can
# fully specify from/to node types for the retriever. Note that it
# is impossible to have a relationship without an adjacency, we only
# support copyting from a nodestream schema.
self.logger.info(f"Copying relationships of type {rel_type}")
adjacencies: List[Adjacency] = list(
self.schema.get_adjacencies_by_relationship_type(rel_type)
)

for adjacency in adjacencies:
async for (
relationship
) in self.type_retriever.get_relationships_of_type_between(
adjacency.from_node_type,
adjacency.to_node_type,
adjacency.relationship_type,
):
yield self.convert_relationship_to_ingest(relationship)

def reorganize_node_key_properties(self, node: Node):
# This is a bit of a hack, but it's the only way to make sure that the
Expand All @@ -49,6 +69,8 @@ def reorganize_node_key_properties(self, node: Node):
# we have to push a lot of work down to the database connector to ensure
# that the keys are in the right place. Its easier to just do it here.
type_def = self.schema.get_node_type_by_name(node.type)
if type_def is None:
return
for key_name in type_def.keys:
node.key_values[key_name] = node.properties[key_name]
del node.properties[key_name]
Expand Down
5 changes: 5 additions & 0 deletions nodestream/databases/null.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class NullRetriver(TypeRetriever):
def get_nodes_of_type(self, _: str) -> AsyncGenerator[Node, None]:
return empty_async_generator()

def get_relationships_of_type_between(
self, __: str, ___: str, ____: str
) -> AsyncGenerator[RelationshipWithNodes, None]:
return empty_async_generator()

def get_relationships_of_type(
self, _: str
) -> AsyncGenerator[RelationshipWithNodes, None]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,22 +222,19 @@ def expand_schema(self, coordinator: SchemaExpansionCoordinator):
)

if self.node_type.is_static:
# Always expand the schema for the main related node type.
coordinator.on_node_schema(
self.expand_related_node_schema,
node_type=self.node_type.value,
)

# Expand schemas for node additional types with the same structure
for additional_type in self.node_additional_types:
coordinator.on_node_schema(
self.expand_related_node_schema,
node_type=additional_type,
)

# Register additional types so relationships are also connected to them
coordinator.register_additional_types(
self.node_type.value,
self.node_additional_types,
# Delegate additional-type expansion to the coordinator so that the
# decision about whether to include additional types in the schema
# is centralized there.
coordinator.expand_additional_types(
base_type=self.node_type.value,
additional_types=self.node_additional_types,
fn=self.expand_related_node_schema,
)

if self.relationship_type.is_static and self.node_type.is_static:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,25 @@ def expand_schema(self, coordinator: SchemaExpansionCoordinator):
if not self.node_type.is_static:
return

# First, bind the source-node alias to the concrete base node type and
# apply this interpretation's keys / properties / indexes to it. Any
# additional properties contributed via `properties` interpretations are
# accumulated on the alias and later propagated to additional types by
# the `SchemaExpansionCoordinator`.
coordinator.on_node_schema(
self.expand_source_node_schema,
alias=self.SOURCE_NODE_TYPE_ALIAS,
node_type=self.node_type.value,
)

# Expand schemas for additional types with the same structure
for additional_type in self.additional_types:
coordinator.on_node_schema(
self.expand_source_node_schema,
node_type=additional_type,
)
# Delegate additional-type expansion to the coordinator so that the
# decision about whether to Cinclude additional types in the schema is
# centralized there.
coordinator.expand_additional_types(
base_type=self.node_type.value,
additional_types=self.additional_types,
fn=self.expand_source_node_schema,
)

# Register additional types so relationships are also connected to them
coordinator.register_additional_types(
Expand Down
2 changes: 1 addition & 1 deletion nodestream/model/graph_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def into_ingest(self) -> "DesiredIngestion":
self.relationship,
outbound=True,
node_creation_rule=NodeCreationRule.MATCH_ONLY,
relationship_creation_rule=RelationshipCreationRule.CREATE,
relationship_creation_rule=RelationshipCreationRule.EAGER,
)
return ingest

Expand Down
2 changes: 2 additions & 0 deletions nodestream/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
PropertyType,
Schema,
SchemaExpansionCoordinator,
UnboundAdjacency,
)

__all__ = (
Expand All @@ -24,4 +25,5 @@
"Adjacency",
"PropertyMetadata",
"PropertyType",
"UnboundAdjacency",
)
121 changes: 108 additions & 13 deletions nodestream/schema/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,15 @@ def relationships(self) -> Iterable[GraphObjectSchema]:
def adjacencies(self) -> Iterable[Adjacency]:
return self.cardinalities.keys()

def get_adjacencies_by_relationship_type(
self, relationship_type: str
) -> Iterable[Adjacency]:
return (
adjacency
for adjacency in self.adjacencies
if adjacency.relationship_type == relationship_type
)

def put_node_type(self, node_type: GraphObjectSchema):
"""Add a node type to the schema.

Expand Down Expand Up @@ -531,7 +540,9 @@ def get_by_type_and_object_type(
self.type_schemas[key] = GraphObjectSchema(name)
return self.type_schemas[key]

def get_node_type_by_name(self, node_type_name: str) -> GraphObjectSchema:
def get_node_type_by_name(
self, node_type_name: str | None
) -> Optional[GraphObjectSchema]:
"""Get a node type by name.

If the node type does not exist, a new node type will be created.
Expand All @@ -542,6 +553,8 @@ def get_node_type_by_name(self, node_type_name: str) -> GraphObjectSchema:
Returns:
The node type.
"""
if node_type_name is None:
return None
return self.get_by_type_and_object_type(GraphObjectType.NODE, node_type_name)

def get_relationship_type_by_name(
Expand Down Expand Up @@ -677,13 +690,22 @@ class UnboundAdjacency:
from_cardinality: Cardinality
to_cardinality: Cardinality

def bind(self, schema: Schema, aliases: LayeredDict[str, str]):
def bind(
self, schema: Schema, aliases: LayeredDict[str, str]
) -> tuple[Adjacency, AdjacencyCardinality]:
from_type = aliases.get(self.from_type_or_alias, self.from_type_or_alias)
to_type = aliases.get(self.to_type_or_alias, self.to_type_or_alias)
schema.add_adjacency(
Adjacency(from_type, to_type, self.relationship_type),
AdjacencyCardinality(self.from_cardinality, self.to_cardinality),
adjacency = Adjacency(
from_node_type=from_type,
to_node_type=to_type,
relationship_type=self.relationship_type,
)
cardinality = AdjacencyCardinality(
self.from_cardinality,
self.to_cardinality,
)
schema.add_adjacency(adjacency, cardinality)
return adjacency, cardinality


@dataclass(slots=True, frozen=True)
Expand All @@ -703,7 +725,9 @@ class SchemaExpansionCoordinator:
default_factory=LayeredList
)
# Maps node types to their additional types for relationship expansion
additional_types_map: Dict[str, Tuple[str, ...]] = field(default_factory=dict)
additional_types_map: LayeredDict[str, Tuple[str, ...]] = field(
default_factory=LayeredDict
)

def on_node_schema(
self,
Expand Down Expand Up @@ -814,11 +838,13 @@ def increment_context_level(self):
self.unbound_adjacencies.increment_context_level()
self.unbound_aliases.increment_context_level()
self.aliases.increment_context_level()
self.additional_types_map.increment_context_level()

def decrement_context_level(self):
self.unbound_adjacencies.decrement_context_level()
self.unbound_aliases.decrement_context_level()
self.aliases.decrement_context_level()
self.additional_types_map.decrement_context_level()

def register_additional_types(
self, main_type: str, additional_types: Tuple[str, ...]
Expand All @@ -832,19 +858,66 @@ def register_additional_types(
main_type: The main node type.
additional_types: The additional types associated with the main type.
"""
if additional_types:
self.additional_types_map[main_type] = additional_types
if additional_types and self.include_additional_types:
# Merge with any existing additional types for this main_type in the
# current context level so that multiple registrations in the same
# context accumulate rather than overwrite. Preserve a stable
# insertion order to keep schema expansion deterministic.
existing = self.additional_types_map.get(main_type, tuple())
merged = existing + tuple(t for t in additional_types if t not in existing)
self.additional_types_map[main_type] = merged

def expand_additional_types(
self,
base_type: str,
additional_types: Tuple[str, ...],
fn: Callable[[GraphObjectSchema], None],
) -> None:
"""Expand and register additional types for a base node type.

When `include_additional_types` is enabled, this helper will:

- Invoke `fn` for each additional type's node schema, so that the
additional types share the same structural definition as the base.
- Register the additional types so that adjacencies and alias-level
properties are also expanded to them during `clear_aliases`.
"""
if not additional_types or not self.include_additional_types:
return

for additional_type in additional_types:
self.on_node_schema(fn, node_type=additional_type)

self.register_additional_types(base_type, additional_types)

def clear_aliases(self):
for unbound_adjacency in self.unbound_adjacencies:
unbound_adjacency.bind(self.schema, self.aliases)
# First, bind all unbound adjacencies for this context into concrete
# adjacencies on the underlying schema, keeping track of exactly which
# adjacencies were created as part of this pass.
base_adjacencies = self._bind_unbound_adjacencies()

if not self.include_additional_types:
return

# Then duplicate those adjacencies and any alias-level properties for
# additional types registered in this context.
self._expand_adjacencies_for_additional_types(base_adjacencies)
self._expand_properties_for_additional_types()

def _bind_unbound_adjacencies(
self,
) -> list[tuple[Adjacency, AdjacencyCardinality]]:
base_adjacencies: list[tuple[Adjacency, AdjacencyCardinality]] = []
for unbound_adjacency in self.unbound_adjacencies:
base_adjacencies.append(unbound_adjacency.bind(self.schema, self.aliases))
return base_adjacencies

def _expand_adjacencies_for_additional_types(
self, base_adjacencies: list[tuple[Adjacency, AdjacencyCardinality]]
) -> None:
# Create duplicate adjacencies for additional types
adjacencies_to_add = []
for adjacency, cardinality in list(self.schema.cardinalities.items()):
adjacencies_to_add: list[tuple[Adjacency, AdjacencyCardinality]] = []
for adjacency, cardinality in base_adjacencies:
from_types = [adjacency.from_node_type]
to_types = [adjacency.to_node_type]

Expand Down Expand Up @@ -873,10 +946,32 @@ def clear_aliases(self):
)
adjacencies_to_add.append((new_adjacency, cardinality))

# Add all the new adjacencies
for adjacency, cardinality in adjacencies_to_add:
self.schema.add_adjacency(adjacency, cardinality)

def _expand_properties_for_additional_types(self) -> None:
"""Propagate alias-level properties to additional types in this context.

Properties defined via `properties` interpretations are accumulated on
aliases (for example, the `source_node` alias). When we register
additional types for a base node type, those alias-level properties
should also be applied to the additional types, but only within the
current schema-expansion context.
"""
for alias, base_type in self.aliases.effective_items.items():
alias_schema = self.unbound_aliases.get(alias, None)
if alias_schema is None:
continue

additional_types = self.additional_types_map.get(base_type, tuple())
if not additional_types:
continue

for additional_type in additional_types:
target_schema = self.schema.get_node_type_by_name(additional_type)
for property_name, metadata in alias_schema.properties.items():
target_schema.add_property(property_name, metadata)


class ExpandsSchema:
"""An interface for an object that expands a schema."""
Expand Down
Loading
Loading