@@ -160,9 +160,9 @@ class TextSemanticDeduplicationWorkflow:
160160 input_blocksize: Blocksize for reading files
161161 input_filetype: Type of input files ("jsonl" or "parquet")
162162 input_file_extensions: List of file extensions to process
163- output_filetype: Type of output files ("jsonl" or "parquet")
164- output_file_extension: File extension for output files (None for default)
165- output_fields: List of fields to include in final output (None for all fields)
163+ output_filetype: Type of deduplicated output files ("jsonl" or "parquet")
164+ output_file_extension: File extension for deduplicated output files (None for default)
165+ output_fields: List of fields to include in final deduplicated output (None for all fields)
166166 read_kwargs: Keyword arguments for reading files
167167 cache_kwargs: Keyword arguments for cache operations and storage
168168 write_kwargs: Keyword arguments for writing files
@@ -282,16 +282,11 @@ def _run_embedding_generation(self, executor: BaseExecutor) -> list[Any]:
282282 pipeline .add_stage (embedding_stage )
283283
284284 # Writer stage
285- if self .output_filetype == "parquet" :
286- writer = ParquetWriter (
287- path = self .embeddings_path ,
288- fields = [self .id_field , self .embedding_field ] + (self .metadata_fields or []),
289- write_kwargs = self .cache_kwargs ,
290- )
291- else :
292- msg = f"Output filetype { self .output_filetype } not supported yet"
293- raise NotImplementedError (msg )
294-
285+ writer = ParquetWriter (
286+ path = self .embeddings_path ,
287+ fields = [self .id_field , self .embedding_field ] + (self .metadata_fields or []),
288+ write_kwargs = self .cache_kwargs ,
289+ )
295290 pipeline .add_stage (writer )
296291
297292 return pipeline .run (executor )
0 commit comments