diff --git a/README.md b/README.md index 5b0503ba..7f53c727 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ See documentation for the full list of included features. **RL Algorithms**: - [Augmented Random Search (ARS)](https://arxiv.org/abs/1803.07055) +- [Noisy Cross Entropy Method (CEM)](http://dx.doi.org/10.1162/neco.2006.18.12.2936) - [Quantile Regression DQN (QR-DQN)](https://arxiv.org/abs/1710.10044) - [PPO with invalid action masking (MaskablePPO)](https://arxiv.org/abs/2006.14171) - [PPO with recurrent policy (RecurrentPPO aka PPO LSTM)](https://ppo-details.cleanrl.dev//2021/11/05/ppo-implementation-details/) diff --git a/docs/misc/changelog.rst b/docs/misc/changelog.rst index d1e11c43..2497e4ab 100644 --- a/docs/misc/changelog.rst +++ b/docs/misc/changelog.rst @@ -9,10 +9,12 @@ Release 2.7.0 (2025-07-25) Breaking Changes: ^^^^^^^^^^^^^^^^^ - Upgraded to Stable-Baselines3 >= 2.7.0 +- Changed default policy architecture for ARS/CEM to ``[32]`` instead of ``[64, 64]`` New Features: ^^^^^^^^^^^^^ - Added support for n-step returns for off-policy algorithms via the `n_steps` parameter +- Added noisy Cross Entropy Method (CEM) Bug Fixes: ^^^^^^^^^^ @@ -357,6 +359,9 @@ Bug Fixes: Deprecations: ^^^^^^^^^^^^^ +Others: +^^^^^^^ + Release 1.5.0 (2022-03-25) ------------------------------- diff --git a/pyproject.toml b/pyproject.toml index 2f19c2c0..f622232f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,6 @@ follow_imports = "silent" show_error_codes = true exclude = """(?x)( sb3_contrib/ppo_recurrent/ppo_recurrent.py$ - | sb3_contrib/ars/ars.py$ | sb3_contrib/common/recurrent/policies.py$ | sb3_contrib/common/recurrent/buffers.py$ | tests/test_train_eval_mode.py$ diff --git a/sb3_contrib/__init__.py b/sb3_contrib/__init__.py index 2aa7a19b..ab86a3a7 100644 --- a/sb3_contrib/__init__.py +++ b/sb3_contrib/__init__.py @@ -1,6 +1,7 @@ import os from sb3_contrib.ars import ARS +from sb3_contrib.cem import CEM from sb3_contrib.crossq import CrossQ from sb3_contrib.ppo_mask import MaskablePPO from sb3_contrib.ppo_recurrent import RecurrentPPO @@ -15,6 +16,7 @@ __all__ = [ "ARS", + "CEM", "QRDQN", "TQC", "TRPO", diff --git a/sb3_contrib/ars/ars.py b/sb3_contrib/ars/ars.py index d06ad724..4d27cdc1 100644 --- a/sb3_contrib/ars/ars.py +++ b/sb3_contrib/ars/ars.py @@ -1,29 +1,21 @@ -import copy -import sys -import time import warnings -from functools import partial from typing import Any, ClassVar, Optional, TypeVar, Union -import numpy as np import torch as th -import torch.nn.utils from gymnasium import spaces -from stable_baselines3.common.base_class import BaseAlgorithm from stable_baselines3.common.callbacks import BaseCallback -from stable_baselines3.common.evaluation import evaluate_policy from stable_baselines3.common.policies import BasePolicy -from stable_baselines3.common.save_util import load_from_zip_file from stable_baselines3.common.type_aliases import GymEnv, MaybeCallback, Schedule -from stable_baselines3.common.utils import FloatSchedule, safe_mean +from stable_baselines3.common.utils import FloatSchedule -from sb3_contrib.ars.policies import ARSPolicy, LinearPolicy, MlpPolicy +from sb3_contrib.common.policies import ESLinearPolicy, ESPolicy +from sb3_contrib.common.population_based_algorithm import PopulationBasedAlgorithm from sb3_contrib.common.vec_env.async_eval import AsyncEval SelfARS = TypeVar("SelfARS", bound="ARS") -class ARS(BaseAlgorithm): +class ARS(PopulationBasedAlgorithm): """ Augmented Random Search: https://arxiv.org/abs/1803.07055 @@ -51,13 +43,14 @@ class ARS(BaseAlgorithm): """ policy_aliases: ClassVar[dict[str, type[BasePolicy]]] = { - "MlpPolicy": MlpPolicy, - "LinearPolicy": LinearPolicy, + "MlpPolicy": ESPolicy, + "LinearPolicy": ESLinearPolicy, } + weights: th.Tensor # Need to call init model to initialize weights def __init__( self, - policy: Union[str, type[ARSPolicy]], + policy: Union[str, type[ESPolicy]], env: Union[GymEnv, str], n_delta: int = 8, n_top: Optional[int] = None, @@ -78,20 +71,20 @@ def __init__( policy, env, learning_rate=learning_rate, + pop_size=2 * n_delta, + alive_bonus_offset=alive_bonus_offset, + n_eval_episodes=n_eval_episodes, stats_window_size=stats_window_size, tensorboard_log=tensorboard_log, policy_kwargs=policy_kwargs, verbose=verbose, device=device, supported_action_spaces=(spaces.Box, spaces.Discrete), - support_multi_env=True, seed=seed, ) self.n_delta = n_delta - self.pop_size = 2 * n_delta self.delta_std_schedule = FloatSchedule(delta_std) - self.n_eval_episodes = n_eval_episodes if n_top is None: n_top = n_delta @@ -103,13 +96,7 @@ def __init__( self.n_top = n_top - self.alive_bonus_offset = alive_bonus_offset self.zero_policy = zero_policy - self.weights = None # Need to call init model to initialize weight - self.processes = None - # Keep track of how many steps where elapsed before a new rollout - # Important for syncing observation normalization between workers - self.old_count = 0 if _init_setup_model: self._setup_model() @@ -125,137 +112,7 @@ def _setup_model(self) -> None: if self.zero_policy: self.weights = th.zeros_like(self.weights, requires_grad=False) - self.policy.load_from_vector(self.weights.cpu()) - - def _mimic_monitor_wrapper(self, episode_rewards: np.ndarray, episode_lengths: np.ndarray) -> None: - """ - Helper to mimic Monitor wrapper and report episode statistics (mean reward, mean episode length). - - :param episode_rewards: List containing per-episode rewards - :param episode_lengths: List containing per-episode lengths (in number of steps) - """ - # Mimic Monitor Wrapper - infos = [ - {"episode": {"r": episode_reward, "l": episode_length}} - for episode_reward, episode_length in zip(episode_rewards, episode_lengths) - ] - - self._update_info_buffer(infos) - - def _trigger_callback( - self, - _locals: dict[str, Any], - _globals: dict[str, Any], - callback: BaseCallback, - n_envs: int, - ) -> None: - """ - Callback passed to the ``evaluate_policy()`` helper - in order to increment the number of timesteps - and trigger events in the single process version. - - :param _locals: - :param _globals: - :param callback: Callback that will be called at every step - :param n_envs: Number of environments - """ - self.num_timesteps += n_envs - callback.on_step() - - def evaluate_candidates( - self, candidate_weights: th.Tensor, callback: BaseCallback, async_eval: Optional[AsyncEval] - ) -> th.Tensor: - """ - Evaluate each candidate. - - :param candidate_weights: The candidate weights to be evaluated. - :param callback: Callback that will be called at each step - (or after evaluation in the multiprocess version) - :param async_eval: The object for asynchronous evaluation of candidates. - :return: The episodic return for each candidate. - """ - - batch_steps = 0 - # returns == sum of rewards - candidate_returns = th.zeros(self.pop_size, device=self.device) - train_policy = copy.deepcopy(self.policy) - # Empty buffer to show only mean over one iteration (one set of candidates) in the logs - self.ep_info_buffer = [] - callback.on_rollout_start() - - if async_eval is not None: - # Multiprocess asynchronous version - async_eval.send_jobs(candidate_weights, self.pop_size) - results = async_eval.get_results() - - for weights_idx, (episode_rewards, episode_lengths) in results: - # Update reward to cancel out alive bonus if needed - candidate_returns[weights_idx] = sum(episode_rewards) + self.alive_bonus_offset * sum(episode_lengths) - batch_steps += np.sum(episode_lengths) - self._mimic_monitor_wrapper(episode_rewards, episode_lengths) - - # Combine the filter stats of each process for normalization - for worker_obs_rms in async_eval.get_obs_rms(): - if self._vec_normalize_env is not None: - # worker_obs_rms.count -= self.old_count - self._vec_normalize_env.obs_rms.combine(worker_obs_rms) - # Hack: don't count timesteps twice (between the two are synced) - # otherwise it will lead to overflow, - # in practice we would need two RunningMeanStats - self._vec_normalize_env.obs_rms.count -= self.old_count - - # Synchronise VecNormalize if needed - if self._vec_normalize_env is not None: - async_eval.sync_obs_rms(self._vec_normalize_env.obs_rms.copy()) - self.old_count = self._vec_normalize_env.obs_rms.count - - # Hack to have Callback events - for _ in range(batch_steps // len(async_eval.remotes)): - self.num_timesteps += len(async_eval.remotes) - callback.on_step() - else: - # Single process, synchronous version - for weights_idx in range(self.pop_size): - # Load current candidate weights - train_policy.load_from_vector(candidate_weights[weights_idx].cpu()) - # Evaluate the candidate - episode_rewards, episode_lengths = evaluate_policy( - train_policy, - self.env, - n_eval_episodes=self.n_eval_episodes, - return_episode_rewards=True, - # Increment num_timesteps too (slight mismatch with multi envs) - callback=partial(self._trigger_callback, callback=callback, n_envs=self.env.num_envs), - warn=False, - ) - # Update reward to cancel out alive bonus if needed - candidate_returns[weights_idx] = sum(episode_rewards) + self.alive_bonus_offset * sum(episode_lengths) - batch_steps += sum(episode_lengths) - self._mimic_monitor_wrapper(episode_rewards, episode_lengths) - - # Note: we increment the num_timesteps inside the evaluate_policy() - # however when using multiple environments, there will be a slight - # mismatch between the number of timesteps used and the number - # of calls to the step() method (cf. implementation of evaluate_policy()) - # self.num_timesteps += batch_steps - - callback.on_rollout_end() - - return candidate_returns - - def dump_logs(self) -> None: - """ - Dump information to the logger. - """ - time_elapsed = max((time.time_ns() - self.start_time) / 1e9, sys.float_info.epsilon) - fps = int((self.num_timesteps - self._num_timesteps_at_start) / time_elapsed) - if len(self.ep_info_buffer) > 0 and len(self.ep_info_buffer[0]) > 0: - self.logger.record("rollout/ep_rew_mean", safe_mean([ep_info["r"] for ep_info in self.ep_info_buffer])) - self.logger.record("rollout/ep_len_mean", safe_mean([ep_info["l"] for ep_info in self.ep_info_buffer])) - self.logger.record("time/fps", fps) - self.logger.record("time/time_elapsed", int(time_elapsed), exclude="tensorboard") - self.logger.record("time/total_timesteps", self.num_timesteps, exclude="tensorboard") - self.logger.dump(step=self.num_timesteps) + self.policy.load_from_vector(self.weights.cpu().numpy()) def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval]) -> None: """ @@ -295,7 +152,7 @@ def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval] step_size = learning_rate / (self.n_top * return_std + 1e-6) # Approximate gradient step self.weights = self.weights + step_size * ((plus_returns - minus_returns) @ deltas) - self.policy.load_from_vector(self.weights.cpu()) + self.policy.load_from_vector(self.weights.cpu().numpy()) self.logger.record("train/iterations", self._n_updates, exclude="tensorboard") self.logger.record("train/delta_std", delta_std) @@ -305,7 +162,7 @@ def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval] self._n_updates += 1 - def learn( + def learn( # type: ignore[override] self: SelfARS, total_timesteps: int, callback: MaybeCallback = None, @@ -328,47 +185,12 @@ def learn( :return: the trained model """ - total_steps, callback = self._setup_learn( - total_timesteps, - callback, - reset_num_timesteps, - tb_log_name, - progress_bar, + return super().learn( + total_timesteps=total_timesteps, + callback=callback, + log_interval=log_interval, + tb_log_name=tb_log_name, + reset_num_timesteps=reset_num_timesteps, + async_eval=async_eval, + progress_bar=progress_bar, ) - - callback.on_training_start(locals(), globals()) - - while self.num_timesteps < total_steps: - self._update_current_progress_remaining(self.num_timesteps, total_timesteps) - self._do_one_update(callback, async_eval) - if log_interval is not None and self._n_updates % log_interval == 0: - self.dump_logs() - - if async_eval is not None: - async_eval.close() - - callback.on_training_end() - - return self - - def set_parameters( - self, - load_path_or_dict: Union[str, dict[str, dict]], - exact_match: bool = True, - device: Union[th.device, str] = "auto", - ) -> None: - # Patched set_parameters() to handle ARS linear policy saved with sb3-contrib < 1.7.0 - params = None - if isinstance(load_path_or_dict, dict): - params = load_path_or_dict - else: - _, params, _ = load_from_zip_file(load_path_or_dict, device=device) - - # Patch to load LinearPolicy saved using sb3-contrib < 1.7.0 - # See https://github.com/Stable-Baselines-Team/stable-baselines3-contrib/pull/122#issuecomment-1331981230 - for name in {"weight", "bias"}: - if f"action_net.{name}" in params.get("policy", {}): - params["policy"][f"action_net.0.{name}"] = params["policy"][f"action_net.{name}"] - del params["policy"][f"action_net.{name}"] - - super().set_parameters(params, exact_match=exact_match) diff --git a/sb3_contrib/ars/policies.py b/sb3_contrib/ars/policies.py index 454265fa..0c7d0379 100644 --- a/sb3_contrib/ars/policies.py +++ b/sb3_contrib/ars/policies.py @@ -1,107 +1,8 @@ -from typing import Any, Optional - -import torch as th -from gymnasium import spaces -from stable_baselines3.common.policies import BasePolicy -from stable_baselines3.common.preprocessing import get_action_dim -from stable_baselines3.common.torch_layers import create_mlp -from stable_baselines3.common.type_aliases import PyTorchObs -from torch import nn - - -class ARSPolicy(BasePolicy): - """ - Policy network for ARS. - - :param observation_space: The observation space of the environment - :param action_space: The action space of the environment - :param net_arch: Network architecture, defaults to a 2 layers MLP with 64 hidden nodes. - :param activation_fn: Activation function - :param with_bias: If set to False, the layers will not learn an additive bias - :param squash_output: For continuous actions, whether the output is squashed - or not using a ``tanh()`` function. If not squashed with tanh the output will instead be clipped. - """ - - def __init__( - self, - observation_space: spaces.Space, - action_space: spaces.Space, - net_arch: Optional[list[int]] = None, - activation_fn: type[nn.Module] = nn.ReLU, - with_bias: bool = True, - squash_output: bool = True, - ): - super().__init__( - observation_space, - action_space, - squash_output=isinstance(action_space, spaces.Box) and squash_output, - ) - - if net_arch is None: - net_arch = [64, 64] - - self.net_arch = net_arch - self.features_extractor = self.make_features_extractor() - self.features_dim = self.features_extractor.features_dim - self.activation_fn = activation_fn - - if isinstance(action_space, spaces.Box): - action_dim = get_action_dim(action_space) - actor_net = create_mlp( - self.features_dim, action_dim, net_arch, activation_fn, with_bias=with_bias, squash_output=squash_output - ) - elif isinstance(action_space, spaces.Discrete): - actor_net = create_mlp(self.features_dim, int(action_space.n), net_arch, activation_fn, with_bias=with_bias) - else: - raise NotImplementedError(f"Error: ARS policy not implemented for action space of type {type(action_space)}.") - - self.action_net = nn.Sequential(*actor_net) - - def _get_constructor_parameters(self) -> dict[str, Any]: - # data = super()._get_constructor_parameters() this adds normalize_images, which we don't support... - data = dict( - observation_space=self.observation_space, - action_space=self.action_space, - net_arch=self.net_arch, - activation_fn=self.activation_fn, - ) - return data - - def forward(self, obs: PyTorchObs) -> th.Tensor: - features = self.extract_features(obs, self.features_extractor) - if isinstance(self.action_space, spaces.Box): - return self.action_net(features) - elif isinstance(self.action_space, spaces.Discrete): - logits = self.action_net(features) - return th.argmax(logits, dim=1) - else: - raise NotImplementedError() - - def _predict(self, observation: PyTorchObs, deterministic: bool = True) -> th.Tensor: - # Non deterministic action does not really make sense for ARS, we ignore this parameter for now.. - return self(observation) - - -class ARSLinearPolicy(ARSPolicy): - """ - Linear policy network for ARS. - - :param observation_space: The observation space of the environment - :param action_space: The action space of the environment - :param with_bias: With or without bias on the output - :param squash_output: For continuous actions, whether the output is squashed - or not using a ``tanh()`` function. If not squashed with tanh the output will instead be clipped. - """ - - def __init__( - self, - observation_space: spaces.Space, - action_space: spaces.Space, - with_bias: bool = False, - squash_output: bool = False, - ): - super().__init__(observation_space, action_space, net_arch=[], with_bias=with_bias, squash_output=squash_output) - +from sb3_contrib.common.policies import ESLinearPolicy, ESPolicy +# Backward compat +ARSLinearPolicy = ESLinearPolicy +ARSPolicy = ESPolicy +# Aliases MlpPolicy = ARSPolicy LinearPolicy = ARSLinearPolicy diff --git a/sb3_contrib/cem/__init__.py b/sb3_contrib/cem/__init__.py new file mode 100644 index 00000000..2d8ee25e --- /dev/null +++ b/sb3_contrib/cem/__init__.py @@ -0,0 +1,4 @@ +from sb3_contrib.cem.cem import CEM +from sb3_contrib.cem.policies import LinearPolicy, MlpPolicy + +__all__ = ["CEM", "LinearPolicy", "MlpPolicy"] diff --git a/sb3_contrib/cem/cem.py b/sb3_contrib/cem/cem.py new file mode 100644 index 00000000..eb38fb9f --- /dev/null +++ b/sb3_contrib/cem/cem.py @@ -0,0 +1,221 @@ +import warnings +from typing import Any, ClassVar, Optional, TypeVar, Union + +import torch as th +from gymnasium import spaces +from stable_baselines3.common.callbacks import BaseCallback +from stable_baselines3.common.policies import BasePolicy +from stable_baselines3.common.type_aliases import GymEnv, MaybeCallback +from torch.distributions.multivariate_normal import MultivariateNormal + +from sb3_contrib.common.policies import ESLinearPolicy, ESPolicy +from sb3_contrib.common.population_based_algorithm import PopulationBasedAlgorithm +from sb3_contrib.common.vec_env.async_eval import AsyncEval + +SelfCEM = TypeVar("SelfCEM", bound="CEM") + + +class CEM(PopulationBasedAlgorithm): + """ + Noisy Cross Entropy Method: http://dx.doi.org/10.1162/neco.2006.18.12.2936 + "Learning Tetris Using the Noisy Cross-Entropy Method" + + CEM is part of the Evolution Strategies (ES), which does black-box optimization + by sampling and evaluating a population of candidates: + https://blog.otoro.net/2017/10/29/visual-evolution-strategies/ + + John Schulman's implementation: https://github.com/joschu/modular_rl/blob/master/modular_rl/cem.py + + :param policy: The policy to train, can be an instance of ``ESPolicy``, or a string from ["LinearPolicy", "MlpPolicy"] + :param env: The environment to train on, may be a string if registered with gym + :param pop_size: Population size (number of individuals) + :param n_top: How many of the top individuals to use in each update step. Default is pop_size + :param initial_std: Initial standard deviation for the exploration noise, + by default using Pytorch default variance at initialization. + :param extra_noise_std: Initial standard deviation for the extra noise added to the covariance matrix + :param noise_multiplier: Noise decay. We add noise to the standard deviation + to avoid early collapse. + :param zero_policy: Boolean determining if the passed policy should have it's weights zeroed before training. + :param alive_bonus_offset: Constant added to the reward at each step, used to cancel out alive bonuses. + :param n_eval_episodes: Number of episodes to evaluate each candidate. + :param policy_kwargs: Keyword arguments to pass to the policy on creation + :param stats_window_size: Window size for the rollout logging, specifying the number of episodes to average + the reported success rate, mean episode length, and mean reward over + :param tensorboard_log: String with the directory to put tensorboard logs: + :param seed: Random seed for the training + :param verbose: Verbosity level: 0 no output, 1 info, 2 debug + :param device: Torch device to use for training, defaults to "cpu" + :param _init_setup_model: Whether or not to build the network at the creation of the instance + """ + + policy_aliases: ClassVar[dict[str, type[BasePolicy]]] = { + "MlpPolicy": ESPolicy, + "LinearPolicy": ESLinearPolicy, + } + weights: th.Tensor # Need to call init model to initialize weights + centroid_cov: th.Tensor + extra_variance: th.Tensor + + def __init__( + self, + policy: Union[str, type[ESPolicy]], + env: Union[GymEnv, str], + pop_size: int = 16, + n_top: Optional[int] = None, + initial_std: Optional[float] = None, + extra_noise_std: float = 0.2, + noise_multiplier: float = 0.999, + zero_policy: bool = False, + use_diagonal_covariance: bool = False, + alive_bonus_offset: float = 0, + n_eval_episodes: int = 1, + policy_kwargs: Optional[dict[str, Any]] = None, + stats_window_size: int = 100, + tensorboard_log: Optional[str] = None, + seed: Optional[int] = None, + verbose: int = 0, + device: Union[th.device, str] = "cpu", + _init_setup_model: bool = True, + ): + + super().__init__( + policy, + env, + learning_rate=0.0, + pop_size=pop_size, + alive_bonus_offset=alive_bonus_offset, + n_eval_episodes=n_eval_episodes, + stats_window_size=stats_window_size, + tensorboard_log=tensorboard_log, + policy_kwargs=policy_kwargs, + verbose=verbose, + device=device, + supported_action_spaces=(spaces.Box, spaces.Discrete), + seed=seed, + ) + + self.initial_std = initial_std + self.extra_noise_std = extra_noise_std + self.noise_multiplier = noise_multiplier + + if n_top is None: + n_top = self.pop_size + + # Make sure our hyper parameters are valid and auto correct them if they are not + if n_top > self.pop_size: + warnings.warn(f"n_top = {n_top} > pop_size = {self.pop_size}, setting n_top = pop_size") + n_top = self.pop_size + + self.n_top = n_top + self.zero_policy = zero_policy + self.use_diagonal_covariance = use_diagonal_covariance + + if _init_setup_model: + self._setup_model() + + def _setup_model(self) -> None: + self._setup_lr_schedule() + self.set_random_seed(self.seed) + + self.policy = self.policy_class(self.observation_space, self.action_space, **self.policy_kwargs) + self.policy = self.policy.to(self.device) + self.weights = th.nn.utils.parameters_to_vector(self.policy.parameters()).detach() + self.n_params = len(self.weights) + + if self.initial_std is None: + # Use weights variance from Pytorch initialization by default + initial_variance = self.weights.var().item() + else: + initial_variance = self.initial_std**2 + self.centroid_cov = th.ones_like(self.weights, requires_grad=False) * initial_variance + if not self.use_diagonal_covariance: + self.centroid_cov = th.diag(self.centroid_cov) + # Initial extra noise vector (extra variance) + self.extra_variance = th.ones_like(self.weights, requires_grad=False) * self.extra_noise_std**2 + + if self.zero_policy: + self.weights = th.zeros_like(self.weights, requires_grad=False) + self.policy.load_from_vector(self.weights.cpu().numpy()) + + def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval]) -> None: + """ + Sample new candidates, evaluate them and then update current policy. + + :param callback: callback(s) called at every step with state of the algorithm. + :param async_eval: The object for asynchronous evaluation of candidates. + """ + + if self.use_diagonal_covariance: + # Sample using only the diagonal of the covariance matrix (+ extra noise) + param_noise = th.normal(mean=0.0, std=1.0, size=(self.pop_size, self.n_params), device=self.device) + policy_deltas = param_noise * th.sqrt(self.centroid_cov + self.extra_variance) + else: + # Sample using the full covariance matrix (+ extra noise) + sample_cov = self.centroid_cov + th.diag(self.extra_variance) + param_noise_distribution = MultivariateNormal(th.zeros_like(self.weights), covariance_matrix=sample_cov) + policy_deltas = param_noise_distribution.sample((self.pop_size,)) # type: ignore[arg-type] + + candidate_weights = self.weights + policy_deltas + + with th.no_grad(): + candidate_returns = self.evaluate_candidates(candidate_weights, callback, async_eval) + + # Keep only the top performing candidates for update + top_idx = th.argsort(candidate_returns, descending=True)[: self.n_top] + + # Update centroid: barycenter of the best candidates + self.weights = candidate_weights[top_idx].mean(dim=0) + if self.use_diagonal_covariance: + # Do not compute full cov matrix when use_diagonal_covariance=True + self.centroid_cov = candidate_weights[top_idx].var(dim=0) + else: + # transpose to mimic rowvar=False in np.cov() + self.centroid_cov = th.cov(candidate_weights[top_idx].transpose(-1, -2)) + + # Update extra variance (prevents early converge) + self.extra_noise_std = self.extra_noise_std * self.noise_multiplier + self.extra_variance = th.ones_like(self.weights, requires_grad=False) * self.extra_noise_std**2 + + # Current policy is the centroid of the best candidates + self.policy.load_from_vector(self.weights.cpu().numpy()) + + self.logger.record("rollout/return_std", candidate_returns.std().item()) + self.logger.record("train/iterations", self._n_updates, exclude="tensorboard") + if self.use_diagonal_covariance: + self.logger.record("train/diag_std", th.mean(th.sqrt(self.centroid_cov)).item()) + else: + self.logger.record("train/diag_std", th.mean(th.sqrt(th.diagonal(self.centroid_cov))).item()) + self.logger.record("train/extra_noise_std", self.extra_noise_std) + + self._n_updates += 1 + + def learn( # type: ignore[override] + self: SelfCEM, + total_timesteps: int, + callback: MaybeCallback = None, + log_interval: int = 1, + tb_log_name: str = "CEM", + reset_num_timesteps: bool = True, + async_eval: Optional[AsyncEval] = None, + progress_bar: bool = False, + ) -> SelfCEM: + """ + Return a trained model. + + :param total_timesteps: The total number of samples (env steps) to train on + :param callback: callback(s) called at every step with state of the algorithm. + :param log_interval: The number of timesteps before logging. + :param tb_log_name: the name of the run for TensorBoard logging + :param reset_num_timesteps: whether or not to reset the current timestep number (used in logging) + :param async_eval: The object for asynchronous evaluation of candidates. + :return: the trained model + """ + return super().learn( + total_timesteps=total_timesteps, + callback=callback, + log_interval=log_interval, + tb_log_name=tb_log_name, + reset_num_timesteps=reset_num_timesteps, + async_eval=async_eval, + progress_bar=progress_bar, + ) diff --git a/sb3_contrib/cem/policies.py b/sb3_contrib/cem/policies.py new file mode 100644 index 00000000..455b8227 --- /dev/null +++ b/sb3_contrib/cem/policies.py @@ -0,0 +1,4 @@ +from sb3_contrib.common.policies import ESLinearPolicy, ESPolicy + +MlpPolicy = ESPolicy +LinearPolicy = ESLinearPolicy diff --git a/sb3_contrib/common/policies.py b/sb3_contrib/common/policies.py new file mode 100644 index 00000000..6a285a57 --- /dev/null +++ b/sb3_contrib/common/policies.py @@ -0,0 +1,104 @@ +from typing import Any, Optional + +import torch as th +from gymnasium import spaces +from stable_baselines3.common.policies import BasePolicy +from stable_baselines3.common.preprocessing import get_action_dim +from stable_baselines3.common.torch_layers import create_mlp +from stable_baselines3.common.type_aliases import PyTorchObs +from torch import nn + + +class ESPolicy(BasePolicy): + """ + Policy network for population based algorithms (evolution strategies). + + :param observation_space: The observation space of the environment + :param action_space: The action space of the environment + :param net_arch: Network architecture, defaults to a 2 layers MLP with 64 hidden nodes. + :param activation_fn: Activation function + :param with_bias: If set to False, the layers will not learn an additive bias + :param squash_output: For continuous actions, whether the output is squashed + or not using a ``tanh()`` function. If not squashed with tanh the output will instead be clipped. + """ + + def __init__( + self, + observation_space: spaces.Space, + action_space: spaces.Space, + net_arch: Optional[list[int]] = None, + activation_fn: type[nn.Module] = nn.ReLU, + with_bias: bool = True, + squash_output: bool = True, + ): + + super().__init__( + observation_space, + action_space, + squash_output=isinstance(action_space, spaces.Box) and squash_output, + ) + + if net_arch is None: + net_arch = [32] + + self.net_arch = net_arch + self.features_extractor = self.make_features_extractor() + self.features_dim = self.features_extractor.features_dim + self.activation_fn = activation_fn + + if isinstance(action_space, spaces.Box): + action_dim = get_action_dim(action_space) + actor_net = create_mlp( + self.features_dim, action_dim, net_arch, activation_fn, with_bias=with_bias, squash_output=squash_output + ) + elif isinstance(action_space, spaces.Discrete): + actor_net = create_mlp(self.features_dim, int(action_space.n), net_arch, activation_fn, with_bias=with_bias) + else: + raise NotImplementedError(f"Error: ES policy not implemented for action space of type {type(action_space)}.") + + self.action_net = nn.Sequential(*actor_net) + + def _get_constructor_parameters(self) -> dict[str, Any]: + # data = super()._get_constructor_parameters() this adds normalize_images, which we don't support... + data = dict( + observation_space=self.observation_space, + action_space=self.action_space, + net_arch=self.net_arch, + activation_fn=self.activation_fn, + ) + return data + + def forward(self, obs: PyTorchObs) -> th.Tensor: + features = self.extract_features(obs, self.features_extractor) + if isinstance(self.action_space, spaces.Box): + return self.action_net(features) + elif isinstance(self.action_space, spaces.Discrete): + logits = self.action_net(features) + return th.argmax(logits, dim=1) + else: + raise NotImplementedError() + + def _predict(self, observation: PyTorchObs, deterministic: bool = True) -> th.Tensor: + # Non deterministic action does not really make sense for ARS, we ignore this parameter for now.. + return self(observation) + + +class ESLinearPolicy(ESPolicy): + """ + Linear policy network for evolution strategies (ES). + + :param observation_space: The observation space of the environment + :param action_space: The action space of the environment + :param with_bias: With or without bias on the output + :param squash_output: For continuous actions, whether the output is squashed + or not using a ``tanh()`` function. If not squashed with tanh the output will instead be clipped. + """ + + def __init__( + self, + observation_space: spaces.Space, + action_space: spaces.Space, + with_bias: bool = False, + squash_output: bool = False, + ): + super().__init__(observation_space, action_space, net_arch=[], with_bias=with_bias, squash_output=squash_output) diff --git a/sb3_contrib/common/population_based_algorithm.py b/sb3_contrib/common/population_based_algorithm.py new file mode 100644 index 00000000..02462043 --- /dev/null +++ b/sb3_contrib/common/population_based_algorithm.py @@ -0,0 +1,310 @@ +import copy +import sys +import time +from collections import deque +from functools import partial +from typing import Any, ClassVar, Optional, TypeVar, Union + +import numpy as np +import torch as th +from gymnasium import spaces +from stable_baselines3.common.base_class import BaseAlgorithm +from stable_baselines3.common.callbacks import BaseCallback +from stable_baselines3.common.evaluation import evaluate_policy +from stable_baselines3.common.policies import BasePolicy +from stable_baselines3.common.running_mean_std import RunningMeanStd +from stable_baselines3.common.save_util import load_from_zip_file +from stable_baselines3.common.type_aliases import GymEnv, MaybeCallback, Schedule, TensorDict +from stable_baselines3.common.utils import safe_mean + +from sb3_contrib.common.policies import ESLinearPolicy, ESPolicy +from sb3_contrib.common.vec_env.async_eval import AsyncEval + +SelfPopulationBased = TypeVar("SelfPopulationBased", bound="PopulationBasedAlgorithm") + + +class PopulationBasedAlgorithm(BaseAlgorithm): + """ + Base class for population based algorithms like + Evolution Strategies (ES), which does black-box optimization + by sampling and evaluating a population of candidates: + https://blog.otoro.net/2017/10/29/visual-evolution-strategies/ + + :param policy: The policy to train, can be an instance of ``ESPolicy``, or a string from ["LinearPolicy", "MlpPolicy"] + :param env: The environment to train on, may be a string if registered with gym + :param pop_size: Population size (number of individuals) + :param alive_bonus_offset: Constant added to the reward at each step, used to cancel out alive bonuses. + :param n_eval_episodes: Number of episodes to evaluate each candidate. + :param policy_kwargs: Keyword arguments to pass to the policy on creation + :param stats_window_size: Window size for the rollout logging, specifying the number of episodes to average + the reported success rate, mean episode length, and mean reward over + :param tensorboard_log: String with the directory to put tensorboard logs: + :param seed: Random seed for the training + :param verbose: Verbosity level: 0 no output, 1 info, 2 debug + :param device: Torch device to use for training, defaults to "cpu" + :param _init_setup_model: Whether or not to build the network at the creation of the instance + """ + + policy_aliases: ClassVar[dict[str, type[BasePolicy]]] = { + "MlpPolicy": ESPolicy, + "LinearPolicy": ESLinearPolicy, + } + + def __init__( + self, + policy: Union[str, type[ESPolicy]], + env: Union[GymEnv, str], + learning_rate: Union[float, Schedule], + pop_size: int = 16, + alive_bonus_offset: float = 0, + n_eval_episodes: int = 1, + policy_kwargs: Optional[dict[str, Any]] = None, + stats_window_size: int = 100, + tensorboard_log: Optional[str] = None, + seed: Optional[int] = None, + verbose: int = 0, + device: Union[th.device, str] = "cpu", + supported_action_spaces: Optional[tuple[type[spaces.Space], ...]] = None, + ): + + super().__init__( + policy, + env, + learning_rate=learning_rate, + stats_window_size=stats_window_size, + tensorboard_log=tensorboard_log, + policy_kwargs=policy_kwargs, + verbose=verbose, + device=device, + supported_action_spaces=supported_action_spaces, + support_multi_env=True, + seed=seed, + ) + + self.pop_size = pop_size + self.n_eval_episodes = n_eval_episodes + self.alive_bonus_offset = alive_bonus_offset + + # Keep track of how many steps where elapsed before a new rollout + # Important for syncing observation normalization between workers + self.old_count = 0 + + def _mimic_monitor_wrapper(self, episode_rewards: np.ndarray, episode_lengths: np.ndarray) -> None: + """ + Helper to mimic Monitor wrapper and report episode statistics (mean reward, mean episode length). + + :param episode_rewards: List containing per-episode rewards + :param episode_lengths: List containing per-episode lengths (in number of steps) + """ + # Mimic Monitor Wrapper + infos = [ + {"episode": {"r": episode_reward, "l": episode_length}} + for episode_reward, episode_length in zip(episode_rewards, episode_lengths) + ] + + self._update_info_buffer(infos) + + def _trigger_callback( + self, + _locals: dict[str, Any], + _globals: dict[str, Any], + callback: BaseCallback, + n_envs: int, + ) -> None: + """ + Callback passed to the ``evaluate_policy()`` helper + in order to increment the number of timesteps + and trigger events in the single process version. + + :param _locals: + :param _globals: + :param callback: Callback that will be called at every step + :param n_envs: Number of environments + """ + self.num_timesteps += n_envs + callback.on_step() + + def evaluate_candidates( + self, candidate_weights: th.Tensor, callback: BaseCallback, async_eval: Optional[AsyncEval] + ) -> th.Tensor: + """ + Evaluate each candidate. + + :param candidate_weights: The candidate weights to be evaluated. + :param callback: Callback that will be called at each step + (or after evaluation in the multiprocess version) + :param async_eval: The object for asynchronous evaluation of candidates. + :return: The episodic return for each candidate. + """ + + batch_steps = 0 + # returns == sum of rewards + candidate_returns = th.zeros(self.pop_size, device=self.device) + train_policy = copy.deepcopy(self.policy) + # Empty buffer to show only mean over one iteration (one set of candidates) in the logs + self.ep_info_buffer = deque(maxlen=self._stats_window_size) + callback.on_rollout_start() + + if async_eval is not None: + # Multiprocess asynchronous version + async_eval.send_jobs(candidate_weights, self.pop_size) + results = async_eval.get_results() + + for weights_idx, (episode_rewards, episode_lengths) in results: + + # Update reward to cancel out alive bonus if needed + candidate_returns[weights_idx] = sum(episode_rewards) + self.alive_bonus_offset * sum(episode_lengths) + batch_steps += np.sum(episode_lengths) + self._mimic_monitor_wrapper(episode_rewards, episode_lengths) + + # Combine the filter stats of each process for normalization + for worker_obs_rms in async_eval.get_obs_rms(): + if self._vec_normalize_env is not None: + assert isinstance( + self._vec_normalize_env.obs_rms, RunningMeanStd + ), "ES Algorithms don't support dict obs with normalization yet" + # worker_obs_rms.count -= self.old_count + self._vec_normalize_env.obs_rms.combine(worker_obs_rms) + # Hack: don't count timesteps twice (between the two are synced) + # otherwise it will lead to overflow, + # in practice we would need two RunningMeanStats + self._vec_normalize_env.obs_rms.count -= self.old_count + + # Synchronise VecNormalize if needed + if self._vec_normalize_env is not None: + assert isinstance( + self._vec_normalize_env.obs_rms, RunningMeanStd + ), "ES Algorithms don't support dict obs with normalization yet" + async_eval.sync_obs_rms(self._vec_normalize_env.obs_rms.copy()) + self.old_count = self._vec_normalize_env.obs_rms.count + + # Hack to have Callback events + for _ in range(batch_steps // len(async_eval.remotes)): + self.num_timesteps += len(async_eval.remotes) + callback.on_step() + else: + assert self.env is not None + # Single process, synchronous version + for weights_idx in range(self.pop_size): + + # Load current candidate weights + train_policy.load_from_vector(candidate_weights[weights_idx].cpu().numpy()) + # Evaluate the candidate + episode_rewards, episode_lengths = evaluate_policy( # type: ignore[assignment] + train_policy, + self.env, + n_eval_episodes=self.n_eval_episodes, + return_episode_rewards=True, + # Increment num_timesteps too (slight mismatch with multi envs) + callback=partial(self._trigger_callback, callback=callback, n_envs=self.env.num_envs), + warn=False, + ) + # Update reward to cancel out alive bonus if needed + candidate_returns[weights_idx] = sum(episode_rewards) + self.alive_bonus_offset * sum(episode_lengths) + batch_steps += sum(episode_lengths) + self._mimic_monitor_wrapper(episode_rewards, episode_lengths) + + # Note: we increment the num_timesteps inside the evaluate_policy() + # however when using multiple environments, there will be a slight + # mismatch between the number of timesteps used and the number + # of calls to the step() method (cf. implementation of evaluate_policy()) + # self.num_timesteps += batch_steps + + callback.on_rollout_end() + + return candidate_returns + + def dump_logs(self) -> None: + """ + Dump information to the logger. + """ + assert self.ep_info_buffer is not None + assert self.ep_success_buffer is not None + + time_elapsed = max((time.time_ns() - self.start_time) / 1e9, sys.float_info.epsilon) + fps = int((self.num_timesteps - self._num_timesteps_at_start) / (time_elapsed + 1e-8)) + if len(self.ep_info_buffer) > 0 and len(self.ep_info_buffer[0]) > 0: + self.logger.record("rollout/ep_rew_mean", safe_mean([ep_info["r"] for ep_info in self.ep_info_buffer])) + self.logger.record("rollout/ep_len_mean", safe_mean([ep_info["l"] for ep_info in self.ep_info_buffer])) + self.logger.record("time/fps", fps) + self.logger.record("time/time_elapsed", int(time_elapsed), exclude="tensorboard") + self.logger.record("time/total_timesteps", self.num_timesteps, exclude="tensorboard") + if len(self.ep_success_buffer) > 0: + self.logger.record("rollout/success_rate", safe_mean(self.ep_success_buffer)) + self.logger.dump(step=self.num_timesteps) + + def _do_one_update(self, callback: BaseCallback, async_eval: Optional[AsyncEval]) -> None: + """ + Sample new candidates, evaluate them and then update current policy. + + :param callback: callback(s) called at every step with state of the algorithm. + :param async_eval: The object for asynchronous evaluation of candidates. + """ + raise NotImplementedError() + + def learn( # type: ignore[override] + self: SelfPopulationBased, + total_timesteps: int, + callback: MaybeCallback = None, + log_interval: int = 1, + tb_log_name: str = "ES", + reset_num_timesteps: bool = True, + async_eval: Optional[AsyncEval] = None, + progress_bar: bool = False, + ) -> SelfPopulationBased: + """ + Return a trained model. + + :param total_timesteps: The total number of samples (env steps) to train on + :param callback: callback(s) called at every step with state of the algorithm. + :param log_interval: The number of timesteps before logging. + :param tb_log_name: the name of the run for TensorBoard logging + :param reset_num_timesteps: whether or not to reset the current timestep number (used in logging) + :param async_eval: The object for asynchronous evaluation of candidates. + :return: the trained model + """ + + total_steps, callback = self._setup_learn( + total_timesteps, + callback, + reset_num_timesteps, + tb_log_name, + progress_bar, + ) + + callback.on_training_start(locals(), globals()) + + while self.num_timesteps < total_steps: + self._update_current_progress_remaining(self.num_timesteps, total_timesteps) + self._do_one_update(callback, async_eval) + if log_interval is not None and self._n_updates % log_interval == 0: + self.dump_logs() + + if async_eval is not None: + async_eval.close() + + callback.on_training_end() + + return self + + def set_parameters( + self, + load_path_or_dict: Union[str, TensorDict], + exact_match: bool = True, + device: Union[th.device, str] = "auto", + ) -> None: + # Patched set_parameters() to handle ARS linear policy saved with sb3-contrib < 1.7.0 + params = None + if isinstance(load_path_or_dict, dict): + params = load_path_or_dict + else: + _, params, _ = load_from_zip_file(load_path_or_dict, device=device) + + # Patch to load LinearPolicy saved using sb3-contrib < 1.7.0 + # See https://github.com/Stable-Baselines-Team/stable-baselines3-contrib/pull/122#issuecomment-1331981230 + for name in {"weight", "bias"}: + if f"action_net.{name}" in params.get("policy", {}): + params["policy"][f"action_net.0.{name}"] = params["policy"][f"action_net.{name}"] # type: ignore[index] + del params["policy"][f"action_net.{name}"] # type: ignore[attr-defined] + + super().set_parameters(params, exact_match=exact_match) diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 00000000..80fe629b --- /dev/null +++ b/setup.cfg @@ -0,0 +1,57 @@ +[metadata] +# This includes the license file in the wheel. +license_files = LICENSE + +[tool:pytest] +# Deterministic ordering for tests; useful for pytest-xdist. +env = + PYTHONHASHSEED=0 +filterwarnings = + # Tensorboard warnings + ignore::DeprecationWarning:tensorboard + # Gym warnings + ignore:Parameters to load are deprecated.:DeprecationWarning + ignore:the imp module is deprecated in favour of importlib:PendingDeprecationWarning + ignore::UserWarning:gym +markers = + slow: marks tests as slow (deselect with '-m "not slow"') + +[pytype] +inputs = sb3_contrib + +[flake8] +ignore = W503,W504,E203,E231 # line breaks before and after binary operators +# Ignore import not used when aliases are defined +per-file-ignores = + ./sb3_contrib/__init__.py:F401 + ./sb3_contrib/ars/__init__.py:F401 + ./sb3_contrib/cem/__init__.py:F401 + ./sb3_contrib/ppo_mask/__init__.py:F401 + ./sb3_contrib/ppo_recurrent/__init__.py:F401 + ./sb3_contrib/qrdqn/__init__.py:F401 + ./sb3_contrib/tqc/__init__.py:F401 + ./sb3_contrib/trpo/__init__.py:F401 + ./sb3_contrib/common/wrappers/__init__.py:F401 + ./sb3_contrib/common/envs/__init__.py:F401 + ./sb3_contrib/common/vec_env/__init__.py:F401 + +exclude = + # No need to traverse our git directory + .git, + # There's no value in checking cache directories + __pycache__, + # Don't check the doc + docs/ + # This contains our built documentation + build, + # This contains builds of flake8 that we don't want to check + dist + *.egg-info +max-complexity = 15 +# The GitHub editor is 127 chars wide +max-line-length = 127 + +[isort] +profile = black +line_length = 127 +src_paths = sb3_contrib diff --git a/tests/test_run.py b/tests/test_run.py index fd4adf64..55ff2c47 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -3,7 +3,7 @@ from stable_baselines3.common.env_util import make_vec_env from stable_baselines3.common.vec_env import VecNormalize -from sb3_contrib import ARS, QRDQN, TQC, TRPO, CrossQ, MaskablePPO +from sb3_contrib import ARS, CEM, QRDQN, TQC, TRPO, CrossQ, MaskablePPO from sb3_contrib.common.envs import InvalidActionEnvDiscrete from sb3_contrib.common.vec_env import AsyncEval @@ -103,13 +103,25 @@ def test_ars(policy_str, env_id): model.learn(total_timesteps=500, log_interval=1) -def test_ars_multi_env(): +@pytest.mark.parametrize("env_id", ["CartPole-v1", "Pendulum-v1"]) +@pytest.mark.parametrize("policy_str", ["LinearPolicy", "MlpPolicy"]) +def test_cem(policy_str, env_id): + model = CEM(policy_str, env_id, pop_size=2, verbose=1, seed=0) + model.learn(total_timesteps=500, log_interval=1) + + +@pytest.mark.parametrize("model_class", [ARS, CEM]) +def test_es_multi_env(model_class): env = make_vec_env("Pendulum-v1", n_envs=2) - model = ARS("MlpPolicy", env, n_delta=1) + kwargs = dict(n_delta=1) if model_class == ARS else dict(pop_size=2) + + model = model_class("MlpPolicy", env, **kwargs) model.learn(total_timesteps=250) + kwargs = dict(n_delta=2) if model_class == ARS else dict(pop_size=3) + env = VecNormalize(make_vec_env("Pendulum-v1", n_envs=1)) - model = ARS("MlpPolicy", env, n_delta=2, seed=0) + model = model_class("MlpPolicy", env, seed=0, **kwargs) # with parallelism async_eval = AsyncEval([lambda: VecNormalize(make_vec_env("Pendulum-v1", n_envs=1)) for _ in range(2)], model.policy) async_eval.seed(0)