Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Jun 1, 2025

📄 36% (0.36x) speedup for DDPMScheduler.get_velocity in src/diffusers/schedulers/scheduling_ddpm.py

⏱️ Runtime : 922 microseconds 679 microseconds (best of 534 runs)

📝 Explanation and details


Optimization summary:

  • Avoid unnecessary .to() conversions of alphas_cumprod and timesteps unless their device or dtype differs, saving copy time.
  • Avoid .flatten() and dynamic unsqueezing; use .reshape(target_shape) to broadcast in as few allocations as possible.
  • Use index_select instead of direct indexing for efficiency and type/device consistency, as timesteps may be on CUDA.
  • sqrt() computation is performed directly after index selection for efficiency.
  • This rewrite significantly reduces temporary tensor creation and redundant device/cast operations in the inner loop. The result tensor matches the previous implementation exactly.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 35 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Generated Regression Tests Details
import math
from typing import List, Optional, Union

import numpy as np
# imports
import pytest  # used for our unit tests
import torch
from src.diffusers.schedulers.scheduling_ddpm import DDPMScheduler

# function to test
# Copyright 2024 UC Berkeley Team and The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# DISCLAIMER: This file is strongly influenced by https://github.com/ermongroup/ddim


def betas_for_alpha_bar(
    num_diffusion_timesteps,
    max_beta=0.999,
    alpha_transform_type="cosine",
):
    if alpha_transform_type == "cosine":
        def alpha_bar_fn(t):
            return math.cos((t + 0.008) / 1.008 * math.pi / 2) ** 2
    elif alpha_transform_type == "exp":
        def alpha_bar_fn(t):
            return math.exp(t * -12.0)
    else:
        raise ValueError(f"Unsupported alpha_transform_type: {alpha_transform_type}")

    betas = []
    for i in range(num_diffusion_timesteps):
        t1 = i / num_diffusion_timesteps
        t2 = (i + 1) / num_diffusion_timesteps
        betas.append(min(1 - alpha_bar_fn(t2) / alpha_bar_fn(t1), max_beta))
    return torch.tensor(betas, dtype=torch.float32)

def rescale_zero_terminal_snr(betas):
    alphas = 1.0 - betas
    alphas_cumprod = torch.cumprod(alphas, dim=0)
    alphas_bar_sqrt = alphas_cumprod.sqrt()
    alphas_bar_sqrt_0 = alphas_bar_sqrt[0].clone()
    alphas_bar_sqrt_T = alphas_bar_sqrt[-1].clone()
    alphas_bar_sqrt -= alphas_bar_sqrt_T
    alphas_bar_sqrt *= alphas_bar_sqrt_0 / (alphas_bar_sqrt_0 - alphas_bar_sqrt_T)
    alphas_bar = alphas_bar_sqrt**2
    alphas = alphas_bar[1:] / alphas_bar[:-1]
    alphas = torch.cat([alphas_bar[0:1], alphas])
    betas = 1 - alphas
    return betas

class DummyConfig:
    def __init__(self, num_train_timesteps):
        self.num_train_timesteps = num_train_timesteps
from src.diffusers.schedulers.scheduling_ddpm import DDPMScheduler

# ------------------------
# Unit tests for get_velocity
# ------------------------

# Helper function for manual velocity computation
def manual_velocity(sample, noise, sqrt_alpha, sqrt_one_minus_alpha):
    return sqrt_alpha * noise - sqrt_one_minus_alpha * sample

# ------------------------
# 1. Basic Test Cases
# ------------------------

