Skip to content

Commit f9d487c

Browse files
authored
fix(sdk): Add SDK support for setting resource limits on older KFP versions (kubeflow#11839)
For context, the commit 70aaf8a removed support for the old fields (without a resource_ prefix). This was added back in commit 6ebf4aa but done in a way that broke any usage of pipeline input parameters but was to support the current KFP backend which did not yet support the new fields. In commit 7c931ae, the old fields were removed again but added support for the new field in KFP backend. This commit addresses the case where a user is using a new SDK but with a KFP backend prior to 2.4. Signed-off-by: mprahl <[email protected]>
1 parent 05581cc commit f9d487c

File tree

4 files changed

+160
-9
lines changed

4 files changed

+160
-9
lines changed

sdk/python/kfp/compiler/compiler_test.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3569,29 +3569,82 @@ def simple_pipeline():
35693569
self.assertEqual(
35703570
'5', dict_format['deploymentSpec']['executors']['exec-return-1-2']
35713571
['container']['resources']['resourceCpuLimit'])
3572+
self.assertEqual(
3573+
5.0, dict_format['deploymentSpec']['executors']['exec-return-1-2']
3574+
['container']['resources']['cpuLimit'])
35723575
self.assertNotIn(
35733576
'memoryLimit', dict_format['deploymentSpec']['executors']
35743577
['exec-return-1-2']['container']['resources'])
35753578

35763579
self.assertEqual(
35773580
'50G', dict_format['deploymentSpec']['executors']['exec-return-1-3']
35783581
['container']['resources']['resourceMemoryLimit'])
3582+
self.assertEqual(
3583+
50.0, dict_format['deploymentSpec']['executors']['exec-return-1-3']
3584+
['container']['resources']['memoryLimit'])
35793585
self.assertNotIn(
35803586
'cpuLimit', dict_format['deploymentSpec']['executors']
35813587
['exec-return-1-3']['container']['resources'])
35823588

35833589
self.assertEqual(
35843590
'2', dict_format['deploymentSpec']['executors']['exec-return-1-4']
35853591
['container']['resources']['resourceCpuRequest'])
3592+
self.assertEqual(
3593+
2.0, dict_format['deploymentSpec']['executors']['exec-return-1-4']
3594+
['container']['resources']['cpuRequest'])
35863595
self.assertEqual(
35873596
'5', dict_format['deploymentSpec']['executors']['exec-return-1-4']
35883597
['container']['resources']['resourceCpuLimit'])
3598+
self.assertEqual(
3599+
5.0, dict_format['deploymentSpec']['executors']['exec-return-1-4']
3600+
['container']['resources']['cpuLimit'])
35893601
self.assertEqual(
35903602
'4G', dict_format['deploymentSpec']['executors']['exec-return-1-4']
35913603
['container']['resources']['resourceMemoryRequest'])
3604+
self.assertEqual(
3605+
4.0, dict_format['deploymentSpec']['executors']['exec-return-1-4']
3606+
['container']['resources']['memoryRequest'])
35923607
self.assertEqual(
35933608
'50G', dict_format['deploymentSpec']['executors']['exec-return-1-4']
35943609
['container']['resources']['resourceMemoryLimit'])
3610+
self.assertEqual(
3611+
50.0, dict_format['deploymentSpec']['executors']['exec-return-1-4']
3612+
['container']['resources']['memoryLimit'])
3613+
3614+
def test_cpu_memory_input_parameter(self):
3615+
3616+
@dsl.pipeline
3617+
def simple_pipeline(
3618+
cpu_request: str,
3619+
cpu_limt: str,
3620+
memory_request: str,
3621+
memory_limit: str,
3622+
ac_type: str,
3623+
ac_count: int,
3624+
):
3625+
return_1().set_cpu_request(cpu_request)\
3626+
.set_cpu_limit(cpu_limt)\
3627+
.set_memory_request(memory_request)\
3628+
.set_memory_limit(memory_limit)\
3629+
.set_accelerator_limit(ac_count)\
3630+
.set_accelerator_type(ac_type)
3631+
3632+
dict_format = json_format.MessageToDict(simple_pipeline.pipeline_spec)
3633+
resources = dict_format['deploymentSpec']['executors']['exec-return-1'][
3634+
'container']['resources']
3635+
3636+
self.assertIn('resourceCpuRequest', resources)
3637+
self.assertNotIn('cpuRequest', resources)
3638+
self.assertIn('resourceCpuLimit', resources)
3639+
self.assertNotIn('cpuLimit', resources)
3640+
self.assertIn('resourceMemoryRequest', resources)
3641+
self.assertNotIn('memoryRequest', resources)
3642+
self.assertIn('resourceMemoryLimit', resources)
3643+
self.assertNotIn('memoryLimit', resources)
3644+
self.assertIn('resourceType', resources['accelerator'])
3645+
self.assertNotIn('type', resources['accelerator'])
3646+
self.assertIn('resourceCount', resources['accelerator'])
3647+
self.assertNotIn('count', resources['accelerator'])
35953648

35963649

35973650
class TestPlatformConfig(unittest.TestCase):

sdk/python/kfp/compiler/compiler_utils.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -804,3 +804,60 @@ def recursive_replace_placeholders(data: Union[Dict, List], old_value: str,
804804
if isinstance(data, pipeline_channel.PipelineChannel):
805805
data = str(data)
806806
return new_value if data == old_value else data
807+
808+
809+
# Note that cpu_to_float assumes the string has already been validated by the _validate_cpu_request_limit method.
810+
def _cpu_to_float(cpu: str) -> float:
811+
"""Converts the validated CPU request/limit string and to its numeric float
812+
value.
813+
814+
Args:
815+
cpu: CPU requests or limits. This string should be a number or a
816+
number followed by an "m" to indicate millicores (1/1000). For
817+
more information, see `Specify a CPU Request and a CPU Limit
818+
Returns:
819+
The numeric value (float) of the cpu request/limit.
820+
"""
821+
return float(cpu[:-1]) / 1000 if cpu.endswith('m') else float(cpu)
822+
823+
824+
# Note that memory_to_float assumes the string has already been validated by the _validate_memory_request_limit method.
825+
def _memory_to_float(memory: str) -> float:
826+
"""Converts the validated memory request/limit string to its numeric value.
827+
828+
Args:
829+
memory: Memory requests or limits. This string should be a number or
830+
a number followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G",
831+
"Gi", "M", "Mi", "K", or "Ki".
832+
Returns:
833+
The numeric value (float) of the memory request/limit.
834+
"""
835+
if memory.endswith('E'):
836+
memory = float(memory[:-1]) * constants._E / constants._G
837+
elif memory.endswith('Ei'):
838+
memory = float(memory[:-2]) * constants._EI / constants._G
839+
elif memory.endswith('P'):
840+
memory = float(memory[:-1]) * constants._P / constants._G
841+
elif memory.endswith('Pi'):
842+
memory = float(memory[:-2]) * constants._PI / constants._G
843+
elif memory.endswith('T'):
844+
memory = float(memory[:-1]) * constants._T / constants._G
845+
elif memory.endswith('Ti'):
846+
memory = float(memory[:-2]) * constants._TI / constants._G
847+
elif memory.endswith('G'):
848+
memory = float(memory[:-1])
849+
elif memory.endswith('Gi'):
850+
memory = float(memory[:-2]) * constants._GI / constants._G
851+
elif memory.endswith('M'):
852+
memory = float(memory[:-1]) * constants._M / constants._G
853+
elif memory.endswith('Mi'):
854+
memory = float(memory[:-2]) * constants._MI / constants._G
855+
elif memory.endswith('K'):
856+
memory = float(memory[:-1]) * constants._K / constants._G
857+
elif memory.endswith('Ki'):
858+
memory = float(memory[:-2]) * constants._KI / constants._G
859+
else:
860+
# By default interpret as a plain integer, in the unit of Bytes.
861+
memory = float(memory) / constants._G
862+
863+
return memory

sdk/python/kfp/compiler/pipeline_spec_builder.py

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -651,27 +651,62 @@ def convert_to_placeholder(input_value: str) -> str:
651651
for name, value in (task.container_spec.env or {}).items()
652652
]))
653653

