1
1
import json
2
+ import os .path
2
3
import re
3
4
import time
4
5
from datetime import datetime
5
6
from pathlib import Path
6
- from typing import Any , Callable , Dict , Optional
7
+ from typing import Any , Callable , Dict
7
8
8
- from devtools_testutils import AzureRecordedTestCase , is_live , set_bodiless_matcher
9
+ from devtools_testutils import AzureRecordedTestCase , set_bodiless_matcher
9
10
import pydash
10
11
import pytest
11
12
from marshmallow import ValidationError
27
28
from azure .core .exceptions import HttpResponseError , ResourceNotFoundError
28
29
from azure .core .polling import LROPoller
29
30
30
- from .._util import _PIPELINE_JOB_TIMEOUT_SECOND , DATABINDING_EXPRESSION_TEST_CASES
31
+ from .._util import _PIPELINE_JOB_TIMEOUT_SECOND , DATABINDING_EXPRESSION_TEST_CASES , \
32
+ DATABINDING_EXPRESSION_TEST_CASE_ENUMERATE
31
33
32
34
33
35
def assert_job_input_output_types (job : PipelineJob ):
@@ -188,35 +190,28 @@ def test_pipeline_job_get_child_run(self, client: MLClient, generate_weekly_fixe
188
190
assert isinstance (retrieved_child_run , Job )
189
191
assert retrieved_child_run .name == child_job .name
190
192
191
- @pytest .mark .skipif (condition = not is_live (), reason = "Recording file names are too long and need to be shortened" )
192
193
@pytest .mark .parametrize (
193
- "pipeline_job_path, expected_error_type " ,
194
+ "pipeline_job_path" ,
194
195
[
195
196
# flaky parameterization
196
- # ("./tests/test_configs/pipeline_jobs/invalid/non_existent_remote_component.yml", Exception),
197
- (
198
- "tests/test_configs/pipeline_jobs/invalid/non_existent_remote_version.yml" ,
199
- Exception ,
200
- ),
201
- (
202
- "tests/test_configs/pipeline_jobs/invalid/non_existent_compute.yml" ,
203
- Exception ,
204
- ),
197
+ # "non_existent_remote_component.yml",
198
+ "non_existent_remote_version.yml" ,
199
+ "non_existent_compute.yml" ,
205
200
],
206
201
)
207
202
def test_pipeline_job_validation_remote (
208
203
self ,
209
204
client : MLClient ,
210
205
randstr : Callable [[str ], str ],
211
206
pipeline_job_path : str ,
212
- expected_error_type ,
213
207
) -> None :
208
+ base_dir = "./tests/test_configs/pipeline_jobs/invalid/"
214
209
pipeline_job : PipelineJob = load_job (
215
- source = pipeline_job_path ,
210
+ source = os . path . join ( base_dir , pipeline_job_path ) ,
216
211
params_override = [{"name" : randstr ("name" )}],
217
212
)
218
213
with pytest .raises (
219
- expected_error_type ,
214
+ Exception ,
220
215
# hide this as server side error message is not consistent
221
216
# match=str(expected_error),
222
217
):
@@ -415,10 +410,22 @@ def test_pipeline_job_default_datastore_compute(self, client: MLClient, randstr:
415
410
else :
416
411
assert job .compute in pipeline_job .jobs [job_name ].compute
417
412
418
- @pytest .mark .skipif (condition = not is_live (), reason = "Recording file names are too long and need to be shortened" )
419
413
@pytest .mark .parametrize (
420
- "pipeline_job_path, converted_jobs, expected_dict, fields_to_omit " ,
414
+ "test_case_i,test_case_name " ,
421
415
[
416
+ # TODO: enable this after identity support released to canary
417
+ # (0, "helloworld_pipeline_job_with_component_output"),
418
+ (1 , "helloworld_pipeline_job_with_paths" ),
419
+ ]
420
+ )
421
+ def test_pipeline_job_with_command_job (
422
+ self ,
423
+ client : MLClient ,
424
+ randstr : Callable [[str ], str ],
425
+ test_case_i ,
426
+ test_case_name ,
427
+ ) -> None :
428
+ params = [
422
429
(
423
430
"tests/test_configs/pipeline_jobs/helloworld_pipeline_job_defaults_with_command_job_e2e.yml" ,
424
431
2 ,
@@ -587,17 +594,9 @@ def test_pipeline_job_default_datastore_compute(self, client: MLClient, randstr:
587
594
"source_job_id" ,
588
595
],
589
596
),
590
- ],
591
- )
592
- def test_pipeline_job_with_command_job (
593
- self ,
594
- client : MLClient ,
595
- randstr : Callable [[str ], str ],
596
- pipeline_job_path : str ,
597
- converted_jobs ,
598
- expected_dict ,
599
- fields_to_omit ,
600
- ) -> None :
597
+ ]
598
+ pipeline_job_path , converted_jobs , expected_dict , fields_to_omit = params [test_case_i ]
599
+
601
600
params_override = [{"name" : randstr ("name" )}]
602
601
pipeline_job = load_job (
603
602
source = pipeline_job_path ,
@@ -616,21 +615,21 @@ def test_pipeline_job_with_command_job(
616
615
actual_dict = pydash .omit (pipeline_dict ["properties" ], * fields_to_omit )
617
616
assert actual_dict == expected_dict
618
617
619
- @pytest .mark .skipif (condition = not is_live (), reason = "Recording file names are too long and need to be shortened" )
620
618
@pytest .mark .parametrize (
621
619
"pipeline_job_path" ,
622
620
[
623
- "tests/test_configs/pipeline_jobs/helloworld_pipeline_job_defaults_with_parallel_job_file_component_input_e2e .yml" ,
624
- "tests/test_configs/pipeline_jobs/helloworld_pipeline_job_defaults_with_parallel_job_file_input_e2e .yml" ,
625
- "tests/test_configs/pipeline_jobs/helloworld_pipeline_job_defaults_with_parallel_job_tabular_input_e2e .yml" ,
621
+ "file_component_input_e2e .yml" ,
622
+ "file_input_e2e .yml" ,
623
+ "tabular_input_e2e .yml" ,
626
624
],
627
625
)
628
626
def test_pipeline_job_with_parallel_job (
629
627
self , client : MLClient , randstr : Callable [[str ], str ], pipeline_job_path : str
630
628
) -> None :
629
+ base_file_name = "./tests/test_configs/pipeline_jobs/helloworld_pipeline_job_defaults_with_parallel_job_"
631
630
params_override = [{"name" : randstr ("name" )}]
632
631
pipeline_job = load_job (
633
- source = pipeline_job_path ,
632
+ source = base_file_name + pipeline_job_path ,
634
633
params_override = params_override ,
635
634
)
636
635
created_job = client .jobs .create_or_update (pipeline_job )
@@ -942,18 +941,19 @@ def test_pipeline_job_with_sweep_node_early_termination_policy(
942
941
created_pipeline_dict = created_pipeline ._to_dict ()
943
942
assert pydash .get (created_pipeline_dict , "jobs.hello_sweep_inline_trial.early_termination" ) == policy_yaml_dict
944
943
945
- @pytest .mark .skipif (condition = not is_live (), reason = "Recording file names are too long and need to be shortened" )
946
944
@pytest .mark .parametrize (
947
- "pipeline_job_path, expected_error " ,
948
- DATABINDING_EXPRESSION_TEST_CASES ,
945
+ "test_case_i, test_case_name " ,
946
+ DATABINDING_EXPRESSION_TEST_CASE_ENUMERATE ,
949
947
)
950
948
def test_pipeline_job_with_data_binding_expression (
951
949
self ,
952
950
client : MLClient ,
953
951
randstr : Callable [[str ], str ],
954
- pipeline_job_path : str ,
955
- expected_error : Optional [ Exception ] ,
952
+ test_case_i : int ,
953
+ test_case_name : str ,
956
954
):
955
+ pipeline_job_path , expected_error = DATABINDING_EXPRESSION_TEST_CASES [test_case_i ]
956
+
957
957
pipeline : PipelineJob = load_job (source = pipeline_job_path , params_override = [{"name" : randstr ("name" )}])
958
958
if expected_error is None :
959
959
assert_job_cancel (pipeline , client )
0 commit comments