1+ import numpy as np
12import torch
23import torch .nn as nn
3- import numpy as np
4- from diffusers import UNet1DModel , DDPMScheduler
54from huggingface_hub import hf_hub_download
65
6+ from diffusers import DDPMScheduler , UNet1DModel
7+
8+
79"""
8- An example of using HuggingFace's diffusers library for diffusion policy,
10+ An example of using HuggingFace's diffusers library for diffusion policy,
911generating smooth movement trajectories.
1012
1113This implements a robot control model for pushing a T-shaped block into a target area.
12- The model takes in the robot arm position, block position, and block angle,
14+ The model takes in the robot arm position, block position, and block angle,
1315then outputs a sequence of 16 (x,y) positions for the robot arm to follow.
1416"""
1517
1618class ObservationEncoder (nn .Module ):
1719 """
1820 Converts raw robot observations (positions/angles) into a more compact representation
19-
21+
2022 state_dim (int): Dimension of the input state vector (default: 5)
2123 [robot_x, robot_y, block_x, block_y, block_angle]
2224
@@ -27,16 +29,16 @@ class ObservationEncoder(nn.Module):
2729 def __init__ (self , state_dim ):
2830 super ().__init__ ()
2931 self .net = nn .Sequential (
30- nn .Linear (state_dim , 512 ),
31- nn .ReLU (),
32+ nn .Linear (state_dim , 512 ),
33+ nn .ReLU (),
3234 nn .Linear (512 , 256 )
3335 )
34-
36+
3537 def forward (self , x ): return self .net (x )
3638
3739class ObservationProjection (nn .Module ):
3840 """
39- Takes the encoded observation and transforms it into 32 values that represent the current robot/block situation.
41+ Takes the encoded observation and transforms it into 32 values that represent the current robot/block situation.
4042 These values are used as additional contextual information during the diffusion model's trajectory generation.
4143
4244 - Input: 256-dim vector (padded to 512)
@@ -48,7 +50,7 @@ def __init__(self):
4850 super ().__init__ ()
4951 self .weight = nn .Parameter (torch .randn (32 , 512 ))
5052 self .bias = nn .Parameter (torch .zeros (32 ))
51-
53+
5254 def forward (self , x ): # pad 256-dim input to 512-dim with zeros
5355 if x .size (- 1 ) == 256 :
5456 x = torch .cat ([x , torch .zeros (* x .shape [:- 1 ], 256 , device = x .device )], dim = - 1 )
@@ -57,7 +59,7 @@ def forward(self, x): # pad 256-dim input to 512-dim with zeros
5759class DiffusionPolicy :
5860 """
5961 Implements diffusion policy for generating robot arm trajectories.
60- Uses diffusion to generate sequences of positions for a robot arm, conditioned on
62+ Uses diffusion to generate sequences of positions for a robot arm, conditioned on
6163 the current state of the robot and the block it needs to push.
6264
6365 The model expects observations in pixel coordinates (0-512 range) and block angle in radians.
@@ -68,29 +70,29 @@ def __init__(self, state_dim=5, device="cuda" if torch.cuda.is_available() else
6870
6971 # define valid ranges for inputs/outputs
7072 self .stats = {'obs' : {'min' : torch .zeros (5 ), 'max' : torch .tensor ([512 , 512 , 512 , 512 , 2 * np .pi ])}, 'action' : {'min' : torch .zeros (2 ), 'max' : torch .full ((2 ,), 512 )}}
71-
73+
7274 self .obs_encoder = ObservationEncoder (state_dim ).to (device )
7375 self .obs_projection = ObservationProjection ().to (device )
74-
76+
7577 # UNet model that performs the denoising process
7678 # takes in concatenated action (2 channels) and context (32 channels) = 34 channels
7779 # outputs predicted action (2 channels for x,y coordinates)
7880 self .model = UNet1DModel (
7981 sample_size = 16 , # length of trajectory sequence
8082 in_channels = 34 ,
81- out_channels = 2 ,
82- layers_per_block = 2 , # number of layers per each UNet block
83+ out_channels = 2 ,
84+ layers_per_block = 2 , # number of layers per each UNet block
8385 block_out_channels = (128 ,), # number of output neurons per layer in each block
8486 down_block_types = ("DownBlock1D" ,), # reduce the resolution of data
8587 up_block_types = ("UpBlock1D" ,) # increase the resolution of data
8688 ).to (device )
8789
8890 # noise scheduler that controls the denoising process
8991 self .noise_scheduler = DDPMScheduler (
90- num_train_timesteps = 100 , # number of denoising steps
92+ num_train_timesteps = 100 , # number of denoising steps
9193 beta_schedule = "squaredcos_cap_v2" # type of noise schedule
92- )
93-
94+ )
95+
9496 # load pre-trained weights from HuggingFace
9597 checkpoint = torch .load (hf_hub_download ("dorsar/diffusion_policy" , "push_tblock.pt" ), map_location = device )
9698
@@ -110,50 +112,50 @@ def unnormalize_data(self, ndata, stats):
110112 def predict (self , observation ):
111113 """
112114 Generates a trajectory of robot arm positions given the current state.
113-
115+
114116 Args:
115117 observation (torch.Tensor): Current state [robot_x, robot_y, block_x, block_y, block_angle]
116118 Shape: (batch_size, 5)
117-
119+
118120 Returns:
119121 torch.Tensor: Sequence of (x,y) positions for the robot arm to follow
120122 Shape: (batch_size, 16, 2) where:
121123 - 16 is the number of steps in the trajectory
122124 - 2 is the (x,y) coordinates in pixel space (0-512)
123-
125+
124126 The function first encodes the observation, then uses it to condition a diffusion
125127 process that gradually denoises random trajectories into smooth, purposeful movements.
126128 """
127129 observation = observation .to (self .device )
128130 normalized_obs = self .normalize_data (observation , self .stats ['obs' ])
129-
131+
130132 # encode the observation into context values for the diffusion model
131133 cond = self .obs_projection (self .obs_encoder (normalized_obs ))
132134 # keeps first & second dimension sizes unchanged, and multiplies last dimension by 16
133135 cond = cond .view (normalized_obs .shape [0 ], - 1 , 1 ).expand (- 1 , - 1 , 16 )
134136
135137 # initialize action with noise - random noise that will be refined into a trajectory
136138 action = torch .randn ((observation .shape [0 ], 2 , 16 ), device = self .device )
137-
139+
138140 # denoise
139141 # at each step `t`, the current noisy trajectory (`action`) & conditioning info (context) are
140- # fed into the model to predict a denoised trajectory, then uses self.noise_scheduler.step to
142+ # fed into the model to predict a denoised trajectory, then uses self.noise_scheduler.step to
141143 # apply this prediction & slightly reduce the noise in `action` more
142144
143145 self .noise_scheduler .set_timesteps (100 )
144146 for t in self .noise_scheduler .timesteps :
145147 model_output = self .model (torch .cat ([action , cond ], dim = 1 ), t )
146148 action = self .noise_scheduler .step (
147- model_output .sample , t , action
149+ model_output .sample , t , action
148150 ).prev_sample
149-
151+
150152 action = action .transpose (1 , 2 ) # reshape to [batch, 16, 2]
151153 action = self .unnormalize_data (action , self .stats ['action' ]) # scale back to coordinates
152154 return action
153155
154156if __name__ == "__main__" :
155157 policy = DiffusionPolicy ()
156-
158+
157159 # sample of a single observation
158160 # robot arm starts in center, block is slightly left and up, rotated 90 degrees
159161 obs = torch .tensor ([[
@@ -163,10 +165,10 @@ def predict(self, observation):
163165 300.0 , # block y position
164166 np .pi / 2 # block angle (90 degrees)
165167 ]])
166-
168+
167169 action = policy .predict (obs )
168-
170+
169171 print ("Action shape:" , action .shape ) # should be [1, 16, 2] - one trajectory of 16 x,y positions
170172 print ("\n Predicted trajectory:" )
171173 for i , (x , y ) in enumerate (action [0 ]):
172- print (f"Step { i :2d} : x={ x :6.1f} , y={ y :6.1f} " )
174+ print (f"Step { i :2d} : x={ x :6.1f} , y={ y :6.1f} " )
0 commit comments