Skip to content

Commit c6c4652

Browse files
authored
Merge pull request #429 from nodestream-proj/pipeline-specific-schemas
Updating Cypheresque to allow for filtering by pipeline(s)
2 parents b3bbffb + 8cf43f4 commit c6c4652

File tree

6 files changed

+220
-6
lines changed

6 files changed

+220
-6
lines changed

nodestream/cli/commands/print_schema.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
1-
from cleo.helpers import option
1+
from typing import ClassVar
2+
3+
from cleo.helpers import argument, option
24

35
from ..operations import InitializeProject, PrintProjectSchema
46
from .nodestream_command import NodestreamCommand
5-
from .shared_options import PROJECT_FILE_OPTION
7+
from .shared_options import MANY_PIPELINES_ARGUMENT, PROJECT_FILE_OPTION
68

79

810
class PrintSchema(NodestreamCommand):
911
name = "print schema"
1012
description = "Print the schema for the current project"
13+
arguments: ClassVar[list[argument]] = [MANY_PIPELINES_ARGUMENT]
1114
options = [
1215
PROJECT_FILE_OPTION,
1316
option(
@@ -37,5 +40,6 @@ async def handle_async(self):
3740
format_string=self.option("format"),
3841
type_overrides_file=self.option("overrides"),
3942
output_file=self.option("out"),
43+
pipeline_names=self.argument("pipelines"),
4044
)
4145
)

nodestream/cli/operations/print_project_schema.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from pathlib import Path
2-
from typing import Optional
2+
from typing import List, Optional
33

44
from ...project import Project
55
from ...schema.printers import SchemaPrinter
@@ -14,17 +14,25 @@ def __init__(
1414
format_string: str,
1515
type_overrides_file: Optional[str] = None,
1616
output_file: Optional[str] = None,
17+
pipeline_names: Optional[List[str]] = None,
1718
) -> None:
1819
self.project = project
1920
self.format_string = format_string
2021
self.output_file = output_file
2122
self.type_overrides_file = type_overrides_file
23+
self.pipeline_names = pipeline_names or []
2224

2325
async def perform(self, command: NodestreamCommand):
2426
type_overrides_file = (
2527
Path(self.type_overrides_file) if self.type_overrides_file else None
2628
)
27-
schema = self.project.get_schema(type_overrides_file=type_overrides_file)
29+
if self.pipeline_names:
30+
schema = self.project.get_pipelines_schema(
31+
pipeline_names=self.pipeline_names,
32+
type_overrides_file=type_overrides_file,
33+
)
34+
else:
35+
schema = self.project.get_schema(type_overrides_file=type_overrides_file)
2836

2937
# Import all schema printers so that they can register themselves
3038
SchemaPrinter.import_all()

nodestream/project/project.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@
1313
from ..pipeline import Step
1414
from ..pipeline.object_storage import ObjectStore
1515
from ..pluggable import Pluggable
16-
from ..schema import ExpandsSchema, ExpandsSchemaFromChildren, Schema
16+
from ..schema import (
17+
ExpandsSchema,
18+
ExpandsSchemaFromChildren,
19+
Schema,
20+
SchemaExpansionCoordinator,
21+
)
1722
from .pipeline_definition import PipelineDefinition
1823
from .pipeline_scope import PipelineScope
1924
from .plugin import PluginConfiguration
@@ -245,6 +250,16 @@ def get_pipeline_by_name(self, pipeline_name: str) -> PipelineDefinition:
245250
if pipeline is not None:
246251
return pipeline
247252

