Skip to content

Commit fba7989

Browse files
authored
Add Multi-GPU implementation for PPO (#2288)
Add MultiGpuPPOPolicy class and command line options to run multi-GPU training
1 parent 861278a commit fba7989

File tree

13 files changed

+379
-51
lines changed

13 files changed

+379
-51
lines changed

ml-agents/mlagents/trainers/learn.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ def run_training(
5454
lesson = int(run_options["--lesson"])
5555
fast_simulation = not bool(run_options["--slow"])
5656
no_graphics = run_options["--no-graphics"]
57+
multi_gpu = run_options["--multi-gpu"]
5758
trainer_config_path = run_options["<trainer-config-path>"]
5859
sampler_file_path = (
5960
run_options["--sampler"] if run_options["--sampler"] != "None" else None
@@ -107,6 +108,7 @@ def run_training(
107108
lesson,
108109
run_seed,
109110
fast_simulation,
111+
multi_gpu,
110112
sampler_manager,
111113
resampling_interval,
112114
)
@@ -292,6 +294,7 @@ def main():
292294
--docker-target-name=<dt> Docker volume to store training-specific files [default: None].
293295
--no-graphics Whether to run the environment in no-graphics mode [default: False].
294296
--debug Whether to run ML-Agents in debug mode with detailed logging [default: False].
297+
--multi-gpu Whether to use multiple GPU training [default: False].
295298
"""
296299

297300
options = docopt(_USAGE)

ml-agents/mlagents/trainers/models.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,7 @@ def create_cc_actor_critic(
601601
hidden_policy,
602602
self.act_size[0],
603603
activation=None,
604+
name="mu",
604605
kernel_initializer=c_layers.variance_scaling_initializer(factor=0.01),
605606
)
606607

@@ -684,13 +685,14 @@ def create_dc_actor_critic(
684685
self.memory_out = tf.identity(memory_out, name="recurrent_out")
685686

686687
policy_branches = []
687-
for size in self.act_size:
688+
for i, size in enumerate(self.act_size):
688689
policy_branches.append(
689690
tf.layers.dense(
690691
hidden,
691692
size,
692693
activation=None,
693694
use_bias=False,
695+
name="policy_branch_" + str(i),
694696
kernel_initializer=c_layers.variance_scaling_initializer(
695697
factor=0.01
696698
),

ml-agents/mlagents/trainers/ppo/models.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,5 +138,6 @@ def create_losses(
138138
)
139139

140140
def create_ppo_optimizer(self):
141-
optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate)
142-
self.update_batch = optimizer.minimize(self.loss)
141+
self.optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate)
142+
self.grads = self.optimizer.compute_gradients(self.loss)
143+
self.update_batch = self.optimizer.minimize(self.loss)
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
import logging
2+
import numpy as np
3+
4+
import tensorflow as tf
5+
from tensorflow.python.client import device_lib
6+
from mlagents.envs.timers import timed
7+
from mlagents.trainers.models import EncoderType
8+
from mlagents.trainers.ppo.policy import PPOPolicy
9+
from mlagents.trainers.ppo.models import PPOModel
10+
from mlagents.trainers.components.reward_signals.reward_signal_factory import (
11+
create_reward_signal,
12+
)
13+
from mlagents.trainers.components.bc.module import BCModule
14+
15+
# Variable scope in which created variables will be placed under
16+
TOWER_SCOPE_NAME = "tower"
17+
18+
logger = logging.getLogger("mlagents.trainers")
19+
20+
21+
class MultiGpuPPOPolicy(PPOPolicy):
22+
def __init__(self, seed, brain, trainer_params, is_training, load):
23+
"""
24+
Policy for Proximal Policy Optimization Networks with multi-GPU training
25+
:param seed: Random seed.
26+
:param brain: Assigned Brain object.
27+
:param trainer_params: Defined training parameters.
28+
:param is_training: Whether the model should be trained.
29+
:param load: Whether a pre-trained model will be loaded or a new one created.
30+
"""
31+
super().__init__(seed, brain, trainer_params, is_training, load)
32+
33+
with self.graph.as_default():
34+
avg_grads = self.average_gradients([t.grads for t in self.towers])
35+
self.update_batch = self.model.optimizer.apply_gradients(avg_grads)
36+
37+
self.update_dict = {"update_batch": self.update_batch}
38+
self.update_dict.update(
39+
{
40+
"value_loss_" + str(i): self.towers[i].value_loss
41+
for i in range(len(self.towers))
42+
}
43+
)
44+
self.update_dict.update(
45+
{
46+
"policy_loss_" + str(i): self.towers[i].policy_loss
47+
for i in range(len(self.towers))
48+
}
49+
)
50+
51+
def create_model(self, brain, trainer_params, reward_signal_configs, seed):
52+
"""
53+
Create PPO models, one on each device
54+
:param brain: Assigned Brain object.
55+
:param trainer_params: Defined training parameters.
56+
:param reward_signal_configs: Reward signal config
57+
:param seed: Random seed.
58+
"""
59+
self.devices = get_devices()
60+
self.towers = []
61+
with self.graph.as_default():
62+
with tf.variable_scope(TOWER_SCOPE_NAME, reuse=tf.AUTO_REUSE):
63+
for device in self.devices:
64+
with tf.device(device):
65+
self.towers.append(
66+
PPOModel(
67+
brain=brain,
68+
lr=float(trainer_params["learning_rate"]),
69+
h_size=int(trainer_params["hidden_units"]),
70+
epsilon=float(trainer_params["epsilon"]),
71+
beta=float(trainer_params["beta"]),
72+
max_step=float(trainer_params["max_steps"]),
73+
normalize=trainer_params["normalize"],
74+
use_recurrent=trainer_params["use_recurrent"],
75+
num_layers=int(trainer_params["num_layers"]),
76+
m_size=self.m_size,
77+
seed=seed,
78+
stream_names=list(reward_signal_configs.keys()),
79+
vis_encode_type=EncoderType(
80+
trainer_params.get("vis_encode_type", "simple")
81+
),
82+
)
83+
)
84+
self.towers[-1].create_ppo_optimizer()
85+
self.model = self.towers[0]
86+
87+
@timed
88+
def update(self, mini_batch, num_sequences):
89+
"""
90+
Updates model using buffer.
91+
:param n_sequences: Number of trajectories in batch.
92+
:param mini_batch: Experience batch.
93+
:return: Output from update process.
94+
"""
95+
feed_dict = {}
96+
97+
device_batch_size = num_sequences // len(self.devices)
98+
device_batches = []
99+
for i in range(len(self.devices)):
100+
device_batches.append(
101+
{k: v[i : i + device_batch_size] for (k, v) in mini_batch.items()}
102+
)
103+
104+
for batch, tower in zip(device_batches, self.towers):
105+
feed_dict.update(self.construct_feed_dict(tower, batch, num_sequences))
106+
107+
out = self._execute_model(feed_dict, self.update_dict)
108+
run_out = {}
109+
run_out["value_loss"] = np.mean(
110+
[out["value_loss_" + str(i)] for i in range(len(self.towers))]
111+
)
112+
run_out["policy_loss"] = np.mean(
113+
[out["policy_loss_" + str(i)] for i in range(len(self.towers))]
114+
)
115+
run_out["update_batch"] = out["update_batch"]
116+
return run_out
117+
118+
def average_gradients(self, tower_grads):
119+
"""
120+
Average gradients from all towers
121+
:param tower_grads: Gradients from all towers
122+
"""
123+
average_grads = []
124+
for grad_and_vars in zip(*tower_grads):
125+
grads = [g for g, _ in grad_and_vars if g is not None]
126+
if not grads:
127+
continue
128+
avg_grad = tf.reduce_mean(tf.stack(grads), 0)
129+
var = grad_and_vars[0][1]
130+
average_grads.append((avg_grad, var))
131+
return average_grads
132+
133+
134+
def get_devices():
135+
"""
136+
Get all available GPU devices
137+
"""
138+
local_device_protos = device_lib.list_local_devices()
139+
devices = [x.name for x in local_device_protos if x.device_type == "GPU"]
140+
return devices

ml-agents/mlagents/trainers/ppo/policy.py

Lines changed: 59 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -30,27 +30,10 @@ def __init__(self, seed, brain, trainer_params, is_training, load):
3030

3131
reward_signal_configs = trainer_params["reward_signals"]
3232

33+
self.create_model(brain, trainer_params, reward_signal_configs, seed)
34+
3335
self.reward_signals = {}
3436
with self.graph.as_default():
35-
self.model = PPOModel(
36-
brain,
37-
lr=float(trainer_params["learning_rate"]),
38-
h_size=int(trainer_params["hidden_units"]),
39-
epsilon=float(trainer_params["epsilon"]),
40-
beta=float(trainer_params["beta"]),
41-
max_step=float(trainer_params["max_steps"]),
42-
normalize=trainer_params["normalize"],
43-
use_recurrent=trainer_params["use_recurrent"],
44-
num_layers=int(trainer_params["num_layers"]),
45-
m_size=self.m_size,
46-
seed=seed,
47-
stream_names=list(reward_signal_configs.keys()),
48-
vis_encode_type=EncoderType(
49-
trainer_params.get("vis_encode_type", "simple")
50-
),
51-
)
52-
self.model.create_ppo_optimizer()
53-
5437
# Create reward signals
5538
for reward_signal, config in reward_signal_configs.items():
5639
self.reward_signals[reward_signal] = create_reward_signal(
@@ -102,6 +85,34 @@ def __init__(self, seed, brain, trainer_params, is_training, load):
10285
"update_batch": self.model.update_batch,
10386
}
10487

88+
def create_model(self, brain, trainer_params, reward_signal_configs, seed):
89+
"""
90+
Create PPO model
91+
:param brain: Assigned Brain object.
92+
:param trainer_params: Defined training parameters.
93+
:param reward_signal_configs: Reward signal config
94+
:param seed: Random seed.
95+
"""
96+
with self.graph.as_default():
97+
self.model = PPOModel(
98+
brain=brain,
99+
lr=float(trainer_params["learning_rate"]),
100+
h_size=int(trainer_params["hidden_units"]),
101+
epsilon=float(trainer_params["epsilon"]),
102+
beta=float(trainer_params["beta"]),
103+
max_step=float(trainer_params["max_steps"]),
104+
normalize=trainer_params["normalize"],
105+
use_recurrent=trainer_params["use_recurrent"],
106+
num_layers=int(trainer_params["num_layers"]),
107+
m_size=self.m_size,
108+
seed=seed,
109+
stream_names=list(reward_signal_configs.keys()),
110+
vis_encode_type=EncoderType(
111+
trainer_params.get("vis_encode_type", "simple")
112+
),
113+
)
114+
self.model.create_ppo_optimizer()
115+
105116
@timed
106117
def evaluate(self, brain_info):
107118
"""
@@ -143,58 +154,62 @@ def update(self, mini_batch, num_sequences):
143154
:param mini_batch: Experience batch.
144155
:return: Output from update process.
145156
"""
157+
feed_dict = self.construct_feed_dict(self.model, mini_batch, num_sequences)
158+
run_out = self._execute_model(feed_dict, self.update_dict)
159+
return run_out
160+
161+
def construct_feed_dict(self, model, mini_batch, num_sequences):
146162
feed_dict = {
147-
self.model.batch_size: num_sequences,
148-
self.model.sequence_length: self.sequence_length,
149-
self.model.mask_input: mini_batch["masks"].flatten(),
150-
self.model.advantage: mini_batch["advantages"].reshape([-1, 1]),
151-
self.model.all_old_log_probs: mini_batch["action_probs"].reshape(
152-
[-1, sum(self.model.act_size)]
163+
model.batch_size: num_sequences,
164+
model.sequence_length: self.sequence_length,
165+
model.mask_input: mini_batch["masks"].flatten(),
166+
model.advantage: mini_batch["advantages"].reshape([-1, 1]),
167+
model.all_old_log_probs: mini_batch["action_probs"].reshape(
168+
[-1, sum(model.act_size)]
153169
),
154170
}
155171
for name in self.reward_signals:
156-
feed_dict[self.model.returns_holders[name]] = mini_batch[
172+
feed_dict[model.returns_holders[name]] = mini_batch[
157173
"{}_returns".format(name)
158174
].flatten()
159-
feed_dict[self.model.old_values[name]] = mini_batch[
175+
feed_dict[model.old_values[name]] = mini_batch[
160176
"{}_value_estimates".format(name)
161177
].flatten()
162178

163179
if self.use_continuous_act:
164-
feed_dict[self.model.output_pre] = mini_batch["actions_pre"].reshape(
165-
[-1, self.model.act_size[0]]
180+
feed_dict[model.output_pre] = mini_batch["actions_pre"].reshape(
181+
[-1, model.act_size[0]]
166182
)
167-
feed_dict[self.model.epsilon] = mini_batch["random_normal_epsilon"].reshape(
168-
[-1, self.model.act_size[0]]
183+
feed_dict[model.epsilon] = mini_batch["random_normal_epsilon"].reshape(
184+
[-1, model.act_size[0]]
169185
)
170186
else:
171-
feed_dict[self.model.action_holder] = mini_batch["actions"].reshape(
172-
[-1, len(self.model.act_size)]
187+
feed_dict[model.action_holder] = mini_batch["actions"].reshape(
188+
[-1, len(model.act_size)]
173189
)
174190
if self.use_recurrent:
175-
feed_dict[self.model.prev_action] = mini_batch["prev_action"].reshape(
176-
[-1, len(self.model.act_size)]
191+
feed_dict[model.prev_action] = mini_batch["prev_action"].reshape(
192+
[-1, len(model.act_size)]
177193
)
178-
feed_dict[self.model.action_masks] = mini_batch["action_mask"].reshape(
194+
feed_dict[model.action_masks] = mini_batch["action_mask"].reshape(
179195
[-1, sum(self.brain.vector_action_space_size)]
180196
)
181197
if self.use_vec_obs:
182-
feed_dict[self.model.vector_in] = mini_batch["vector_obs"].reshape(
198+
feed_dict[model.vector_in] = mini_batch["vector_obs"].reshape(
183199
[-1, self.vec_obs_size]
184200
)
185-
if self.model.vis_obs_size > 0:
186-
for i, _ in enumerate(self.model.visual_in):
201+
if model.vis_obs_size > 0:
202+
for i, _ in enumerate(model.visual_in):
187203
_obs = mini_batch["visual_obs%d" % i]
188204
if self.sequence_length > 1 and self.use_recurrent:
189205
(_batch, _seq, _w, _h, _c) = _obs.shape
190-
feed_dict[self.model.visual_in[i]] = _obs.reshape([-1, _w, _h, _c])
206+
feed_dict[model.visual_in[i]] = _obs.reshape([-1, _w, _h, _c])
191207
else:
192-
feed_dict[self.model.visual_in[i]] = _obs
208+
feed_dict[model.visual_in[i]] = _obs
193209
if self.use_recurrent:
194210
mem_in = mini_batch["memory"][:, 0, :]
195-
feed_dict[self.model.memory_in] = mem_in
196-
run_out = self._execute_model(feed_dict, self.update_dict)
197-
return run_out
211+
feed_dict[model.memory_in] = mem_in
212+
return feed_dict
198213

199214
def get_value_estimates(
200215
self, brain_info: BrainInfo, idx: int, done: bool

ml-agents/mlagents/trainers/ppo/trainer.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from mlagents.envs import AllBrainInfo, BrainInfo
1212
from mlagents.trainers.buffer import Buffer
1313
from mlagents.trainers.ppo.policy import PPOPolicy
14+
from mlagents.trainers.ppo.multi_gpu_policy import MultiGpuPPOPolicy, get_devices
1415
from mlagents.trainers.trainer import Trainer, UnityTrainerException
1516
from mlagents.envs.action_info import ActionInfoOutputs
1617

@@ -21,7 +22,15 @@ class PPOTrainer(Trainer):
2122
"""The PPOTrainer is an implementation of the PPO algorithm."""
2223

2324
def __init__(
24-
self, brain, reward_buff_cap, trainer_parameters, training, load, seed, run_id
25+
self,
26+
brain,
27+
reward_buff_cap,
28+
trainer_parameters,
29+
training,
30+
load,
31+
seed,
32+
run_id,
33+
multi_gpu,
2534
):
2635
"""
2736
Responsible for collecting experiences and training PPO model.
@@ -65,7 +74,14 @@ def __init__(
6574
)
6675

6776
self.step = 0
68-
self.policy = PPOPolicy(seed, brain, trainer_parameters, self.is_training, load)
77+
if multi_gpu and len(get_devices()) > 1:
78+
self.policy = MultiGpuPPOPolicy(
79+
seed, brain, trainer_parameters, self.is_training, load
80+
)
81+
else:
82+
self.policy = PPOPolicy(
83+
seed, brain, trainer_parameters, self.is_training, load
84+
)
6985

7086
stats = defaultdict(list)
7187
# collected_rewards is a dictionary from name of reward signal to a dictionary of agent_id to cumulative reward

0 commit comments

Comments
 (0)