From 31ab2d844bd0cef36ffe9720f14291bf7eb06368 Mon Sep 17 00:00:00 2001 From: Antonin Raffin Date: Sat, 5 Mar 2022 21:11:29 +0100 Subject: [PATCH 01/16] Start CEM --- sb3_contrib/__init__.py | 1 + sb3_contrib/cem/__init__.py | 2 + sb3_contrib/cem/cem.py | 349 ++++++++++++++++++++++++++++++++++++ sb3_contrib/cem/policies.py | 10 ++ setup.cfg | 1 + tests/test_run.py | 20 ++- 6 files changed, 379 insertions(+), 4 deletions(-) create mode 100644 sb3_contrib/cem/__init__.py create mode 100644 sb3_contrib/cem/cem.py create mode 100644 sb3_contrib/cem/policies.py diff --git a/sb3_contrib/__init__.py b/sb3_contrib/__init__.py index 2ff218d9..52ba74e8 100644 --- a/sb3_contrib/__init__.py +++ b/sb3_contrib/__init__.py @@ -1,6 +1,7 @@ import os from sb3_contrib.ars import ARS +from sb3_contrib.cem import CEM from sb3_contrib.ppo_mask import MaskablePPO from sb3_contrib.qrdqn import QRDQN from sb3_contrib.tqc import TQC diff --git a/sb3_contrib/cem/__init__.py b/sb3_contrib/cem/__init__.py new file mode 100644 index 00000000..8f48e7f5 --- /dev/null +++ b/sb3_contrib/cem/__init__.py @@ -0,0 +1,2 @@ +from sb3_contrib.cem.cem import CEM +from sb3_contrib.cem.policies import LinearPolicy, MlpPolicy diff --git a/sb3_contrib/cem/cem.py b/sb3_contrib/cem/cem.py new file mode 100644 index 00000000..d8b27193 --- /dev/null +++ b/sb3_contrib/cem/cem.py @@ -0,0 +1,349 @@ +import copy +import time +import warnings +from functools import partial +from typing import Any, Dict, Optional, Type, Union + +import gym +import numpy as np +import torch as th +import torch.nn.utils +from stable_baselines3.common.base_class import BaseAlgorithm +from stable_baselines3.common.callbacks import BaseCallback +from stable_baselines3.common.evaluation import evaluate_policy +from stable_baselines3.common.policies import BasePolicy +from stable_baselines3.common.type_aliases import GymEnv, MaybeCallback, Schedule +from stable_baselines3.common.utils import get_schedule_fn, safe_mean + +from sb3_contrib.ars.policies import ARSPolicy +from sb3_contrib.common.vec_env.async_eval import AsyncEval + + +class CEM(BaseAlgorithm): + """ + Noisy Cross Entropy Method: http://dx.doi.org/10.1162/neco.2006.18.12.2936 + http://ie.technion.ac.il/CE/files/papers/Learning%20Tetris%20Using%20the%20Noisy%20Cross-Entropy%20Method.pdf + + John Schulman's implementation: https://github.com/joschu/modular_rl/blob/master/modular_rl/cem.py + + :param policy: The policy to train, can be an instance of ``ARSPolicy``, or a string from ["LinearPolicy", "MlpPolicy"] + :param env: The environment to train on, may be a string if registered with gym + :param pop_size: Population size (number of individuals) + :param n_top: How many of the top individuals to use in each update step. Default is pop_size + :param initial_std: Initial standard deviation for the exploration noise + :param noise_multiplier: Noise decay. We add noise to the standard deviation + to avoid early collapse. + :param zero_policy: Boolean determining if the passed policy should have it's weights zeroed before training. + :param alive_bonus_offset: Constant added to the reward at each step, used to cancel out alive bonuses. + :param n_eval_episodes: Number of episodes to evaluate each candidate. + :param policy_kwargs: Keyword arguments to pass to the policy on creation + :param policy_base: Base class to use for the policy + :param tensorboard_log: String with the directory to put tensorboard logs: + :param seed: Random seed for the training + :param verbose: Verbosity level: 0 no output, 1 info, 2 debug + :param device: Torch device to use for training, defaults to "cpu" + :param _init_setup_model: Whether or not to build the network at the creation of the instance + """ + + def __init__( + self, + policy: Union[str, Type[ARSPolicy]], + env: Union[GymEnv, str], + pop_size: int = 16, + n_top: Optional[int] = None, + initial_std: Union[float, Schedule] = 0.05, + # extra_noise_std: Union[float, Schedule] = 0.05, # TODO: implement schedule + extra_noise_std: float = 0.0, + noise_multiplier: float = 0.999, + zero_policy: bool = True, + alive_bonus_offset: float = 0, + n_eval_episodes: int = 1, + policy_kwargs: Optional[Dict[str, Any]] = None, + policy_base: Type[BasePolicy] = ARSPolicy, + tensorboard_log: Optional[str] = None, + seed: Optional[int] = None, + verbose: int = 0, + device: Union[th.device, str] = "cpu", + _init_setup_model: bool = True, + ): + + super().__init__( + policy, + env, + learning_rate=0.0, + tensorboard_log=tensorboard_log, + policy_base=policy_base, + policy_kwargs=policy_kwargs, + verbose=verbose, + device=device, + supported_action_spaces=(gym.spaces.Box, gym.spaces.Discrete), + support_multi_env=True, + seed=seed, + ) + + self.pop_size = pop_size + self.initial_std = initial_std + # TODO: replace with extra std schedule + self.extra_noise_std = extra_noise_std + self.noise_multiplier = noise_multiplier + self.n_eval_episodes = n_eval_episodes + + if n_top is None: + n_top = self.pop_size + + # Make sure our hyper parameters are valid and auto correct them if they are not + if n_top > self.pop_size: + warnings.warn(f"n_top = {n_top} > pop_size = {self.pop_size}, setting n_top = pop_size") + n_top = self.pop_size + + self.n_top = n_top + + self.alive_bonus_offset = alive_bonus_offset + self.zero_policy = zero_policy + self.weights = None # Need to call init model to initialize weight + self.centroid_std = None + self.processes = None + # Keep track of how many steps where elapsed before a new rollout + # Important for syncing observation normalization between workers + self.old_count = 0 + + if _init_setup_model: + self._setup_model() + + def _setup_model(self) -> None: + self._setup_lr_schedule() + self.set_random_seed(self.seed) + + self.policy = self.policy_class(self.observation_space, self.action_space, **self.policy_kwargs) + self.policy = self.policy.to(self.device) + self.weights = th.nn.utils.parameters_to_vector(self.policy.parameters()).detach() + self.n_params = len(self.weights) + + if self.zero_policy: + self.weights = th.zeros_like(self.weights, requires_grad=False) + self.policy.load_from_vector(self.weights.cpu()) + + # TODO: implement covariance matrix + self.centroid_std = th.ones_like(self.weights, requires_grad=False) * self.initial_std + + def _mimic_monitor_wrapper(self, episode_rewards: np.ndarray, episode_lengths: np.ndarray) -> None: + """ + Helper to mimic Monitor wrapper and report episode statistics (mean reward, mean episode length). + + :param episode_rewards: List containing per-episode rewards + :param episode_lengths: List containing per-episode lengths (in number of steps) + """ + # Mimic Monitor Wrapper + infos = [ + {"episode": {"r": episode_reward, "l": episode_length}} + for episode_reward, episode_length in zip(episode_rewards, episode_lengths) + ] + + self._update_info_buffer(infos) + + def _trigger_callback( + self, + _locals: Dict[str, Any], + _globals: Dict[str, Any], + callback: BaseCallback, + n_envs: int, + ) -> None: + """ + Callback passed to the ``evaluate_policy()`` helper + in order to increment the number of timesteps + and trigger events in the single process version. + + :param _locals: + :param _globals: + :param callback: Callback that will be called at every step + :param n_envs: Number of environments + """ + self.num_timesteps += n_envs + callback.on_step() + + def evaluate_candidates( + self, candidate_weights: th.Tensor, callback: BaseCallback, async_eval: Optional[AsyncEval] + ) -> th.Tensor: + """ + Evaluate each candidate. + + :param candidate_weights: The candidate weights to be evaluated. + :param callback: Callback that will be called at each step + (or after evaluation in the multiprocess version) + :param async_eval: The object for asynchronous evaluation of candidates. + :return: The episodic return for each candidate. + """ + + batch_steps = 0 + # returns == sum of rewards + candidate_returns = th.zeros(self.pop_size, device=self.device) + train_policy = copy.deepcopy(self.policy) + # Empty buffer to show only mean over one iteration (one set of candidates) in the logs + self.ep_info_buffer = [] + callback.on_rollout_start() + + if async_eval is not None: + # Multiprocess asynchronous version + async_eval.send_jobs(candidate_weights, self.pop_size) + results = async_eval.get_results() + + for weights_idx, (episode_rewards, episode_lengths) in results: + + # Update reward to cancel out alive bonus if needed + candidate_returns[weights_idx] = sum(episode_rewards) + self.alive_bonus_offset * sum(episode_lengths) + batch_steps += np.sum(episode_lengths) + self._mimic_monitor_wrapper(episode_rewards, episode_lengths) + + # Combine the filter stats of each process for normalization + for worker_obs_rms in async_eval.get_obs_rms(): + if self._vec_normalize_env is not None: + # worker_obs_rms.count -= self.old_count + self._vec_normalize_env.obs_rms.combine(worker_obs_rms) + # Hack: don't count timesteps twice (between the two are synced) + # otherwise it will lead to overflow, + # in practice we would need two RunningMeanStats + self._vec_normalize_env.obs_rms.count -= self.old_count + + # Synchronise VecNormalize if needed + if self._vec_normalize_env is not None: + async_eval.sync_obs_rms(self._vec_normalize_env.obs_rms.copy()) + self.old_count = self._vec_normalize_env.obs_rms.count + + # Hack to have Callback events + for _ in range(batch_steps // len(async_eval.remotes)): + self.num_timesteps += len(async_eval.remotes) + callback.on_step() + else: + # Single process, synchronous version + for weights_idx in range(self.pop_size): + + # Load current candidate weights + train_policy.load_from_vector(candidate_weights[weights_idx].cpu()) + # Evaluate the candidate + episode_rewards, episode_lengths = evaluate_policy( + train_policy, + self.env, + n_eval_episodes=self.n_eval_episodes, + return_episode_rewards=True, + # Increment num_timesteps too (slight mismatch with multi envs) + callback=partial(self._trigger_callback, callback=callback, n_envs=self.env.num_envs), + warn=False, + ) + # Update reward to cancel out alive bonus if needed + candidate_returns[weights_idx] = sum(episode_rewards) + self.alive_bonus_offset * sum(episode_lengths) + batch_steps += sum(episode_lengths) + self._mimic_monitor_wrapper(episode_rewards, episode_lengths) + + # Note: we increment the num_timesteps inside the evaluate_policy() + # however when using multiple environments, there will be a slight + # mismatch between the number of timesteps used and the number + # of calls to the step() method (cf. implementation of evaluate_policy()) + # self.num_timesteps += batch_steps + + callback.on_rollout_end() + + return candidate_returns + + def _log_and_dump(self) -> None: + """ + Dump information to the logger. + """ + time_elapsed = time.time() - self.start_time + fps = int((self.num_timesteps - self._num_timesteps_at_start) / (time_elapsed + 1e-8)) + if len(self.ep_info_buffer) > 0 and len(self.ep_info_buffer[0]) > 0: + self.logger.record("rollout/ep_rew_mean", safe_mean([ep_info["r"] for ep_info in self.ep_info_buffer])) + self.logger.record("rollout/ep_len_mean", safe_mean([ep_info["l"] for ep_info in self.ep_info_buffer])) + self.logger.record("time/fps", fps) + self.logger.record("time/time_elapsed", int(time_elapsed), exclude="tensorboard") + self.logger.record("time/total_timesteps", self.num_timesteps, exclude="tensorboard") + self.logger.dump(step=self.num_timesteps) + + def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval]) -> None: + """ + Sample new candidates, evaluate them and then update current policy. + + :param callback: callback(s) called at every step with state of the algorithm. + :param async_eval: The object for asynchronous evaluation of candidates. + """ + # Retrieve current parameter noise standard deviation + # delta_std = self.delta_std_schedule(self._current_progress_remaining) + + # TODO: replace with correct update, we cannot just add stds together + # (we can add variances?) + delta_std = self.centroid_std + self.extra_noise_std + # Sample the parameter noise, it will be scaled by delta_std + # deltas = th.normal(mean=0.0, std=1.0, size=(self.n_delta, self.n_params), device=self.device) + # Generate 2 * n_delta candidate policies by adding noise to the current weights + # candidate_weights = th.cat([self.weights + policy_deltas, self.weights - policy_deltas]) + # candidate_weights = th.normal(mean=self.weights, std=self.delta_std, size=(self.pop_size, self.n_params), + # device=self.device) + policy_deltas = th.normal(mean=0.0, std=1.0, size=(self.pop_size, self.n_params), device=self.device) + candidate_weights = self.weights + policy_deltas * delta_std + + with th.no_grad(): + candidate_returns = self.evaluate_candidates(candidate_weights, callback, async_eval) + + # Keep only the top performing candidates for update + top_idx = th.argsort(candidate_returns, descending=True)[: self.n_top] + + # Update mean policy + self.weights = candidate_weights[top_idx].mean(dim=0) + self.centroid_std = candidate_weights[top_idx].std(dim=0) + self.extra_noise_std = self.extra_noise_std * self.noise_multiplier + + self.policy.load_from_vector(self.weights.cpu()) + + self.logger.record("train/iterations", self._n_updates, exclude="tensorboard") + self.logger.record("train/delta_std", delta_std.mean().item()) + # self.logger.record("train/step_size", step_size.item()) + self.logger.record("rollout/return_std", candidate_returns.std().item()) + + self._n_updates += 1 + + def learn( + self, + total_timesteps: int, + callback: MaybeCallback = None, + log_interval: int = 1, + tb_log_name: str = "CEM", + eval_env: Optional[GymEnv] = None, + eval_freq: int = -1, + n_eval_episodes: int = 5, + eval_log_path: Optional[str] = None, + reset_num_timesteps: bool = True, + async_eval: Optional[AsyncEval] = None, + ) -> "CEM": + """ + Return a trained model. + + :param total_timesteps: The total number of samples (env steps) to train on + :param callback: callback(s) called at every step with state of the algorithm. + :param log_interval: The number of timesteps before logging. + :param tb_log_name: the name of the run for TensorBoard logging + :param eval_env: Environment that will be used to evaluate the agent + :param eval_freq: Evaluate the agent every ``eval_freq`` timesteps (this may vary a little) + :param n_eval_episodes: Number of episode to evaluate the agent + :param eval_log_path: Path to a folder where the evaluations will be saved + :param reset_num_timesteps: whether or not to reset the current timestep number (used in logging) + :param async_eval: The object for asynchronous evaluation of candidates. + :return: the trained model + """ + + total_steps, callback = self._setup_learn( + total_timesteps, eval_env, callback, eval_freq, n_eval_episodes, eval_log_path, reset_num_timesteps, tb_log_name + ) + + callback.on_training_start(locals(), globals()) + + while self.num_timesteps < total_steps: + self._update_current_progress_remaining(self.num_timesteps, total_timesteps) + self._do_one_update(callback, async_eval) + if log_interval is not None and self._n_updates % log_interval == 0: + self._log_and_dump() + + if async_eval is not None: + async_eval.close() + + callback.on_training_end() + + return self diff --git a/sb3_contrib/cem/policies.py b/sb3_contrib/cem/policies.py new file mode 100644 index 00000000..9fe63532 --- /dev/null +++ b/sb3_contrib/cem/policies.py @@ -0,0 +1,10 @@ +from stable_baselines3.common.policies import register_policy + +from sb3_contrib.ars.policies import ARSLinearPolicy, ARSPolicy + +MlpPolicy = ARSPolicy +LinearPolicy = ARSLinearPolicy + + +register_policy("LinearPolicy", LinearPolicy) +register_policy("MlpPolicy", MlpPolicy) diff --git a/setup.cfg b/setup.cfg index 11009480..ad9c84af 100644 --- a/setup.cfg +++ b/setup.cfg @@ -23,6 +23,7 @@ ignore = W503,W504,E203,E231 # line breaks before and after binary operators per-file-ignores = ./sb3_contrib/__init__.py:F401 ./sb3_contrib/ars/__init__.py:F401 + ./sb3_contrib/cem/__init__.py:F401 ./sb3_contrib/ppo_mask/__init__.py:F401 ./sb3_contrib/qrdqn/__init__.py:F401 ./sb3_contrib/tqc/__init__.py:F401 diff --git a/tests/test_run.py b/tests/test_run.py index c9c85847..e55891dd 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -3,7 +3,7 @@ from stable_baselines3.common.env_util import make_vec_env from stable_baselines3.common.vec_env import VecNormalize -from sb3_contrib import ARS, QRDQN, TQC, TRPO, MaskablePPO +from sb3_contrib import ARS, CEM, QRDQN, TQC, TRPO, MaskablePPO from sb3_contrib.common.envs import InvalidActionEnvDiscrete from sb3_contrib.common.vec_env import AsyncEval @@ -92,13 +92,25 @@ def test_ars(policy_str, env_id): model.learn(total_timesteps=500, log_interval=1, eval_freq=250) -def test_ars_multi_env(): +@pytest.mark.parametrize("env_id", ["CartPole-v1", "Pendulum-v1"]) +@pytest.mark.parametrize("policy_str", ["LinearPolicy", "MlpPolicy"]) +def test_cem(policy_str, env_id): + model = CEM(policy_str, env_id, pop_size=2, verbose=1, seed=0) + model.learn(total_timesteps=500, log_interval=1, eval_freq=250) + + +@pytest.mark.parametrize("model_class", [ARS, CEM]) +def test_es_multi_env(model_class): env = make_vec_env("Pendulum-v1", n_envs=2) - model = ARS("MlpPolicy", env, n_delta=1) + kwargs = dict(n_delta=1) if model_class == ARS else dict(pop_size=2) + + model = model_class("MlpPolicy", env, **kwargs) model.learn(total_timesteps=250) + kwargs = dict(n_delta=2) if model_class == ARS else dict(pop_size=3) + env = VecNormalize(make_vec_env("Pendulum-v1", n_envs=1)) - model = ARS("MlpPolicy", env, n_delta=2, seed=0) + model = model_class("MlpPolicy", env, seed=0, **kwargs) # with parallelism async_eval = AsyncEval([lambda: VecNormalize(make_vec_env("Pendulum-v1", n_envs=1)) for _ in range(2)], model.policy) async_eval.seed(0) From e2f0694597d8c5eb468c3ae81de483b97c400b4a Mon Sep 17 00:00:00 2001 From: Antonin Raffin Date: Sun, 6 Mar 2022 14:47:33 +0100 Subject: [PATCH 02/16] Add covariance and extra noise --- sb3_contrib/cem/cem.py | 58 ++++++++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/sb3_contrib/cem/cem.py b/sb3_contrib/cem/cem.py index d8b27193..fbaab01f 100644 --- a/sb3_contrib/cem/cem.py +++ b/sb3_contrib/cem/cem.py @@ -14,6 +14,7 @@ from stable_baselines3.common.policies import BasePolicy from stable_baselines3.common.type_aliases import GymEnv, MaybeCallback, Schedule from stable_baselines3.common.utils import get_schedule_fn, safe_mean +from torch.distributions.multivariate_normal import MultivariateNormal from sb3_contrib.ars.policies import ARSPolicy from sb3_contrib.common.vec_env.async_eval import AsyncEval @@ -53,9 +54,10 @@ def __init__( n_top: Optional[int] = None, initial_std: Union[float, Schedule] = 0.05, # extra_noise_std: Union[float, Schedule] = 0.05, # TODO: implement schedule - extra_noise_std: float = 0.0, + extra_noise_std: float = 0.2, noise_multiplier: float = 0.999, - zero_policy: bool = True, + zero_policy: bool = False, + diag_cov: bool = False, alive_bonus_offset: float = 0, n_eval_episodes: int = 1, policy_kwargs: Optional[Dict[str, Any]] = None, @@ -101,7 +103,9 @@ def __init__( self.alive_bonus_offset = alive_bonus_offset self.zero_policy = zero_policy self.weights = None # Need to call init model to initialize weight - self.centroid_std = None + self.centroid_cov = None + self.diag_cov = diag_cov + self.extra_noise_matrix = None self.processes = None # Keep track of how many steps where elapsed before a new rollout # Important for syncing observation normalization between workers @@ -119,13 +123,17 @@ def _setup_model(self) -> None: self.weights = th.nn.utils.parameters_to_vector(self.policy.parameters()).detach() self.n_params = len(self.weights) + self.extra_noise_matrix = th.diag(th.ones_like(self.weights, requires_grad=False) * self.extra_noise_std**2) + # TODO: replace with initial_std if needed + initial_var = self.weights.var().item() + self.centroid_cov = th.diag(th.ones_like(self.weights, requires_grad=False) * initial_var) + # Note: only add it at sample time + # self.centroid_cov += self.extra_noise_matrix + if self.zero_policy: self.weights = th.zeros_like(self.weights, requires_grad=False) self.policy.load_from_vector(self.weights.cpu()) - # TODO: implement covariance matrix - self.centroid_std = th.ones_like(self.weights, requires_grad=False) * self.initial_std - def _mimic_monitor_wrapper(self, episode_rewards: np.ndarray, episode_lengths: np.ndarray) -> None: """ Helper to mimic Monitor wrapper and report episode statistics (mean reward, mean episode length). @@ -265,20 +273,19 @@ def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval] :param callback: callback(s) called at every step with state of the algorithm. :param async_eval: The object for asynchronous evaluation of candidates. """ - # Retrieve current parameter noise standard deviation - # delta_std = self.delta_std_schedule(self._current_progress_remaining) - - # TODO: replace with correct update, we cannot just add stds together - # (we can add variances?) - delta_std = self.centroid_std + self.extra_noise_std - # Sample the parameter noise, it will be scaled by delta_std - # deltas = th.normal(mean=0.0, std=1.0, size=(self.n_delta, self.n_params), device=self.device) - # Generate 2 * n_delta candidate policies by adding noise to the current weights - # candidate_weights = th.cat([self.weights + policy_deltas, self.weights - policy_deltas]) - # candidate_weights = th.normal(mean=self.weights, std=self.delta_std, size=(self.pop_size, self.n_params), - # device=self.device) - policy_deltas = th.normal(mean=0.0, std=1.0, size=(self.pop_size, self.n_params), device=self.device) - candidate_weights = self.weights + policy_deltas * delta_std + + if self.diag_cov: + param_noise = th.normal(mean=0.0, std=1.0, size=(self.pop_size, self.n_params), device=self.device) + if len(self.centroid_cov.shape) > 1: + policy_deltas = param_noise * th.sqrt(th.diag(self.centroid_cov) + th.diag(self.extra_noise_matrix)) + else: + policy_deltas = param_noise * th.sqrt(self.centroid_cov + th.diag(self.extra_noise_matrix)) + else: + sample_cov = self.centroid_cov + self.extra_noise_matrix + param_noise_distribution = MultivariateNormal(th.zeros_like(self.weights), covariance_matrix=sample_cov) + policy_deltas = param_noise_distribution.sample((self.pop_size,)) + + candidate_weights = self.weights + policy_deltas with th.no_grad(): candidate_returns = self.evaluate_candidates(candidate_weights, callback, async_eval) @@ -288,13 +295,20 @@ def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval] # Update mean policy self.weights = candidate_weights[top_idx].mean(dim=0) - self.centroid_std = candidate_weights[top_idx].std(dim=0) + if self.diag_cov: + # Do not compute full cov matrix when diag_cov=True + self.centroid_cov = candidate_weights[top_idx].var(dim=0) + else: + # rowvar=False in np.cov + self.centroid_cov = th.cov(candidate_weights[top_idx].transpose(-1, -2)) + self.extra_noise_std = self.extra_noise_std * self.noise_multiplier + self.extra_noise_matrix = th.diag(th.ones_like(self.weights, requires_grad=False) * self.extra_noise_std) self.policy.load_from_vector(self.weights.cpu()) self.logger.record("train/iterations", self._n_updates, exclude="tensorboard") - self.logger.record("train/delta_std", delta_std.mean().item()) + # self.logger.record("train/delta_std", delta_std.mean().item()) # self.logger.record("train/step_size", step_size.item()) self.logger.record("rollout/return_std", candidate_returns.std().item()) From dd43da3816b38137040b6a264edc919869e14926 Mon Sep 17 00:00:00 2001 From: Antonin Raffin Date: Sun, 6 Mar 2022 19:09:49 +0100 Subject: [PATCH 03/16] Update default net arch for ARS/CEM --- docs/misc/changelog.rst | 5 +++-- sb3_contrib/ars/policies.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/misc/changelog.rst b/docs/misc/changelog.rst index 621043ce..46e5c518 100644 --- a/docs/misc/changelog.rst +++ b/docs/misc/changelog.rst @@ -3,18 +3,19 @@ Changelog ========== -Release 1.4.1a1 (WIP) +Release 1.4.1a3 (WIP) ------------------------------- - Breaking Changes: ^^^^^^^^^^^^^^^^^ - Switched minimum Gym version to 0.21.0. - Upgraded to Stable-Baselines3 >= 1.4.1a1 +- Changed default policy architecture for ARS/CEM to ``[32]`` instead of ``[64, 64]`` New Features: ^^^^^^^^^^^^^ - Allow PPO to turn of advantage normalization (see `PR #61 `_) @vwxyzjn +- Added noisy Cross Entropy Method (CEM) Bug Fixes: ^^^^^^^^^^ diff --git a/sb3_contrib/ars/policies.py b/sb3_contrib/ars/policies.py index e90927d2..7e4a5659 100644 --- a/sb3_contrib/ars/policies.py +++ b/sb3_contrib/ars/policies.py @@ -36,7 +36,7 @@ def __init__( ) if net_arch is None: - net_arch = [64, 64] + net_arch = [32] self.net_arch = net_arch self.features_extractor = self.make_features_extractor() From 61518d2314bc1a0f6270be86f3e131f404646ad8 Mon Sep 17 00:00:00 2001 From: Antonin Raffin Date: Sun, 6 Mar 2022 19:10:12 +0100 Subject: [PATCH 04/16] Cleanup code --- sb3_contrib/cem/cem.py | 60 +++++++++++++++++++++-------------------- sb3_contrib/version.txt | 2 +- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/sb3_contrib/cem/cem.py b/sb3_contrib/cem/cem.py index fbaab01f..4080ebeb 100644 --- a/sb3_contrib/cem/cem.py +++ b/sb3_contrib/cem/cem.py @@ -13,7 +13,7 @@ from stable_baselines3.common.evaluation import evaluate_policy from stable_baselines3.common.policies import BasePolicy from stable_baselines3.common.type_aliases import GymEnv, MaybeCallback, Schedule -from stable_baselines3.common.utils import get_schedule_fn, safe_mean +from stable_baselines3.common.utils import safe_mean from torch.distributions.multivariate_normal import MultivariateNormal from sb3_contrib.ars.policies import ARSPolicy @@ -22,8 +22,8 @@ class CEM(BaseAlgorithm): """ - Noisy Cross Entropy Method: http://dx.doi.org/10.1162/neco.2006.18.12.2936 - http://ie.technion.ac.il/CE/files/papers/Learning%20Tetris%20Using%20the%20Noisy%20Cross-Entropy%20Method.pdf + Noisy Cross Entropy Method: http://dx.doi.org/10.1162/neco.2006.18.12.2936 + "Learning Tetris Using the Noisy Cross-Entropy Method" John Schulman's implementation: https://github.com/joschu/modular_rl/blob/master/modular_rl/cem.py @@ -52,12 +52,11 @@ def __init__( env: Union[GymEnv, str], pop_size: int = 16, n_top: Optional[int] = None, - initial_std: Union[float, Schedule] = 0.05, - # extra_noise_std: Union[float, Schedule] = 0.05, # TODO: implement schedule + initial_std: Union[float, Schedule] = 0.05, # Note: currently not used extra_noise_std: float = 0.2, noise_multiplier: float = 0.999, zero_policy: bool = False, - diag_cov: bool = False, + use_diagonal_covariance: bool = False, alive_bonus_offset: float = 0, n_eval_episodes: int = 1, policy_kwargs: Optional[Dict[str, Any]] = None, @@ -85,7 +84,6 @@ def __init__( self.pop_size = pop_size self.initial_std = initial_std - # TODO: replace with extra std schedule self.extra_noise_std = extra_noise_std self.noise_multiplier = noise_multiplier self.n_eval_episodes = n_eval_episodes @@ -102,10 +100,10 @@ def __init__( self.alive_bonus_offset = alive_bonus_offset self.zero_policy = zero_policy - self.weights = None # Need to call init model to initialize weight + self.weights = None # Need to call init model to initialize weights self.centroid_cov = None - self.diag_cov = diag_cov - self.extra_noise_matrix = None + self.use_diagonal_covariance = use_diagonal_covariance + self.extra_variance = None self.processes = None # Keep track of how many steps where elapsed before a new rollout # Important for syncing observation normalization between workers @@ -123,12 +121,13 @@ def _setup_model(self) -> None: self.weights = th.nn.utils.parameters_to_vector(self.policy.parameters()).detach() self.n_params = len(self.weights) - self.extra_noise_matrix = th.diag(th.ones_like(self.weights, requires_grad=False) * self.extra_noise_std**2) # TODO: replace with initial_std if needed - initial_var = self.weights.var().item() - self.centroid_cov = th.diag(th.ones_like(self.weights, requires_grad=False) * initial_var) - # Note: only add it at sample time - # self.centroid_cov += self.extra_noise_matrix + initial_variance = self.weights.var().item() + self.centroid_cov = th.ones_like(self.weights, requires_grad=False) * initial_variance + if not self.use_diagonal_covariance: + self.centroid_cov = th.diag(self.centroid_cov) + # Initial extra noise vector (extra variance) + self.extra_variance = th.ones_like(self.weights, requires_grad=False) * self.extra_noise_std**2 if self.zero_policy: self.weights = th.zeros_like(self.weights, requires_grad=False) @@ -274,14 +273,13 @@ def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval] :param async_eval: The object for asynchronous evaluation of candidates. """ - if self.diag_cov: + if self.use_diagonal_covariance: + # Sample using only the diagonal of the covariance matrix (+ extra noise) param_noise = th.normal(mean=0.0, std=1.0, size=(self.pop_size, self.n_params), device=self.device) - if len(self.centroid_cov.shape) > 1: - policy_deltas = param_noise * th.sqrt(th.diag(self.centroid_cov) + th.diag(self.extra_noise_matrix)) - else: - policy_deltas = param_noise * th.sqrt(self.centroid_cov + th.diag(self.extra_noise_matrix)) + policy_deltas = param_noise * th.sqrt(self.centroid_cov + self.extra_variance) else: - sample_cov = self.centroid_cov + self.extra_noise_matrix + # Sample using the full covariance matrix (+ extra noise) + sample_cov = self.centroid_cov + th.diag(self.extra_variance) param_noise_distribution = MultivariateNormal(th.zeros_like(self.weights), covariance_matrix=sample_cov) policy_deltas = param_noise_distribution.sample((self.pop_size,)) @@ -293,24 +291,28 @@ def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval] # Keep only the top performing candidates for update top_idx = th.argsort(candidate_returns, descending=True)[: self.n_top] - # Update mean policy + # Update centroid: barycenter of the best candidates self.weights = candidate_weights[top_idx].mean(dim=0) - if self.diag_cov: - # Do not compute full cov matrix when diag_cov=True + if self.use_diagonal_covariance: + # Do not compute full cov matrix when use_diagonal_covariance=True self.centroid_cov = candidate_weights[top_idx].var(dim=0) else: - # rowvar=False in np.cov + # transpose to mimic rowvar=False in np.cov() self.centroid_cov = th.cov(candidate_weights[top_idx].transpose(-1, -2)) + # Update extra variance (prevents early converge) self.extra_noise_std = self.extra_noise_std * self.noise_multiplier - self.extra_noise_matrix = th.diag(th.ones_like(self.weights, requires_grad=False) * self.extra_noise_std) + self.extra_variance = th.ones_like(self.weights, requires_grad=False) * self.extra_noise_std**2 self.policy.load_from_vector(self.weights.cpu()) - self.logger.record("train/iterations", self._n_updates, exclude="tensorboard") - # self.logger.record("train/delta_std", delta_std.mean().item()) - # self.logger.record("train/step_size", step_size.item()) self.logger.record("rollout/return_std", candidate_returns.std().item()) + self.logger.record("train/iterations", self._n_updates, exclude="tensorboard") + if self.use_diagonal_covariance: + self.logger.record("train/diag_std", th.mean(th.sqrt(self.centroid_cov)).item()) + else: + self.logger.record("train/diag_std", th.mean(th.sqrt(th.diagonal(self.centroid_cov))).item()) + self.logger.record("train/extra_noise_std", self.extra_noise_std) self._n_updates += 1 diff --git a/sb3_contrib/version.txt b/sb3_contrib/version.txt index d012e1c6..540ed038 100644 --- a/sb3_contrib/version.txt +++ b/sb3_contrib/version.txt @@ -1 +1 @@ -1.4.1a1 +1.4.1a3 From 9cf25174909339193feffae631eda16d4473e0b3 Mon Sep 17 00:00:00 2001 From: Antonin Raffin Date: Sun, 13 Mar 2022 15:07:16 +0100 Subject: [PATCH 05/16] Test with momentum --- sb3_contrib/cem/cem.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sb3_contrib/cem/cem.py b/sb3_contrib/cem/cem.py index 4080ebeb..9067f2b2 100644 --- a/sb3_contrib/cem/cem.py +++ b/sb3_contrib/cem/cem.py @@ -34,6 +34,7 @@ class CEM(BaseAlgorithm): :param initial_std: Initial standard deviation for the exploration noise :param noise_multiplier: Noise decay. We add noise to the standard deviation to avoid early collapse. + :param momentum: :param zero_policy: Boolean determining if the passed policy should have it's weights zeroed before training. :param alive_bonus_offset: Constant added to the reward at each step, used to cancel out alive bonuses. :param n_eval_episodes: Number of episodes to evaluate each candidate. @@ -55,6 +56,7 @@ def __init__( initial_std: Union[float, Schedule] = 0.05, # Note: currently not used extra_noise_std: float = 0.2, noise_multiplier: float = 0.999, + momentum: float = 0.0, zero_policy: bool = False, use_diagonal_covariance: bool = False, alive_bonus_offset: float = 0, @@ -87,6 +89,7 @@ def __init__( self.extra_noise_std = extra_noise_std self.noise_multiplier = noise_multiplier self.n_eval_episodes = n_eval_episodes + self.momentum = momentum if n_top is None: n_top = self.pop_size @@ -291,14 +294,17 @@ def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval] # Keep only the top performing candidates for update top_idx = th.argsort(candidate_returns, descending=True)[: self.n_top] - # Update centroid: barycenter of the best candidates - self.weights = candidate_weights[top_idx].mean(dim=0) + new_centroid_weights = candidate_weights[top_idx].mean(dim=0) if self.use_diagonal_covariance: # Do not compute full cov matrix when use_diagonal_covariance=True - self.centroid_cov = candidate_weights[top_idx].var(dim=0) + new_centroid_cov = candidate_weights[top_idx].var(dim=0) else: # transpose to mimic rowvar=False in np.cov() - self.centroid_cov = th.cov(candidate_weights[top_idx].transpose(-1, -2)) + new_centroid_cov = th.cov(candidate_weights[top_idx].transpose(-1, -2)) + + # Update centroid: barycenter of the best candidates, with momentum if needed + self.weights = self.momentum * self.weights + (1 - self.momentum) * new_centroid_weights + self.centroid_cov = self.momentum * self.centroid_cov + (1 - self.momentum) * new_centroid_cov # Update extra variance (prevents early converge) self.extra_noise_std = self.extra_noise_std * self.noise_multiplier From 1a6d767660876b82255a984e2169305d8f0e5468 Mon Sep 17 00:00:00 2001 From: Antonin Raffin Date: Sun, 13 Mar 2022 15:07:29 +0100 Subject: [PATCH 06/16] Update docstring --- sb3_contrib/cem/cem.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sb3_contrib/cem/cem.py b/sb3_contrib/cem/cem.py index 9067f2b2..6dc70cde 100644 --- a/sb3_contrib/cem/cem.py +++ b/sb3_contrib/cem/cem.py @@ -32,6 +32,7 @@ class CEM(BaseAlgorithm): :param pop_size: Population size (number of individuals) :param n_top: How many of the top individuals to use in each update step. Default is pop_size :param initial_std: Initial standard deviation for the exploration noise + :param extra_noise_std: Initial standard deviation for the extra noise added to the covariance matrix :param noise_multiplier: Noise decay. We add noise to the standard deviation to avoid early collapse. :param momentum: From 8a975687ff37af4fcf0e26b07992375b7a01bb6e Mon Sep 17 00:00:00 2001 From: Antonin Raffin Date: Sun, 13 Mar 2022 15:09:12 +0100 Subject: [PATCH 07/16] Revert "Test with momentum" This reverts commit 9cf25174909339193feffae631eda16d4473e0b3. --- sb3_contrib/cem/cem.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/sb3_contrib/cem/cem.py b/sb3_contrib/cem/cem.py index 6dc70cde..29d18a43 100644 --- a/sb3_contrib/cem/cem.py +++ b/sb3_contrib/cem/cem.py @@ -35,7 +35,6 @@ class CEM(BaseAlgorithm): :param extra_noise_std: Initial standard deviation for the extra noise added to the covariance matrix :param noise_multiplier: Noise decay. We add noise to the standard deviation to avoid early collapse. - :param momentum: :param zero_policy: Boolean determining if the passed policy should have it's weights zeroed before training. :param alive_bonus_offset: Constant added to the reward at each step, used to cancel out alive bonuses. :param n_eval_episodes: Number of episodes to evaluate each candidate. @@ -57,7 +56,6 @@ def __init__( initial_std: Union[float, Schedule] = 0.05, # Note: currently not used extra_noise_std: float = 0.2, noise_multiplier: float = 0.999, - momentum: float = 0.0, zero_policy: bool = False, use_diagonal_covariance: bool = False, alive_bonus_offset: float = 0, @@ -90,7 +88,6 @@ def __init__( self.extra_noise_std = extra_noise_std self.noise_multiplier = noise_multiplier self.n_eval_episodes = n_eval_episodes - self.momentum = momentum if n_top is None: n_top = self.pop_size @@ -295,17 +292,14 @@ def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval] # Keep only the top performing candidates for update top_idx = th.argsort(candidate_returns, descending=True)[: self.n_top] - new_centroid_weights = candidate_weights[top_idx].mean(dim=0) + # Update centroid: barycenter of the best candidates + self.weights = candidate_weights[top_idx].mean(dim=0) if self.use_diagonal_covariance: # Do not compute full cov matrix when use_diagonal_covariance=True - new_centroid_cov = candidate_weights[top_idx].var(dim=0) + self.centroid_cov = candidate_weights[top_idx].var(dim=0) else: # transpose to mimic rowvar=False in np.cov() - new_centroid_cov = th.cov(candidate_weights[top_idx].transpose(-1, -2)) - - # Update centroid: barycenter of the best candidates, with momentum if needed - self.weights = self.momentum * self.weights + (1 - self.momentum) * new_centroid_weights - self.centroid_cov = self.momentum * self.centroid_cov + (1 - self.momentum) * new_centroid_cov + self.centroid_cov = th.cov(candidate_weights[top_idx].transpose(-1, -2)) # Update extra variance (prevents early converge) self.extra_noise_std = self.extra_noise_std * self.noise_multiplier From dc610676809920e791fc88a97984ed3e15a709dd Mon Sep 17 00:00:00 2001 From: Antonin Raffin Date: Sun, 13 Mar 2022 15:11:16 +0100 Subject: [PATCH 08/16] Update min pytorch version --- .github/workflows/ci.yml | 2 +- setup.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b981df64..8747363a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,7 +28,7 @@ jobs: run: | python -m pip install --upgrade pip # cpu version of pytorch - pip install torch==1.8.1+cpu -f https://download.pytorch.org/whl/torch_stable.html + pip install torch==1.11.0+cpu -f https://download.pytorch.org/whl/torch_stable.html # Install dependencies for docs and tests pip install stable_baselines3[extra,tests,docs] # Install master version diff --git a/setup.py b/setup.py index e3051b5e..711b6d4b 100644 --- a/setup.py +++ b/setup.py @@ -64,6 +64,7 @@ package_data={"sb3_contrib": ["py.typed", "version.txt"]}, install_requires=[ "stable_baselines3>=1.4.1a1", + "torch>=1.11", ], description="Contrib package of Stable Baselines3, experimental code.", author="Antonin Raffin", From 855222fb787c44f83fa1e01e560b8ccc9b09be1d Mon Sep 17 00:00:00 2001 From: Antonin Raffin Date: Tue, 22 Mar 2022 12:36:05 +0100 Subject: [PATCH 09/16] Update doc and add initial std param --- sb3_contrib/cem/cem.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/sb3_contrib/cem/cem.py b/sb3_contrib/cem/cem.py index 29d18a43..3154d272 100644 --- a/sb3_contrib/cem/cem.py +++ b/sb3_contrib/cem/cem.py @@ -12,7 +12,7 @@ from stable_baselines3.common.callbacks import BaseCallback from stable_baselines3.common.evaluation import evaluate_policy from stable_baselines3.common.policies import BasePolicy -from stable_baselines3.common.type_aliases import GymEnv, MaybeCallback, Schedule +from stable_baselines3.common.type_aliases import GymEnv, MaybeCallback from stable_baselines3.common.utils import safe_mean from torch.distributions.multivariate_normal import MultivariateNormal @@ -25,13 +25,18 @@ class CEM(BaseAlgorithm): Noisy Cross Entropy Method: http://dx.doi.org/10.1162/neco.2006.18.12.2936 "Learning Tetris Using the Noisy Cross-Entropy Method" + CEM is part of the Evolution Strategies (ES), which does black-box optimization + by sampling and evaluating a population of candidates: + https://blog.otoro.net/2017/10/29/visual-evolution-strategies/ + John Schulman's implementation: https://github.com/joschu/modular_rl/blob/master/modular_rl/cem.py :param policy: The policy to train, can be an instance of ``ARSPolicy``, or a string from ["LinearPolicy", "MlpPolicy"] :param env: The environment to train on, may be a string if registered with gym :param pop_size: Population size (number of individuals) :param n_top: How many of the top individuals to use in each update step. Default is pop_size - :param initial_std: Initial standard deviation for the exploration noise + :param initial_std: Initial standard deviation for the exploration noise, + by default using Pytorch default variance at initialization. :param extra_noise_std: Initial standard deviation for the extra noise added to the covariance matrix :param noise_multiplier: Noise decay. We add noise to the standard deviation to avoid early collapse. @@ -53,7 +58,7 @@ def __init__( env: Union[GymEnv, str], pop_size: int = 16, n_top: Optional[int] = None, - initial_std: Union[float, Schedule] = 0.05, # Note: currently not used + initial_std: Optional[float] = None, extra_noise_std: float = 0.2, noise_multiplier: float = 0.999, zero_policy: bool = False, @@ -122,8 +127,12 @@ def _setup_model(self) -> None: self.weights = th.nn.utils.parameters_to_vector(self.policy.parameters()).detach() self.n_params = len(self.weights) - # TODO: replace with initial_std if needed - initial_variance = self.weights.var().item() + if self.initial_std is None: + # Use weights variance from Pytorch initialization by default + initial_variance = self.weights.var().item() + print(np.sqrt(initial_variance)) + else: + initial_variance = self.initial_std**2 self.centroid_cov = th.ones_like(self.weights, requires_grad=False) * initial_variance if not self.use_diagonal_covariance: self.centroid_cov = th.diag(self.centroid_cov) @@ -305,6 +314,7 @@ def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval] self.extra_noise_std = self.extra_noise_std * self.noise_multiplier self.extra_variance = th.ones_like(self.weights, requires_grad=False) * self.extra_noise_std**2 + # Current policy is the centroid of the best candidates self.policy.load_from_vector(self.weights.cpu()) self.logger.record("rollout/return_std", candidate_returns.std().item()) From 29ebb720f5babd30a574696cba66e6ca757ae6f6 Mon Sep 17 00:00:00 2001 From: Antonin Raffin Date: Fri, 25 Mar 2022 14:17:04 +0100 Subject: [PATCH 10/16] Remove print --- sb3_contrib/cem/cem.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sb3_contrib/cem/cem.py b/sb3_contrib/cem/cem.py index 3154d272..832fbb13 100644 --- a/sb3_contrib/cem/cem.py +++ b/sb3_contrib/cem/cem.py @@ -130,7 +130,6 @@ def _setup_model(self) -> None: if self.initial_std is None: # Use weights variance from Pytorch initialization by default initial_variance = self.weights.var().item() - print(np.sqrt(initial_variance)) else: initial_variance = self.initial_std**2 self.centroid_cov = th.ones_like(self.weights, requires_grad=False) * initial_variance From e19f223dcb85d2c992ae01ef1c7830578ada7ba7 Mon Sep 17 00:00:00 2001 From: Antonin Raffin Date: Fri, 25 Mar 2022 14:46:48 +0100 Subject: [PATCH 11/16] Refactor CEM --- sb3_contrib/ars/policies.py | 117 +------- sb3_contrib/cem/cem.py | 191 ++----------- sb3_contrib/cem/policies.py | 6 +- sb3_contrib/common/policies.py | 111 ++++++++ .../common/population_based_algorithm.py | 268 ++++++++++++++++++ 5 files changed, 408 insertions(+), 285 deletions(-) create mode 100644 sb3_contrib/common/policies.py create mode 100644 sb3_contrib/common/population_based_algorithm.py diff --git a/sb3_contrib/ars/policies.py b/sb3_contrib/ars/policies.py index 7e4a5659..3f197c01 100644 --- a/sb3_contrib/ars/policies.py +++ b/sb3_contrib/ars/policies.py @@ -1,116 +1,11 @@ -from typing import Any, Dict, List, Optional, Type - -import gym -import torch as th -from stable_baselines3.common.policies import BasePolicy, register_policy -from stable_baselines3.common.preprocessing import get_action_dim -from stable_baselines3.common.torch_layers import create_mlp -from torch import nn - - -class ARSPolicy(BasePolicy): - """ - Policy network for ARS. - - :param observation_space: The observation space of the environment - :param action_space: The action space of the environment - :param net_arch: Network architecture, defaults to a 2 layers MLP with 64 hidden nodes. - :param activation_fn: Activation function - :param squash_output: For continuous actions, whether the output is squashed - or not using a ``tanh()`` function. If not squashed with tanh the output will instead be clipped. - """ - - def __init__( - self, - observation_space: gym.spaces.Space, - action_space: gym.spaces.Space, - net_arch: Optional[List[int]] = None, - activation_fn: Type[nn.Module] = nn.ReLU, - squash_output: bool = True, - ): - - super().__init__( - observation_space, - action_space, - squash_output=isinstance(action_space, gym.spaces.Box) and squash_output, - ) - - if net_arch is None: - net_arch = [32] - - self.net_arch = net_arch - self.features_extractor = self.make_features_extractor() - self.features_dim = self.features_extractor.features_dim - self.activation_fn = activation_fn - - if isinstance(action_space, gym.spaces.Box): - action_dim = get_action_dim(action_space) - actor_net = create_mlp(self.features_dim, action_dim, net_arch, activation_fn, squash_output=True) - elif isinstance(action_space, gym.spaces.Discrete): - actor_net = create_mlp(self.features_dim, action_space.n, net_arch, activation_fn) - else: - raise NotImplementedError(f"Error: ARS policy not implemented for action space of type {type(action_space)}.") - - self.action_net = nn.Sequential(*actor_net) - - def _get_constructor_parameters(self) -> Dict[str, Any]: - # data = super()._get_constructor_parameters() this adds normalize_images, which we don't support... - data = dict( - observation_space=self.observation_space, - action_space=self.action_space, - net_arch=self.net_arch, - activation_fn=self.activation_fn, - ) - return data - - def forward(self, obs: th.Tensor) -> th.Tensor: - - features = self.extract_features(obs) - if isinstance(self.action_space, gym.spaces.Box): - return self.action_net(features) - elif isinstance(self.action_space, gym.spaces.Discrete): - logits = self.action_net(features) - return th.argmax(logits, dim=1) - else: - raise NotImplementedError() - - def _predict(self, observation: th.Tensor, deterministic: bool = True) -> th.Tensor: - # Non deterministic action does not really make sense for ARS, we ignore this parameter for now.. - return self(observation) - - -class ARSLinearPolicy(ARSPolicy): - """ - Linear policy network for ARS. - - :param observation_space: The observation space of the environment - :param action_space: The action space of the environment - :param with_bias: With or without bias on the output - :param squash_output: For continuous actions, whether the output is squashed - or not using a ``tanh()`` function. If not squashed with tanh the output will instead be clipped. - """ - - def __init__( - self, - observation_space: gym.spaces.Space, - action_space: gym.spaces.Space, - with_bias: bool = False, - squash_output: bool = False, - ): - - super().__init__(observation_space, action_space, squash_output=squash_output) - - if isinstance(action_space, gym.spaces.Box): - action_dim = get_action_dim(action_space) - self.action_net = nn.Linear(self.features_dim, action_dim, bias=with_bias) - if squash_output: - self.action_net = nn.Sequential(self.action_net, nn.Tanh()) - elif isinstance(action_space, gym.spaces.Discrete): - self.action_net = nn.Linear(self.features_dim, action_space.n, bias=with_bias) - else: - raise NotImplementedError(f"Error: ARS policy not implemented for action space of type {type(action_space)}.") +from stable_baselines3.common.policies import register_policy +from sb3_contrib.common.policies import ESLinearPolicy, ESPolicy +# Backward compat +ARSLinearPolicy = ESLinearPolicy +ARSPolicy = ESPolicy +# Aliases MlpPolicy = ARSPolicy LinearPolicy = ARSLinearPolicy diff --git a/sb3_contrib/cem/cem.py b/sb3_contrib/cem/cem.py index 832fbb13..a61c0cae 100644 --- a/sb3_contrib/cem/cem.py +++ b/sb3_contrib/cem/cem.py @@ -1,26 +1,20 @@ -import copy -import time import warnings -from functools import partial from typing import Any, Dict, Optional, Type, Union import gym -import numpy as np import torch as th import torch.nn.utils -from stable_baselines3.common.base_class import BaseAlgorithm from stable_baselines3.common.callbacks import BaseCallback -from stable_baselines3.common.evaluation import evaluate_policy from stable_baselines3.common.policies import BasePolicy from stable_baselines3.common.type_aliases import GymEnv, MaybeCallback -from stable_baselines3.common.utils import safe_mean from torch.distributions.multivariate_normal import MultivariateNormal -from sb3_contrib.ars.policies import ARSPolicy +from sb3_contrib.common.policies import ESPolicy +from sb3_contrib.common.population_based_algorithm import PopulationBasedAlgorithm from sb3_contrib.common.vec_env.async_eval import AsyncEval -class CEM(BaseAlgorithm): +class CEM(PopulationBasedAlgorithm): """ Noisy Cross Entropy Method: http://dx.doi.org/10.1162/neco.2006.18.12.2936 "Learning Tetris Using the Noisy Cross-Entropy Method" @@ -31,7 +25,7 @@ class CEM(BaseAlgorithm): John Schulman's implementation: https://github.com/joschu/modular_rl/blob/master/modular_rl/cem.py - :param policy: The policy to train, can be an instance of ``ARSPolicy``, or a string from ["LinearPolicy", "MlpPolicy"] + :param policy: The policy to train, can be an instance of ``ESPolicy``, or a string from ["LinearPolicy", "MlpPolicy"] :param env: The environment to train on, may be a string if registered with gym :param pop_size: Population size (number of individuals) :param n_top: How many of the top individuals to use in each update step. Default is pop_size @@ -54,7 +48,7 @@ class CEM(BaseAlgorithm): def __init__( self, - policy: Union[str, Type[ARSPolicy]], + policy: Union[str, Type[ESPolicy]], env: Union[GymEnv, str], pop_size: int = 16, n_top: Optional[int] = None, @@ -66,7 +60,7 @@ def __init__( alive_bonus_offset: float = 0, n_eval_episodes: int = 1, policy_kwargs: Optional[Dict[str, Any]] = None, - policy_base: Type[BasePolicy] = ARSPolicy, + policy_base: Type[BasePolicy] = ESPolicy, tensorboard_log: Optional[str] = None, seed: Optional[int] = None, verbose: int = 0, @@ -78,21 +72,21 @@ def __init__( policy, env, learning_rate=0.0, + pop_size=pop_size, + alive_bonus_offset=alive_bonus_offset, + n_eval_episodes=n_eval_episodes, tensorboard_log=tensorboard_log, policy_base=policy_base, policy_kwargs=policy_kwargs, verbose=verbose, device=device, supported_action_spaces=(gym.spaces.Box, gym.spaces.Discrete), - support_multi_env=True, seed=seed, ) - self.pop_size = pop_size self.initial_std = initial_std self.extra_noise_std = extra_noise_std self.noise_multiplier = noise_multiplier - self.n_eval_episodes = n_eval_episodes if n_top is None: n_top = self.pop_size @@ -103,17 +97,11 @@ def __init__( n_top = self.pop_size self.n_top = n_top - - self.alive_bonus_offset = alive_bonus_offset self.zero_policy = zero_policy self.weights = None # Need to call init model to initialize weights self.centroid_cov = None self.use_diagonal_covariance = use_diagonal_covariance self.extra_variance = None - self.processes = None - # Keep track of how many steps where elapsed before a new rollout - # Important for syncing observation normalization between workers - self.old_count = 0 if _init_setup_model: self._setup_model() @@ -142,138 +130,6 @@ def _setup_model(self) -> None: self.weights = th.zeros_like(self.weights, requires_grad=False) self.policy.load_from_vector(self.weights.cpu()) - def _mimic_monitor_wrapper(self, episode_rewards: np.ndarray, episode_lengths: np.ndarray) -> None: - """ - Helper to mimic Monitor wrapper and report episode statistics (mean reward, mean episode length). - - :param episode_rewards: List containing per-episode rewards - :param episode_lengths: List containing per-episode lengths (in number of steps) - """ - # Mimic Monitor Wrapper - infos = [ - {"episode": {"r": episode_reward, "l": episode_length}} - for episode_reward, episode_length in zip(episode_rewards, episode_lengths) - ] - - self._update_info_buffer(infos) - - def _trigger_callback( - self, - _locals: Dict[str, Any], - _globals: Dict[str, Any], - callback: BaseCallback, - n_envs: int, - ) -> None: - """ - Callback passed to the ``evaluate_policy()`` helper - in order to increment the number of timesteps - and trigger events in the single process version. - - :param _locals: - :param _globals: - :param callback: Callback that will be called at every step - :param n_envs: Number of environments - """ - self.num_timesteps += n_envs - callback.on_step() - - def evaluate_candidates( - self, candidate_weights: th.Tensor, callback: BaseCallback, async_eval: Optional[AsyncEval] - ) -> th.Tensor: - """ - Evaluate each candidate. - - :param candidate_weights: The candidate weights to be evaluated. - :param callback: Callback that will be called at each step - (or after evaluation in the multiprocess version) - :param async_eval: The object for asynchronous evaluation of candidates. - :return: The episodic return for each candidate. - """ - - batch_steps = 0 - # returns == sum of rewards - candidate_returns = th.zeros(self.pop_size, device=self.device) - train_policy = copy.deepcopy(self.policy) - # Empty buffer to show only mean over one iteration (one set of candidates) in the logs - self.ep_info_buffer = [] - callback.on_rollout_start() - - if async_eval is not None: - # Multiprocess asynchronous version - async_eval.send_jobs(candidate_weights, self.pop_size) - results = async_eval.get_results() - - for weights_idx, (episode_rewards, episode_lengths) in results: - - # Update reward to cancel out alive bonus if needed - candidate_returns[weights_idx] = sum(episode_rewards) + self.alive_bonus_offset * sum(episode_lengths) - batch_steps += np.sum(episode_lengths) - self._mimic_monitor_wrapper(episode_rewards, episode_lengths) - - # Combine the filter stats of each process for normalization - for worker_obs_rms in async_eval.get_obs_rms(): - if self._vec_normalize_env is not None: - # worker_obs_rms.count -= self.old_count - self._vec_normalize_env.obs_rms.combine(worker_obs_rms) - # Hack: don't count timesteps twice (between the two are synced) - # otherwise it will lead to overflow, - # in practice we would need two RunningMeanStats - self._vec_normalize_env.obs_rms.count -= self.old_count - - # Synchronise VecNormalize if needed - if self._vec_normalize_env is not None: - async_eval.sync_obs_rms(self._vec_normalize_env.obs_rms.copy()) - self.old_count = self._vec_normalize_env.obs_rms.count - - # Hack to have Callback events - for _ in range(batch_steps // len(async_eval.remotes)): - self.num_timesteps += len(async_eval.remotes) - callback.on_step() - else: - # Single process, synchronous version - for weights_idx in range(self.pop_size): - - # Load current candidate weights - train_policy.load_from_vector(candidate_weights[weights_idx].cpu()) - # Evaluate the candidate - episode_rewards, episode_lengths = evaluate_policy( - train_policy, - self.env, - n_eval_episodes=self.n_eval_episodes, - return_episode_rewards=True, - # Increment num_timesteps too (slight mismatch with multi envs) - callback=partial(self._trigger_callback, callback=callback, n_envs=self.env.num_envs), - warn=False, - ) - # Update reward to cancel out alive bonus if needed - candidate_returns[weights_idx] = sum(episode_rewards) + self.alive_bonus_offset * sum(episode_lengths) - batch_steps += sum(episode_lengths) - self._mimic_monitor_wrapper(episode_rewards, episode_lengths) - - # Note: we increment the num_timesteps inside the evaluate_policy() - # however when using multiple environments, there will be a slight - # mismatch between the number of timesteps used and the number - # of calls to the step() method (cf. implementation of evaluate_policy()) - # self.num_timesteps += batch_steps - - callback.on_rollout_end() - - return candidate_returns - - def _log_and_dump(self) -> None: - """ - Dump information to the logger. - """ - time_elapsed = time.time() - self.start_time - fps = int((self.num_timesteps - self._num_timesteps_at_start) / (time_elapsed + 1e-8)) - if len(self.ep_info_buffer) > 0 and len(self.ep_info_buffer[0]) > 0: - self.logger.record("rollout/ep_rew_mean", safe_mean([ep_info["r"] for ep_info in self.ep_info_buffer])) - self.logger.record("rollout/ep_len_mean", safe_mean([ep_info["l"] for ep_info in self.ep_info_buffer])) - self.logger.record("time/fps", fps) - self.logger.record("time/time_elapsed", int(time_elapsed), exclude="tensorboard") - self.logger.record("time/total_timesteps", self.num_timesteps, exclude="tensorboard") - self.logger.dump(step=self.num_timesteps) - def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval]) -> None: """ Sample new candidates, evaluate them and then update current policy. @@ -354,22 +210,15 @@ def learn( :param async_eval: The object for asynchronous evaluation of candidates. :return: the trained model """ - - total_steps, callback = self._setup_learn( - total_timesteps, eval_env, callback, eval_freq, n_eval_episodes, eval_log_path, reset_num_timesteps, tb_log_name + return super().learn( + total_timesteps=total_timesteps, + callback=callback, + log_interval=log_interval, + eval_env=eval_env, + eval_freq=eval_freq, + n_eval_episodes=n_eval_episodes, + tb_log_name=tb_log_name, + eval_log_path=eval_log_path, + reset_num_timesteps=reset_num_timesteps, + async_eval=async_eval, ) - - callback.on_training_start(locals(), globals()) - - while self.num_timesteps < total_steps: - self._update_current_progress_remaining(self.num_timesteps, total_timesteps) - self._do_one_update(callback, async_eval) - if log_interval is not None and self._n_updates % log_interval == 0: - self._log_and_dump() - - if async_eval is not None: - async_eval.close() - - callback.on_training_end() - - return self diff --git a/sb3_contrib/cem/policies.py b/sb3_contrib/cem/policies.py index 9fe63532..a8d651fc 100644 --- a/sb3_contrib/cem/policies.py +++ b/sb3_contrib/cem/policies.py @@ -1,9 +1,9 @@ from stable_baselines3.common.policies import register_policy -from sb3_contrib.ars.policies import ARSLinearPolicy, ARSPolicy +from sb3_contrib.common.policies import ESLinearPolicy, ESPolicy -MlpPolicy = ARSPolicy -LinearPolicy = ARSLinearPolicy +MlpPolicy = ESPolicy +LinearPolicy = ESLinearPolicy register_policy("LinearPolicy", LinearPolicy) diff --git a/sb3_contrib/common/policies.py b/sb3_contrib/common/policies.py new file mode 100644 index 00000000..1ee3fe91 --- /dev/null +++ b/sb3_contrib/common/policies.py @@ -0,0 +1,111 @@ +from typing import Any, Dict, List, Optional, Type + +import gym +import torch as th +from stable_baselines3.common.policies import BasePolicy +from stable_baselines3.common.preprocessing import get_action_dim +from stable_baselines3.common.torch_layers import create_mlp +from torch import nn + + +class ESPolicy(BasePolicy): + """ + Policy network for population based algorithms (evolution strategies). + + :param observation_space: The observation space of the environment + :param action_space: The action space of the environment + :param net_arch: Network architecture, defaults to a 2 layers MLP with 64 hidden nodes. + :param activation_fn: Activation function + :param squash_output: For continuous actions, whether the output is squashed + or not using a ``tanh()`` function. If not squashed with tanh the output will instead be clipped. + """ + + def __init__( + self, + observation_space: gym.spaces.Space, + action_space: gym.spaces.Space, + net_arch: Optional[List[int]] = None, + activation_fn: Type[nn.Module] = nn.ReLU, + squash_output: bool = True, + ): + + super().__init__( + observation_space, + action_space, + squash_output=isinstance(action_space, gym.spaces.Box) and squash_output, + ) + + if net_arch is None: + net_arch = [32] + + self.net_arch = net_arch + self.features_extractor = self.make_features_extractor() + self.features_dim = self.features_extractor.features_dim + self.activation_fn = activation_fn + + if isinstance(action_space, gym.spaces.Box): + action_dim = get_action_dim(action_space) + actor_net = create_mlp(self.features_dim, action_dim, net_arch, activation_fn, squash_output=True) + elif isinstance(action_space, gym.spaces.Discrete): + actor_net = create_mlp(self.features_dim, action_space.n, net_arch, activation_fn) + else: + raise NotImplementedError(f"Error: ES policy not implemented for action space of type {type(action_space)}.") + + self.action_net = nn.Sequential(*actor_net) + + def _get_constructor_parameters(self) -> Dict[str, Any]: + # data = super()._get_constructor_parameters() this adds normalize_images, which we don't support... + data = dict( + observation_space=self.observation_space, + action_space=self.action_space, + net_arch=self.net_arch, + activation_fn=self.activation_fn, + ) + return data + + def forward(self, obs: th.Tensor) -> th.Tensor: + + features = self.extract_features(obs) + if isinstance(self.action_space, gym.spaces.Box): + return self.action_net(features) + elif isinstance(self.action_space, gym.spaces.Discrete): + logits = self.action_net(features) + return th.argmax(logits, dim=1) + else: + raise NotImplementedError() + + def _predict(self, observation: th.Tensor, deterministic: bool = True) -> th.Tensor: + # Non deterministic action does not really make sense for ARS, we ignore this parameter for now.. + return self(observation) + + +class ESLinearPolicy(ESPolicy): + """ + Linear policy network for evolution strategies (ES). + + :param observation_space: The observation space of the environment + :param action_space: The action space of the environment + :param with_bias: With or without bias on the output + :param squash_output: For continuous actions, whether the output is squashed + or not using a ``tanh()`` function. If not squashed with tanh the output will instead be clipped. + """ + + def __init__( + self, + observation_space: gym.spaces.Space, + action_space: gym.spaces.Space, + with_bias: bool = False, + squash_output: bool = False, + ): + + super().__init__(observation_space, action_space, squash_output=squash_output) + + if isinstance(action_space, gym.spaces.Box): + action_dim = get_action_dim(action_space) + self.action_net = nn.Linear(self.features_dim, action_dim, bias=with_bias) + if squash_output: + self.action_net = nn.Sequential(self.action_net, nn.Tanh()) + elif isinstance(action_space, gym.spaces.Discrete): + self.action_net = nn.Linear(self.features_dim, action_space.n, bias=with_bias) + else: + raise NotImplementedError(f"Error: ES policy not implemented for action space of type {type(action_space)}.") diff --git a/sb3_contrib/common/population_based_algorithm.py b/sb3_contrib/common/population_based_algorithm.py new file mode 100644 index 00000000..497e3807 --- /dev/null +++ b/sb3_contrib/common/population_based_algorithm.py @@ -0,0 +1,268 @@ +import copy +import time +from functools import partial +from typing import Any, Dict, Optional, Tuple, Type, Union + +import gym +import numpy as np +import torch as th +import torch.nn.utils +from stable_baselines3.common.base_class import BaseAlgorithm +from stable_baselines3.common.callbacks import BaseCallback +from stable_baselines3.common.evaluation import evaluate_policy +from stable_baselines3.common.policies import BasePolicy +from stable_baselines3.common.type_aliases import GymEnv, MaybeCallback, Schedule +from stable_baselines3.common.utils import safe_mean + +from sb3_contrib.common.vec_env.async_eval import AsyncEval + + +class PopulationBasedAlgorithm(BaseAlgorithm): + """ + Base class for population based algorithms like + Evolution Strategies (ES), which does black-box optimization + by sampling and evaluating a population of candidates: + https://blog.otoro.net/2017/10/29/visual-evolution-strategies/ + + :param policy: The policy to train, can be an instance of ``ESPolicy``, or a string from ["LinearPolicy", "MlpPolicy"] + :param env: The environment to train on, may be a string if registered with gym + :param pop_size: Population size (number of individuals) + :param alive_bonus_offset: Constant added to the reward at each step, used to cancel out alive bonuses. + :param n_eval_episodes: Number of episodes to evaluate each candidate. + :param policy_kwargs: Keyword arguments to pass to the policy on creation + :param policy_base: Base class to use for the policy + :param tensorboard_log: String with the directory to put tensorboard logs: + :param seed: Random seed for the training + :param verbose: Verbosity level: 0 no output, 1 info, 2 debug + :param device: Torch device to use for training, defaults to "cpu" + :param _init_setup_model: Whether or not to build the network at the creation of the instance + """ + + def __init__( + self, + policy: Union[str, Type[BasePolicy]], + env: Union[GymEnv, str], + policy_base: Type[BasePolicy], + learning_rate: Union[float, Schedule], + pop_size: int = 16, + alive_bonus_offset: float = 0, + n_eval_episodes: int = 1, + policy_kwargs: Optional[Dict[str, Any]] = None, + tensorboard_log: Optional[str] = None, + seed: Optional[int] = None, + verbose: int = 0, + device: Union[th.device, str] = "cpu", + supported_action_spaces: Optional[Tuple[gym.spaces.Space, ...]] = None, + ): + + super().__init__( + policy, + env, + learning_rate=learning_rate, + tensorboard_log=tensorboard_log, + policy_base=policy_base, + policy_kwargs=policy_kwargs, + verbose=verbose, + device=device, + supported_action_spaces=supported_action_spaces, + support_multi_env=True, + seed=seed, + ) + + self.pop_size = pop_size + self.n_eval_episodes = n_eval_episodes + self.alive_bonus_offset = alive_bonus_offset + + self.processes = None + # Keep track of how many steps where elapsed before a new rollout + # Important for syncing observation normalization between workers + self.old_count = 0 + + def _mimic_monitor_wrapper(self, episode_rewards: np.ndarray, episode_lengths: np.ndarray) -> None: + """ + Helper to mimic Monitor wrapper and report episode statistics (mean reward, mean episode length). + + :param episode_rewards: List containing per-episode rewards + :param episode_lengths: List containing per-episode lengths (in number of steps) + """ + # Mimic Monitor Wrapper + infos = [ + {"episode": {"r": episode_reward, "l": episode_length}} + for episode_reward, episode_length in zip(episode_rewards, episode_lengths) + ] + + self._update_info_buffer(infos) + + def _trigger_callback( + self, + _locals: Dict[str, Any], + _globals: Dict[str, Any], + callback: BaseCallback, + n_envs: int, + ) -> None: + """ + Callback passed to the ``evaluate_policy()`` helper + in order to increment the number of timesteps + and trigger events in the single process version. + + :param _locals: + :param _globals: + :param callback: Callback that will be called at every step + :param n_envs: Number of environments + """ + self.num_timesteps += n_envs + callback.on_step() + + def evaluate_candidates( + self, candidate_weights: th.Tensor, callback: BaseCallback, async_eval: Optional[AsyncEval] + ) -> th.Tensor: + """ + Evaluate each candidate. + + :param candidate_weights: The candidate weights to be evaluated. + :param callback: Callback that will be called at each step + (or after evaluation in the multiprocess version) + :param async_eval: The object for asynchronous evaluation of candidates. + :return: The episodic return for each candidate. + """ + + batch_steps = 0 + # returns == sum of rewards + candidate_returns = th.zeros(self.pop_size, device=self.device) + train_policy = copy.deepcopy(self.policy) + # Empty buffer to show only mean over one iteration (one set of candidates) in the logs + self.ep_info_buffer = [] + callback.on_rollout_start() + + if async_eval is not None: + # Multiprocess asynchronous version + async_eval.send_jobs(candidate_weights, self.pop_size) + results = async_eval.get_results() + + for weights_idx, (episode_rewards, episode_lengths) in results: + + # Update reward to cancel out alive bonus if needed + candidate_returns[weights_idx] = sum(episode_rewards) + self.alive_bonus_offset * sum(episode_lengths) + batch_steps += np.sum(episode_lengths) + self._mimic_monitor_wrapper(episode_rewards, episode_lengths) + + # Combine the filter stats of each process for normalization + for worker_obs_rms in async_eval.get_obs_rms(): + if self._vec_normalize_env is not None: + # worker_obs_rms.count -= self.old_count + self._vec_normalize_env.obs_rms.combine(worker_obs_rms) + # Hack: don't count timesteps twice (between the two are synced) + # otherwise it will lead to overflow, + # in practice we would need two RunningMeanStats + self._vec_normalize_env.obs_rms.count -= self.old_count + + # Synchronise VecNormalize if needed + if self._vec_normalize_env is not None: + async_eval.sync_obs_rms(self._vec_normalize_env.obs_rms.copy()) + self.old_count = self._vec_normalize_env.obs_rms.count + + # Hack to have Callback events + for _ in range(batch_steps // len(async_eval.remotes)): + self.num_timesteps += len(async_eval.remotes) + callback.on_step() + else: + # Single process, synchronous version + for weights_idx in range(self.pop_size): + + # Load current candidate weights + train_policy.load_from_vector(candidate_weights[weights_idx].cpu()) + # Evaluate the candidate + episode_rewards, episode_lengths = evaluate_policy( + train_policy, + self.env, + n_eval_episodes=self.n_eval_episodes, + return_episode_rewards=True, + # Increment num_timesteps too (slight mismatch with multi envs) + callback=partial(self._trigger_callback, callback=callback, n_envs=self.env.num_envs), + warn=False, + ) + # Update reward to cancel out alive bonus if needed + candidate_returns[weights_idx] = sum(episode_rewards) + self.alive_bonus_offset * sum(episode_lengths) + batch_steps += sum(episode_lengths) + self._mimic_monitor_wrapper(episode_rewards, episode_lengths) + + # Note: we increment the num_timesteps inside the evaluate_policy() + # however when using multiple environments, there will be a slight + # mismatch between the number of timesteps used and the number + # of calls to the step() method (cf. implementation of evaluate_policy()) + # self.num_timesteps += batch_steps + + callback.on_rollout_end() + + return candidate_returns + + def _log_and_dump(self) -> None: + """ + Dump information to the logger. + """ + time_elapsed = time.time() - self.start_time + fps = int((self.num_timesteps - self._num_timesteps_at_start) / (time_elapsed + 1e-8)) + if len(self.ep_info_buffer) > 0 and len(self.ep_info_buffer[0]) > 0: + self.logger.record("rollout/ep_rew_mean", safe_mean([ep_info["r"] for ep_info in self.ep_info_buffer])) + self.logger.record("rollout/ep_len_mean", safe_mean([ep_info["l"] for ep_info in self.ep_info_buffer])) + self.logger.record("time/fps", fps) + self.logger.record("time/time_elapsed", int(time_elapsed), exclude="tensorboard") + self.logger.record("time/total_timesteps", self.num_timesteps, exclude="tensorboard") + self.logger.dump(step=self.num_timesteps) + + def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval]) -> None: + """ + Sample new candidates, evaluate them and then update current policy. + + :param callback: callback(s) called at every step with state of the algorithm. + :param async_eval: The object for asynchronous evaluation of candidates. + """ + raise NotImplementedError() + + def learn( + self, + total_timesteps: int, + callback: MaybeCallback = None, + log_interval: int = 1, + tb_log_name: str = "ES", + eval_env: Optional[GymEnv] = None, + eval_freq: int = -1, + n_eval_episodes: int = 5, + eval_log_path: Optional[str] = None, + reset_num_timesteps: bool = True, + async_eval: Optional[AsyncEval] = None, + ) -> "PopulationBasedAlgorithm": + """ + Return a trained model. + + :param total_timesteps: The total number of samples (env steps) to train on + :param callback: callback(s) called at every step with state of the algorithm. + :param log_interval: The number of timesteps before logging. + :param tb_log_name: the name of the run for TensorBoard logging + :param eval_env: Environment that will be used to evaluate the agent + :param eval_freq: Evaluate the agent every ``eval_freq`` timesteps (this may vary a little) + :param n_eval_episodes: Number of episode to evaluate the agent + :param eval_log_path: Path to a folder where the evaluations will be saved + :param reset_num_timesteps: whether or not to reset the current timestep number (used in logging) + :param async_eval: The object for asynchronous evaluation of candidates. + :return: the trained model + """ + + total_steps, callback = self._setup_learn( + total_timesteps, eval_env, callback, eval_freq, n_eval_episodes, eval_log_path, reset_num_timesteps, tb_log_name + ) + + callback.on_training_start(locals(), globals()) + + while self.num_timesteps < total_steps: + self._update_current_progress_remaining(self.num_timesteps, total_timesteps) + self._do_one_update(callback, async_eval) + if log_interval is not None and self._n_updates % log_interval == 0: + self._log_and_dump() + + if async_eval is not None: + async_eval.close() + + callback.on_training_end() + + return self From 38852183cb7becb2fc76d9154459d79b7b353b06 Mon Sep 17 00:00:00 2001 From: Antonin Raffin Date: Fri, 25 Mar 2022 14:52:32 +0100 Subject: [PATCH 12/16] Refactor ARS --- sb3_contrib/ars/ars.py | 188 +++++------------------------------------ 1 file changed, 20 insertions(+), 168 deletions(-) diff --git a/sb3_contrib/ars/ars.py b/sb3_contrib/ars/ars.py index 8fed745b..0337de13 100644 --- a/sb3_contrib/ars/ars.py +++ b/sb3_contrib/ars/ars.py @@ -1,25 +1,20 @@ -import copy -import time import warnings -from functools import partial from typing import Any, Dict, Optional, Type, Union import gym -import numpy as np import torch as th import torch.nn.utils -from stable_baselines3.common.base_class import BaseAlgorithm from stable_baselines3.common.callbacks import BaseCallback -from stable_baselines3.common.evaluation import evaluate_policy from stable_baselines3.common.policies import BasePolicy from stable_baselines3.common.type_aliases import GymEnv, MaybeCallback, Schedule -from stable_baselines3.common.utils import get_schedule_fn, safe_mean +from stable_baselines3.common.utils import get_schedule_fn -from sb3_contrib.ars.policies import ARSPolicy +from sb3_contrib.common.policies import ESPolicy +from sb3_contrib.common.population_based_algorithm import PopulationBasedAlgorithm from sb3_contrib.common.vec_env.async_eval import AsyncEval -class ARS(BaseAlgorithm): +class ARS(PopulationBasedAlgorithm): """ Augmented Random Search: https://arxiv.org/abs/1803.07055 @@ -47,7 +42,7 @@ class ARS(BaseAlgorithm): def __init__( self, - policy: Union[str, Type[ARSPolicy]], + policy: Union[str, Type[ESPolicy]], env: Union[GymEnv, str], n_delta: int = 8, n_top: Optional[int] = None, @@ -57,7 +52,7 @@ def __init__( alive_bonus_offset: float = 0, n_eval_episodes: int = 1, policy_kwargs: Optional[Dict[str, Any]] = None, - policy_base: Type[BasePolicy] = ARSPolicy, + policy_base: Type[BasePolicy] = ESPolicy, tensorboard_log: Optional[str] = None, seed: Optional[int] = None, verbose: int = 0, @@ -69,20 +64,20 @@ def __init__( policy, env, learning_rate=learning_rate, + pop_size=2 * n_delta, + alive_bonus_offset=alive_bonus_offset, + n_eval_episodes=n_eval_episodes, tensorboard_log=tensorboard_log, policy_base=policy_base, policy_kwargs=policy_kwargs, verbose=verbose, device=device, supported_action_spaces=(gym.spaces.Box, gym.spaces.Discrete), - support_multi_env=True, seed=seed, ) self.n_delta = n_delta - self.pop_size = 2 * n_delta self.delta_std_schedule = get_schedule_fn(delta_std) - self.n_eval_episodes = n_eval_episodes if n_top is None: n_top = n_delta @@ -94,13 +89,8 @@ def __init__( self.n_top = n_top - self.alive_bonus_offset = alive_bonus_offset self.zero_policy = zero_policy self.weights = None # Need to call init model to initialize weight - self.processes = None - # Keep track of how many steps where elapsed before a new rollout - # Important for syncing observation normalization between workers - self.old_count = 0 if _init_setup_model: self._setup_model() @@ -118,138 +108,6 @@ def _setup_model(self) -> None: self.weights = th.zeros_like(self.weights, requires_grad=False) self.policy.load_from_vector(self.weights.cpu()) - def _mimic_monitor_wrapper(self, episode_rewards: np.ndarray, episode_lengths: np.ndarray) -> None: - """ - Helper to mimic Monitor wrapper and report episode statistics (mean reward, mean episode length). - - :param episode_rewards: List containing per-episode rewards - :param episode_lengths: List containing per-episode lengths (in number of steps) - """ - # Mimic Monitor Wrapper - infos = [ - {"episode": {"r": episode_reward, "l": episode_length}} - for episode_reward, episode_length in zip(episode_rewards, episode_lengths) - ] - - self._update_info_buffer(infos) - - def _trigger_callback( - self, - _locals: Dict[str, Any], - _globals: Dict[str, Any], - callback: BaseCallback, - n_envs: int, - ) -> None: - """ - Callback passed to the ``evaluate_policy()`` helper - in order to increment the number of timesteps - and trigger events in the single process version. - - :param _locals: - :param _globals: - :param callback: Callback that will be called at every step - :param n_envs: Number of environments - """ - self.num_timesteps += n_envs - callback.on_step() - - def evaluate_candidates( - self, candidate_weights: th.Tensor, callback: BaseCallback, async_eval: Optional[AsyncEval] - ) -> th.Tensor: - """ - Evaluate each candidate. - - :param candidate_weights: The candidate weights to be evaluated. - :param callback: Callback that will be called at each step - (or after evaluation in the multiprocess version) - :param async_eval: The object for asynchronous evaluation of candidates. - :return: The episodic return for each candidate. - """ - - batch_steps = 0 - # returns == sum of rewards - candidate_returns = th.zeros(self.pop_size, device=self.device) - train_policy = copy.deepcopy(self.policy) - # Empty buffer to show only mean over one iteration (one set of candidates) in the logs - self.ep_info_buffer = [] - callback.on_rollout_start() - - if async_eval is not None: - # Multiprocess asynchronous version - async_eval.send_jobs(candidate_weights, self.pop_size) - results = async_eval.get_results() - - for weights_idx, (episode_rewards, episode_lengths) in results: - - # Update reward to cancel out alive bonus if needed - candidate_returns[weights_idx] = sum(episode_rewards) + self.alive_bonus_offset * sum(episode_lengths) - batch_steps += np.sum(episode_lengths) - self._mimic_monitor_wrapper(episode_rewards, episode_lengths) - - # Combine the filter stats of each process for normalization - for worker_obs_rms in async_eval.get_obs_rms(): - if self._vec_normalize_env is not None: - # worker_obs_rms.count -= self.old_count - self._vec_normalize_env.obs_rms.combine(worker_obs_rms) - # Hack: don't count timesteps twice (between the two are synced) - # otherwise it will lead to overflow, - # in practice we would need two RunningMeanStats - self._vec_normalize_env.obs_rms.count -= self.old_count - - # Synchronise VecNormalize if needed - if self._vec_normalize_env is not None: - async_eval.sync_obs_rms(self._vec_normalize_env.obs_rms.copy()) - self.old_count = self._vec_normalize_env.obs_rms.count - - # Hack to have Callback events - for _ in range(batch_steps // len(async_eval.remotes)): - self.num_timesteps += len(async_eval.remotes) - callback.on_step() - else: - # Single process, synchronous version - for weights_idx in range(self.pop_size): - - # Load current candidate weights - train_policy.load_from_vector(candidate_weights[weights_idx].cpu()) - # Evaluate the candidate - episode_rewards, episode_lengths = evaluate_policy( - train_policy, - self.env, - n_eval_episodes=self.n_eval_episodes, - return_episode_rewards=True, - # Increment num_timesteps too (slight mismatch with multi envs) - callback=partial(self._trigger_callback, callback=callback, n_envs=self.env.num_envs), - warn=False, - ) - # Update reward to cancel out alive bonus if needed - candidate_returns[weights_idx] = sum(episode_rewards) + self.alive_bonus_offset * sum(episode_lengths) - batch_steps += sum(episode_lengths) - self._mimic_monitor_wrapper(episode_rewards, episode_lengths) - - # Note: we increment the num_timesteps inside the evaluate_policy() - # however when using multiple environments, there will be a slight - # mismatch between the number of timesteps used and the number - # of calls to the step() method (cf. implementation of evaluate_policy()) - # self.num_timesteps += batch_steps - - callback.on_rollout_end() - - return candidate_returns - - def _log_and_dump(self) -> None: - """ - Dump information to the logger. - """ - time_elapsed = time.time() - self.start_time - fps = int((self.num_timesteps - self._num_timesteps_at_start) / (time_elapsed + 1e-8)) - if len(self.ep_info_buffer) > 0 and len(self.ep_info_buffer[0]) > 0: - self.logger.record("rollout/ep_rew_mean", safe_mean([ep_info["r"] for ep_info in self.ep_info_buffer])) - self.logger.record("rollout/ep_len_mean", safe_mean([ep_info["l"] for ep_info in self.ep_info_buffer])) - self.logger.record("time/fps", fps) - self.logger.record("time/time_elapsed", int(time_elapsed), exclude="tensorboard") - self.logger.record("time/total_timesteps", self.num_timesteps, exclude="tensorboard") - self.logger.dump(step=self.num_timesteps) - def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval]) -> None: """ Sample new candidates, evaluate them and then update current policy. @@ -327,21 +185,15 @@ def learn( :return: the trained model """ - total_steps, callback = self._setup_learn( - total_timesteps, eval_env, callback, eval_freq, n_eval_episodes, eval_log_path, reset_num_timesteps, tb_log_name + return super().learn( + total_timesteps=total_timesteps, + callback=callback, + log_interval=log_interval, + eval_env=eval_env, + eval_freq=eval_freq, + n_eval_episodes=n_eval_episodes, + tb_log_name=tb_log_name, + eval_log_path=eval_log_path, + reset_num_timesteps=reset_num_timesteps, + async_eval=async_eval, ) - - callback.on_training_start(locals(), globals()) - - while self.num_timesteps < total_steps: - self._update_current_progress_remaining(self.num_timesteps, total_timesteps) - self._do_one_update(callback, async_eval) - if log_interval is not None and self._n_updates % log_interval == 0: - self._log_and_dump() - - if async_eval is not None: - async_eval.close() - - callback.on_training_end() - - return self From 3beb0021a65f323cee9e28ae743fc7c831551d4e Mon Sep 17 00:00:00 2001 From: Antonin Raffin Date: Tue, 25 Oct 2022 15:05:05 +0200 Subject: [PATCH 13/16] Fix elasped time --- sb3_contrib/cem/cem.py | 2 +- sb3_contrib/common/population_based_algorithm.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sb3_contrib/cem/cem.py b/sb3_contrib/cem/cem.py index 8130f305..7a3b3637 100644 --- a/sb3_contrib/cem/cem.py +++ b/sb3_contrib/cem/cem.py @@ -181,7 +181,7 @@ def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval] self._n_updates += 1 def learn( - self, + self: CEMSelf, total_timesteps: int, callback: MaybeCallback = None, log_interval: int = 1, diff --git a/sb3_contrib/common/population_based_algorithm.py b/sb3_contrib/common/population_based_algorithm.py index 018525d3..47154ef2 100644 --- a/sb3_contrib/common/population_based_algorithm.py +++ b/sb3_contrib/common/population_based_algorithm.py @@ -1,4 +1,5 @@ import copy +import sys import time from functools import partial from typing import Any, Dict, Optional, Tuple, Type, TypeVar, Union @@ -204,7 +205,7 @@ def _log_and_dump(self) -> None: """ Dump information to the logger. """ - time_elapsed = time.time() - self.start_time + time_elapsed = max((time.time_ns() - self.start_time) / 1e9, sys.float_info.epsilon) fps = int((self.num_timesteps - self._num_timesteps_at_start) / (time_elapsed + 1e-8)) if len(self.ep_info_buffer) > 0 and len(self.ep_info_buffer[0]) > 0: self.logger.record("rollout/ep_rew_mean", safe_mean([ep_info["r"] for ep_info in self.ep_info_buffer])) From 28448a0b656827b484a7dbeca0d848bca10454af Mon Sep 17 00:00:00 2001 From: Antonin RAFFIN Date: Mon, 11 Aug 2025 10:32:43 +0200 Subject: [PATCH 14/16] Upgrade to latest SB3 version --- sb3_contrib/__init__.py | 1 + sb3_contrib/ars/ars.py | 21 ++--- sb3_contrib/cem/__init__.py | 2 + sb3_contrib/cem/cem.py | 41 ++++++---- sb3_contrib/common/policies.py | 57 ++++++------- .../common/population_based_algorithm.py | 80 ++++++++++++++----- tests/test_run.py | 4 +- 7 files changed, 122 insertions(+), 84 deletions(-) diff --git a/sb3_contrib/__init__.py b/sb3_contrib/__init__.py index 620a6a6a..ab86a3a7 100644 --- a/sb3_contrib/__init__.py +++ b/sb3_contrib/__init__.py @@ -16,6 +16,7 @@ __all__ = [ "ARS", + "CEM", "QRDQN", "TQC", "TRPO", diff --git a/sb3_contrib/ars/ars.py b/sb3_contrib/ars/ars.py index b6360166..efb76d6e 100644 --- a/sb3_contrib/ars/ars.py +++ b/sb3_contrib/ars/ars.py @@ -1,20 +1,14 @@ import warnings -from functools import partial from typing import Any, ClassVar, Optional, TypeVar, Union -import numpy as np import torch as th -import torch.nn.utils from gymnasium import spaces -from stable_baselines3.common.base_class import BaseAlgorithm from stable_baselines3.common.callbacks import BaseCallback -from stable_baselines3.common.evaluation import evaluate_policy from stable_baselines3.common.policies import BasePolicy -from stable_baselines3.common.save_util import load_from_zip_file from stable_baselines3.common.type_aliases import GymEnv, MaybeCallback, Schedule -from stable_baselines3.common.utils import FloatSchedule, safe_mean +from stable_baselines3.common.utils import FloatSchedule -from sb3_contrib.common.policies import ESPolicy +from sb3_contrib.common.policies import ESLinearPolicy, ESPolicy from sb3_contrib.common.population_based_algorithm import PopulationBasedAlgorithm from sb3_contrib.common.vec_env.async_eval import AsyncEval @@ -49,8 +43,8 @@ class ARS(PopulationBasedAlgorithm): """ policy_aliases: ClassVar[dict[str, type[BasePolicy]]] = { - "MlpPolicy": MlpPolicy, - "LinearPolicy": LinearPolicy, + "MlpPolicy": ESPolicy, + "LinearPolicy": ESLinearPolicy, } def __init__( @@ -85,7 +79,6 @@ def __init__( verbose=verbose, device=device, supported_action_spaces=(spaces.Box, spaces.Discrete), - support_multi_env=True, seed=seed, ) @@ -119,7 +112,7 @@ def _setup_model(self) -> None: if self.zero_policy: self.weights = th.zeros_like(self.weights, requires_grad=False) - self.policy.load_from_vector(self.weights.cpu()) + self.policy.load_from_vector(self.weights.cpu().numpy()) def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval]) -> None: """ @@ -159,7 +152,7 @@ def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval] step_size = learning_rate / (self.n_top * return_std + 1e-6) # Approximate gradient step self.weights = self.weights + step_size * ((plus_returns - minus_returns) @ deltas) - self.policy.load_from_vector(self.weights.cpu()) + self.policy.load_from_vector(self.weights.cpu().numpy()) self.logger.record("train/iterations", self._n_updates, exclude="tensorboard") self.logger.record("train/delta_std", delta_std) @@ -169,7 +162,7 @@ def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval] self._n_updates += 1 - def learn( + def learn( # type: ignore[override] self: SelfARS, total_timesteps: int, callback: MaybeCallback = None, diff --git a/sb3_contrib/cem/__init__.py b/sb3_contrib/cem/__init__.py index 8f48e7f5..2d8ee25e 100644 --- a/sb3_contrib/cem/__init__.py +++ b/sb3_contrib/cem/__init__.py @@ -1,2 +1,4 @@ from sb3_contrib.cem.cem import CEM from sb3_contrib.cem.policies import LinearPolicy, MlpPolicy + +__all__ = ["CEM", "LinearPolicy", "MlpPolicy"] diff --git a/sb3_contrib/cem/cem.py b/sb3_contrib/cem/cem.py index 7a3b3637..cab96cf2 100644 --- a/sb3_contrib/cem/cem.py +++ b/sb3_contrib/cem/cem.py @@ -1,18 +1,18 @@ import warnings -from typing import Any, Dict, Optional, Type, TypeVar, Union +from typing import Any, ClassVar, Optional, TypeVar, Union -import gym import torch as th -import torch.nn.utils +from gymnasium import spaces from stable_baselines3.common.callbacks import BaseCallback +from stable_baselines3.common.policies import BasePolicy from stable_baselines3.common.type_aliases import GymEnv, MaybeCallback from torch.distributions.multivariate_normal import MultivariateNormal -from sb3_contrib.common.policies import ESPolicy +from sb3_contrib.common.policies import ESLinearPolicy, ESPolicy from sb3_contrib.common.population_based_algorithm import PopulationBasedAlgorithm from sb3_contrib.common.vec_env.async_eval import AsyncEval -CEMSelf = TypeVar("CEMSelf", bound="CEM") +SelfCEM = TypeVar("SelfCEM", bound="CEM") class CEM(PopulationBasedAlgorithm): @@ -39,6 +39,8 @@ class CEM(PopulationBasedAlgorithm): :param alive_bonus_offset: Constant added to the reward at each step, used to cancel out alive bonuses. :param n_eval_episodes: Number of episodes to evaluate each candidate. :param policy_kwargs: Keyword arguments to pass to the policy on creation + :param stats_window_size: Window size for the rollout logging, specifying the number of episodes to average + the reported success rate, mean episode length, and mean reward over :param tensorboard_log: String with the directory to put tensorboard logs: :param seed: Random seed for the training :param verbose: Verbosity level: 0 no output, 1 info, 2 debug @@ -46,9 +48,17 @@ class CEM(PopulationBasedAlgorithm): :param _init_setup_model: Whether or not to build the network at the creation of the instance """ + policy_aliases: ClassVar[dict[str, type[BasePolicy]]] = { + "MlpPolicy": ESPolicy, + "LinearPolicy": ESLinearPolicy, + } + weights: th.Tensor # Need to call init model to initialize weights + centroid_cov: th.Tensor + extra_variance: th.Tensor + def __init__( self, - policy: Union[str, Type[ESPolicy]], + policy: Union[str, type[ESPolicy]], env: Union[GymEnv, str], pop_size: int = 16, n_top: Optional[int] = None, @@ -59,7 +69,8 @@ def __init__( use_diagonal_covariance: bool = False, alive_bonus_offset: float = 0, n_eval_episodes: int = 1, - policy_kwargs: Optional[Dict[str, Any]] = None, + policy_kwargs: Optional[dict[str, Any]] = None, + stats_window_size: int = 100, tensorboard_log: Optional[str] = None, seed: Optional[int] = None, verbose: int = 0, @@ -74,11 +85,12 @@ def __init__( pop_size=pop_size, alive_bonus_offset=alive_bonus_offset, n_eval_episodes=n_eval_episodes, + stats_window_size=stats_window_size, tensorboard_log=tensorboard_log, policy_kwargs=policy_kwargs, verbose=verbose, device=device, - supported_action_spaces=(gym.spaces.Box, gym.spaces.Discrete), + supported_action_spaces=(spaces.Box, spaces.Discrete), seed=seed, ) @@ -96,10 +108,7 @@ def __init__( self.n_top = n_top self.zero_policy = zero_policy - self.weights = None # Need to call init model to initialize weights - self.centroid_cov = None self.use_diagonal_covariance = use_diagonal_covariance - self.extra_variance = None if _init_setup_model: self._setup_model() @@ -126,7 +135,7 @@ def _setup_model(self) -> None: if self.zero_policy: self.weights = th.zeros_like(self.weights, requires_grad=False) - self.policy.load_from_vector(self.weights.cpu()) + self.policy.load_from_vector(self.weights.cpu().numpy()) def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval]) -> None: """ @@ -168,7 +177,7 @@ def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval] self.extra_variance = th.ones_like(self.weights, requires_grad=False) * self.extra_noise_std**2 # Current policy is the centroid of the best candidates - self.policy.load_from_vector(self.weights.cpu()) + self.policy.load_from_vector(self.weights.cpu().numpy()) self.logger.record("rollout/return_std", candidate_returns.std().item()) self.logger.record("train/iterations", self._n_updates, exclude="tensorboard") @@ -180,8 +189,8 @@ def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval] self._n_updates += 1 - def learn( - self: CEMSelf, + def learn( # type: ignore[override] + self: SelfCEM, total_timesteps: int, callback: MaybeCallback = None, log_interval: int = 1, @@ -189,7 +198,7 @@ def learn( reset_num_timesteps: bool = True, async_eval: Optional[AsyncEval] = None, progress_bar: bool = False, - ) -> CEMSelf: + ) -> SelfCEM: """ Return a trained model. diff --git a/sb3_contrib/common/policies.py b/sb3_contrib/common/policies.py index 1ee3fe91..6a285a57 100644 --- a/sb3_contrib/common/policies.py +++ b/sb3_contrib/common/policies.py @@ -1,10 +1,11 @@ -from typing import Any, Dict, List, Optional, Type +from typing import Any, Optional -import gym import torch as th +from gymnasium import spaces from stable_baselines3.common.policies import BasePolicy from stable_baselines3.common.preprocessing import get_action_dim from stable_baselines3.common.torch_layers import create_mlp +from stable_baselines3.common.type_aliases import PyTorchObs from torch import nn @@ -16,23 +17,25 @@ class ESPolicy(BasePolicy): :param action_space: The action space of the environment :param net_arch: Network architecture, defaults to a 2 layers MLP with 64 hidden nodes. :param activation_fn: Activation function + :param with_bias: If set to False, the layers will not learn an additive bias :param squash_output: For continuous actions, whether the output is squashed or not using a ``tanh()`` function. If not squashed with tanh the output will instead be clipped. """ def __init__( self, - observation_space: gym.spaces.Space, - action_space: gym.spaces.Space, - net_arch: Optional[List[int]] = None, - activation_fn: Type[nn.Module] = nn.ReLU, + observation_space: spaces.Space, + action_space: spaces.Space, + net_arch: Optional[list[int]] = None, + activation_fn: type[nn.Module] = nn.ReLU, + with_bias: bool = True, squash_output: bool = True, ): super().__init__( observation_space, action_space, - squash_output=isinstance(action_space, gym.spaces.Box) and squash_output, + squash_output=isinstance(action_space, spaces.Box) and squash_output, ) if net_arch is None: @@ -43,17 +46,19 @@ def __init__( self.features_dim = self.features_extractor.features_dim self.activation_fn = activation_fn - if isinstance(action_space, gym.spaces.Box): + if isinstance(action_space, spaces.Box): action_dim = get_action_dim(action_space) - actor_net = create_mlp(self.features_dim, action_dim, net_arch, activation_fn, squash_output=True) - elif isinstance(action_space, gym.spaces.Discrete): - actor_net = create_mlp(self.features_dim, action_space.n, net_arch, activation_fn) + actor_net = create_mlp( + self.features_dim, action_dim, net_arch, activation_fn, with_bias=with_bias, squash_output=squash_output + ) + elif isinstance(action_space, spaces.Discrete): + actor_net = create_mlp(self.features_dim, int(action_space.n), net_arch, activation_fn, with_bias=with_bias) else: raise NotImplementedError(f"Error: ES policy not implemented for action space of type {type(action_space)}.") self.action_net = nn.Sequential(*actor_net) - def _get_constructor_parameters(self) -> Dict[str, Any]: + def _get_constructor_parameters(self) -> dict[str, Any]: # data = super()._get_constructor_parameters() this adds normalize_images, which we don't support... data = dict( observation_space=self.observation_space, @@ -63,18 +68,17 @@ def _get_constructor_parameters(self) -> Dict[str, Any]: ) return data - def forward(self, obs: th.Tensor) -> th.Tensor: - - features = self.extract_features(obs) - if isinstance(self.action_space, gym.spaces.Box): + def forward(self, obs: PyTorchObs) -> th.Tensor: + features = self.extract_features(obs, self.features_extractor) + if isinstance(self.action_space, spaces.Box): return self.action_net(features) - elif isinstance(self.action_space, gym.spaces.Discrete): + elif isinstance(self.action_space, spaces.Discrete): logits = self.action_net(features) return th.argmax(logits, dim=1) else: raise NotImplementedError() - def _predict(self, observation: th.Tensor, deterministic: bool = True) -> th.Tensor: + def _predict(self, observation: PyTorchObs, deterministic: bool = True) -> th.Tensor: # Non deterministic action does not really make sense for ARS, we ignore this parameter for now.. return self(observation) @@ -92,20 +96,9 @@ class ESLinearPolicy(ESPolicy): def __init__( self, - observation_space: gym.spaces.Space, - action_space: gym.spaces.Space, + observation_space: spaces.Space, + action_space: spaces.Space, with_bias: bool = False, squash_output: bool = False, ): - - super().__init__(observation_space, action_space, squash_output=squash_output) - - if isinstance(action_space, gym.spaces.Box): - action_dim = get_action_dim(action_space) - self.action_net = nn.Linear(self.features_dim, action_dim, bias=with_bias) - if squash_output: - self.action_net = nn.Sequential(self.action_net, nn.Tanh()) - elif isinstance(action_space, gym.spaces.Discrete): - self.action_net = nn.Linear(self.features_dim, action_space.n, bias=with_bias) - else: - raise NotImplementedError(f"Error: ES policy not implemented for action space of type {type(action_space)}.") + super().__init__(observation_space, action_space, net_arch=[], with_bias=with_bias, squash_output=squash_output) diff --git a/sb3_contrib/common/population_based_algorithm.py b/sb3_contrib/common/population_based_algorithm.py index 47154ef2..02462043 100644 --- a/sb3_contrib/common/population_based_algorithm.py +++ b/sb3_contrib/common/population_based_algorithm.py @@ -1,23 +1,26 @@ import copy import sys import time +from collections import deque from functools import partial -from typing import Any, Dict, Optional, Tuple, Type, TypeVar, Union +from typing import Any, ClassVar, Optional, TypeVar, Union -import gym import numpy as np import torch as th -import torch.nn.utils +from gymnasium import spaces from stable_baselines3.common.base_class import BaseAlgorithm from stable_baselines3.common.callbacks import BaseCallback from stable_baselines3.common.evaluation import evaluate_policy -from stable_baselines3.common.type_aliases import GymEnv, MaybeCallback, Schedule +from stable_baselines3.common.policies import BasePolicy +from stable_baselines3.common.running_mean_std import RunningMeanStd +from stable_baselines3.common.save_util import load_from_zip_file +from stable_baselines3.common.type_aliases import GymEnv, MaybeCallback, Schedule, TensorDict from stable_baselines3.common.utils import safe_mean from sb3_contrib.common.policies import ESLinearPolicy, ESPolicy from sb3_contrib.common.vec_env.async_eval import AsyncEval -PopulationBasedSelf = TypeVar("PopulationBasedSelf", bound="PopulationBasedAlgorithm") +SelfPopulationBased = TypeVar("SelfPopulationBased", bound="PopulationBasedAlgorithm") class PopulationBasedAlgorithm(BaseAlgorithm): @@ -33,6 +36,8 @@ class PopulationBasedAlgorithm(BaseAlgorithm): :param alive_bonus_offset: Constant added to the reward at each step, used to cancel out alive bonuses. :param n_eval_episodes: Number of episodes to evaluate each candidate. :param policy_kwargs: Keyword arguments to pass to the policy on creation + :param stats_window_size: Window size for the rollout logging, specifying the number of episodes to average + the reported success rate, mean episode length, and mean reward over :param tensorboard_log: String with the directory to put tensorboard logs: :param seed: Random seed for the training :param verbose: Verbosity level: 0 no output, 1 info, 2 debug @@ -40,31 +45,33 @@ class PopulationBasedAlgorithm(BaseAlgorithm): :param _init_setup_model: Whether or not to build the network at the creation of the instance """ - policy_aliases: Dict[str, Type[ESPolicy]] = { + policy_aliases: ClassVar[dict[str, type[BasePolicy]]] = { "MlpPolicy": ESPolicy, "LinearPolicy": ESLinearPolicy, } def __init__( self, - policy: Union[str, Type[ESPolicy]], + policy: Union[str, type[ESPolicy]], env: Union[GymEnv, str], learning_rate: Union[float, Schedule], pop_size: int = 16, alive_bonus_offset: float = 0, n_eval_episodes: int = 1, - policy_kwargs: Optional[Dict[str, Any]] = None, + policy_kwargs: Optional[dict[str, Any]] = None, + stats_window_size: int = 100, tensorboard_log: Optional[str] = None, seed: Optional[int] = None, verbose: int = 0, device: Union[th.device, str] = "cpu", - supported_action_spaces: Optional[Tuple[gym.spaces.Space, ...]] = None, + supported_action_spaces: Optional[tuple[type[spaces.Space], ...]] = None, ): super().__init__( policy, env, learning_rate=learning_rate, + stats_window_size=stats_window_size, tensorboard_log=tensorboard_log, policy_kwargs=policy_kwargs, verbose=verbose, @@ -78,7 +85,6 @@ def __init__( self.n_eval_episodes = n_eval_episodes self.alive_bonus_offset = alive_bonus_offset - self.processes = None # Keep track of how many steps where elapsed before a new rollout # Important for syncing observation normalization between workers self.old_count = 0 @@ -100,8 +106,8 @@ def _mimic_monitor_wrapper(self, episode_rewards: np.ndarray, episode_lengths: n def _trigger_callback( self, - _locals: Dict[str, Any], - _globals: Dict[str, Any], + _locals: dict[str, Any], + _globals: dict[str, Any], callback: BaseCallback, n_envs: int, ) -> None: @@ -136,7 +142,7 @@ def evaluate_candidates( candidate_returns = th.zeros(self.pop_size, device=self.device) train_policy = copy.deepcopy(self.policy) # Empty buffer to show only mean over one iteration (one set of candidates) in the logs - self.ep_info_buffer = [] + self.ep_info_buffer = deque(maxlen=self._stats_window_size) callback.on_rollout_start() if async_eval is not None: @@ -154,6 +160,9 @@ def evaluate_candidates( # Combine the filter stats of each process for normalization for worker_obs_rms in async_eval.get_obs_rms(): if self._vec_normalize_env is not None: + assert isinstance( + self._vec_normalize_env.obs_rms, RunningMeanStd + ), "ES Algorithms don't support dict obs with normalization yet" # worker_obs_rms.count -= self.old_count self._vec_normalize_env.obs_rms.combine(worker_obs_rms) # Hack: don't count timesteps twice (between the two are synced) @@ -163,6 +172,9 @@ def evaluate_candidates( # Synchronise VecNormalize if needed if self._vec_normalize_env is not None: + assert isinstance( + self._vec_normalize_env.obs_rms, RunningMeanStd + ), "ES Algorithms don't support dict obs with normalization yet" async_eval.sync_obs_rms(self._vec_normalize_env.obs_rms.copy()) self.old_count = self._vec_normalize_env.obs_rms.count @@ -171,13 +183,14 @@ def evaluate_candidates( self.num_timesteps += len(async_eval.remotes) callback.on_step() else: + assert self.env is not None # Single process, synchronous version for weights_idx in range(self.pop_size): # Load current candidate weights - train_policy.load_from_vector(candidate_weights[weights_idx].cpu()) + train_policy.load_from_vector(candidate_weights[weights_idx].cpu().numpy()) # Evaluate the candidate - episode_rewards, episode_lengths = evaluate_policy( + episode_rewards, episode_lengths = evaluate_policy( # type: ignore[assignment] train_policy, self.env, n_eval_episodes=self.n_eval_episodes, @@ -201,10 +214,13 @@ def evaluate_candidates( return candidate_returns - def _log_and_dump(self) -> None: + def dump_logs(self) -> None: """ Dump information to the logger. """ + assert self.ep_info_buffer is not None + assert self.ep_success_buffer is not None + time_elapsed = max((time.time_ns() - self.start_time) / 1e9, sys.float_info.epsilon) fps = int((self.num_timesteps - self._num_timesteps_at_start) / (time_elapsed + 1e-8)) if len(self.ep_info_buffer) > 0 and len(self.ep_info_buffer[0]) > 0: @@ -213,6 +229,8 @@ def _log_and_dump(self) -> None: self.logger.record("time/fps", fps) self.logger.record("time/time_elapsed", int(time_elapsed), exclude="tensorboard") self.logger.record("time/total_timesteps", self.num_timesteps, exclude="tensorboard") + if len(self.ep_success_buffer) > 0: + self.logger.record("rollout/success_rate", safe_mean(self.ep_success_buffer)) self.logger.dump(step=self.num_timesteps) def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval]) -> None: @@ -224,8 +242,8 @@ def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval] """ raise NotImplementedError() - def learn( - self: PopulationBasedSelf, + def learn( # type: ignore[override] + self: SelfPopulationBased, total_timesteps: int, callback: MaybeCallback = None, log_interval: int = 1, @@ -233,7 +251,7 @@ def learn( reset_num_timesteps: bool = True, async_eval: Optional[AsyncEval] = None, progress_bar: bool = False, - ) -> PopulationBasedSelf: + ) -> SelfPopulationBased: """ Return a trained model. @@ -260,7 +278,7 @@ def learn( self._update_current_progress_remaining(self.num_timesteps, total_timesteps) self._do_one_update(callback, async_eval) if log_interval is not None and self._n_updates % log_interval == 0: - self._log_and_dump() + self.dump_logs() if async_eval is not None: async_eval.close() @@ -268,3 +286,25 @@ def learn( callback.on_training_end() return self + + def set_parameters( + self, + load_path_or_dict: Union[str, TensorDict], + exact_match: bool = True, + device: Union[th.device, str] = "auto", + ) -> None: + # Patched set_parameters() to handle ARS linear policy saved with sb3-contrib < 1.7.0 + params = None + if isinstance(load_path_or_dict, dict): + params = load_path_or_dict + else: + _, params, _ = load_from_zip_file(load_path_or_dict, device=device) + + # Patch to load LinearPolicy saved using sb3-contrib < 1.7.0 + # See https://github.com/Stable-Baselines-Team/stable-baselines3-contrib/pull/122#issuecomment-1331981230 + for name in {"weight", "bias"}: + if f"action_net.{name}" in params.get("policy", {}): + params["policy"][f"action_net.0.{name}"] = params["policy"][f"action_net.{name}"] # type: ignore[index] + del params["policy"][f"action_net.{name}"] # type: ignore[attr-defined] + + super().set_parameters(params, exact_match=exact_match) diff --git a/tests/test_run.py b/tests/test_run.py index 88263835..55ff2c47 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -3,7 +3,7 @@ from stable_baselines3.common.env_util import make_vec_env from stable_baselines3.common.vec_env import VecNormalize -from sb3_contrib import ARS, CEM, QRDQN, TQC, TRPO, MaskablePPO, CrossQ +from sb3_contrib import ARS, CEM, QRDQN, TQC, TRPO, CrossQ, MaskablePPO from sb3_contrib.common.envs import InvalidActionEnvDiscrete from sb3_contrib.common.vec_env import AsyncEval @@ -107,7 +107,7 @@ def test_ars(policy_str, env_id): @pytest.mark.parametrize("policy_str", ["LinearPolicy", "MlpPolicy"]) def test_cem(policy_str, env_id): model = CEM(policy_str, env_id, pop_size=2, verbose=1, seed=0) - model.learn(total_timesteps=500, log_interval=1, eval_freq=250) + model.learn(total_timesteps=500, log_interval=1) @pytest.mark.parametrize("model_class", [ARS, CEM]) From 717918a605868fc43bd9f027e38b112bb962531f Mon Sep 17 00:00:00 2001 From: Antonin RAFFIN Date: Mon, 11 Aug 2025 10:36:56 +0200 Subject: [PATCH 15/16] Fix type checking --- docs/misc/changelog.rst | 2 +- sb3_contrib/cem/cem.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/misc/changelog.rst b/docs/misc/changelog.rst index c6e201e4..2497e4ab 100644 --- a/docs/misc/changelog.rst +++ b/docs/misc/changelog.rst @@ -14,6 +14,7 @@ Breaking Changes: New Features: ^^^^^^^^^^^^^ - Added support for n-step returns for off-policy algorithms via the `n_steps` parameter +- Added noisy Cross Entropy Method (CEM) Bug Fixes: ^^^^^^^^^^ @@ -312,7 +313,6 @@ Breaking Changes: New Features: ^^^^^^^^^^^^^ -- Added noisy Cross Entropy Method (CEM) Bug Fixes: ^^^^^^^^^^ diff --git a/sb3_contrib/cem/cem.py b/sb3_contrib/cem/cem.py index cab96cf2..eb38fb9f 100644 --- a/sb3_contrib/cem/cem.py +++ b/sb3_contrib/cem/cem.py @@ -153,7 +153,7 @@ def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval] # Sample using the full covariance matrix (+ extra noise) sample_cov = self.centroid_cov + th.diag(self.extra_variance) param_noise_distribution = MultivariateNormal(th.zeros_like(self.weights), covariance_matrix=sample_cov) - policy_deltas = param_noise_distribution.sample((self.pop_size,)) + policy_deltas = param_noise_distribution.sample((self.pop_size,)) # type: ignore[arg-type] candidate_weights = self.weights + policy_deltas From 9ab7063b1e0922c44a9a207e1c55696acc73b8ce Mon Sep 17 00:00:00 2001 From: Antonin RAFFIN Date: Mon, 11 Aug 2025 10:38:05 +0200 Subject: [PATCH 16/16] Fix ARS types --- pyproject.toml | 1 - sb3_contrib/ars/ars.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 2f19c2c0..f622232f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,6 @@ follow_imports = "silent" show_error_codes = true exclude = """(?x)( sb3_contrib/ppo_recurrent/ppo_recurrent.py$ - | sb3_contrib/ars/ars.py$ | sb3_contrib/common/recurrent/policies.py$ | sb3_contrib/common/recurrent/buffers.py$ | tests/test_train_eval_mode.py$ diff --git a/sb3_contrib/ars/ars.py b/sb3_contrib/ars/ars.py index efb76d6e..4d27cdc1 100644 --- a/sb3_contrib/ars/ars.py +++ b/sb3_contrib/ars/ars.py @@ -46,6 +46,7 @@ class ARS(PopulationBasedAlgorithm): "MlpPolicy": ESPolicy, "LinearPolicy": ESLinearPolicy, } + weights: th.Tensor # Need to call init model to initialize weights def __init__( self, @@ -96,7 +97,6 @@ def __init__( self.n_top = n_top self.zero_policy = zero_policy - self.weights = None # Need to call init model to initialize weight if _init_setup_model: self._setup_model()