diff --git a/dags/sparsity_diffusion_devx/maxtext_moe_tpu_e2e.py b/dags/sparsity_diffusion_devx/maxtext_moe_tpu_e2e.py index 0a364bdba..b5446b37b 100644 --- a/dags/sparsity_diffusion_devx/maxtext_moe_tpu_e2e.py +++ b/dags/sparsity_diffusion_devx/maxtext_moe_tpu_e2e.py @@ -73,11 +73,19 @@ "cluster": XpkClusters.TPU_V5P_8_CLUSTER, "time_out_in_min": 90, }, + "qwen3-next-80b": { + "script_name": "tpu/qwen/next/qwen3-next-80b-a3b/2_test_qwen3_next_80b_a3b", + "cluster": XpkClusters.TPU_V5P_128_CLUSTER, + "time_out_in_min": 90, + "owner": test_owner.ROHAN_B, + }, } unchained_tests = [] for model, test_scripts_details in test_models_tpu.items(): for image in docker_image.keys(): + current_owner = test_scripts_details.get("owner", test_owner.SHUNING_J) + training_tpu = gke_config.get_gke_config( time_out_in_min=test_scripts_details["time_out_in_min"], test_name=f"{test_name_prefix}_{image}_{model}", @@ -85,7 +93,7 @@ f"export HF_TOKEN={HF_TOKEN}; export BASE_OUTPUT_PATH=$GCS_OUTPUT; bash tests/end_to_end/{test_scripts_details['script_name']}.sh", ), docker_image=docker_image[image], - test_owner=test_owner.SHUNING_J, + test_owner=current_owner, cluster=test_scripts_details["cluster"], ).run_with_quarantine(quarantine_task_group) unchained_tests.append(training_tpu) @@ -129,6 +137,7 @@ def convert_checkpoint_and_run_training( docker_image, model, test_scripts_details, + current_owner, ): with TaskGroup(group_id=test_group_id, prefix_group_id=False) as group: test_name = f"{test_name_prefix}_{image}_{model}" @@ -145,7 +154,7 @@ def convert_checkpoint_and_run_training( f"export BASE_OUTPUT_PATH=$GCS_OUTPUT; bash tests/end_to_end/{test_scripts_details[0]['script_name']}.sh", ), docker_image=docker_image, - test_owner=test_owner.SHUNING_J, + test_owner=current_owner, cluster=test_scripts_details[0]["cluster"], ).run(gcs_location=shared_gcs_location) training_tpu = gke_config.get_gke_config( @@ -155,7 +164,7 @@ def convert_checkpoint_and_run_training( f"export BASE_OUTPUT_PATH=$GCS_OUTPUT; bash tests/end_to_end/{test_scripts_details[1]['script_name']}.sh", ), docker_image=docker_image, - test_owner=test_owner.SHUNING_J, + test_owner=current_owner, cluster=test_scripts_details[1]["cluster"], ).run(gcs_location=shared_gcs_location) return conversion_cpu, training_tpu @@ -165,6 +174,7 @@ def convert_checkpoint_and_run_training( gcs_subfolder = ( f"{test_owner.Team.JAX_MODELS_AND_PERFORMANCE.value}/maxtext" ) + current_owner = test_scripts_details[0].get("owner", test_owner.SHUNING_J) for image in docker_image.keys(): test_group_id = "chained_tests" + "_" + model + "_" + image if QuarantineTests.is_quarantined(test_group_id): @@ -176,6 +186,7 @@ def convert_checkpoint_and_run_training( docker_image[image], model, test_scripts_details, + current_owner, ) else: mode_cpu, mode_tpu = convert_checkpoint_and_run_training( @@ -185,6 +196,7 @@ def convert_checkpoint_and_run_training( docker_image[image], model, test_scripts_details, + current_owner, ) tests.append(mode_cpu) tests.append(mode_tpu) diff --git a/dags/sparsity_diffusion_devx/qwen3_next_test.py b/dags/sparsity_diffusion_devx/qwen3_next_test.py new file mode 100644 index 000000000..796065d96 --- /dev/null +++ b/dags/sparsity_diffusion_devx/qwen3_next_test.py @@ -0,0 +1,55 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A standalone DAG to test the Qwen3-Next 80B script with a custom Docker image.""" + +import datetime +from airflow import models +from dags import composer_env +from dags.common import test_owner +from dags.common.vm_resource import XpkClusters +from dags.multipod.configs import gke_config + +# Retrieve the HF_TOKEN from Airflow variables +HF_TOKEN = models.Variable.get("HF_TOKEN", None) + +with models.DAG( + dag_id="qwen3_next_custom_image_test", + schedule=None, # Set to None so it only runs when manually triggered + tags=[ + "maxtext", + "tpu", + "qwen3", + "v5p-128", + ], + start_date=datetime.datetime(2024, 1, 1), + catchup=False, +) as dag: + # Your specified custom docker image + custom_docker_image = "gcr.io/tpu-prod-env-multipod/maxtext_stable_stack_candidate:rbierneni-qwen-test" + + # Single unchained test configuration + test_qwen3_next = gke_config.get_gke_config( + time_out_in_min=90, + test_name="maxtext_qwen3_next_80b_test", + run_model_cmds=( + f"export HF_TOKEN={HF_TOKEN}; export BASE_OUTPUT_PATH=$GCS_OUTPUT; bash tests/end_to_end/tpu/qwen/next/qwen3-next-80b-a3b/2_test_qwen3_next_80b_a3b.sh", + ), + docker_image=custom_docker_image, + test_owner=test_owner.ROHAN_B, # Update the owner if necessary + cluster=XpkClusters.TPU_V5P_128_CLUSTER, + ).run() + + # No chained dependencies required since there is only one task + test_qwen3_next