@@ -304,10 +304,11 @@ def to_yaml(self) -> str:
304304 "version" : self .version ,
305305 }
306306
307- if self .docker_image :
308- data ["docker_image" ] = self .docker_image
309- if self .docker_platform :
310- data ["docker_platform" ] = self .docker_platform
307+ # Docker image and platform are hardcoded in workflow generation, not exposed in YAML
308+ # if self.docker_image:
309+ # data["docker_image"] = self.docker_image
310+ # if self.docker_platform:
311+ # data["docker_platform"] = self.docker_platform
311312
312313 if self .assets :
313314 data ["assets" ] = self .assets
@@ -440,6 +441,113 @@ def set_docker_image(
440441 self .docker_platform = platform
441442 return self
442443
444+ def _generate_participant_workflow_nf (self , entrypoint : Optional [str ] = None ) -> str :
445+ """Generate workflow for List[GenotypeRecord] with participant iteration and aggregation."""
446+
447+ primary_process = self .processes [0 ]
448+ container_image = primary_process .container or self .docker_image or _default_docker_image ()
449+ workflow_script_asset = entrypoint or self ._entrypoint or primary_process .script
450+
451+ # Determine output pattern from outputs
452+ individual_pattern = None
453+ aggregated_path = None
454+ classifier_name = None
455+
456+ for output_spec in self .outputs :
457+ if output_spec .path :
458+ if "{participant_id}" in output_spec .path :
459+ individual_pattern = output_spec .path .replace ("{participant_id}" , "*" )
460+ else :
461+ aggregated_path = output_spec .path
462+ # Extract classifier name from aggregated path (e.g., result_HERC2.tsv -> HERC2)
463+ if aggregated_path .startswith ("result_" ) and aggregated_path .endswith (".tsv" ):
464+ classifier_name = aggregated_path [7 :- 4 ] # Remove "result_" and ".tsv"
465+
466+ if not classifier_name :
467+ classifier_name = self .name .upper ().replace ("-" , "_" ).replace (" " , "_" )
468+
469+ if not individual_pattern :
470+ individual_pattern = f"result_{ classifier_name } _*.tsv"
471+ if not aggregated_path :
472+ aggregated_path = f"result_{ classifier_name } .tsv"
473+
474+ # Generate workflow
475+ workflow = f'''nextflow.enable.dsl=2
476+
477+ workflow USER {{
478+ take:
479+ context
480+ participants // Channel emitting GenotypeRecord maps
481+
482+ main:
483+ def assetsDir = file(context.params.assets_dir)
484+ def workflowScript = file("${{assetsDir}}/{ workflow_script_asset } ")
485+
486+ // Extract (participant_id, genotype_file) tuples from the records channel
487+ def participant_tuples = participants.map {{ record ->
488+ tuple(
489+ record.participant_id,
490+ file(record.genotype_file)
491+ )
492+ }}
493+
494+ // Process each participant
495+ def per_participant_results = { primary_process .name } (
496+ workflowScript,
497+ participant_tuples
498+ )
499+
500+ // Aggregate all results into single file
501+ def aggregated = aggregate_results(
502+ per_participant_results.collect()
503+ )
504+
505+ emit:
506+ { self .outputs [0 ].name if self .outputs else "classification_result" } = aggregated
507+ }}
508+
509+ process { primary_process .name } {{
510+ container '{ container_image } '
511+ publishDir params.results_dir, mode: 'copy', overwrite: true, pattern: '{ individual_pattern } '
512+ tag {{ participant_id }}
513+
514+ input:
515+ path script
516+ tuple val(participant_id), path(genotype_file)
517+
518+ output:
519+ path "result_{ classifier_name } _${{participant_id}}.tsv"
520+
521+ script:
522+ """
523+ bioscript classify "${{script}}" --file "${{genotype_file}}" --participant_id "${{participant_id}}"
524+ """
525+ }}
526+
527+ process aggregate_results {{
528+ container '{ container_image } '
529+ publishDir params.results_dir, mode: 'copy', overwrite: true
530+
531+ input:
532+ path individual_results
533+
534+ output:
535+ path "{ aggregated_path } "
536+
537+ script:
538+ """
539+ # Extract header from first file
540+ head -n 1 ${{individual_results[0]}} > { aggregated_path }
541+
542+ # Append all data rows (skip headers)
543+ for file in ${{individual_results}}; do
544+ tail -n +2 "\\ $file" >> { aggregated_path }
545+ done
546+ """
547+ }}
548+ '''
549+ return workflow
550+
443551 def generate_workflow_nf (self , entrypoint : Optional [str ] = None ) -> str :
444552 """Generate a Nextflow workflow file for this workflow."""
445553
@@ -463,6 +571,14 @@ def generate_workflow_nf(self, entrypoint: Optional[str] = None) -> str:
463571 if not self ._entrypoint :
464572 self ._entrypoint = script_candidate
465573
574+ # Check if using List[GenotypeRecord] - requires different workflow pattern
575+ uses_genotype_list = any (
576+ inp .type .startswith ("List[GenotypeRecord" ) for inp in self .inputs
577+ )
578+
579+ if uses_genotype_list :
580+ return self ._generate_participant_workflow_nf (entrypoint )
581+
466582 if len (self .processes ) > 1 :
467583 raise NotImplementedError ("Multiple processes per workflow are not supported yet" )
468584
0 commit comments