Skip to content

Commit ae3b22d

Browse files
authored
fix: Patch jobset_uptime_validation scheduling/selector and standardize DAG_ID (#1188)
This change provides critical patches for the `jobset_uptime_validation` DAG to ensure proper scheduling and resource selection. Additionally, it refactors the tpu_observability module to use a unified DAG_ID constant, eliminating hard-coded strings.
1 parent 0ff7f02 commit ae3b22d

13 files changed

+35
-23
lines changed

dags/common/scheduling_helper/scheduling_helper.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,13 @@ class DayOfWeek(enum.Enum):
4545
"jobset_rollback_ttr": dt.timedelta(minutes=90),
4646
"jobset_ttr_node_pool_resize": dt.timedelta(minutes=90),
4747
"jobset_ttr_pod_delete": dt.timedelta(minutes=90),
48-
"multi-host-availability-rollback": dt.timedelta(minutes=30),
48+
"multi_host_nodepool_rollback": dt.timedelta(minutes=30),
4949
"node_pool_ttr_disk_size": dt.timedelta(minutes=90),
5050
"node_pool_ttr_update_label": dt.timedelta(minutes=90),
5151
"tpu_info_format_validation_dag": dt.timedelta(minutes=30),
5252
"tpu_sdk_monitoring_validation": dt.timedelta(minutes=30),
5353
"jobset_ttr_kill_process": dt.timedelta(minutes=90),
54+
"jobset_uptime_validation": dt.timedelta(minutes=90),
5455
},
5556
}
5657

dags/tpu_observability/jobset_ttr_kill_process.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,15 @@ def kill_tpu_pod_workload(info: node_pool.Info, pod_name: str) -> None:
131131

132132
jobset_config = jobset.build_jobset_from_gcs_yaml(
133133
gcs_path=GCS_JOBSET_CONFIG_PATH,
134-
dag_name="jobset_ttr_kill_process",
134+
dag_name=DAG_ID,
135135
node_pool_selector=selector,
136136
)
137137