def test_velocity_simple_scalar():
    """
    Basic test: scalar inputs, single timestep
    """
    scheduler = DDPMScheduler(num_train_timesteps=10)
    sample = torch.tensor([2.0])
    noise = torch.tensor([3.0])
    t = torch.tensor([5], dtype=torch.long)
    # Compute expected values
    alpha_cumprod = scheduler.alphas_cumprod[t][0].item()
    sqrt_alpha = alpha_cumprod ** 0.5
    sqrt_one_minus_alpha = (1 - alpha_cumprod) ** 0.5
    expected = manual_velocity(sample, noise, sqrt_alpha, sqrt_one_minus_alpha)
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_batch_basic():
    """
    Basic test: batch of samples, batch of noises, batch of timesteps
    """
    scheduler = DDPMScheduler(num_train_timesteps=20)
    sample = torch.tensor([[1.0, 2.0], [3.0, 4.0]])
    noise = torch.tensor([[5.0, 6.0], [7.0, 8.0]])
    t = torch.tensor([0, 19], dtype=torch.long)  # test first and last timestep
    alpha_cumprod = scheduler.alphas_cumprod[t]
    sqrt_alpha = alpha_cumprod.sqrt().unsqueeze(-1)
    sqrt_one_minus_alpha = (1 - alpha_cumprod).sqrt().unsqueeze(-1)
    expected = sqrt_alpha * noise - sqrt_one_minus_alpha * sample
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_different_shapes():
    """
    Basic test: higher-dimensional input (e.g., image shape)
    """
    scheduler = DDPMScheduler(num_train_timesteps=5)
    sample = torch.ones((2, 3, 4))
    noise = torch.full((2, 3, 4), 2.0)
    t = torch.tensor([2, 4], dtype=torch.long)
    alpha_cumprod = scheduler.alphas_cumprod[t]
    sqrt_alpha = alpha_cumprod.sqrt().unsqueeze(-1).unsqueeze(-1)
    sqrt_one_minus_alpha = (1 - alpha_cumprod).sqrt().unsqueeze(-1).unsqueeze(-1)
    expected = sqrt_alpha * noise - sqrt_one_minus_alpha * sample
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_dtype_and_device():
    """
    Basic test: test with float64 and on CUDA if available
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    scheduler = DDPMScheduler(num_train_timesteps=8)
    sample = torch.tensor([1.0, 2.0], dtype=torch.float64, device=device)
    noise = torch.tensor([3.0, 4.0], dtype=torch.float64, device=device)
    t = torch.tensor([0, 7], dtype=torch.long, device=device)
    alpha_cumprod = scheduler.alphas_cumprod[t].to(dtype=sample.dtype, device=device)
    sqrt_alpha = alpha_cumprod.sqrt()
    sqrt_one_minus_alpha = (1 - alpha_cumprod).sqrt()
    expected = sqrt_alpha * noise - sqrt_one_minus_alpha * sample
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

# ------------------------
# 2. Edge Test Cases
# ------------------------

def test_velocity_zero_noise():
    """
    Edge: Zero noise, should be -sqrt(1-alpha)*sample
    """
    scheduler = DDPMScheduler(num_train_timesteps=3)
    sample = torch.tensor([[1.0, -2.0]])
    noise = torch.zeros_like(sample)
    t = torch.tensor([1], dtype=torch.long)
    alpha_cumprod = scheduler.alphas_cumprod[t]
    sqrt_one_minus_alpha = (1 - alpha_cumprod).sqrt().unsqueeze(-1)
    expected = -sqrt_one_minus_alpha * sample
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_zero_sample():
    """
    Edge: Zero sample, should be sqrt(alpha)*noise
    """
    scheduler = DDPMScheduler(num_train_timesteps=4)
    sample = torch.zeros((2, 2))
    noise = torch.tensor([[1.0, 2.0], [3.0, 4.0]])
    t = torch.tensor([0, 3], dtype=torch.long)
    alpha_cumprod = scheduler.alphas_cumprod[t]
    sqrt_alpha = alpha_cumprod.sqrt().unsqueeze(-1)
    expected = sqrt_alpha * noise
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_all_zero():
    """
    Edge: Both sample and noise are zero, velocity should be zero
    """
    scheduler = DDPMScheduler(num_train_timesteps=2)
    sample = torch.zeros((1, 3))
    noise = torch.zeros((1, 3))
    t = torch.tensor([1], dtype=torch.long)
    expected = torch.zeros_like(sample)
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_negative_values():
    """
    Edge: Negative values in sample and noise
    """
    scheduler = DDPMScheduler(num_train_timesteps=3)
    sample = torch.tensor([[-1.0, -2.0]])
    noise = torch.tensor([[-3.0, -4.0]])
    t = torch.tensor([2], dtype=torch.long)
    alpha_cumprod = scheduler.alphas_cumprod[t]
    sqrt_alpha = alpha_cumprod.sqrt().unsqueeze(-1)
    sqrt_one_minus_alpha = (1 - alpha_cumprod).sqrt().unsqueeze(-1)
    expected = sqrt_alpha * noise - sqrt_one_minus_alpha * sample
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_first_and_last_timestep():
    """
    Edge: t=0 (first), t=max (last) - check boundary behavior
    """
    scheduler = DDPMScheduler(num_train_timesteps=10)
    sample = torch.tensor([[1.0, 2.0], [3.0, 4.0]])
    noise = torch.tensor([[5.0, 6.0], [7.0, 8.0]])
    t = torch.tensor([0, 9], dtype=torch.long)
    alpha_cumprod = scheduler.alphas_cumprod[t]
    sqrt_alpha = alpha_cumprod.sqrt().unsqueeze(-1)
    sqrt_one_minus_alpha = (1 - alpha_cumprod).sqrt().unsqueeze(-1)
    expected = sqrt_alpha * noise - sqrt_one_minus_alpha * sample
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_broadcasting_timesteps():
    """
    Edge: Scalar timestep with batch sample/noise (should broadcast)
    """
    scheduler = DDPMScheduler(num_train_timesteps=7)
    sample = torch.tensor([[1.0, 2.0], [3.0, 4.0]])
    noise = torch.tensor([[5.0, 6.0], [7.0, 8.0]])
    t = torch.tensor([3], dtype=torch.long)  # single timestep for all
    alpha_cumprod = scheduler.alphas_cumprod[t][0]
    sqrt_alpha = alpha_cumprod.sqrt()
    sqrt_one_minus_alpha = (1 - alpha_cumprod).sqrt()
    expected = sqrt_alpha * noise - sqrt_one_minus_alpha * sample
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_invalid_timestep_raises():
    """
    Edge: Out-of-bounds timestep should raise an error
    """
    scheduler = DDPMScheduler(num_train_timesteps=5)
    sample = torch.ones((1, 2))
    noise = torch.ones((1, 2))
    t = torch.tensor([5], dtype=torch.long)  # invalid, should be in [0,4]
    with pytest.raises(IndexError):
        codeflash_output = scheduler.get_velocity(sample, noise, t); _ = codeflash_output

def test_velocity_mismatched_batch_size_raises():
    """
    Edge: Mismatched batch size between sample/noise and timesteps should raise error
    """
    scheduler = DDPMScheduler(num_train_timesteps=5)
    sample = torch.ones((2, 2))
    noise = torch.ones((2, 2))
    t = torch.tensor([1], dtype=torch.long)  # batch size 1, sample/noise batch size 2
    # Should broadcast, so this is valid
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

    # Now test invalid: batch size mismatch that can't broadcast
    t_invalid = torch.tensor([1, 2, 3], dtype=torch.long)  # batch size 3, sample/noise batch size 2
    with pytest.raises(RuntimeError):
        codeflash_output = scheduler.get_velocity(sample, noise, t_invalid); _ = codeflash_output

def test_velocity_noncontiguous_inputs():
    """
    Edge: Non-contiguous tensors as input
    """
    scheduler = DDPMScheduler(num_train_timesteps=10)
    sample = torch.arange(8).reshape(2, 4).t()[::2].t()  # make non-contiguous
    noise = torch.arange(8, 16).reshape(2, 4).t()[::2].t()
    t = torch.tensor([2, 8], dtype=torch.long)
    # Just check that the function works and output shape is correct
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

# ------------------------
# 3. Large Scale Test Cases
# ------------------------

def test_velocity_large_batch():
    """
    Large scale: Large batch size, but small enough for <100MB
    """
    scheduler = DDPMScheduler(num_train_timesteps=100)
    batch_size = 512
    feature_size = 16
    sample = torch.randn((batch_size, feature_size))
    noise = torch.randn((batch_size, feature_size))
    t = torch.randint(0, 100, (batch_size,), dtype=torch.long)
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_large_feature():
    """
    Large scale: Large feature dimension, but small batch
    """
    scheduler = DDPMScheduler(num_train_timesteps=50)
    batch_size = 2
    feature_size = 1000
    sample = torch.randn((batch_size, feature_size))
    noise = torch.randn((batch_size, feature_size))
    t = torch.randint(0, 50, (batch_size,), dtype=torch.long)
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_large_3d_tensor():
    """
    Large scale: 3D tensor (e.g. image batch), up to 1000 elements
    """
    scheduler = DDPMScheduler(num_train_timesteps=30)
    batch_size = 8
    channels = 3
    height = 8
    width = 4  # 8*3*8*4 = 768 elements
    sample = torch.randn((batch_size, channels, height, width))
    noise = torch.randn((batch_size, channels, height, width))
    t = torch.randint(0, 30, (batch_size,), dtype=torch.long)
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_performance_reasonable_time():
    """
    Large scale: Ensure function runs in reasonable time for large batch
    """
    import time
    scheduler = DDPMScheduler(num_train_timesteps=100)
    batch_size = 1000
    feature_size = 1
    sample = torch.randn((batch_size, feature_size))
    noise = torch.randn((batch_size, feature_size))
    t = torch.randint(0, 100, (batch_size,), dtype=torch.long)
    start = time.time()
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output
    elapsed = time.time() - start

def test_velocity_multiple_timesteps_vectorized():
    """
    Large scale: All possible timesteps in one batch (vectorized)
    """
    scheduler = DDPMScheduler(num_train_timesteps=50)
    sample = torch.randn((50, 10))
    noise = torch.randn((50, 10))
    t = torch.arange(0, 50, dtype=torch.long)
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

import math
from typing import List, Optional, Union

import numpy as np
# imports
import pytest  # used for our unit tests
import torch
from src.diffusers.schedulers.scheduling_ddpm import DDPMScheduler

# function to test
# Copyright 2024 UC Berkeley Team and The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# DISCLAIMER: This file is strongly influenced by https://github.com/ermongroup/ddim


def betas_for_alpha_bar(
    num_diffusion_timesteps,
    max_beta=0.999,
    alpha_transform_type="cosine",
):
    if alpha_transform_type == "cosine":
        def alpha_bar_fn(t):
            return math.cos((t + 0.008) / 1.008 * math.pi / 2) ** 2
    elif alpha_transform_type == "exp":
        def alpha_bar_fn(t):
            return math.exp(t * -12.0)
    else:
        raise ValueError(f"Unsupported alpha_transform_type: {alpha_transform_type}")

    betas = []
    for i in range(num_diffusion_timesteps):
        t1 = i / num_diffusion_timesteps
        t2 = (i + 1) / num_diffusion_timesteps
        betas.append(min(1 - alpha_bar_fn(t2) / alpha_bar_fn(t1), max_beta))
    return torch.tensor(betas, dtype=torch.float32)

def rescale_zero_terminal_snr(betas):
    alphas = 1.0 - betas
    alphas_cumprod = torch.cumprod(alphas, dim=0)
    alphas_bar_sqrt = alphas_cumprod.sqrt()
    alphas_bar_sqrt_0 = alphas_bar_sqrt[0].clone()
    alphas_bar_sqrt_T = alphas_bar_sqrt[-1].clone()
    alphas_bar_sqrt -= alphas_bar_sqrt_T
    alphas_bar_sqrt *= alphas_bar_sqrt_0 / (alphas_bar_sqrt_0 - alphas_bar_sqrt_T)
    alphas_bar = alphas_bar_sqrt**2
    alphas = alphas_bar[1:] / alphas_bar[:-1]
    alphas = torch.cat([alphas_bar[0:1], alphas])
    betas = 1 - alphas
    return betas

class ConfigMixin:
    pass
def register_to_config(fn):
    return fn
class KarrasDiffusionSchedulers:
    name = "dummy"

class SchedulerMixin:
    pass
from src.diffusers.schedulers.scheduling_ddpm import DDPMScheduler

# unit tests

# ----------- BASIC TEST CASES -----------

def test_velocity_scalar_inputs():
    # Test with scalar (0-dim) sample and noise, single timestep
    scheduler = DDPMScheduler(num_train_timesteps=10)
    sample = torch.tensor(2.0)
    noise = torch.tensor(3.0)
    t = torch.tensor([5], dtype=torch.long)
    # Manually compute expected
    acp = scheduler.alphas_cumprod
    sqrt_alpha = acp[5].sqrt()
    sqrt_one_minus_alpha = (1 - acp[5]).sqrt()
    expected = sqrt_alpha * noise - sqrt_one_minus_alpha * sample
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_vector_inputs():
    # Test with 1D vector sample and noise, batch of timesteps
    scheduler = DDPMScheduler(num_train_timesteps=20)
    sample = torch.tensor([1.0, 2.0, 3.0])
    noise = torch.tensor([0.5, -1.0, 2.0])
    t = torch.tensor([0, 10, 19], dtype=torch.long)
    acp = scheduler.alphas_cumprod
    expected = torch.stack([
        acp[0].sqrt() * noise[0] - (1 - acp[0]).sqrt() * sample[0],
        acp[10].sqrt() * noise[1] - (1 - acp[10]).sqrt() * sample[1],
        acp[19].sqrt() * noise[2] - (1 - acp[19]).sqrt() * sample[2],
    ])
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_batched_2d():
    # Test with 2D batch, batch size > 1, same timesteps for all
    scheduler = DDPMScheduler(num_train_timesteps=5)
    sample = torch.tensor([[1.0, 2.0], [3.0, 4.0]])
    noise = torch.tensor([[0.0, 1.0], [2.0, 3.0]])
    t = torch.tensor([4, 4], dtype=torch.long)
    acp = scheduler.alphas_cumprod
    sqrt_alpha = acp[4].sqrt()
    sqrt_one_minus_alpha = (1 - acp[4]).sqrt()
    expected = sqrt_alpha * noise - sqrt_one_minus_alpha * sample
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_batched_diff_timesteps():
    # Test with 2D batch, different timesteps for each batch
    scheduler = DDPMScheduler(num_train_timesteps=8)
    sample = torch.tensor([[1.0, 2.0], [3.0, 4.0]])
    noise = torch.tensor([[0.0, 1.0], [2.0, 3.0]])
    t = torch.tensor([0, 7], dtype=torch.long)
    acp = scheduler.alphas_cumprod
    expected = torch.stack([
        acp[0].sqrt() * noise[0] - (1 - acp[0]).sqrt() * sample[0],
        acp[7].sqrt() * noise[1] - (1 - acp[7]).sqrt() * sample[1],
    ])
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_dtype_consistency():
    # Test with float64 dtype
    scheduler = DDPMScheduler(num_train_timesteps=10)
    sample = torch.tensor([1.0, 2.0], dtype=torch.float64)
    noise = torch.tensor([0.5, -0.5], dtype=torch.float64)
    t = torch.tensor([1, 9], dtype=torch.long)
    acp = scheduler.alphas_cumprod.to(dtype=torch.float64)
    expected = torch.stack([
        acp[1].sqrt() * noise[0] - (1 - acp[1]).sqrt() * sample[0],
        acp[9].sqrt() * noise[1] - (1 - acp[9]).sqrt() * sample[1],
    ])
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_device_consistency():
    # Test with CUDA if available, otherwise skip
    if torch.cuda.is_available():
        scheduler = DDPMScheduler(num_train_timesteps=10)
        sample = torch.tensor([1.0, 2.0], device='cuda')
        noise = torch.tensor([0.5, -0.5], device='cuda')
        t = torch.tensor([1, 9], dtype=torch.long, device='cuda')
        acp = scheduler.alphas_cumprod.to(device='cuda')
        expected = torch.stack([
            acp[1].sqrt() * noise[0] - (1 - acp[1]).sqrt() * sample[0],
            acp[9].sqrt() * noise[1] - (1 - acp[9]).sqrt() * sample[1],
        ])
        codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

# ----------- EDGE TEST CASES -----------

def test_velocity_timestep_zero():
    # Test with timestep 0 (should use first alphas_cumprod)
    scheduler = DDPMScheduler(num_train_timesteps=10)
    sample = torch.tensor([1.0])
    noise = torch.tensor([0.0])
    t = torch.tensor([0], dtype=torch.long)
    acp = scheduler.alphas_cumprod
    expected = acp[0].sqrt() * noise - (1 - acp[0]).sqrt() * sample
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_timestep_last():
    # Test with last possible timestep
    scheduler = DDPMScheduler(num_train_timesteps=10)
    sample = torch.tensor([1.0])
    noise = torch.tensor([1.0])
    t = torch.tensor([9], dtype=torch.long)
    acp = scheduler.alphas_cumprod
    expected = acp[9].sqrt() * noise - (1 - acp[9]).sqrt() * sample
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_negative_sample_noise():
    # Test with negative values in sample and noise
    scheduler = DDPMScheduler(num_train_timesteps=5)
    sample = torch.tensor([-1.0, -2.0])
    noise = torch.tensor([-3.0, -4.0])
    t = torch.tensor([2, 3], dtype=torch.long)
    acp = scheduler.alphas_cumprod
    expected = torch.stack([
        acp[2].sqrt() * noise[0] - (1 - acp[2]).sqrt() * sample[0],
        acp[3].sqrt() * noise[1] - (1 - acp[3]).sqrt() * sample[1],
    ])
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_zero_sample_noise():
    # Test with all zeros in sample and noise
    scheduler = DDPMScheduler(num_train_timesteps=4)
    sample = torch.zeros(3)
    noise = torch.zeros(3)
    t = torch.tensor([0, 1, 2], dtype=torch.long)
    expected = torch.zeros(3)
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_mismatched_shapes_raises():
    # Test with mismatched shapes (should raise)
    scheduler = DDPMScheduler(num_train_timesteps=4)
    sample = torch.zeros(3)
    noise = torch.zeros(4)
    t = torch.tensor([0, 1, 2, 3], dtype=torch.long)
    with pytest.raises(RuntimeError):
        scheduler.get_velocity(sample, noise, t)

def test_velocity_timestep_out_of_bounds_raises():
    # Test with out-of-bounds timestep (should raise)
    scheduler = DDPMScheduler(num_train_timesteps=4)
    sample = torch.zeros(1)
    noise = torch.zeros(1)
    t = torch.tensor([4], dtype=torch.long)  # valid: 0-3
    with pytest.raises(IndexError):
        scheduler.get_velocity(sample, noise, t)

def test_velocity_broadcasting():
    # Test with broadcasting: sample shape (batch, C, H, W), noise shape (batch, 1, 1, 1)
    scheduler = DDPMScheduler(num_train_timesteps=10)
    sample = torch.ones((2, 3, 4, 4))
    noise = torch.zeros((2, 1, 1, 1))
    t = torch.tensor([0, 9], dtype=torch.long)
    acp = scheduler.alphas_cumprod
    expected = torch.stack([
        acp[0].sqrt() * noise[0] - (1 - acp[0]).sqrt() * sample[0],
        acp[9].sqrt() * noise[1] - (1 - acp[9]).sqrt() * sample[1],
    ])
    # Broadcasting expected to (2,3,4,4)
    expected = expected.expand_as(sample)
    codeflash_output = scheduler.get_velocity(sample, noise.expand_as(sample), t); result = codeflash_output

# ----------- LARGE SCALE TEST CASES -----------

def test_velocity_large_batch():
    # Test with large batch size
    scheduler = DDPMScheduler(num_train_timesteps=100)
    batch_size = 512
    sample = torch.randn((batch_size, 3))
    noise = torch.randn((batch_size, 3))
    t = torch.randint(0, 100, (batch_size,), dtype=torch.long)
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_large_image():
    # Test with large image-like tensors, but under 100MB
    scheduler = DDPMScheduler(num_train_timesteps=50)
    batch = 8
    channels = 3
    H = 64
    W = 64
    sample = torch.randn((batch, channels, H, W))
    noise = torch.randn((batch, channels, H, W))
    t = torch.randint(0, 50, (batch,), dtype=torch.long)
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_many_timesteps():
    # Test with many timesteps (max allowed)
    scheduler = DDPMScheduler(num_train_timesteps=999)
    sample = torch.randn((10,))
    noise = torch.randn((10,))
    t = torch.randint(0, 999, (10,), dtype=torch.long)
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_large_broadcasting():
    # Test with large batch and broadcasting, shape (batch, 1, 1)
    scheduler = DDPMScheduler(num_train_timesteps=100)
    batch = 256
    sample = torch.randn((batch, 1, 1))
    noise = torch.randn((batch, 1, 1))
    t = torch.randint(0, 100, (batch,), dtype=torch.long)
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output

def test_velocity_performance_under_load():
    # Test that function completes quickly for large but reasonable input
    import time
    scheduler = DDPMScheduler(num_train_timesteps=1000)
    batch = 64
    shape = (batch, 16, 16)
    sample = torch.randn(shape)
    noise = torch.randn(shape)
    t = torch.randint(0, 1000, (batch,), dtype=torch.long)
    start = time.time()
    codeflash_output = scheduler.get_velocity(sample, noise, t); result = codeflash_output
    elapsed = time.time() - start
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-DDPMScheduler.get_velocity-mbdlvknj and push.

Codeflash

---

**Optimization summary:**
- Avoid unnecessary `.to()` conversions of `alphas_cumprod` and `timesteps` unless their device or dtype differs, saving copy time.
- Avoid `.flatten()` and dynamic unsqueezing; use `.reshape(target_shape)` to broadcast in as few allocations as possible.
- Use `index_select` instead of direct indexing for efficiency and type/device consistency, as `timesteps` may be on CUDA.
- `sqrt()` computation is performed directly after index selection for efficiency.
- This rewrite significantly reduces temporary tensor creation and redundant device/cast operations in the inner loop. The result tensor matches the previous implementation exactly.
@codeflash-ai codeflash-ai bot added the ⚡️ codeflash Optimization PR opened by Codeflash AI label Jun 1, 2025
@codeflash-ai codeflash-ai bot requested a review from aseembits93 June 1, 2025 11:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI

Projects

None yet

Development

Successfully merging this pull request may close these issues.

0 participants