@@ -8431,3 +8431,96 @@ def test_run_workflow_use_cached_job_format_source_pick_param(self):
84318431 ).strip ()
84328432 == "2"
84338433 )
8434+
8435+ def test_run_workflow_use_cached_job_implicit_conversion_send_to_new_history (self ):
8436+ wf = """class: GalaxyWorkflow
8437+ inputs:
8438+ fastq_input:
8439+ type: data
8440+ steps:
8441+ grep:
8442+ # Grep1 requires fastqsanger, so fastqsanger.gz will be implicitly converted
8443+ tool_id: Grep1
8444+ in:
8445+ input: fastq_input
8446+ """
8447+ with self .dataset_populator .test_history () as history_id :
8448+ # Create a fastqsanger.gz dataset
8449+ compressed_path = self .test_data_resolver .get_filename ("1.fastqsanger.gz" )
8450+ with open (compressed_path , "rb" ) as fh :
8451+ dataset = self .dataset_populator .new_dataset (
8452+ history_id , content = fh , file_type = "fastqsanger.gz" , wait = True
8453+ )
8454+
8455+ # Upload workflow
8456+ workflow_id = self .workflow_populator .upload_yaml_workflow (wf )
8457+
8458+ # Run workflow first time
8459+ workflow_request : Dict [str , Any ] = {
8460+ "inputs" : json .dumps ({"fastq_input" : self ._ds_entry (dataset )}),
8461+ "history" : f"hist_id={ history_id } " ,
8462+ "inputs_by" : "name" ,
8463+ }
8464+ first_invocation_summary = self .workflow_populator .invoke_workflow_and_wait (
8465+ workflow_id , request = workflow_request
8466+ ).json ()
8467+ self .workflow_populator .wait_for_invocation_and_jobs (
8468+ history_id = first_invocation_summary ["history_id" ],
8469+ workflow_id = workflow_id ,
8470+ invocation_id = first_invocation_summary ["id" ],
8471+ assert_ok = True ,
8472+ )
8473+ first_invocation = self .workflow_populator .get_invocation (first_invocation_summary ["id" ], step_details = True )
8474+ first_job_id = first_invocation ["steps" ][1 ]["jobs" ][0 ]["id" ]
8475+ first_job_details = self .dataset_populator .get_job_details (first_job_id , full = True ).json ()
8476+ assert first_job_details ["state" ] == "ok"
8477+ assert not first_job_details ["copied_from_job_id" ]
8478+
8479+ # Verify implicit conversion happened (input to Grep1 should be fastqsanger, not fastqsanger.gz)
8480+ grep_input_id = first_job_details ["inputs" ]["input" ]["id" ]
8481+ grep_input = self .dataset_populator .get_history_dataset_details (
8482+ history_id = first_job_details ["history_id" ], content_id = grep_input_id
8483+ )
8484+ assert grep_input ["extension" ] == "fastqsanger" , "Expected implicit conversion to fastqsanger"
8485+ assert grep_input_id != dataset ["id" ], "Input should be implicitly converted dataset"
8486+
8487+ # Run workflow second time with use_cached_job and new_history_name
8488+ # Remove history parameter since we're specifying new_history_name
8489+ workflow_request .pop ("history" , None )
8490+ workflow_request ["use_cached_job" ] = True
8491+ workflow_request ["new_history_name" ] = self .dataset_populator .get_random_name ()
8492+ second_invocation_response = self .workflow_populator .invoke_workflow (workflow_id , request = workflow_request )
8493+ second_invocation_summary = second_invocation_response .json ()
8494+ second_history_id = second_invocation_summary ["history_id" ]
8495+ # Wait for the workflow to complete
8496+ self .workflow_populator .wait_for_invocation_and_jobs (
8497+ history_id = second_history_id ,
8498+ workflow_id = workflow_id ,
8499+ invocation_id = second_invocation_summary ["id" ],
8500+ assert_ok = True ,
8501+ )
8502+ second_invocation = self .workflow_populator .get_invocation (
8503+ second_invocation_summary ["id" ], step_details = True
8504+ )
8505+ second_job_id = second_invocation ["steps" ][1 ]["jobs" ][0 ]["id" ]
8506+ second_job_details = self .dataset_populator .get_job_details (second_job_id , full = True ).json ()
8507+
8508+ # Verify job was cached
8509+ assert second_job_details ["state" ] == "ok"
8510+ assert second_job_details ["copied_from_job_id" ] == first_job_id , "Second job should be cached from first"
8511+
8512+ # Verify the second invocation is in a different history
8513+ assert (
8514+ second_job_details ["history_id" ] != first_job_details ["history_id" ]
8515+ ), "Second invocation should be in a new history"
8516+
8517+ # Verify implicit conversion dataset was copied to the new history
8518+ cached_grep_input_id = second_job_details ["inputs" ]["input" ]["id" ]
8519+ cached_grep_input = self .dataset_populator .get_history_dataset_details (
8520+ history_id = second_job_details ["history_id" ], content_id = cached_grep_input_id
8521+ )
8522+ assert cached_grep_input ["extension" ] == "fastqsanger"
8523+ # The implicitly converted dataset should have a different HDA ID but same underlying dataset
8524+ assert (
8525+ cached_grep_input_id != grep_input_id
8526+ ), "Cached run should have copied the implicitly converted dataset to the new history"
0 commit comments