Skip to content

Commit 6253ad6

Browse files
authored
Add Private Service Connect interface support to VertexAI operators (#54170)
1 parent ec754f8 commit 6253ad6

File tree

5 files changed

+62
-2
lines changed

5 files changed

+62
-2
lines changed

providers/google/docs/operators/cloud/vertex_ai.rst

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -861,6 +861,19 @@ To delete experiment run you can use
861861
:start-after: [START how_to_cloud_vertex_ai_delete_experiment_run_operator]
862862
:end-before: [END how_to_cloud_vertex_ai_delete_experiment_run_operator]
863863

864+
Use Private Service Connect interface
865+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
866+
867+
You can configure Private Service Connect interface connections for
868+
:class:`~airflow.providers.google.cloud.operators.vertex_ai.custom_job.CreateCustomContainerTrainingJobOperator`,
869+
:class:`~airflow.providers.google.cloud.operators.vertex_ai.custom_job.CreateCustomPythonPackageTrainingJobOperator`,
870+
:class:`~airflow.providers.google.cloud.operators.vertex_ai.custom_job.CreateCustomTrainingJobOperator` and
871+
:class:`~airflow.providers.google.cloud.operators.vertex_ai.ray.CreateRayClusterOperator`
872+
operators in Vertex AI. For doing it you must first configure the PSC interface by following the provided
873+
`documentation <https://cloud.google.com/vertex-ai/docs/general/vpc-psc-i-setup>`__.
874+
Then, specify the PSC configuration in the ``psc_interface_config`` parameter.
875+
876+
864877
Reference
865878
^^^^^^^^^
866879

providers/google/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ dependencies = [
7878
# google-cloud-aiplatform doesn't install ray for python 3.12 (issue: https://github.com/googleapis/python-aiplatform/issues/5252).
7979
# Temporarily lock in ray 2.42.0 which is compatible with python 3.12 until linked issue is solved.
8080
# Remove the ray dependency as well as google-cloud-bigquery-storage once linked issue is fixed
81-
"google-cloud-aiplatform[evaluation]>=1.73.0",
81+
"google-cloud-aiplatform[evaluation]>=1.98.0",
8282
"ray[default]>=2.42.0 ; python_version < '3.13'",
8383
"google-cloud-bigquery-storage>=2.31.0 ; python_version < '3.13'",
8484
"google-cloud-alloydb>=0.4.0",

providers/google/src/airflow/providers/google/cloud/hooks/vertex_ai/custom_job.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
from google.cloud.aiplatform_v1.services.pipeline_service.pagers import (
5656
ListTrainingPipelinesPager,
5757
)
58-
from google.cloud.aiplatform_v1.types import CustomJob, TrainingPipeline
58+
from google.cloud.aiplatform_v1.types import CustomJob, PscInterfaceConfig, TrainingPipeline
5959

6060

6161
class CustomJobHook(GoogleBaseHook, OperationHelper):
@@ -317,6 +317,7 @@ def _run_job(
317317
is_default_version: bool | None = None,
318318
model_version_aliases: list[str] | None = None,
319319
model_version_description: str | None = None,
320+
psc_interface_config: PscInterfaceConfig | None = None,
320321
) -> tuple[models.Model | None, str, str]:
321322
"""Run a training pipeline job and wait until its completion."""
322323
model = job.run(
@@ -350,6 +351,7 @@ def _run_job(
350351
is_default_version=is_default_version,
351352
model_version_aliases=model_version_aliases,
352353
model_version_description=model_version_description,
354+
psc_interface_config=psc_interface_config,
353355
)
354356
training_id = self.extract_training_id(job.resource_name)
355357
custom_job_id = self.extract_custom_job_id(
@@ -574,6 +576,7 @@ def create_custom_container_training_job(
574576
timestamp_split_column_name: str | None = None,
575577
tensorboard: str | None = None,
576578
sync=True,
579+
psc_interface_config: PscInterfaceConfig | None = None,
577580
) -> tuple[models.Model | None, str, str]:
578581
"""
579582
Create Custom Container Training Job.
@@ -837,6 +840,8 @@ def create_custom_container_training_job(
837840
:param sync: Whether to execute the AI Platform job synchronously. If False, this method
838841
will be executed in concurrent Future and any downstream object will
839842
be immediately returned and synced when the Future has completed.
843+
:param psc_interface_config: Optional. Configuration for Private Service Connect interface used for
844+
training.
840845
"""
841846
self._job = self.get_custom_container_training_job(
842847
project=project_id,
@@ -896,6 +901,7 @@ def create_custom_container_training_job(
896901
is_default_version=is_default_version,
897902
model_version_aliases=model_version_aliases,
898903
model_version_description=model_version_description,
904+
psc_interface_config=psc_interface_config,
899905
)
900906

901907
return model, training_id, custom_job_id
@@ -958,6 +964,7 @@ def create_custom_python_package_training_job(
958964
model_version_aliases: list[str] | None = None,
959965
model_version_description: str | None = None,
960966
sync=True,
967+
psc_interface_config: PscInterfaceConfig | None = None,
961968
) -> tuple[models.Model | None, str, str]:
962969
"""
963970
Create Custom Python Package Training Job.
@@ -1220,6 +1227,8 @@ def create_custom_python_package_training_job(
12201227
:param sync: Whether to execute the AI Platform job synchronously. If False, this method
12211228
will be executed in concurrent Future and any downstream object will
12221229
be immediately returned and synced when the Future has completed.
1230+
:param psc_interface_config: Optional. Configuration for Private Service Connect interface used for
1231+
training.
12231232
"""
12241233
self._job = self.get_custom_python_package_training_job(
12251234
project=project_id,
@@ -1280,6 +1289,7 @@ def create_custom_python_package_training_job(
12801289
is_default_version=is_default_version,
12811290
model_version_aliases=model_version_aliases,
12821291
model_version_description=model_version_description,
1292+
psc_interface_config=psc_interface_config,
12831293
)
12841294

12851295
return model, training_id, custom_job_id
@@ -1342,6 +1352,7 @@ def create_custom_training_job(
13421352
timestamp_split_column_name: str | None = None,
13431353
tensorboard: str | None = None,
13441354
sync=True,
1355+
psc_interface_config: PscInterfaceConfig | None = None,
13451356
) -> tuple[models.Model | None, str, str]:
13461357
"""
13471358
Create Custom Training Job.
@@ -1604,6 +1615,8 @@ def create_custom_training_job(
16041615
:param sync: Whether to execute the AI Platform job synchronously. If False, this method
16051616
will be executed in concurrent Future and any downstream object will
16061617
be immediately returned and synced when the Future has completed.
1618+
:param psc_interface_config: Optional. Configuration for Private Service Connect interface used for
1619+
training.
16071620
"""
16081621
self._job = self.get_custom_training_job(
16091622
project=project_id,
@@ -1664,6 +1677,7 @@ def create_custom_training_job(
16641677
is_default_version=is_default_version,
16651678
model_version_aliases=model_version_aliases,
16661679
model_version_description=model_version_description,
1680+
psc_interface_config=psc_interface_config,
16671681
)
16681682

16691683
return model, training_id, custom_job_id
@@ -1725,6 +1739,7 @@ def submit_custom_container_training_job(
17251739
predefined_split_column_name: str | None = None,
17261740
timestamp_split_column_name: str | None = None,
17271741
tensorboard: str | None = None,
1742+
psc_interface_config: PscInterfaceConfig | None = None,
17281743
) -> CustomContainerTrainingJob:
17291744
"""
17301745
Create and submit a Custom Container Training Job pipeline, then exit without waiting for it to complete.
@@ -1985,6 +2000,8 @@ def submit_custom_container_training_job(
19852000
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
19862001
For more information on configuring your service account please visit:
19872002
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
2003+
:param psc_interface_config: Optional. Configuration for Private Service Connect interface used for
2004+
training.
19882005
"""
19892006
self._job = self.get_custom_container_training_job(
19902007
project=project_id,
@@ -2043,6 +2060,7 @@ def submit_custom_container_training_job(
20432060
model_version_aliases=model_version_aliases,
20442061
model_version_description=model_version_description,
20452062
sync=False,
2063+
psc_interface_config=psc_interface_config,
20462064
)
20472065
return self._job
20482066

@@ -2104,6 +2122,7 @@ def submit_custom_python_package_training_job(
21042122
is_default_version: bool | None = None,
21052123
model_version_aliases: list[str] | None = None,
21062124
model_version_description: str | None = None,
2125+
psc_interface_config: PscInterfaceConfig | None = None,
21072126
) -> CustomPythonPackageTrainingJob:
21082127
"""
21092128
Create and submit a Custom Python Package Training Job pipeline, then exit without waiting for it to complete.
@@ -2363,6 +2382,8 @@ def submit_custom_python_package_training_job(
23632382
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
23642383
For more information on configuring your service account please visit:
23652384
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
2385+
:param psc_interface_config: Optional. Configuration for Private Service Connect interface used for
2386+
training.
23662387
"""
23672388
self._job = self.get_custom_python_package_training_job(
23682389
project=project_id,
@@ -2422,6 +2443,7 @@ def submit_custom_python_package_training_job(
24222443
model_version_aliases=model_version_aliases,
24232444
model_version_description=model_version_description,
24242445
sync=False,
2446+
psc_interface_config=psc_interface_config,
24252447
)
24262448

24272449
return self._job
@@ -2484,6 +2506,7 @@ def submit_custom_training_job(
24842506
predefined_split_column_name: str | None = None,
24852507
timestamp_split_column_name: str | None = None,
24862508
tensorboard: str | None = None,
2509+
psc_interface_config: PscInterfaceConfig | None = None,
24872510
) -> CustomTrainingJob:
24882511
"""
24892512
Create and submit a Custom Training Job pipeline, then exit without waiting for it to complete.
@@ -2747,6 +2770,8 @@ def submit_custom_training_job(
27472770
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
27482771
For more information on configuring your service account please visit:
27492772
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
2773+
:param psc_interface_config: Optional. Configuration for Private Service Connect interface used for
2774+
training.
27502775
"""
27512776
self._job = self.get_custom_training_job(
27522777
project=project_id,
@@ -2806,6 +2831,7 @@ def submit_custom_training_job(
28062831
model_version_aliases=model_version_aliases,
28072832
model_version_description=model_version_description,
28082833
sync=False,
2834+
psc_interface_config=psc_interface_config,
28092835
)
28102836
return self._job
28112837

providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/custom_job.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
CustomPythonPackageTrainingJob,
5252
CustomTrainingJob,
5353
)
54+
from google.cloud.aiplatform_v1.types import PscInterfaceConfig
5455

5556
from airflow.utils.context import Context
5657

@@ -110,6 +111,7 @@ def __init__(
110111
predefined_split_column_name: str | None = None,
111112
timestamp_split_column_name: str | None = None,
112113
tensorboard: str | None = None,
114+
psc_interface_config: PscInterfaceConfig | None = None,
113115
gcp_conn_id: str = "google_cloud_default",
114116
impersonation_chain: str | Sequence[str] | None = None,
115117
**kwargs,
@@ -166,6 +168,7 @@ def __init__(
166168
self.predefined_split_column_name = predefined_split_column_name
167169
self.timestamp_split_column_name = timestamp_split_column_name
168170
self.tensorboard = tensorboard
171+
self.psc_interface_config = psc_interface_config
169172
# END Run param
170173
self.gcp_conn_id = gcp_conn_id
171174
self.impersonation_chain = impersonation_chain
@@ -473,6 +476,8 @@ class CreateCustomContainerTrainingJobOperator(CustomTrainingJobBaseOperator):
473476
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
474477
For more information on configuring your service account please visit:
475478
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
479+
:param psc_interface_config: Optional. Configuration for Private Service Connect interface used for
480+
training.
476481
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
477482
:param impersonation_chain: Optional service account to impersonate using short-term
478483
credentials, or chained list of accounts required to get the access_token
@@ -586,6 +591,7 @@ def execute(self, context: Context):
586591
timestamp_split_column_name=self.timestamp_split_column_name,
587592
tensorboard=self.tensorboard,
588593
sync=True,
594+
psc_interface_config=self.psc_interface_config,
589595
)
590596

591597
if model:
@@ -652,6 +658,7 @@ def invoke_defer(self, context: Context) -> None:
652658
predefined_split_column_name=self.predefined_split_column_name,
653659
timestamp_split_column_name=self.timestamp_split_column_name,
654660
tensorboard=self.tensorboard,
661+
psc_interface_config=self.psc_interface_config,
655662
)
656663
custom_container_training_job_obj.wait_for_resource_creation()
657664
training_pipeline_id: str = custom_container_training_job_obj.name
@@ -931,6 +938,8 @@ class CreateCustomPythonPackageTrainingJobOperator(CustomTrainingJobBaseOperator
931938
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
932939
For more information on configuring your service account please visit:
933940
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
941+
:param psc_interface_config: Optional. Configuration for Private Service Connect interface used for
942+
training.
934943
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
935944
:param impersonation_chain: Optional service account to impersonate using short-term
936945
credentials, or chained list of accounts required to get the access_token
@@ -1043,6 +1052,7 @@ def execute(self, context: Context):
10431052
timestamp_split_column_name=self.timestamp_split_column_name,
10441053
tensorboard=self.tensorboard,
10451054
sync=True,
1055+
psc_interface_config=self.psc_interface_config,
10461056
)
10471057

10481058
if model:
@@ -1110,6 +1120,7 @@ def invoke_defer(self, context: Context) -> None:
11101120
predefined_split_column_name=self.predefined_split_column_name,
11111121
timestamp_split_column_name=self.timestamp_split_column_name,
11121122
tensorboard=self.tensorboard,
1123+
psc_interface_config=self.psc_interface_config,
11131124
)
11141125
custom_python_training_job_obj.wait_for_resource_creation()
11151126
training_pipeline_id: str = custom_python_training_job_obj.name
@@ -1389,6 +1400,8 @@ class CreateCustomTrainingJobOperator(CustomTrainingJobBaseOperator):
13891400
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
13901401
For more information on configuring your service account please visit:
13911402
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
1403+
:param psc_interface_config: Optional. Configuration for Private Service Connect interface used for
1404+
training.
13921405
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
13931406
:param impersonation_chain: Optional service account to impersonate using short-term
13941407
credentials, or chained list of accounts required to get the access_token
@@ -1506,6 +1519,7 @@ def execute(self, context: Context):
15061519
timestamp_split_column_name=self.timestamp_split_column_name,
15071520
tensorboard=self.tensorboard,
15081521
sync=True,
1522+
psc_interface_config=None,
15091523
)
15101524

15111525
if model:
@@ -1573,6 +1587,7 @@ def invoke_defer(self, context: Context) -> None:
15731587
predefined_split_column_name=self.predefined_split_column_name,
15741588
timestamp_split_column_name=self.timestamp_split_column_name,
15751589
tensorboard=self.tensorboard,
1590+
psc_interface_config=self.psc_interface_config,
15761591
)
15771592
custom_training_job_obj.wait_for_resource_creation()
15781593
training_pipeline_id: str = custom_training_job_obj.name

providers/google/tests/unit/google/cloud/operators/test_vertex_ai.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ def test_execute(self, mock_hook, mock_dataset):
321321
is_default_version=None,
322322
model_version_aliases=None,
323323
model_version_description=None,
324+
psc_interface_config=None,
324325
)
325326

326327
@mock.patch(VERTEX_AI_PATH.format("custom_job.Dataset"))
@@ -407,6 +408,7 @@ def test_execute__parent_model_version_index_is_removed(self, mock_hook, mock_da
407408
is_default_version=None,
408409
model_version_aliases=None,
409410
model_version_description=None,
411+
psc_interface_config=None,
410412
)
411413

412414
@mock.patch(VERTEX_AI_PATH.format("custom_job.CreateCustomContainerTrainingJobOperator.hook"))
@@ -648,6 +650,7 @@ def test_execute(self, mock_hook, mock_dataset):
648650
timestamp_split_column_name=None,
649651
tensorboard=None,
650652
sync=True,
653+
psc_interface_config=None,
651654
)
652655

653656
@mock.patch(VERTEX_AI_PATH.format("custom_job.Dataset"))
@@ -736,6 +739,7 @@ def test_execute__parent_model_version_index_is_removed(self, mock_hook, mock_da
736739
timestamp_split_column_name=None,
737740
tensorboard=None,
738741
sync=True,
742+
psc_interface_config=None,
739743
)
740744

741745
@mock.patch(VERTEX_AI_PATH.format("custom_job.CreateCustomPythonPackageTrainingJobOperator.hook"))
@@ -976,6 +980,7 @@ def test_execute(self, mock_hook, mock_dataset):
976980
is_default_version=None,
977981
model_version_aliases=None,
978982
model_version_description=None,
983+
psc_interface_config=None,
979984
)
980985

981986
@mock.patch(VERTEX_AI_PATH.format("custom_job.Dataset"))
@@ -1057,6 +1062,7 @@ def test_execute__parent_model_version_index_is_removed(self, mock_hook, mock_da
10571062
is_default_version=None,
10581063
model_version_aliases=None,
10591064
model_version_description=None,
1065+
psc_interface_config=None,
10601066
)
10611067

10621068
@mock.patch(VERTEX_AI_PATH.format("custom_job.CreateCustomTrainingJobOperator.hook"))

0 commit comments

Comments
 (0)