11# SPDX-License-Identifier: BSD-3-Clause
2+ # Copyright 2025 MoDaCor Authors
3+ #
4+ # Redistribution and use in source and binary forms, with or without modification,
5+ # are permitted provided that the following conditions are met:
6+ # 1. Redistributions of source code must retain the above copyright notice, this
7+ # list of conditions and the following disclaimer.
8+ # 2. Redistributions in binary form must reproduce the above copyright notice,
9+ # this list of conditions and the following disclaimer in the documentation
10+ # and/or other materials provided with the distribution.
11+ # 3. Neither the name of the copyright holder nor the names of its contributors
12+ # may be used to endorse or promote products derived from this software without
13+ # specific prior written permission.
14+ # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS “AS IS” AND
15+ # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16+ # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17+ # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
18+ # ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
19+ # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
20+ # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
21+ # ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22+ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
23+ # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
224
325
426__all__ = ["ProcessStep" ]
931from abc import abstractmethod
1032from numbers import Integral
1133from pathlib import Path
12- from typing import Any
34+ from typing import Any , Iterable , Type
1335
1436from attrs import define , field
1537from attrs import validators as v
1840from .databundle import DataBundle
1941from .messagehandler import MessageHandler
2042from .process_step_describer import ProcessStepDescriber
43+ from .processing_data import ProcessingData
2144from .validators import is_list_of_ints
2245
2346
2447@define
2548class ProcessStep :
2649 """A base class defining a processing step"""
2750
51+ # Class attributes for the process step
52+ CONFIG_KEYS = {
53+ "with_processing_keys" : {
54+ "type" : str ,
55+ "allow_iterable" : True ,
56+ "allow_none" : True ,
57+ "default" : None ,
58+ },
59+ "output_processing_key" : {
60+ "type" : str ,
61+ "allow_iterable" : False ,
62+ "allow_none" : True ,
63+ "default" : None ,
64+ },
65+ }
66+
67+ # The configuration keys for the process step instantiation
2868 io_sources : IoSources = field ()
2969
3070 # class attribute for a machine-readable description of the process step
@@ -36,7 +76,10 @@ class ProcessStep:
3676 )
3777
3878 # dynamic instance configuration
39- configuration : dict = field (factory = dict , validator = v .instance_of (dict ))
79+ configuration : dict = field (
80+ factory = dict ,
81+ validator = lambda inst , attrs , val : inst .is_process_step_dict ,
82+ )
4083
4184 # flags and attributes for running the pipeline
4285 requires_steps : list [int ] = field (factory = list , validator = is_list_of_ints )
@@ -54,6 +97,15 @@ class ProcessStep:
5497
5598 # internal variables:
5699 __prepared : bool = field (default = False , validator = v .instance_of (bool ))
100+ processing_data : ProcessingData = field (
101+ default = None , validator = v .optional (v .instance_of (ProcessingData ))
102+ )
103+
104+ def __attrs_post_init__ (self ):
105+ """
106+ Post-initialization method to set up the process step.
107+ """
108+ self .configuration = self .default_config ()
57109
58110 def prepare_execution (self ):
59111 """
@@ -64,30 +116,24 @@ def prepare_execution(self):
64116 """
65117 pass
66118
67- def can_execute (self , input_field_names : list [str ]) -> bool :
68- """
69- Check if the process step can be executed
70-
71- The default implementation always returns True and any ProcessStep
72- that has dependency checks should override this method.
73- """
74- return True
75-
76119 @abstractmethod
77- def calculate (self , data : DataBundle , ** kwargs : Any ) -> dict [str , Any ]:
120+ def calculate (self ) -> dict [str , DataBundle ]:
78121 """Calculate the process step on the given data"""
79122 raise NotImplementedError ("Subclasses must implement this method" )
80123
81- def execute (self , data : DataBundle , ** kwargs : Any ) -> DataBundle :
124+ def execute (self , data : ProcessingData ) -> None :
82125 """Execute the process step on the given data"""
126+ self .processing_data = data
83127 if not self .__prepared :
84128 self .prepare_execution ()
85129 self .__prepared = True
86- self .produced_outputs = self .calculate (data , ** kwargs )
130+ self .produced_outputs = self .calculate ()
87131 for _key , value in self .produced_outputs .items ():
88- data .data [_key ] = value
132+ if _key in data :
133+ data [_key ].update (value )
134+ else :
135+ data [_key ] = value
89136 self .executed = True
90- return data
91137
92138 def reset (self ):
93139 """Reset the process step to its initial state"""
@@ -102,3 +148,36 @@ def modify_config(self, key: str, value: Any):
102148 else :
103149 raise KeyError (f"Key { key } not found in configuration" ) # noqa
104150 self .__prepared = False
151+
152+ @classmethod
153+ def is_process_step_dict (cls , instance : Type | None , attribute : str | None , item : Any ) -> bool :
154+ """
155+ Check if the value is a dictionary with the correct keys and types.
156+ """
157+ if not isinstance (item , dict ):
158+ return False
159+ for _key , _value in item .items ():
160+ if _key not in cls .CONFIG_KEYS :
161+ return False
162+ _config = cls .CONFIG_KEYS [_key ]
163+ if _value is None :
164+ if _config ["allow_none" ]:
165+ continue
166+ return False
167+ if isinstance (_value , Iterable ) and not isinstance (_value , str ):
168+ if not (
169+ _config ["allow_iterable" ]
170+ and all ([isinstance (_i , _config ["type" ]) for _i in _value ])
171+ ):
172+ return False
173+ continue
174+ if not isinstance (_value , _config ["type" ]):
175+ return False
176+ return True
177+
178+ @classmethod
179+ def default_config (cls ) -> dict [str , Any ]:
180+ """
181+ Create an initial dictionary for the process step configuration.
182+ """
183+ return {_k : _v ["default" ] for _k , _v in cls .CONFIG_KEYS .items ()}
0 commit comments