|  | 
|  | 1 | +import numpy as np | 
|  | 2 | +import numpy.core.multiarray as multiarray | 
|  | 3 | +import torch | 
|  | 4 | +import torch.nn as nn | 
|  | 5 | +from huggingface_hub import hf_hub_download | 
|  | 6 | +from torch.serialization import add_safe_globals | 
|  | 7 | + | 
|  | 8 | +from diffusers import DDPMScheduler, UNet1DModel | 
|  | 9 | + | 
|  | 10 | + | 
|  | 11 | +add_safe_globals( | 
|  | 12 | +    [ | 
|  | 13 | +        multiarray._reconstruct, | 
|  | 14 | +        np.ndarray, | 
|  | 15 | +        np.dtype, | 
|  | 16 | +        np.dtype(np.float32).type, | 
|  | 17 | +        np.dtype(np.float64).type, | 
|  | 18 | +        np.dtype(np.int32).type, | 
|  | 19 | +        np.dtype(np.int64).type, | 
|  | 20 | +        type(np.dtype(np.float32)), | 
|  | 21 | +        type(np.dtype(np.float64)), | 
|  | 22 | +        type(np.dtype(np.int32)), | 
|  | 23 | +        type(np.dtype(np.int64)), | 
|  | 24 | +    ] | 
|  | 25 | +) | 
|  | 26 | + | 
|  | 27 | +""" | 
|  | 28 | +An example of using HuggingFace's diffusers library for diffusion policy, | 
|  | 29 | +generating smooth movement trajectories. | 
|  | 30 | +
 | 
|  | 31 | +This implements a robot control model for pushing a T-shaped block into a target area. | 
|  | 32 | +The model takes in the robot arm position, block position, and block angle, | 
|  | 33 | +then outputs a sequence of 16 (x,y) positions for the robot arm to follow. | 
|  | 34 | +""" | 
|  | 35 | + | 
|  | 36 | + | 
|  | 37 | +class ObservationEncoder(nn.Module): | 
|  | 38 | +    """ | 
|  | 39 | +    Converts raw robot observations (positions/angles) into a more compact representation | 
|  | 40 | +
 | 
|  | 41 | +    state_dim (int): Dimension of the input state vector (default: 5) | 
|  | 42 | +        [robot_x, robot_y, block_x, block_y, block_angle] | 
|  | 43 | +
 | 
|  | 44 | +    - Input shape: (batch_size, state_dim) | 
|  | 45 | +    - Output shape: (batch_size, 256) | 
|  | 46 | +    """ | 
|  | 47 | + | 
|  | 48 | +    def __init__(self, state_dim): | 
|  | 49 | +        super().__init__() | 
|  | 50 | +        self.net = nn.Sequential(nn.Linear(state_dim, 512), nn.ReLU(), nn.Linear(512, 256)) | 
|  | 51 | + | 
|  | 52 | +    def forward(self, x): | 
|  | 53 | +        return self.net(x) | 
|  | 54 | + | 
|  | 55 | + | 
|  | 56 | +class ObservationProjection(nn.Module): | 
|  | 57 | +    """ | 
|  | 58 | +    Takes the encoded observation and transforms it into 32 values that represent the current robot/block situation. | 
|  | 59 | +    These values are used as additional contextual information during the diffusion model's trajectory generation. | 
|  | 60 | +
 | 
|  | 61 | +    - Input: 256-dim vector (padded to 512) | 
|  | 62 | +            Shape: (batch_size, 256) | 
|  | 63 | +    - Output: 32 contextual information values for the diffusion model | 
|  | 64 | +            Shape: (batch_size, 32) | 
|  | 65 | +    """ | 
|  | 66 | + | 
|  | 67 | +    def __init__(self): | 
|  | 68 | +        super().__init__() | 
|  | 69 | +        self.weight = nn.Parameter(torch.randn(32, 512)) | 
|  | 70 | +        self.bias = nn.Parameter(torch.zeros(32)) | 
|  | 71 | + | 
|  | 72 | +    def forward(self, x):  # pad 256-dim input to 512-dim with zeros | 
|  | 73 | +        if x.size(-1) == 256: | 
|  | 74 | +            x = torch.cat([x, torch.zeros(*x.shape[:-1], 256, device=x.device)], dim=-1) | 
|  | 75 | +        return nn.functional.linear(x, self.weight, self.bias) | 
|  | 76 | + | 
|  | 77 | + | 
|  | 78 | +class DiffusionPolicy: | 
|  | 79 | +    """ | 
|  | 80 | +    Implements diffusion policy for generating robot arm trajectories. | 
|  | 81 | +    Uses diffusion to generate sequences of positions for a robot arm, conditioned on | 
|  | 82 | +    the current state of the robot and the block it needs to push. | 
|  | 83 | +
 | 
|  | 84 | +    The model expects observations in pixel coordinates (0-512 range) and block angle in radians. | 
|  | 85 | +    It generates trajectories as sequences of (x,y) coordinates also in the 0-512 range. | 
|  | 86 | +    """ | 
|  | 87 | + | 
|  | 88 | +    def __init__(self, state_dim=5, device="cpu"): | 
|  | 89 | +        self.device = device | 
|  | 90 | + | 
|  | 91 | +        # define valid ranges for inputs/outputs | 
|  | 92 | +        self.stats = { | 
|  | 93 | +            "obs": {"min": torch.zeros(5), "max": torch.tensor([512, 512, 512, 512, 2 * np.pi])}, | 
|  | 94 | +            "action": {"min": torch.zeros(2), "max": torch.full((2,), 512)}, | 
|  | 95 | +        } | 
|  | 96 | + | 
|  | 97 | +        self.obs_encoder = ObservationEncoder(state_dim).to(device) | 
|  | 98 | +        self.obs_projection = ObservationProjection().to(device) | 
|  | 99 | + | 
|  | 100 | +        # UNet model that performs the denoising process | 
|  | 101 | +        # takes in concatenated action (2 channels) and context (32 channels) = 34 channels | 
|  | 102 | +        # outputs predicted action (2 channels for x,y coordinates) | 
|  | 103 | +        self.model = UNet1DModel( | 
|  | 104 | +            sample_size=16,  # length of trajectory sequence | 
|  | 105 | +            in_channels=34, | 
|  | 106 | +            out_channels=2, | 
|  | 107 | +            layers_per_block=2,  # number of layers per each UNet block | 
|  | 108 | +            block_out_channels=(128,),  # number of output neurons per layer in each block | 
|  | 109 | +            down_block_types=("DownBlock1D",),  # reduce the resolution of data | 
|  | 110 | +            up_block_types=("UpBlock1D",),  # increase the resolution of data | 
|  | 111 | +        ).to(device) | 
|  | 112 | + | 
|  | 113 | +        # noise scheduler that controls the denoising process | 
|  | 114 | +        self.noise_scheduler = DDPMScheduler( | 
|  | 115 | +            num_train_timesteps=100,  # number of denoising steps | 
|  | 116 | +            beta_schedule="squaredcos_cap_v2",  # type of noise schedule | 
|  | 117 | +        ) | 
|  | 118 | + | 
|  | 119 | +        # load pre-trained weights from HuggingFace | 
|  | 120 | +        checkpoint = torch.load( | 
|  | 121 | +            hf_hub_download("dorsar/diffusion_policy", "push_tblock.pt"), weights_only=True, map_location=device | 
|  | 122 | +        ) | 
|  | 123 | +        self.model.load_state_dict(checkpoint["model_state_dict"]) | 
|  | 124 | +        self.obs_encoder.load_state_dict(checkpoint["encoder_state_dict"]) | 
|  | 125 | +        self.obs_projection.load_state_dict(checkpoint["projection_state_dict"]) | 
|  | 126 | + | 
|  | 127 | +    # scales data to [-1, 1] range for neural network processing | 
|  | 128 | +    def normalize_data(self, data, stats): | 
|  | 129 | +        return ((data - stats["min"]) / (stats["max"] - stats["min"])) * 2 - 1 | 
|  | 130 | + | 
|  | 131 | +    # converts normalized data back to original range | 
|  | 132 | +    def unnormalize_data(self, ndata, stats): | 
|  | 133 | +        return ((ndata + 1) / 2) * (stats["max"] - stats["min"]) + stats["min"] | 
|  | 134 | + | 
|  | 135 | +    @torch.no_grad() | 
|  | 136 | +    def predict(self, observation): | 
|  | 137 | +        """ | 
|  | 138 | +        Generates a trajectory of robot arm positions given the current state. | 
|  | 139 | +
 | 
|  | 140 | +        Args: | 
|  | 141 | +            observation (torch.Tensor): Current state [robot_x, robot_y, block_x, block_y, block_angle] | 
|  | 142 | +                                    Shape: (batch_size, 5) | 
|  | 143 | +
 | 
|  | 144 | +        Returns: | 
|  | 145 | +            torch.Tensor: Sequence of (x,y) positions for the robot arm to follow | 
|  | 146 | +                        Shape: (batch_size, 16, 2) where: | 
|  | 147 | +                        - 16 is the number of steps in the trajectory | 
|  | 148 | +                        - 2 is the (x,y) coordinates in pixel space (0-512) | 
|  | 149 | +
 | 
|  | 150 | +        The function first encodes the observation, then uses it to condition a diffusion | 
|  | 151 | +        process that gradually denoises random trajectories into smooth, purposeful movements. | 
|  | 152 | +        """ | 
|  | 153 | +        observation = observation.to(self.device) | 
|  | 154 | +        normalized_obs = self.normalize_data(observation, self.stats["obs"]) | 
|  | 155 | + | 
|  | 156 | +        # encode the observation into context values for the diffusion model | 
|  | 157 | +        cond = self.obs_projection(self.obs_encoder(normalized_obs)) | 
|  | 158 | +        # keeps first & second dimension sizes unchanged, and multiplies last dimension by 16 | 
|  | 159 | +        cond = cond.view(normalized_obs.shape[0], -1, 1).expand(-1, -1, 16) | 
|  | 160 | + | 
|  | 161 | +        # initialize action with noise - random noise that will be refined into a trajectory | 
|  | 162 | +        action = torch.randn((observation.shape[0], 2, 16), device=self.device) | 
|  | 163 | + | 
|  | 164 | +        # denoise | 
|  | 165 | +        # at each step `t`, the current noisy trajectory (`action`) & conditioning info (context) are | 
|  | 166 | +        # fed into the model to predict a denoised trajectory, then uses self.noise_scheduler.step to | 
|  | 167 | +        # apply this prediction & slightly reduce the noise in `action` more | 
|  | 168 | + | 
|  | 169 | +        self.noise_scheduler.set_timesteps(100) | 
|  | 170 | +        for t in self.noise_scheduler.timesteps: | 
|  | 171 | +            model_output = self.model(torch.cat([action, cond], dim=1), t) | 
|  | 172 | +            action = self.noise_scheduler.step(model_output.sample, t, action).prev_sample | 
|  | 173 | + | 
|  | 174 | +        action = action.transpose(1, 2)  # reshape to [batch, 16, 2] | 
|  | 175 | +        action = self.unnormalize_data(action, self.stats["action"])  # scale back to coordinates | 
|  | 176 | +        return action | 
|  | 177 | + | 
|  | 178 | + | 
|  | 179 | +if __name__ == "__main__": | 
|  | 180 | +    policy = DiffusionPolicy() | 
|  | 181 | + | 
|  | 182 | +    # sample of a single observation | 
|  | 183 | +    # robot arm starts in center, block is slightly left and up, rotated 90 degrees | 
|  | 184 | +    obs = torch.tensor( | 
|  | 185 | +        [ | 
|  | 186 | +            [ | 
|  | 187 | +                256.0,  # robot arm x position (middle of screen) | 
|  | 188 | +                256.0,  # robot arm y position (middle of screen) | 
|  | 189 | +                200.0,  # block x position | 
|  | 190 | +                300.0,  # block y position | 
|  | 191 | +                np.pi / 2,  # block angle (90 degrees) | 
|  | 192 | +            ] | 
|  | 193 | +        ] | 
|  | 194 | +    ) | 
|  | 195 | + | 
|  | 196 | +    action = policy.predict(obs) | 
|  | 197 | + | 
|  | 198 | +    print("Action shape:", action.shape)  # should be [1, 16, 2] - one trajectory of 16 x,y positions | 
|  | 199 | +    print("\nPredicted trajectory:") | 
|  | 200 | +    for i, (x, y) in enumerate(action[0]): | 
|  | 201 | +        print(f"Step {i:2d}: x={x:6.1f}, y={y:6.1f}") | 
0 commit comments