Skip to content

Commit b4fe85d

Browse files
committed
Cypheresque - adding ability to limit output by pipeline(s)
1 parent 5c229d7 commit b4fe85d

File tree

3 files changed

+69
-4
lines changed

3 files changed

+69
-4
lines changed

nodestream/cli/commands/print_schema.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22

33
from ..operations import InitializeProject, PrintProjectSchema
44
from .nodestream_command import NodestreamCommand
5-
from .shared_options import PROJECT_FILE_OPTION
5+
from .shared_options import PROJECT_FILE_OPTION, MANY_PIPELINES_ARGUMENT
66

77

88
class PrintSchema(NodestreamCommand):
99
name = "print schema"
1010
description = "Print the schema for the current project"
11+
arguments = [MANY_PIPELINES_ARGUMENT]
1112
options = [
1213
PROJECT_FILE_OPTION,
1314
option(
@@ -37,5 +38,6 @@ async def handle_async(self):
3738
format_string=self.option("format"),
3839
type_overrides_file=self.option("overrides"),
3940
output_file=self.option("out"),
41+
pipeline_names=self.argument("pipelines"),
4042
)
4143
)

nodestream/cli/operations/print_project_schema.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from pathlib import Path
2-
from typing import Optional
2+
from typing import List, Optional
33

44
from ...project import Project
55
from ...schema.printers import SchemaPrinter
@@ -14,18 +14,26 @@ def __init__(
1414
format_string: str,
1515
type_overrides_file: Optional[str] = None,
1616
output_file: Optional[str] = None,
17+
pipeline_names: Optional[List[str]] = None,
1718
) -> None:
1819
self.project = project
1920
self.format_string = format_string
2021
self.output_file = output_file
2122
self.type_overrides_file = type_overrides_file
23+
self.pipeline_names = pipeline_names or []
2224

2325
async def perform(self, command: NodestreamCommand):
2426
type_overrides_file = (
2527
Path(self.type_overrides_file) if self.type_overrides_file else None
2628
)
27-
schema = self.project.get_schema(type_overrides_file=type_overrides_file)
28-
29+
if self.pipeline_names:
30+
schema = self.project.get_pipelines_schema(
31+
pipeline_names=self.pipeline_names,
32+
type_overrides_file=type_overrides_file
33+
)
34+
else:
35+
schema = self.project.get_schema(type_overrides_file=type_overrides_file)
36+
2937
# Import all schema printers so that they can register themselves
3038
SchemaPrinter.import_all()
3139
printer = SchemaPrinter.from_name(self.format_string)

nodestream/project/project.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,61 @@ def get_schema(self, type_overrides_file: Optional[Path] = None) -> Schema:
291291
schema.merge(overrides_schema)
292292
return schema
293293

294+
def get_pipelines_schema(self, pipeline_names: List[str], type_overrides_file: Optional[Path] = None) -> Schema:
295+
"""Returns a `GraphSchema` representing only the specified pipelines.
296+
297+
This method generates a schema from only the specified pipelines,
298+
allowing you to see what schema specific pipelines contribute without interference
299+
from other pipelines in the project.
300+
301+
Args:
302+
pipeline_names (List[str]): List of pipeline names to include in schema generation.
303+
type_overrides_file (Optional[Path], optional): A path to a YAML file containing type overrides. Defaults to None.
304+
305+
Returns:
306+
Schema: The schema representing only the specified pipelines.
307+
308+
Raises:
309+
ValueError: If none of the specified pipelines are found.
310+
"""
311+
# Create a temporary project to store only the specified pipelines
312+
filtered_project = Project(
313+
targets_by_name=self.targets_by_name,
314+
storage_configuration=self.storage_configuration,
315+
)
316+
317+
pipelines_found = []
318+
319+
for scope in self.scopes_by_name.values():
320+
filtered_scope = PipelineScope(
321+
name=scope.name,
322+
config=scope.config,
323+
pipeline_configuration=scope.pipeline_configuration,
324+
)
325+
326+
for pipeline_name in pipeline_names:
327+
if pipeline_name in scope.pipelines_by_name:
328+
filtered_scope.add_pipeline_definition(
329+
scope.pipelines_by_name[pipeline_name]
330+
)
331+
pipelines_found.append(pipeline_name)
332+
333+
if filtered_scope.pipelines_by_name:
334+
filtered_project.add_scope(filtered_scope)
335+
336+
if not pipelines_found:
337+
available_pipelines = [
338+
name for scope in self.scopes_by_name.values()
339+
for name in scope.pipelines_by_name.keys()
340+
]
341+
raise ValueError(f"None of the specified pipelines {pipeline_names} were found. Available pipelines: {available_pipelines}")
342+
343+
schema = filtered_project.make_schema()
344+
if type_overrides_file is not None:
345+
overrides_schema = Schema.read_from_file(type_overrides_file)
346+
schema.merge(overrides_schema)
347+
return schema
348+
294349
def get_child_expanders(self) -> Iterable[ExpandsSchema]:
295350
return self.scopes_by_name.values()
296351

0 commit comments

Comments
 (0)