Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions dags/sparsity_diffusion_devx/maxtext_moe_tpu_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,27 @@
"cluster": XpkClusters.TPU_V5P_8_CLUSTER,
"time_out_in_min": 90,
},
"qwen3-next-80b": {
Copy link
Collaborator

@shuningjin shuningjin Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add yourself as the owner for qwen3-next-80b?

We can do something like the maxtext_end_to_end DAG, where the owner is different across tests:

"gemma-2b": {
"owner": test_owner.MOHIT_K,
"commands": ["bash tests/end_to_end/tpu/gemma/2b/test_gemma.sh"],
},
"gemma2-2b": {
"owner": test_owner.HENGTAO_G,
"commands": [
"bash tests/end_to_end/tpu/gemma2/2b/test_gemma2_to_mt.sh",
"bash tests/end_to_end/tpu/gemma2/2b/test_gemma2_to_hf.sh",
],
},

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I have added myself for the qwen3 next test. For the other tests, i let it default to you.

"script_name": "tpu/qwen/next/qwen3-next-80b-a3b/2_test_qwen3_next_80b_a3b",
"cluster": XpkClusters.TPU_V5P_128_CLUSTER,
"time_out_in_min": 90,
"owner": test_owner.ROHAN_B,
},
}

unchained_tests = []
for model, test_scripts_details in test_models_tpu.items():
for image in docker_image.keys():
current_owner = test_scripts_details.get("owner", test_owner.SHUNING_J)

training_tpu = gke_config.get_gke_config(
time_out_in_min=test_scripts_details["time_out_in_min"],
test_name=f"{test_name_prefix}_{image}_{model}",
run_model_cmds=(
f"export HF_TOKEN={HF_TOKEN}; export BASE_OUTPUT_PATH=$GCS_OUTPUT; bash tests/end_to_end/{test_scripts_details['script_name']}.sh",
),
docker_image=docker_image[image],
test_owner=test_owner.SHUNING_J,
test_owner=current_owner,
cluster=test_scripts_details["cluster"],
).run_with_quarantine(quarantine_task_group)
unchained_tests.append(training_tpu)
Expand Down Expand Up @@ -129,6 +137,7 @@ def convert_checkpoint_and_run_training(
docker_image,
model,
test_scripts_details,
current_owner,
):
with TaskGroup(group_id=test_group_id, prefix_group_id=False) as group:
test_name = f"{test_name_prefix}_{image}_{model}"
Expand All @@ -145,7 +154,7 @@ def convert_checkpoint_and_run_training(
f"export BASE_OUTPUT_PATH=$GCS_OUTPUT; bash tests/end_to_end/{test_scripts_details[0]['script_name']}.sh",
),
docker_image=docker_image,
test_owner=test_owner.SHUNING_J,
test_owner=current_owner,
cluster=test_scripts_details[0]["cluster"],
).run(gcs_location=shared_gcs_location)
training_tpu = gke_config.get_gke_config(
Expand All @@ -155,7 +164,7 @@ def convert_checkpoint_and_run_training(
f"export BASE_OUTPUT_PATH=$GCS_OUTPUT; bash tests/end_to_end/{test_scripts_details[1]['script_name']}.sh",
),
docker_image=docker_image,
test_owner=test_owner.SHUNING_J,
test_owner=current_owner,
cluster=test_scripts_details[1]["cluster"],
).run(gcs_location=shared_gcs_location)
return conversion_cpu, training_tpu
Expand All @@ -165,6 +174,7 @@ def convert_checkpoint_and_run_training(
gcs_subfolder = (
f"{test_owner.Team.JAX_MODELS_AND_PERFORMANCE.value}/maxtext"
)
current_owner = test_scripts_details[0].get("owner", test_owner.SHUNING_J)
for image in docker_image.keys():
test_group_id = "chained_tests" + "_" + model + "_" + image
if QuarantineTests.is_quarantined(test_group_id):
Expand All @@ -176,6 +186,7 @@ def convert_checkpoint_and_run_training(
docker_image[image],
model,
test_scripts_details,
current_owner,
)
else:
mode_cpu, mode_tpu = convert_checkpoint_and_run_training(
Expand All @@ -185,6 +196,7 @@ def convert_checkpoint_and_run_training(
docker_image[image],
model,
test_scripts_details,
current_owner,
)
tests.append(mode_cpu)
tests.append(mode_tpu)
Expand Down
55 changes: 55 additions & 0 deletions dags/sparsity_diffusion_devx/qwen3_next_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A standalone DAG to test the Qwen3-Next 80B script with a custom Docker image."""

import datetime
from airflow import models
from dags import composer_env
from dags.common import test_owner
from dags.common.vm_resource import XpkClusters
from dags.multipod.configs import gke_config

# Retrieve the HF_TOKEN from Airflow variables
HF_TOKEN = models.Variable.get("HF_TOKEN", None)

with models.DAG(
dag_id="qwen3_next_custom_image_test",
schedule=None, # Set to None so it only runs when manually triggered
tags=[
"maxtext",
"tpu",
"qwen3",
"v5p-128",
],
start_date=datetime.datetime(2024, 1, 1),
catchup=False,
) as dag:
# Your specified custom docker image
custom_docker_image = "gcr.io/tpu-prod-env-multipod/maxtext_stable_stack_candidate:rbierneni-qwen-test"

# Single unchained test configuration
test_qwen3_next = gke_config.get_gke_config(
time_out_in_min=90,
test_name="maxtext_qwen3_next_80b_test",
run_model_cmds=(
f"export HF_TOKEN={HF_TOKEN}; export BASE_OUTPUT_PATH=$GCS_OUTPUT; bash tests/end_to_end/tpu/qwen/next/qwen3-next-80b-a3b/2_test_qwen3_next_80b_a3b.sh",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To confirm this test is only checking for runtime errors ? do you also plan on adding a tests for logits now or in the future ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is running this script: https://github.com/AI-Hypercomputer/maxtext/blob/040c71b73616d768b141da07292fb0417164846c/tests/end_to_end/tpu/qwen/next/qwen3-next-80b-a3b/2_test_qwen3_next_80b_a3b.sh

It does:

  • Forward pass logit check
  • train workload
  • finetuning workload
  • decoding workload

It should cover logit comparision, runtime errors, config checks, train/decoding support, etc. Pretty much all end-to-end model checks.

),
docker_image=custom_docker_image,
test_owner=test_owner.ROHAN_B, # Update the owner if necessary
cluster=XpkClusters.TPU_V5P_128_CLUSTER,
).run()

# No chained dependencies required since there is only one task
test_qwen3_next
Loading