Skip to content

Commit efe7e90

Browse files
committed
updating projects to be simpler
1 parent cc88b1c commit efe7e90

File tree

2 files changed

+85
-75
lines changed

2 files changed

+85
-75
lines changed

nodestream/project/project.py

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@
1313
from ..pipeline import Step
1414
from ..pipeline.object_storage import ObjectStore
1515
from ..pluggable import Pluggable
16-
from ..schema import ExpandsSchema, ExpandsSchemaFromChildren, Schema
16+
from ..schema import (
17+
ExpandsSchema,
18+
ExpandsSchemaFromChildren,
19+
Schema,
20+
SchemaExpansionCoordinator,
21+
)
1722
from .pipeline_definition import PipelineDefinition
1823
from .pipeline_scope import PipelineScope
1924
from .plugin import PluginConfiguration
@@ -245,6 +250,16 @@ def get_pipeline_by_name(self, pipeline_name: str) -> PipelineDefinition:
245250
if pipeline is not None:
246251
return pipeline
247252

253+
def get_pipeline_by_names(
254+
self, pipeline_names: List[str]
255+
) -> Iterable[PipelineDefinition]:
256+
"""Returns pipeline objects in the project."""
257+
for scope in self.scopes_by_name.values():
258+
for pipeline_name in pipeline_names:
259+
pipeline = scope.pipelines_by_name.get(pipeline_name, None)
260+
if pipeline is not None:
261+
yield pipeline
262+
248263
def delete_pipeline(
249264
self,
250265
scope_name: Optional[str],
@@ -310,32 +325,9 @@ def get_pipelines_schema(
310325
Raises:
311326
ValueError: If none of the specified pipelines are found.
312327
"""
313-
# Create a temporary project to store only the specified pipelines
314-
filtered_project = Project(
315-
targets_by_name=self.targets_by_name,
316-
storage_configuration=self.storage_configuration,
317-
)
318-
319-
pipelines_found = []
320-
321-
for scope in self.scopes_by_name.values():
322-
filtered_scope = PipelineScope(
323-
name=scope.name,
324-
config=scope.config,
325-
pipeline_configuration=scope.pipeline_configuration,
326-
)
327-
328-
for pipeline_name in pipeline_names:
329-
if pipeline_name in scope.pipelines_by_name:
330-
filtered_scope.add_pipeline_definition(
331-
scope.pipelines_by_name[pipeline_name]
332-
)
333-
pipelines_found.append(pipeline_name)
334-
335-
if filtered_scope.pipelines_by_name:
336-
filtered_project.add_scope(filtered_scope)
328+
pipeline_definitions = list(self.get_pipeline_by_names(pipeline_names))
337329

338-
if not pipelines_found:
330+
if not pipeline_definitions:
339331
available_pipelines = [
340332
name
341333
for scope in self.scopes_by_name.values()
@@ -345,7 +337,19 @@ def get_pipelines_schema(
345337
f"None of the specified pipelines {pipeline_names} were found. Available pipelines: {available_pipelines}"
346338
)
347339

348-
return filtered_project.get_schema(type_overrides_file=type_overrides_file)
340+
coordinator = SchemaExpansionCoordinator(schema := Schema())
341+
342+
for pipeline in pipeline_definitions:
343+
pipeline.initialize_for_introspection()
344+
pipeline.expand_schema(coordinator=coordinator)
345+
346+
schema = coordinator.schema
347+
348+
if type_overrides_file is not None:
349+
overrides_schema = Schema.read_from_file(type_overrides_file)
350+
schema.merge(overrides_schema)
351+
352+
return schema
349353

350354
def get_child_expanders(self) -> Iterable[ExpandsSchema]:
351355
return self.scopes_by_name.values()

tests/unit/project/test_project.py

Lines changed: 54 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -379,73 +379,79 @@ def test_project_get_from_storage(project, mocker):
379379
same_instance(project.storage_configuration.initialize_by_name.return_value),
380380
)
381381

382-
383-
def test_get_pipelines_schema_single_pipeline(project, mocker):
384-
"""Test filtering schema for a single pipeline."""
382+
def test_get_pipelines_schema_multiple_pipelines(project, mocker):
385383
mock_schema = mocker.Mock(spec=Schema)
386-
387-
# Mock the pipeline initialization to avoid file system dependencies
388-
mocker.patch.object(project.__class__, "make_schema", return_value=mock_schema)
389-
mock_pipeline_init = mocker.patch(
390-
"nodestream.project.pipeline_definition.PipelineDefinition.initialize_for_introspection"
391-
)
392-
mock_pipeline = mocker.Mock()
393-
mock_pipeline.expand_schema = mocker.Mock()
394-
mock_pipeline_init.return_value = mock_pipeline
395-
396-
result = project.get_pipelines_schema(["test"])
397-
398-
# Verify the result
384+
mock_coordinator = mocker.Mock()
385+
mock_coordinator.schema = mock_schema
386+
387+
mocker.patch('nodestream.project.project.SchemaExpansionCoordinator', return_value=mock_coordinator)
388+
389+
mock_pipeline1 = mocker.Mock()
390+
mock_pipeline1.initialize_for_introspection = mocker.Mock()
391+
mock_pipeline1.expand_schema = mocker.Mock()
392+
393+
mock_pipeline2 = mocker.Mock()
394+
mock_pipeline2.initialize_for_introspection = mocker.Mock()
395+
mock_pipeline2.expand_schema = mocker.Mock()
396+
397+
mocker.patch.object(project, 'get_pipeline_by_names', return_value=[mock_pipeline1, mock_pipeline2])
398+
399+
result = project.get_pipelines_schema(["test", "test2"])
400+
401+
mock_pipeline1.initialize_for_introspection.assert_called_once()
402+
mock_pipeline1.expand_schema.assert_called_once_with(coordinator=mock_coordinator)
403+
mock_pipeline2.initialize_for_introspection.assert_called_once()
404+
mock_pipeline2.expand_schema.assert_called_once_with(coordinator=mock_coordinator)
399405
assert_that(result, same_instance(mock_schema))
400406

401407

402408
def test_get_pipelines_schema_with_type_overrides(project, mocker):
403-
"""Test pipeline schema generation with type overrides."""
404409
mock_base_schema = mocker.Mock(spec=Schema)
405410
mock_overrides_schema = mocker.Mock(spec=Schema)
406-
407-
# Mock the pipeline initialization and schema loading
408-
mocker.patch.object(project.__class__, "make_schema", return_value=mock_base_schema)
409-
mock_pipeline_init = mocker.patch(
410-
"nodestream.project.pipeline_definition.PipelineDefinition.initialize_for_introspection"
411-
)
411+
mock_coordinator = mocker.Mock()
412+
mock_coordinator.schema = mock_base_schema
413+
414+
mocker.patch('nodestream.project.project.SchemaExpansionCoordinator', return_value=mock_coordinator)
415+
Schema.read_from_file = mocker.Mock(return_value=mock_overrides_schema)
416+
412417
mock_pipeline = mocker.Mock()
418+
mock_pipeline.initialize_for_introspection = mocker.Mock()
413419
mock_pipeline.expand_schema = mocker.Mock()
414-
mock_pipeline_init.return_value = mock_pipeline
415-
416-
Schema.read_from_file = mocker.Mock(return_value=mock_overrides_schema)
417-
420+
mocker.patch.object(project, 'get_pipeline_by_names', return_value=[mock_pipeline])
421+
418422
overrides_path = Path("some/overrides.yaml")
419423
result = project.get_pipelines_schema(["test"], overrides_path)
420-
424+
421425
Schema.read_from_file.assert_called_once_with(overrides_path)
422426
mock_base_schema.merge.assert_called_once_with(mock_overrides_schema)
423427
assert_that(result, same_instance(mock_base_schema))
424428

425429

426-
def test_get_pipelines_schema_nonexistent_pipeline_raises_error(project):
427-
"""Test that specifying non-existent pipelines raises ValueError with helpful message."""
430+
def test_get_pipelines_schema_nonexistent_pipeline_raises_error(project, mocker):
431+
mocker.patch.object(project, 'get_pipeline_by_names', return_value=[])
432+
428433
with pytest.raises(ValueError) as exc_info:
429434
project.get_pipelines_schema(["nonexistent"])
430-
435+
431436
error_message = str(exc_info.value)
432-
assert_that(
433-
error_message,
434-
equal_to(
435-
"None of the specified pipelines ['nonexistent'] were found. Available pipelines: ['test', 'test2']"
436-
),
437-
)
437+
assert_that(error_message, equal_to(
438+
"None of the specified pipelines ['nonexistent'] were found. Available pipelines: ['test', 'test2']"
439+
))
438440

441+
def test_get_pipeline_by_names(project):
442+
pipelines = list(project.get_pipeline_by_names(["test", "test2"]))
443+
444+
assert_that(pipelines, has_length(2))
445+
assert_that([p.name for p in pipelines], contains_inanyorder("test", "test2"))
439446

440-
def test_get_pipelines_schema_empty_list_raises_error(project):
441-
"""Test that providing an empty pipeline list raises ValueError."""
442-
with pytest.raises(ValueError) as exc_info:
443-
project.get_pipelines_schema([])
444447

445-
error_message = str(exc_info.value)
446-
assert_that(
447-
error_message,
448-
equal_to(
449-
"None of the specified pipelines [] were found. Available pipelines: ['test', 'test2']"
450-
),
451-
)
448+
def test_get_pipeline_by_names_nonexistent_pipeline(project):
449+
pipelines = list(project.get_pipeline_by_names(["nonexistent"]))
450+
assert_that(pipelines, has_length(0))
451+
452+
453+
def test_get_pipeline_by_names_mixed_existing_nonexisting(project):
454+
pipelines = list(project.get_pipeline_by_names(["test", "nonexistent", "test2"]))
455+
456+
assert_that(pipelines, has_length(2))
457+
assert_that([p.name for p in pipelines], contains_inanyorder("test", "test2"))

0 commit comments

Comments
 (0)