Skip to content

Commit b605d0a

Browse files
authored
Yancao/add path on compute (#34377)
* add path_on_compute * fix parsing error * output parsing error * fix blank * fix import inputoutputbase * add literal input convert * fix comments * fix conflict * fix conflict * Update output.py
1 parent 0c2f524 commit b605d0a

File tree

4 files changed

+45
-4
lines changed

4 files changed

+45
-4
lines changed

sdk/ml/azure-ai-ml/azure/ai/ml/_schema/job/input_output_entry.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,15 @@ def generate_path_property(azureml_type):
5959
)
6060

6161

62+
def generate_path_on_compute_property(azureml_type):
63+
return UnionField(
64+
[
65+
LocalPathField(pattern=r"^file:.*"),
66+
],
67+
is_strict=True,
68+
)
69+
70+
6271
def generate_datastore_property():
6372
metadata = {
6473
"description": "Name of the datastore to upload local paths to.",
@@ -103,6 +112,7 @@ class DataInputSchema(InputSchema):
103112
]
104113
)
105114
path = generate_path_property(azureml_type=AzureMLResourceType.DATA)
115+
path_on_compute = generate_path_on_compute_property(azureml_type=AzureMLResourceType.DATA)
106116
datastore = generate_datastore_property()
107117

108118

@@ -119,6 +129,7 @@ class MLTableInputSchema(InputSchema):
119129
)
120130
type = StringTransformedEnum(allowed_values=[AssetTypes.MLTABLE])
121131
path = generate_path_property(azureml_type=AzureMLResourceType.DATA)
132+
path_on_compute = generate_path_on_compute_property(azureml_type=AzureMLResourceType.DATA)
122133
datastore = generate_datastore_property()
123134

