Skip to content

Commit baff6ca

Browse files
authored
fix: downgrade processing graph and end_time to optional, add auto-so… (#1529)
* fix: downgrade processing graph and end_time to optional, add auto-sorting * chore: lint * fix: handle empty data_prcesses * tests: remove check for required field that is now optional * tests: coverage on the new ordering validator
1 parent 6e26181 commit baff6ca

File tree

2 files changed

+115
-4
lines changed

2 files changed

+115
-4
lines changed

src/aind_data_schema/core/processing.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ class DataProcess(DataModel):
6565
default=None, title="Pipeline name", description="Pipeline names must exist in Processing.pipelines"
6666
)
6767
start_date_time: Annotated[AwareDatetimeWithDefault, TimeValidation.AFTER] = Field(..., title="Start date time")
68-
end_date_time: Annotated[AwareDatetimeWithDefault, TimeValidation.AFTER] = Field(..., title="End date time")
68+
end_date_time: Optional[Annotated[AwareDatetimeWithDefault, TimeValidation.AFTER]] = Field(
69+
default=None, title="End date time"
70+
)
6971
output_path: Optional[AssetPath] = Field(
7072
default=None, title="Output path", description="Path to processing outputs, if stored."
7173
)
@@ -110,8 +112,8 @@ class Processing(DataCoreModel):
110112
)
111113
notes: Optional[str] = Field(default=None, title="Notes")
112114

113-
dependency_graph: Dict[str, List[str]] = Field(
114-
...,
115+
dependency_graph: Optional[Dict[str, List[str]]] = Field(
116+
default=None,
115117
title="Dependency graph",
116118
description=(
117119
"Directed graph of processing step dependencies. Each key is a process name, and the value is a list of "
@@ -140,6 +142,26 @@ def rename_process(self, old_name: str, new_name: str) -> None:
140142
if old_name in value:
141143
value[value.index(old_name)] = new_name
142144

145+
@model_validator(mode="after")
146+
def order_processes(self) -> "Processing":
147+
"""Ensure that processes are ordered by start_date_time"""
148+
149+
if not hasattr(self, "data_processes") or not self.data_processes:
150+
return self
151+
152+
# Check if any processes are out of order
153+
start_times = [process.start_date_time for process in self.data_processes]
154+
if not all(start_times[i] <= start_times[i + 1] for i in range(len(start_times) - 1)):
155+
# Sort processes by start_date_time
156+
self.data_processes.sort(key=lambda x: x.start_date_time)
157+
self.notes = (
158+
"Processes were reordered by start_date_time"
159+
if not self.notes
160+
else f"{self.notes}; Processes were reordered by start_date_time"
161+
)
162+
163+
return self
164+
143165
@classmethod
144166
def create_with_sequential_process_graph(cls, data_processes: List[DataProcess], **kwargs) -> "Processing":
145167
"""Generate a sequential process graph from a list of DataProcess objects"""

tests/test_processing.py

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ def test_constructors(self):
6060
self.assertIn("stage", repr(e.exception))
6161
self.assertIn("code", repr(e.exception))
6262
self.assertIn("start_date_time", repr(e.exception))
63-
self.assertIn("end_date_time", repr(e.exception))
6463
self.assertIn("notes", repr(e.exception))
6564

6665
def test_resource_usage(self):
@@ -361,6 +360,96 @@ def test_validate_pipeline_names(self):
361360
)
362361
self.assertIn("Pipeline name 'NonExistentPipeline' not found in pipelines list", str(e.exception))
363362

363+
def test_order_processes(self):
364+
"""Test the order_processes method"""
365+
366+
# Create processes with different start times (out of order)
367+
t1 = datetime.fromisoformat("2024-09-13T14:00:00")
368+
t2 = datetime.fromisoformat("2024-09-13T12:00:00") # Earlier time
369+
t3 = datetime.fromisoformat("2024-09-13T16:00:00") # Later time
370+
371+
process1 = DataProcess(
372+
name="process1",
373+
experimenters=["Dr. Dan"],
374+
process_type=ProcessName.COMPRESSION,
375+
stage=ProcessStage.PROCESSING,
376+
code=code,
377+
start_date_time=t1,
378+
end_date_time=t1,
379+
)
380+
process2 = DataProcess(
381+
name="process2",
382+
experimenters=["Dr. Dan"],
383+
process_type=ProcessName.ANALYSIS,
384+
stage=ProcessStage.ANALYSIS,
385+
code=code,
386+
start_date_time=t2,
387+
end_date_time=t2,
388+
)
389+
process3 = DataProcess(
390+
name="process3",
391+
experimenters=["Dr. Dan"],
392+
process_type=ProcessName.SPIKE_SORTING,
393+
stage=ProcessStage.PROCESSING,
394+
code=code,
395+
start_date_time=t3,
396+
end_date_time=t3,
397+
)
398+
399+
# Create Processing with out-of-order processes
400+
dependency_graph = {"process1": [], "process2": [], "process3": []}
401+
p = Processing(data_processes=[process1, process2, process3], dependency_graph=dependency_graph)
402+
403+
# Check that processes were reordered by start_date_time
404+
expected_order = [process2, process1, process3] # t2 < t1 < t3
405+
actual_names = [proc.name for proc in p.data_processes]
406+
expected_names = [proc.name for proc in expected_order]
407+
self.assertEqual(actual_names, expected_names)
408+
409+
# Check that notes were updated
410+
self.assertIn("Processes were reordered by start_date_time", p.notes)
411+
412+
# Test with already ordered processes
413+
process4 = DataProcess(
414+
name="process4",
415+
experimenters=["Dr. Dan"],
416+
process_type=ProcessName.COMPRESSION,
417+
stage=ProcessStage.PROCESSING,
418+
code=code,
419+
start_date_time=t1,
420+
end_date_time=t1,
421+
)
422+
process5 = DataProcess(
423+
name="process5",
424+
experimenters=["Dr. Dan"],
425+
process_type=ProcessName.ANALYSIS,
426+
stage=ProcessStage.ANALYSIS,
427+
code=code,
428+
start_date_time=t3,
429+
end_date_time=t3,
430+
)
431+
432+
dependency_graph2 = {"process4": [], "process5": []}
433+
p2 = Processing(data_processes=[process4, process5], dependency_graph=dependency_graph2)
434+
435+
# Check that order wasn't changed and no reordering note was added
436+
self.assertEqual([proc.name for proc in p2.data_processes], ["process4", "process5"])
437+
self.assertIsNone(p2.notes)
438+
439+
# Test with existing notes
440+
dependency_graph3 = {"process1": [], "process2": [], "process3": []}
441+
p3 = Processing(
442+
data_processes=[process1, process2, process3], dependency_graph=dependency_graph3, notes="Existing notes"
443+
)
444+
445+
# Check that reordering note was appended to existing notes
446+
self.assertIn("Existing notes; Processes were reordered by start_date_time", p3.notes)
447+
448+
# Test with empty data_processes
449+
p4 = Processing(data_processes=[], dependency_graph={})
450+
self.assertEqual(len(p4.data_processes), 0)
451+
self.assertIsNone(p4.notes)
452+
364453

365454
if __name__ == "__main__":
366455
unittest.main()

0 commit comments

Comments
 (0)