1616from aind_data_schema_models .modalities import Modality
1717from aind_data_schema_models .organizations import Organization
1818from pydantic import Field , SkipValidation , model_validator
19+ from pydantic_core import PydanticUndefined
1920
2021from aind_data_schema .base import AwareDatetimeWithDefault , DataCoreModel , DataModel
2122from aind_data_schema .components .identifiers import Person
@@ -187,29 +188,30 @@ def from_raw(
187188
188189 """
189190
191+ if not data_description .data_level == DataLevel .RAW :
192+ raise ValueError (f"Input data_description must have data_level=RAW, got { data_description .data_level } " )
193+
190194 def get_or_default (field_name : str ) -> Any :
191195 """
192196 If the field is set in kwargs, use that value. Otherwise, check if
193197 the field is set in the DataDescription object. If not, pull from
194198 the field default value if the field has a default value. Otherwise,
195199 return None and allow pydantic to raise a Validation Error if field
196200 is not Optional.
197- Parameters
198- ----------
199- field_name : str
200- Name of the field to set
201-
202- Returns
203- -------
204- Any
205-
206201 """
207202 if kwargs .get (field_name ) is not None :
208203 return kwargs .get (field_name )
209204 elif hasattr (data_description , field_name ) and getattr (data_description , field_name ) is not None :
210205 return getattr (data_description , field_name )
211206 else :
212- return getattr (DataDescription .model_fields .get (field_name ), "default" )
207+ default_value = getattr (DataDescription .model_fields .get (field_name ), "default" )
208+ if default_value is PydanticUndefined :
209+ raise ValueError (
210+ f"Required field { field_name } must have a value "
211+ "in the original DataDescription or be passed as an argument"
212+ )
213+ else :
214+ return default_value
213215
214216 creation_time = (
215217 datetime .now (tz = timezone .utc ) if kwargs .get ("creation_time" ) is None else kwargs ["creation_time" ]
@@ -225,14 +227,105 @@ def get_or_default(field_name: str) -> Any:
225227 raise ValueError (f"Derived name({ derived_name } ) does not match allowed Regex pattern" )
226228
227229 # Upgrade source_data
230+ current_source_data = data_description .source_data or []
228231 if source_data is not None :
229- new_source_data = (
230- source_data if not data_description .source_data else data_description .source_data + source_data
231- )
232+ new_source_data = current_source_data + source_data
232233 else :
233- new_source_data = (
234- [original_name ] if not data_description .source_data else data_description .source_data + [original_name ]
235- )
234+ new_source_data = current_source_data + [original_name ]
235+
236+ return cls (
237+ subject_id = get_or_default ("subject_id" ),
238+ creation_time = creation_time ,
239+ tags = get_or_default ("tags" ),
240+ name = derived_name ,
241+ institution = get_or_default ("institution" ),
242+ funding_source = get_or_default ("funding_source" ),
243+ data_level = DataLevel .DERIVED ,
244+ group = get_or_default ("group" ),
245+ investigators = get_or_default ("investigators" ),
246+ project_name = get_or_default ("project_name" ),
247+ restrictions = get_or_default ("restrictions" ),
248+ modalities = get_or_default ("modalities" ),
249+ data_summary = get_or_default ("data_summary" ),
250+ source_data = new_source_data ,
251+ )
252+
253+ @classmethod
254+ def from_derived (
255+ cls , data_description : "DataDescription" , process_name : str , source_data : Optional [List [str ]] = None , ** kwargs
256+ ) -> "DataDescription" :
257+ """
258+ Create a DataLevel.DERIVED DataDescription from another DataLevel.DERIVED DataDescription object.
259+
260+ This method extracts the original input name from the existing derived data description
261+ and uses it as the base for creating a new derived data description, rather than
262+ chaining derived names.
263+
264+ Parameters
265+ ----------
266+ data_description : DataDescription
267+ The DERIVED DataDescription object to use as the base for the new Derived
268+ process_name : str
269+ Name of the process that created the data
270+ source_data : Optional[List[str]]
271+ Optional list of source data names. If None, will use the current data_description.name
272+ kwargs
273+ DataDescription fields can be explicitly set and will override
274+ values pulled from DataDescription
275+
276+ Returns
277+ -------
278+ DataDescription
279+ New DERIVED DataDescription with name based on the original input, not the full derived name
280+
281+ """
282+ if data_description .data_level != DataLevel .DERIVED :
283+ raise ValueError (f"Input data_description must have data_level=DERIVED, got { data_description .data_level } " )
284+
285+ def get_or_default (field_name : str ) -> Any :
286+ """
287+ If the field is set in kwargs, use that value. Otherwise, check if
288+ the field is set in the DataDescription object. If not, pull from
289+ the field default value if the field has a default value. Otherwise,
290+ return None and allow pydantic to raise a Validation Error if field
291+ is not Optional.
292+ """
293+ if kwargs .get (field_name ) is not None :
294+ return kwargs .get (field_name )
295+ elif hasattr (data_description , field_name ) and getattr (data_description , field_name ) is not None :
296+ return getattr (data_description , field_name )
297+ else :
298+ default_value = getattr (DataDescription .model_fields .get (field_name ), "default" )
299+ if default_value is PydanticUndefined :
300+ raise ValueError (
301+ f"Required field { field_name } must have a value "
302+ "in the original DataDescription or be passed as an argument"
303+ )
304+ else :
305+ return default_value
306+
307+ creation_time = (
308+ datetime .now (tz = timezone .utc ) if kwargs .get ("creation_time" ) is None else kwargs ["creation_time" ]
309+ )
310+
311+ if not isinstance (creation_time , datetime ):
312+ raise ValueError (f"creation_time({ creation_time } ) must be a datetime object" )
313+
314+ # Parse the existing derived name to extract the original input
315+ parsed_name = cls .parse_name (data_description .name , DataLevel .DERIVED )
316+ original_input = parsed_name ["input" ] # This is the original raw name with datetime
317+
318+ # Create new derived name using the original input (not the full derived name)
319+ derived_name = f"{ original_input } _{ process_name } _{ datetime_to_name_string (creation_time )} "
320+ if not re .match (DataRegex .DERIVED .value , derived_name ): # pragma: no cover
321+ raise ValueError (f"Derived name({ derived_name } ) does not match allowed Regex pattern" )
322+
323+ # Upgrade source_data
324+ current_source_data = data_description .source_data or []
325+ if source_data is not None :
326+ new_source_data = current_source_data + source_data
327+ else :
328+ new_source_data = current_source_data + [data_description .name ]
236329
237330 return cls (
238331 subject_id = get_or_default ("subject_id" ),
@@ -250,3 +343,38 @@ def get_or_default(field_name: str) -> Any:
250343 data_summary = get_or_default ("data_summary" ),
251344 source_data = new_source_data ,
252345 )
346+
347+ @classmethod
348+ def from_data_description (
349+ cls , data_description : "DataDescription" , process_name : str , source_data : Optional [List [str ]] = None , ** kwargs
350+ ) -> "DataDescription" :
351+ """
352+ Create a DataLevel.DERIVED DataDescription from any DataDescription object.
353+
354+ Automatically chooses the appropriate method (from_raw or from_derived) based on
355+ the data_level of the input DataDescription.
356+
357+ Parameters
358+ ----------
359+ data_description : DataDescription
360+ The DataDescription object to use as the base for the new Derived
361+ process_name : str
362+ Name of the process that created the data
363+ source_data : Optional[List[str]]
364+ Optional list of source data names
365+ kwargs
366+ DataDescription fields can be explicitly set and will override
367+ values pulled from DataDescription
368+
369+ Returns
370+ -------
371+ DataDescription
372+ New DERIVED DataDescription
373+
374+ """
375+ if data_description .data_level == DataLevel .RAW :
376+ return cls .from_raw (data_description , process_name , source_data , ** kwargs )
377+ elif data_description .data_level == DataLevel .DERIVED :
378+ return cls .from_derived (data_description , process_name , source_data , ** kwargs )
379+ else :
380+ raise ValueError (f"Unsupported data_level: { data_description .data_level .value } " )
0 commit comments