124135

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_inputs_outputs/input.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ class Input(_InputOutputBase): # pylint: disable=too-many-instance-attributes
4040
* 'download': Download the data to the compute target,
4141
* 'direct': Pass in the URI as a string to be accessed at runtime
4242
:paramtype mode: Optional[str]
43+
:keyword path_on_compute: The access path of the data input for compute
44+
:paramtype mode: Optional[str]
4345
:keyword default: The default value of the input. If a default is set, the input data will be optional.
4446
:paramtype default: Union[str, int, float, bool]
4547
:keyword min: The minimum value for the input. If a value smaller than the minimum is passed to the job, the job
@@ -70,7 +72,19 @@ class Input(_InputOutputBase): # pylint: disable=too-many-instance-attributes
7072
"""
7173

7274
_EMPTY = Parameter.empty
73-
_IO_KEYS = ["path", "type", "mode", "description", "default", "min", "max", "enum", "optional", "datastore"]
75+
_IO_KEYS = [
76+
"path",
77+
"type",
78+
"mode",
79+
"path_on_compute",
80+
"description",
81+
"default",
82+
"min",
83+
"max",
84+
"enum",
85+
"optional",
86+
"datastore",
87+
]
7488

7589
@overload
7690
def __init__(
@@ -205,6 +219,7 @@ def __init__(
205219
type: str = "uri_folder",
206220
path: Optional[str] = None,
207221
mode: Optional[str] = None,
222+
path_on_compute: Optional[str] = None,
208223
default: Optional[Union[str, int, float, bool]] = None,
209224
optional: Optional[bool] = None,
210225
min: Optional[Union[int, float]] = None,
@@ -226,6 +241,7 @@ def __init__(
226241
self.path = str(path)
227242
else:
228243
self.path = path
244+
self.path_on_compute = path_on_compute
229245
self.mode = None if self._is_primitive_type else mode
230246
self._update_default(default)
231247
self.optional = optional

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_inputs_outputs/output.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919

2020
class Output(_InputOutputBase):
21-
_IO_KEYS = ["name", "version", "path", "type", "mode", "description", "early_available"]
21+
_IO_KEYS = ["name", "version", "path", "path_on_compute", "type", "mode", "description", "early_available"]
2222

2323
@overload
2424
def __init__(
@@ -82,6 +82,8 @@ def __init__( # type: ignore[misc]
8282
* 'upload': Upload the data from the compute target
8383
* 'direct': Pass in the URI as a string
8484
:paramtype mode: Optional[str]
85+
:keyword path_on_compute: The access path of the data output for compute
86+
:paramtype mode: Optional[str]
8587
:keyword description: The description of the output.
8688
:paramtype description: Optional[str]
8789
:keyword name: The name to be used to register the output as a Data or Model asset. A name can be set without
@@ -116,6 +118,7 @@ def __init__( # type: ignore[misc]
116118
self._is_primitive_type = self.type in IOConstants.PRIMITIVE_STR_2_TYPE
117119
self.description = description
118120
self.path = path
121+
self.path_on_compute = kwargs.pop("path_on_compute", None)
119122
self.mode = mode
120123
# use this field to mark Output for early node orchestrate, currently hide in kwargs
121124
self.early_available = kwargs.pop("early_available", None)

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,8 @@ def to_rest_dataset_literal_inputs(
219219
# set mode attribute manually for binding job input
220220
if input_value.mode:
221221
input_data.mode = INPUT_MOUNT_MAPPING_TO_REST[input_value.mode]
222+
if input_value.path_on_compute:
223+
input_data.pathOnCompute = input_value.path_on_compute
222224
input_data.job_input_type = JobInputType.LITERAL
223225
else:
224226
target_cls_dict = get_input_rest_cls_dict()
@@ -282,11 +284,15 @@ def from_rest_inputs_to_dataset_literal(inputs: Dict[str, RestJobInput]) -> Dict
282284
if input_value.job_input_type in type_transfer_dict:
283285
if input_value.uri:
284286
path = input_value.uri
285-
287+
if getattr(input_value, "pathOnCompute", None) is not None:
288+
sourcePathOnCompute = input_value.pathOnCompute
289+
else:
290+
sourcePathOnCompute = None
286291
input_data = Input(
287292
type=type_transfer_dict[input_value.job_input_type],
288293
path=path,
289294
mode=INPUT_MOUNT_MAPPING_FROM_REST[input_value.mode] if input_value.mode else None,
295+
path_on_compute=sourcePathOnCompute,
290296
)
291297
elif input_value.job_input_type in (JobInputType.LITERAL, JobInputType.LITERAL):
292298
# otherwise, the input is a literal, so just unpack the InputData value field
@@ -331,6 +337,7 @@ def to_rest_data_outputs(outputs: Optional[Dict]) -> Dict[str, RestJobOutput]:
331337
asset_version=output_value.version,
332338
uri=output_value.path,
333339
mode=OUTPUT_MOUNT_MAPPING_TO_REST[output_value.mode.lower()] if output_value.mode else None,
340+
pathOnCompute=output_value.path_on_compute if output_value.path_on_compute else None,
334341
description=output_value.description,
335342
)
336343
else:
@@ -364,12 +371,16 @@ def from_rest_data_outputs(outputs: Dict[str, RestJobOutput]) -> Dict[str, Outpu
364371
# deal with invalid output type submitted by feb api
365372
# todo: backend help convert node level input/output type
366373
normalize_job_input_output_type(output_value)
367-
374+
if getattr(output_value, "pathOnCompute", None) is not None:
375+
sourcePathOnCompute = output_value.pathOnCompute
376+
else:
377+
sourcePathOnCompute = None
368378
if output_value.job_output_type in output_type_mapping:
369379
from_rest_outputs[output_name] = Output(
370380
type=output_type_mapping[output_value.job_output_type],
371381
path=output_value.uri,
372382
mode=OUTPUT_MOUNT_MAPPING_FROM_REST[output_value.mode] if output_value.mode else None,
383+
path_on_compute=sourcePathOnCompute,
373384
description=output_value.description,
374385
name=output_value.asset_name,
375386
version=output_value.asset_version,

0 commit comments

Comments
 (0)