@@ -1006,6 +1006,138 @@ def process(self):
10061006 logger .debug (f"Failed files: { self .failed_files } " )
10071007
10081008
1009+ class ListToEntries (BaseParallelProcessor ):
1010+ """
1011+ A dataset processor that transforms a single entry containing a list of items into multiple entries,
1012+ one for each item in the list.
1013+
1014+ This is useful when a manifest field (e.g., "segments") contains a list of sub-entries, and you want
1015+ to flatten these into individual records for further processing.
1016+
1017+ Args:
1018+ field_with_list (str): The name of the field in the input entry that contains a list.
1019+ output_field (str, optional): The name of the output field to assign to items in the list
1020+ if they are not dictionaries. Required if the list contains primitive types (e.g., strings).
1021+ **kwargs: Additional arguments passed to the BaseParallelProcessor.
1022+
1023+ Raises:
1024+ TypeError: If the specified list field is not of type list.
1025+ ValueError: If the list items are not dictionaries and `output_field` is not provided.
1026+
1027+ Returns:
1028+ A manifest where each entry corresponds to one item in the original list from the input entry.
1029+ This effectively transforms a single input entry containing a list of items into multiple standalone
1030+ entries, each suitable for further dataset processing.
1031+
1032+ .. admonition:: Example 1 (list of dicts)
1033+
1034+ .. code-block:: yaml
1035+
1036+ - _target_: sdp.processors.ListToEntries
1037+ input_manifest_file: ${workspace_dir}/input_manifest.json
1038+ output_manifest_file: ${workspace_dir}/output_manifest.json
1039+ field_with_list: "segments"
1040+
1041+ Input::
1042+
1043+ {
1044+ "audio_filepath": "sample.wav",
1045+ "segments": [
1046+ {"start": 0.0, "end": 1.5, "text": "Hello"},
1047+ {"start": 1.6, "end": 3.0, "text": "World"}
1048+ ]
1049+ }
1050+
1051+ Output::
1052+
1053+ [
1054+ {
1055+ "audio_filepath": "sample.wav",
1056+ "start": 0.0,
1057+ "end": 1.5,
1058+ "text": "Hello"
1059+ },
1060+ {
1061+ "audio_filepath": "sample.wav",
1062+ "start": 1.6,
1063+ "end": 3.0,
1064+ "text": "World"
1065+ }
1066+ ]
1067+
1068+ .. admonition:: Example 2 (list of primitives)
1069+
1070+ .. code-block:: yaml
1071+
1072+ - _target_: sdp.processors.ListToEntries
1073+ input_manifest_file: ${workspace_dir}/input_manifest.json
1074+ output_manifest_file: ${workspace_dir}/output_manifest.json
1075+ field_with_list: "text_chunks"
1076+ output_field: "text"
1077+
1078+ Input::
1079+
1080+ {
1081+ "audio_filepath": "sample.wav",
1082+ "text_chunks": [
1083+ "Hello",
1084+ "World"
1085+ ]
1086+ }
1087+
1088+ Output::
1089+
1090+ [
1091+ {
1092+ "audio_filepath": "sample.wav",
1093+ "text": "Hello"
1094+ },
1095+ {
1096+ "audio_filepath": "sample.wav",
1097+ "text": "World"
1098+ }
1099+ ]
1100+
1101+ """
1102+
1103+ def __init__ (self ,
1104+ field_with_list : str ,
1105+ output_field : str = None ,
1106+ ** kwargs ):
1107+ super ().__init__ (** kwargs )
1108+ self .field_with_list = field_with_list
1109+ self .output_field = output_field
1110+
1111+ def process_dataset_entry (self , data_entry ):
1112+ _entries = []
1113+
1114+ # Check that the target field is actually a list
1115+ if not isinstance (data_entry [self .field_with_list ], list ):
1116+ raise TypeError (f'Values of { self .field_with_list } field should be list type only: { data_entry } ' )
1117+
1118+ # Remove the list field from the entry and get the list of items
1119+ items_list = data_entry .pop (self .field_with_list )
1120+
1121+ # If items are not dicts, output_field must be specified to store the item
1122+ if not isinstance (items_list [0 ], dict ) and not self .output_field :
1123+ raise ValueError (f'Type of items in items list `{ self .field_with_list } ` is not dict ({ type (items_list [0 ])} ). In this case `output_field` should be provided.' )
1124+
1125+ # Expand the list into multiple entries
1126+ for item in items_list :
1127+ _entry = data_entry .copy ()
1128+
1129+ # If item is a dict, merge its keys; otherwise, store it in `output_field`
1130+ if isinstance (item , dict ):
1131+ _entry .update (item )
1132+ else :
1133+ _entry [self .output_field ] = item
1134+
1135+ _entry = DataEntry (_entry )
1136+ _entries .append (_entry )
1137+
1138+ return _entries
1139+
1140+
10091141class LambdaExpression (BaseParallelProcessor ):
10101142 """
10111143 A dataset processor that evaluates a Python expression on each data entry and either stores
0 commit comments