253+
def get_pipeline_by_names(
254+
self, pipeline_names: List[str]
255+
) -> Iterable[PipelineDefinition]:
256+
"""Returns pipeline objects in the project."""
257+
for scope in self.scopes_by_name.values():
258+
for pipeline_name in pipeline_names:
259+
pipeline = scope.pipelines_by_name.get(pipeline_name, None)
260+
if pipeline is not None:
261+
yield pipeline
262+
248263
def delete_pipeline(
249264
self,
250265
scope_name: Optional[str],
@@ -291,6 +306,51 @@ def get_schema(self, type_overrides_file: Optional[Path] = None) -> Schema:
291306
schema.merge(overrides_schema)
292307
return schema
293308

309+
def get_pipelines_schema(
310+
self, pipeline_names: List[str], type_overrides_file: Optional[Path] = None
311+
) -> Schema:
312+
"""Returns a `GraphSchema` representing only the specified pipelines.
313+
314+
This method generates a schema from only the specified pipelines,
315+
allowing you to see what schema specific pipelines contribute without interference
316+
from other pipelines in the project.
317+
318+
Args:
319+
pipeline_names (List[str]): List of pipeline names to include in schema generation.
320+
type_overrides_file (Optional[Path], optional): A path to a YAML file containing type overrides. Defaults to None.
321+
322+
Returns:
323+
Schema: The schema representing only the specified pipelines.
324+
325+
Raises:
326+
ValueError: If none of the specified pipelines are found.
327+
"""
328+
pipeline_definitions = list(self.get_pipeline_by_names(pipeline_names))
329+
330+
if not pipeline_definitions:
331+
available_pipelines = [
332+
name
333+
for scope in self.scopes_by_name.values()
334+
for name in scope.pipelines_by_name.keys()
335+
]
336+
raise ValueError(
337+
f"None of the specified pipelines {pipeline_names} were found. Available pipelines: {available_pipelines}"
338+
)
339+
340+
coordinator = SchemaExpansionCoordinator(schema := Schema())
341+
342+
for pipeline in pipeline_definitions:
343+
pipeline.initialize_for_introspection()
344+
pipeline.expand_schema(coordinator=coordinator)
345+
346+
schema = coordinator.schema
347+
348+
if type_overrides_file is not None:
349+
overrides_schema = Schema.read_from_file(type_overrides_file)
350+
schema.merge(overrides_schema)
351+
352+
return schema
353+
294354
def get_child_expanders(self) -> Iterable[ExpandsSchema]:
295355
return self.scopes_by_name.values()
296356

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "nodestream"
3-
version = "0.14.13"
3+
version = "0.14.14"
44
description = "A Fast, Declarative ETL for Graph Databases."
55
license = "GPL-3.0-only"
66
authors = [

tests/unit/cli/operations/test_print_project_schema.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from pathlib import Path
2+
13
import pytest
24

35
from nodestream.cli.operations import PrintProjectSchema
@@ -42,3 +44,53 @@ async def test_print_project_schema_prints_schema_to_file(
4244
_, stdout, fileout = await ran_print_project_schema_operation("some/path")
4345
assert not stdout.called
4446
assert fileout.called
47+
48+
49+
@pytest.mark.asyncio
50+
async def test_print_project_schema_with_pipeline_filtering(
51+
mocker, project_with_default_scope
52+
):
53+
std_out = mocker.patch(
54+
"nodestream.schema.printers.SchemaPrinter.print_schema_to_stdout"
55+
)
56+
project_with_default_scope.get_schema = mocker.Mock(return_value="full_schema")
57+
project_with_default_scope.get_pipelines_schema = mocker.Mock(
58+
return_value="filtered_schema"
59+
)
60+
61+
operation = PrintProjectSchema(
62+
project=project_with_default_scope,
63+
format_string="plain",
64+
pipeline_names=["test_pipeline"],
65+
)
66+
await operation.perform(mocker.Mock())
67+
68+
# Should call get_pipelines_schema, not get_schema
69+
project_with_default_scope.get_pipelines_schema.assert_called_once_with(
70+
pipeline_names=["test_pipeline"], type_overrides_file=None
71+
)
72+
project_with_default_scope.get_schema.assert_not_called()
73+
std_out.assert_called_once()
74+
75+
76+
@pytest.mark.asyncio
77+
async def test_print_project_schema_with_pipeline_filtering_and_overrides(
78+
mocker, project_with_default_scope
79+
):
80+
project_with_default_scope.get_pipelines_schema = mocker.Mock(
81+
return_value="filtered_schema"
82+
)
83+
84+
operation = PrintProjectSchema(
85+
project=project_with_default_scope,
86+
format_string="plain",
87+
type_overrides_file="overrides.yaml",
88+
pipeline_names=["test_pipeline", "another_pipeline"],
89+
)
90+
await operation.perform(mocker.Mock())
91+
92+
# Should call get_pipelines_schema with correct arguments
93+
project_with_default_scope.get_pipelines_schema.assert_called_once_with(
94+
pipeline_names=["test_pipeline", "another_pipeline"],
95+
type_overrides_file=Path("overrides.yaml"),
96+
)

tests/unit/project/test_project.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,3 +378,93 @@ def test_project_get_from_storage(project, mocker):
378378
result,
379379
same_instance(project.storage_configuration.initialize_by_name.return_value),
380380
)
381+
382+
383+
def test_get_pipelines_schema_multiple_pipelines(project, mocker):
384+
mock_schema = mocker.Mock(spec=Schema)
385+
mock_coordinator = mocker.Mock()
386+
mock_coordinator.schema = mock_schema
387+
388+
mocker.patch(
389+
"nodestream.project.project.SchemaExpansionCoordinator",
390+
return_value=mock_coordinator,
391+
)
392+
393+
mock_pipeline1 = mocker.Mock()
394+
mock_pipeline1.initialize_for_introspection = mocker.Mock()
395+
mock_pipeline1.expand_schema = mocker.Mock()
396+
397+
mock_pipeline2 = mocker.Mock()
398+
mock_pipeline2.initialize_for_introspection = mocker.Mock()
399+
mock_pipeline2.expand_schema = mocker.Mock()
400+
401+
mocker.patch.object(
402+
project, "get_pipeline_by_names", return_value=[mock_pipeline1, mock_pipeline2]
403+
)
404+
405+
result = project.get_pipelines_schema(["test", "test2"])
406+
407+
mock_pipeline1.initialize_for_introspection.assert_called_once()
408+
mock_pipeline1.expand_schema.assert_called_once_with(coordinator=mock_coordinator)
409+
mock_pipeline2.initialize_for_introspection.assert_called_once()
410+
mock_pipeline2.expand_schema.assert_called_once_with(coordinator=mock_coordinator)
411+
assert_that(result, same_instance(mock_schema))
412+
413+
414+
def test_get_pipelines_schema_with_type_overrides(project, mocker):
415+
mock_base_schema = mocker.Mock(spec=Schema)
416+
mock_overrides_schema = mocker.Mock(spec=Schema)
417+
mock_coordinator = mocker.Mock()
418+
mock_coordinator.schema = mock_base_schema
419+
420+
mocker.patch(
421+
"nodestream.project.project.SchemaExpansionCoordinator",
422+
return_value=mock_coordinator,
423+
)
424+
Schema.read_from_file = mocker.Mock(return_value=mock_overrides_schema)
425+
426+
mock_pipeline = mocker.Mock()
427+
mock_pipeline.initialize_for_introspection = mocker.Mock()
428+
mock_pipeline.expand_schema = mocker.Mock()
429+
mocker.patch.object(project, "get_pipeline_by_names", return_value=[mock_pipeline])
430+
431+
overrides_path = Path("some/overrides.yaml")
432+
result = project.get_pipelines_schema(["test"], overrides_path)
433+
434+
Schema.read_from_file.assert_called_once_with(overrides_path)
435+
mock_base_schema.merge.assert_called_once_with(mock_overrides_schema)
436+
assert_that(result, same_instance(mock_base_schema))
437+
438+
439+
def test_get_pipelines_schema_nonexistent_pipeline_raises_error(project, mocker):
440+
mocker.patch.object(project, "get_pipeline_by_names", return_value=[])
441+
442+
with pytest.raises(ValueError) as exc_info:
443+
project.get_pipelines_schema(["nonexistent"])
444+
445+
error_message = str(exc_info.value)
446+
assert_that(
447+
error_message,
448+
equal_to(
449+
"None of the specified pipelines ['nonexistent'] were found. Available pipelines: ['test', 'test2']"
450+
),
451+
)
452+
453+
454+
def test_get_pipeline_by_names(project):
455+
pipelines = list(project.get_pipeline_by_names(["test", "test2"]))
456+
457+
assert_that(pipelines, has_length(2))
458+
assert_that([p.name for p in pipelines], contains_inanyorder("test", "test2"))
459+
460+
461+
def test_get_pipeline_by_names_nonexistent_pipeline(project):
462+
pipelines = list(project.get_pipeline_by_names(["nonexistent"]))
463+
assert_that(pipelines, has_length(0))
464+
465+
466+
def test_get_pipeline_by_names_mixed_existing_nonexisting(project):
467+
pipelines = list(project.get_pipeline_by_names(["test", "nonexistent", "test2"]))
468+
469+
assert_that(pipelines, has_length(2))
470+
assert_that([p.name for p in pipelines], contains_inanyorder("test", "test2"))

0 commit comments

Comments
 (0)