138138
cluster_info = node_pool.build_node_pool_info_from_gcs_yaml.override(
139139
task_id="build_node_pool_info_from_gcs_yaml"
140140
)(
141141
gcs_path=GCS_CONFIG_PATH,
142-
dag_name="jobset_ttr_kill_process",
142+
dag_name=DAG_ID,
143143
is_prod=composer_env.is_prod_env(),
144144
machine_type=config.machine_version.value,
145145
tpu_topology=config.tpu_topology,

dags/tpu_observability/jobset_ttr_node_pool_resize.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,15 @@
9595

9696
jobset_config = jobset.build_jobset_from_gcs_yaml(
9797
gcs_path=GCS_JOBSET_CONFIG_PATH,
98-
dag_name="jobset_ttr_node_pool_resize",
98+
dag_name=DAG_ID,
9999
node_pool_selector=selector,
100100
)
101101

102102
cluster_info = node_pool.build_node_pool_info_from_gcs_yaml.override(
103103
task_id="build_node_pool_info_from_gcs_yaml"
104104
)(
105105
gcs_path=GCS_CONFIG_PATH,
106-
dag_name="jobset_ttr_node_pool_resize",
106+
dag_name=DAG_ID,
107107
is_prod=composer_env.is_prod_env(),
108108
machine_type=config.machine_version.value,
109109
tpu_topology=config.tpu_topology,

dags/tpu_observability/jobset_ttr_pod_delete.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,15 +90,15 @@
9090

9191
jobset_config = jobset.build_jobset_from_gcs_yaml(
9292
gcs_path=GCS_JOBSET_CONFIG_PATH,
93-
dag_name="jobset_ttr_pod_delete",
93+
dag_name=DAG_ID,
9494
node_pool_selector=selector,
9595
)
9696

9797
cluster_info = node_pool.build_node_pool_info_from_gcs_yaml.override(
9898
task_id="build_node_pool_info_from_gcs_yaml"
9999
)(
100100
gcs_path=GCS_CONFIG_PATH,
101-
dag_name="jobset_ttr_pod_delete",
101+
dag_name=DAG_ID,
102102
is_prod=composer_env.is_prod_env(),
103103
machine_type=config.machine_version.value,
104104
tpu_topology=config.tpu_topology,

dags/tpu_observability/jobset_ttr_rollback.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,15 +92,15 @@
9292

9393
jobset_config = jobset.build_jobset_from_gcs_yaml(
9494
gcs_path=GCS_JOBSET_CONFIG_PATH,
95-
dag_name="jobset_rollback_ttr",
95+
dag_name=DAG_ID,
9696
node_pool_selector=selector,
9797
)
9898

9999
cluster_info = node_pool.build_node_pool_info_from_gcs_yaml.override(
100100
task_id="build_node_pool_info_from_gcs_yaml"
101101
)(
102102
gcs_path=GCS_CONFIG_PATH,
103-
dag_name="jobset_rollback_ttr",
103+
dag_name=DAG_ID,
104104
is_prod=composer_env.is_prod_env(),
105105
machine_type=config.machine_version.value,
106106
tpu_topology=config.tpu_topology,

dags/tpu_observability/jobset_uptime_validation.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@
3232
from dags.tpu_observability.utils import node_pool_util as node_pool
3333
from dags.tpu_observability.utils.jobset_util import Workload
3434
from dags.tpu_observability.utils.time_util import TimeUtil
35+
from dags.common.scheduling_helper.scheduling_helper import SchedulingHelper, get_dag_timeout
36+
37+
38+
DAG_ID = "jobset_uptime_validation"
39+
DAGRUN_TIMEOUT = get_dag_timeout(DAG_ID)
40+
SCHEDULE = SchedulingHelper.arrange_schedule_time(DAG_ID)
3541

3642

3743
@task
@@ -44,10 +50,11 @@ def get_current_time() -> TimeUtil:
4450
# Keyword arguments are generated dynamically at runtime (pylint does not
4551
# know this signature).
4652
with models.DAG( # pylint: disable=unexpected-keyword-arg
47-
dag_id="jobset_uptime_validation",
53+
dag_id=DAG_ID,
4854
start_date=datetime.datetime(2025, 8, 15),
4955
default_args={"retries": 0},
50-
schedule="30 16 * * *" if composer_env.is_prod_env() else None,
56+
schedule=SCHEDULE if composer_env.is_prod_env() else None,
57+
dagrun_timeout=DAGRUN_TIMEOUT,
5158
catchup=False,
5259
tags=[
5360
"cloud-ml-auto-solutions",
@@ -93,19 +100,23 @@ def get_current_time() -> TimeUtil:
93100
with TaskGroup( # pylint: disable=unexpected-keyword-arg
94101
group_id=f"v{config.tpu_version.value}"
95102
):
103+
selector = jobset.generate_node_pool_selector("jobset-rollback-ttr")
104+
96105
jobset_config = jobset.build_jobset_from_gcs_yaml(
97106
gcs_path=GCS_JOBSET_CONFIG_PATH,
98-
dag_name="jobset_uptime_validation",
107+
dag_name=DAG_ID,
108+
node_pool_selector=selector,
99109
)
100110

101111
cluster_info = node_pool.build_node_pool_info_from_gcs_yaml.override(
102112
task_id="build_node_pool_info_from_gcs_yaml"
103113
)(
104114
gcs_path=GCS_CONFIG_PATH,
105-
dag_name="jobset_uptime_validation",
115+
dag_name=DAG_ID,
106116
is_prod=composer_env.is_prod_env(),
107117
machine_type=config.machine_version.value,
108118
tpu_topology=config.tpu_topology,
119+
node_pool_selector=selector,
109120
)
110121

111122
create_node_pool = node_pool.create.override(task_id="create_node_pool")(

dags/tpu_observability/multi_host_nodepool_rollback_dag.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from dags.common.scheduling_helper.scheduling_helper import SchedulingHelper, get_dag_timeout
3232

3333

34-
DAG_ID = "multi-host-availability-rollback"
34+
DAG_ID = "multi_host_nodepool_rollback"
3535
DAGRUN_TIMEOUT = get_dag_timeout(DAG_ID)
3636
SCHEDULE = SchedulingHelper.arrange_schedule_time(DAG_ID)
3737

@@ -91,7 +91,7 @@
9191
task_id="build_node_pool_info_from_gcs_yaml"
9292
)(
9393
gcs_path=GCS_CONFIG_PATH,
94-
dag_name="multi_host_nodepool_rollback",
94+
dag_name=DAG_ID,
9595
is_prod=composer_env.is_prod_env(),
9696
machine_type=config.machine_version.value,
9797
tpu_topology=config.tpu_topology,

dags/tpu_observability/node_pool_status.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def generate_problematic_node_location(
9191
task_id="build_node_pool_info_from_gcs_yaml"
9292
)(
9393
gcs_path=GCS_CONFIG_PATH,
94-
dag_name="gke_node_pool_status",
94+
dag_name=DAG_ID,
9595
is_prod=composer_env.is_prod_env(),
9696
machine_type=config.machine_version.value,
9797
tpu_topology=config.tpu_topology,

dags/tpu_observability/node_pool_ttr_disk_size.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@
7878
with TaskGroup(group_id=f"v{config.tpu_version.value}"):
7979
node_pool_info = node_pool.build_node_pool_info_from_gcs_yaml(
8080
gcs_path=GCS_CONFIG_PATH,
81-
dag_name="node_pool_ttr_disk_size",
81+
dag_name=DAG_ID,
8282
is_prod=composer_env.is_prod_env(),
8383
machine_type=config.machine_version.value,
8484
tpu_topology=config.tpu_topology,

dags/tpu_observability/node_pool_ttr_update_label.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@
7777
task_id="build_node_pool_info_from_gcs_yaml"
7878
)(
7979
gcs_path=GCS_CONFIG_PATH,
80-
dag_name="node_pool_ttr_update_label",
80+
dag_name=DAG_ID,
8181
is_prod=composer_env.is_prod_env(),
8282
machine_type=config.machine_version.value,
8383
tpu_topology=config.tpu_topology,

0 commit comments

Comments
 (0)