654+
# All the fields with the resource_ prefix are newer fields to support pipeline input parameters. The below code
655+
# will check if the value is a placeholder and if not, it will also set the value on the old deprecated fields
656+
# without the resource_ prefix to work on older KFP installations.
654657
if task.container_spec.resources is not None:
655658
if task.container_spec.resources.cpu_request is not None:
656-
container_spec.resources.resource_cpu_request = convert_to_placeholder(
659+
placeholder = convert_to_placeholder(
657660
task.container_spec.resources.cpu_request)
661+
container_spec.resources.resource_cpu_request = placeholder
662+
663+
if task.container_spec.resources.cpu_request == placeholder:
664+
container_spec.resources.cpu_request = compiler_utils._cpu_to_float(
665+
task.container_spec.resources.cpu_request)
658666
if task.container_spec.resources.cpu_limit is not None:
659-
container_spec.resources.resource_cpu_limit = convert_to_placeholder(
667+
placeholder = convert_to_placeholder(
660668
task.container_spec.resources.cpu_limit)
669+
container_spec.resources.resource_cpu_limit = placeholder
670+
671+
if task.container_spec.resources.cpu_limit == placeholder:
672+
container_spec.resources.cpu_limit = compiler_utils._cpu_to_float(
673+
task.container_spec.resources.cpu_limit)
661674
if task.container_spec.resources.memory_request is not None:
662-
container_spec.resources.resource_memory_request = convert_to_placeholder(
675+
placeholder = convert_to_placeholder(
663676
task.container_spec.resources.memory_request)
677+
container_spec.resources.resource_memory_request = placeholder
678+
679+
if task.container_spec.resources.memory_request == placeholder:
680+
container_spec.resources.memory_request = compiler_utils._memory_to_float(
681+
task.container_spec.resources.memory_request)
664682
if task.container_spec.resources.memory_limit is not None:
665-
container_spec.resources.resource_memory_limit = convert_to_placeholder(
683+
placeholder = convert_to_placeholder(
666684
task.container_spec.resources.memory_limit)
685+
container_spec.resources.resource_memory_limit = placeholder
686+
687+
if task.container_spec.resources.memory_limit == placeholder:
688+
container_spec.resources.memory_limit = compiler_utils._memory_to_float(
689+
task.container_spec.resources.memory_limit)
667690
if task.container_spec.resources.accelerator_count is not None:
691+
ac_type = None
692+
ac_type_placholder = convert_to_placeholder(
693+
task.container_spec.resources.accelerator_type)
694+
if task.container_spec.resources.accelerator_type == ac_type_placholder:
695+
ac_type = task.container_spec.resources.accelerator_type
696+
697+
ac_count = None
698+
ac_count_placeholder = convert_to_placeholder(
699+
task.container_spec.resources.accelerator_count)
700+
if task.container_spec.resources.accelerator_count == ac_count_placeholder:
701+
ac_count = int(task.container_spec.resources.accelerator_count)
702+
668703
container_spec.resources.accelerator.CopyFrom(
669704
pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec
670705
.ResourceSpec.AcceleratorConfig(
671-
resource_type=convert_to_placeholder(
672-
task.container_spec.resources.accelerator_type),
673-
resource_count=convert_to_placeholder(
674-
task.container_spec.resources.accelerator_count),
706+
resource_type=ac_type_placholder,
707+
resource_count=ac_count_placeholder,
708+
type=ac_type,
709+
count=ac_count,
675710
))
676711

677712
return container_spec

sdk/python/test_data/pipelines/pipeline_with_resource_spec.yaml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,14 @@ deploymentSpec:
6161
image: gcr.io/my-project/my-fancy-trainer
6262
resources:
6363
accelerator:
64+
count: '1'
6465
resourceCount: '1'
6566
resourceType: tpu-v3
67+
type: tpu-v3
68+
cpuLimit: 4.0
69+
cpuRequest: 2.0
70+
memoryLimit: 15.032385536
71+
memoryRequest: 4.294967296
6672
resourceCpuLimit: '4'
6773
resourceCpuRequest: '2'
6874
resourceMemoryLimit: 14Gi
@@ -119,4 +125,4 @@ root:
119125
isOptional: true
120126
parameterType: STRING
121127
schemaVersion: 2.1.0
122-
sdkVersion: kfp-2.11.0
128+
sdkVersion: kfp-2.12.1

0 commit comments

Comments
 (0)