|
17 | 17 | and interpretation on Amazon SageMaker. |
18 | 18 | """ |
19 | 19 | from __future__ import absolute_import |
20 | | - |
| 20 | +import json |
21 | 21 | import logging |
22 | 22 | import os |
23 | 23 | import pathlib |
|
60 | 60 | ) |
61 | 61 | from sagemaker.workflow import is_pipeline_variable |
62 | 62 | from sagemaker.workflow.entities import PipelineVariable |
63 | | -from sagemaker.workflow.execution_variables import ExecutionVariable, ExecutionVariables |
| 63 | +from sagemaker.workflow.execution_variables import ExecutionVariables |
64 | 64 | from sagemaker.workflow.functions import Join |
65 | 65 | from sagemaker.workflow.pipeline_context import runnable_by_pipeline |
66 | | -from sagemaker.workflow.parameters import Parameter |
67 | 66 |
|
68 | 67 | logger = logging.getLogger(__name__) |
69 | 68 |
|
@@ -316,14 +315,14 @@ def _normalize_args( |
316 | 315 | + "rather than a pipeline variable" |
317 | 316 | ) |
318 | 317 | if arguments is not None: |
319 | | - normalized_arguments = [] |
| 318 | + processed_arguments = [] |
320 | 319 | for arg in arguments: |
321 | 320 | if isinstance(arg, PipelineVariable): |
322 | | - normalized_value = self._normalize_pipeline_variable(arg) |
323 | | - normalized_arguments.append(normalized_value) |
| 321 | + processed_value = json.dumps(arg.expr) |
| 322 | + processed_arguments.append(processed_value) |
324 | 323 | else: |
325 | | - normalized_arguments.append(str(arg)) |
326 | | - arguments = normalized_arguments |
| 324 | + processed_arguments.append(str(arg)) |
| 325 | + arguments = processed_arguments |
327 | 326 |
|
328 | 327 | self._current_job_name = self._generate_current_job_name(job_name=job_name) |
329 | 328 |
|
@@ -509,37 +508,6 @@ def _normalize_outputs(self, outputs=None): |
509 | 508 | normalized_outputs.append(output) |
510 | 509 | return normalized_outputs |
511 | 510 |
|
512 | | - def _normalize_pipeline_variable(self, value): |
513 | | - """Helper function to normalize PipelineVariable objects""" |
514 | | - try: |
515 | | - if isinstance(value, Parameter): |
516 | | - return str(value.default_value) if value.default_value is not None else None |
517 | | - |
518 | | - elif isinstance(value, ExecutionVariable): |
519 | | - return f"{value.name}" |
520 | | - |
521 | | - elif isinstance(value, Join): |
522 | | - normalized_values = [ |
523 | | - normalize_pipeline_variable(v) if isinstance(v, PipelineVariable) else str(v) |
524 | | - for v in value.values |
525 | | - ] |
526 | | - return value.on.join(normalized_values) |
527 | | - |
528 | | - elif isinstance(value, PipelineVariable): |
529 | | - if hasattr(value, 'default_value'): |
530 | | - return str(value.default_value) |
531 | | - elif hasattr(value, 'expr'): |
532 | | - return str(value.expr) |
533 | | - |
534 | | - return str(value) |
535 | | - |
536 | | - except AttributeError as e: |
537 | | - raise ValueError(f"Missing required attribute while normalizing {type(value).__name__}: {e}") |
538 | | - except TypeError as e: |
539 | | - raise ValueError(f"Type error while normalizing {type(value).__name__}: {e}") |
540 | | - except Exception as e: |
541 | | - raise ValueError(f"Error normalizing {type(value).__name__}: {e}") |
542 | | - |
543 | 511 |
|
544 | 512 | class ScriptProcessor(Processor): |
545 | 513 | """Handles Amazon SageMaker processing tasks for jobs using a machine learning framework.""" |
|
0 commit comments