Skip to content

Commit e029f69

Browse files
Test DAG for qwen3-Next tests
Add qwen3-next to MaxText_moe DAG Run pylinter add separate test owner logic
1 parent fb46ce6 commit e029f69

File tree

2 files changed

+70
-3
lines changed

2 files changed

+70
-3
lines changed

dags/sparsity_diffusion_devx/maxtext_moe_tpu_e2e.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,19 +73,27 @@
7373
"cluster": XpkClusters.TPU_V5P_8_CLUSTER,
7474
"time_out_in_min": 90,
7575
},
76+
"qwen3-next-80b": {
77+
"script_name": "tpu/qwen/next/qwen3-next-80b-a3b/2_test_qwen3_next_80b_a3b",
78+
"cluster": XpkClusters.TPU_V5P_128_CLUSTER,
79+
"time_out_in_min": 90,
80+
"owner": test_owner.ROHAN_B,
81+
},
7682
}
7783

7884
unchained_tests = []
7985
for model, test_scripts_details in test_models_tpu.items():
8086
for image in docker_image.keys():
87+
current_owner = test_scripts_details.get("owner", test_owner.SHUNING_J)
88+
8189
training_tpu = gke_config.get_gke_config(
8290
time_out_in_min=test_scripts_details["time_out_in_min"],
8391
test_name=f"{test_name_prefix}_{image}_{model}",
8492
run_model_cmds=(
8593
f"export HF_TOKEN={HF_TOKEN}; export BASE_OUTPUT_PATH=$GCS_OUTPUT; bash tests/end_to_end/{test_scripts_details['script_name']}.sh",
8694
),
8795
docker_image=docker_image[image],
88-
test_owner=test_owner.SHUNING_J,
96+
test_owner=current_owner,
8997
cluster=test_scripts_details["cluster"],
9098
).run_with_quarantine(quarantine_task_group)
9199
unchained_tests.append(training_tpu)
@@ -129,6 +137,7 @@ def convert_checkpoint_and_run_training(
129137
docker_image,
130138
model,
131139
test_scripts_details,
140+
current_owner,
132141
):
133142
with TaskGroup(group_id=test_group_id, prefix_group_id=False) as group:
134143
test_name = f"{test_name_prefix}_{image}_{model}"
@@ -145,7 +154,7 @@ def convert_checkpoint_and_run_training(
145154
f"export BASE_OUTPUT_PATH=$GCS_OUTPUT; bash tests/end_to_end/{test_scripts_details[0]['script_name']}.sh",
146155
),
147156
docker_image=docker_image,
148-
test_owner=test_owner.SHUNING_J,
157+
test_owner=current_owner,
149158
cluster=test_scripts_details[0]["cluster"],
150159
).run(gcs_location=shared_gcs_location)
151160
training_tpu = gke_config.get_gke_config(
@@ -155,7 +164,7 @@ def convert_checkpoint_and_run_training(
155164
f"export BASE_OUTPUT_PATH=$GCS_OUTPUT; bash tests/end_to_end/{test_scripts_details[1]['script_name']}.sh",
156165
),
157166
docker_image=docker_image,
158-
test_owner=test_owner.SHUNING_J,
167+
test_owner=current_owner,
159168
cluster=test_scripts_details[1]["cluster"],
160169
).run(gcs_location=shared_gcs_location)
161170
return conversion_cpu, training_tpu
@@ -165,6 +174,7 @@ def convert_checkpoint_and_run_training(
165174
gcs_subfolder = (
166175
f"{test_owner.Team.JAX_MODELS_AND_PERFORMANCE.value}/maxtext"
167176
)
177+
current_owner = test_scripts_details[0].get("owner", test_owner.SHUNING_J)
168178
for image in docker_image.keys():
169179
test_group_id = "chained_tests" + "_" + model + "_" + image
170180
if QuarantineTests.is_quarantined(test_group_id):
@@ -176,6 +186,7 @@ def convert_checkpoint_and_run_training(
176186
docker_image[image],
177187
model,
178188
test_scripts_details,
189+
current_owner,
179190
)
180191
else:
181192
mode_cpu, mode_tpu = convert_checkpoint_and_run_training(
@@ -185,6 +196,7 @@ def convert_checkpoint_and_run_training(
185196
docker_image[image],
186197
model,
187198
test_scripts_details,
199+
current_owner,
188200
)
189201
tests.append(mode_cpu)
190202
tests.append(mode_tpu)
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""A standalone DAG to test the Qwen3-Next 80B script with a custom Docker image."""
16+
17+
import datetime
18+
from airflow import models
19+
from dags import composer_env
20+
from dags.common import test_owner
21+
from dags.common.vm_resource import XpkClusters
22+
from dags.multipod.configs import gke_config
23+
24+
# Retrieve the HF_TOKEN from Airflow variables
25+
HF_TOKEN = models.Variable.get("HF_TOKEN", None)
26+
27+
with models.DAG(
28+
dag_id="qwen3_next_custom_image_test",
29+
schedule=None, # Set to None so it only runs when manually triggered
30+
tags=[
31+
"maxtext",
32+
"tpu",
33+
"qwen3",
34+
"v5p-128",
35+
],
36+
start_date=datetime.datetime(2024, 1, 1),
37+
catchup=False,
38+
) as dag:
39+
# Your specified custom docker image
40+
custom_docker_image = "gcr.io/tpu-prod-env-multipod/maxtext_stable_stack_candidate:rbierneni-qwen-test"
41+
42+
# Single unchained test configuration
43+
test_qwen3_next = gke_config.get_gke_config(
44+
time_out_in_min=90,
45+
test_name="maxtext_qwen3_next_80b_test",
46+
run_model_cmds=(
47+
f"export HF_TOKEN={HF_TOKEN}; export BASE_OUTPUT_PATH=$GCS_OUTPUT; bash tests/end_to_end/tpu/qwen/next/qwen3-next-80b-a3b/2_test_qwen3_next_80b_a3b.sh",
48+
),
49+
docker_image=custom_docker_image,
50+
test_owner=test_owner.ROHAN_B, # Update the owner if necessary
51+
cluster=XpkClusters.TPU_V5P_128_CLUSTER,
52+
).run()
53+
54+
# No chained dependencies required since there is only one task
55+
test_qwen3_next

0 commit comments

Comments
 (0)