diff --git a/hyperparams/human.yml b/hyperparams/human.yml new file mode 100644 index 000000000..5c1d1c473 --- /dev/null +++ b/hyperparams/human.yml @@ -0,0 +1,7 @@ +# Space Engineers envs +SE-WalkingTest-v1: + env_wrapper: + - utils.wrappers.HistoryWrapper: + horizon: 2 + n_timesteps: !!float 2e6 + policy: 'MlpPolicy' diff --git a/hyperparams/tqc.yml b/hyperparams/tqc.yml index f10dc4aff..68e5b70d9 100644 --- a/hyperparams/tqc.yml +++ b/hyperparams/tqc.yml @@ -254,3 +254,97 @@ parking-v0: n_sampled_goal=4, max_episode_length=100 )" + +# Space Engineers envs +SE-Forward-v1: &defaults + env_wrapper: + - utils.wrappers.HistoryWrapper: + horizon: 2 + vec_env_wrapper: + - utils.wrappers.VecForceResetWrapper + callback: + - utils.callbacks.ParallelTrainCallback: + gradient_steps: 400 + # - utils.callbacks.StopTrainingOnMeanRewardThreshold: + # reward_threshold: 250 + # verbose: 1 + n_timesteps: !!float 5e6 + policy: 'MlpPolicy' + learning_rate: !!float 7.3e-4 + buffer_size: 100000 + batch_size: 256 + ent_coef: 'auto' + gamma: 0.98 + tau: 0.05 + # train_freq: [1, "episode"] + train_freq: 100 + n_envs: 4 + gradient_steps: -1 + learning_starts: 800 + use_sde: False + top_quantiles_to_drop_per_net: 2 + policy_kwargs: "dict(net_arch=[256, 256], n_critics=2)" + +SE-Symmetric-v1: + <<: *defaults + +SE-Corrections-v1: + <<: *defaults + +SE-Generic-v1: + <<: *defaults + callback: + - utils.callbacks.ParallelTrainCallback: + gradient_steps: 400 + # - utils.callbacks.StopTrainingOnMeanRewardThreshold: + # reward_threshold: 250 + # verbose: 1 + +SE-TurnLeft-v1: + <<: *defaults + callback: + - utils.callbacks.ParallelTrainCallback: + gradient_steps: 400 + - utils.callbacks.StopTrainingOnMeanRewardThreshold: + reward_threshold: 250 + verbose: 1 + +SE-MultiTask-v1: + <<: *defaults + # policy: 'MixtureMlpPolicy' + learning_rate: !!float 7.3e-4 + # gamma: 0.99 + # tau: 0.005 + buffer_size: 200000 + callback: + - utils.callbacks.ParallelTrainCallback: + gradient_steps: 400 + # policy_kwargs: "dict(net_arch=[400, 300], n_critics=2, n_additional_experts=2)" + policy_kwargs: "dict(net_arch=[256, 256], n_critics=5)" + + +# ======== Real Robot envs ============ + +WalkingBertSim-v1: + env_wrapper: + - utils.wrappers.HistoryWrapper: + horizon: 2 + callback: + - utils.callbacks.ParallelTrainCallback: + gradient_steps: 400 + n_timesteps: !!float 2e6 + policy: 'MlpPolicy' + learning_rate: !!float 7.3e-4 + buffer_size: 300000 + batch_size: 256 + ent_coef: 'auto' + gamma: 0.98 + tau: 0.02 + train_freq: [1, "episode"] + gradient_steps: -1 + learning_starts: 1200 + use_sde_at_warmup: True + use_sde: True + sde_sample_freq: 4 + top_quantiles_to_drop_per_net: 2 + policy_kwargs: "dict(log_std_init=-3, net_arch=[256, 256], n_critics=2)" diff --git a/requirements.txt b/requirements.txt index f4ea8a8ae..d5ea73db9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,8 @@ gym>=0.17,<0.20 -stable-baselines3[extra,tests,docs]>=1.3.1a8 -sb3-contrib>=1.3.1a7 -box2d-py==2.3.8 -pybullet +# stable-baselines3[extra,tests,docs]>=1.3.0 +sb3-contrib>=1.3.0 +# box2d-py==2.3.8 +# pybullet gym-minigrid scikit-optimize optuna @@ -11,7 +11,9 @@ seaborn pyyaml>=5.1 cloudpickle>=1.5.0 # tmp fix: ROM missing in newest release -atari-py==0.2.6 +# atari-py==0.2.6 plotly -panda-gym==1.1.1 # tmp fix: until compatibility with panda-gym v2 -rliable>=1.0.5 +pygame +# panda-gym>=1.1.1 +# rliable requires python 3.7+ +# rliable>=1.0.5 diff --git a/setup.cfg b/setup.cfg index c74b8320e..6841b0fcb 100644 --- a/setup.cfg +++ b/setup.cfg @@ -20,6 +20,7 @@ per-file-ignores = ./scripts/all_plots.py:E501 ./scripts/plot_train.py:E501 ./scripts/plot_training_success.py:E501 + ./utils/teleop.py:F405 exclude = # No need to traverse our git directory diff --git a/utils/callbacks.py b/utils/callbacks.py index f156f0e5d..394c9611c 100644 --- a/utils/callbacks.py +++ b/utils/callbacks.py @@ -1,4 +1,5 @@ import os +import pickle import tempfile import time from copy import deepcopy @@ -10,7 +11,8 @@ from sb3_contrib import TQC from stable_baselines3 import SAC from stable_baselines3.common.callbacks import BaseCallback, EvalCallback -from stable_baselines3.common.vec_env import VecEnv +from stable_baselines3.common.utils import safe_mean +from stable_baselines3.common.vec_env import VecEnv, sync_envs_normalization class TrialEvalCallback(EvalCallback): @@ -129,6 +131,12 @@ def _init_callback(self) -> None: self.model.save(temp_file) + if self.model.get_vec_normalize_env() is not None: + temp_file_norm = os.path.join("logs", "vec_normalize.pkl") + + with open(temp_file_norm, "wb") as file_handler: + pickle.dump(self.model.get_vec_normalize_env(), file_handler) + # TODO: add support for other algorithms for model_class in [SAC, TQC]: if isinstance(self.model, model_class): @@ -138,6 +146,11 @@ def _init_callback(self) -> None: assert self.model_class is not None, f"{self.model} is not supported for parallel training" self._model = self.model_class.load(temp_file) + if self.model.get_vec_normalize_env() is not None: + with open(temp_file_norm, "rb") as file_handler: + self._model._vec_normalize_env = pickle.load(file_handler) + self._model._vec_normalize_env.training = False + self.batch_size = self._model.batch_size # Disable train method @@ -182,6 +195,10 @@ def _on_rollout_end(self) -> None: self._model.replay_buffer = deepcopy(self.model.replay_buffer) self.model.set_parameters(deepcopy(self._model.get_parameters())) self.model.actor = self.model.policy.actor + # Sync VecNormalize + if self.model.get_vec_normalize_env() is not None: + sync_envs_normalization(self.model.get_vec_normalize_env(), self._model._vec_normalize_env) + if self.num_timesteps >= self._model.learning_starts: self.train() # Do not wait for the training loop to finish @@ -193,3 +210,31 @@ def _on_training_end(self) -> None: if self.verbose > 0: print("Waiting for training thread to terminate") self.process.join() + + +class StopTrainingOnMeanRewardThreshold(BaseCallback): + """ + Stop the training once a threshold in mean episodic reward + has been reached (i.e. when the model is good enough). + + :param reward_threshold: Minimum expected reward per episode + to stop training. + :param verbose: + """ + + def __init__(self, reward_threshold: float, verbose: int = 0): + super().__init__(verbose=verbose) + self.reward_threshold = reward_threshold + + def _on_step(self) -> bool: + continue_training = True + if len(self.model.ep_info_buffer) > 0 and len(self.model.ep_info_buffer[0]) > 0: + mean_reward = safe_mean([ep_info["r"] for ep_info in self.model.ep_info_buffer]) + # Convert np.bool_ to bool, otherwise callback() is False won't work + continue_training = bool(mean_reward < self.reward_threshold) + if self.verbose > 0 and not continue_training: + print( + f"Stopping training because the mean reward {mean_reward:.2f} " + f" is above the threshold {self.reward_threshold}" + ) + return continue_training diff --git a/utils/exp_manager.py b/utils/exp_manager.py index 53a9d620e..ee9abe6af 100644 --- a/utils/exp_manager.py +++ b/utils/exp_manager.py @@ -11,6 +11,7 @@ import numpy as np import optuna import yaml +import zmq from optuna.integration.skopt import SkoptSampler from optuna.pruners import BasePruner, MedianPruner, SuccessiveHalvingPruner from optuna.samplers import BaseSampler, RandomSampler, TPESampler @@ -29,6 +30,7 @@ DummyVecEnv, SubprocVecEnv, VecEnv, + VecEnvWrapper, VecFrameStack, VecNormalize, VecTransposeImage, @@ -99,6 +101,7 @@ def __init__( self.env_wrapper = None self.frame_stack = None self.seed = seed + self.vec_env_wrapper = None self.optimization_log_path = optimization_log_path self.vec_env_class = {"dummy": DummyVecEnv, "subproc": SubprocVecEnv}[vec_env_type] @@ -160,7 +163,7 @@ def setup_experiment(self) -> Optional[BaseAlgorithm]: :return: the initialized RL model """ hyperparams, saved_hyperparams = self.read_hyperparameters() - hyperparams, self.env_wrapper, self.callbacks = self._preprocess_hyperparams(hyperparams) + hyperparams, self.env_wrapper, self.callbacks, self.vec_env_wrapper = self._preprocess_hyperparams(hyperparams) self.create_log_folder() self.create_callbacks() @@ -200,12 +203,18 @@ def learn(self, model: BaseAlgorithm) -> None: try: model.learn(self.n_timesteps, **kwargs) - except KeyboardInterrupt: + except (KeyboardInterrupt, zmq.error.ZMQError): # this allows to save the model when interrupting training pass finally: # Release resources try: + # Hack for zmq on Windows to allow early termination + env_tmp = model.env + while isinstance(env_tmp, VecEnvWrapper): + env_tmp = env_tmp.venv + env_tmp.waiting = False + model.env.close() except EOFError: pass @@ -310,7 +319,7 @@ def _preprocess_normalization(self, hyperparams: Dict[str, Any]) -> Dict[str, An def _preprocess_hyperparams( self, hyperparams: Dict[str, Any] - ) -> Tuple[Dict[str, Any], Optional[Callable], List[BaseCallback]]: + ) -> Tuple[Dict[str, Any], Optional[Callable], List[BaseCallback], Optional[Callable]]: self.n_envs = hyperparams.get("n_envs", 1) if self.verbose > 0: @@ -354,12 +363,16 @@ def _preprocess_hyperparams( if "env_wrapper" in hyperparams.keys(): del hyperparams["env_wrapper"] + vec_env_wrapper = get_wrapper_class(hyperparams, "vec_env_wrapper") + if "vec_env_wrapper" in hyperparams.keys(): + del hyperparams["vec_env_wrapper"] + callbacks = get_callback_list(hyperparams) if "callback" in hyperparams.keys(): self.specified_callbacks = hyperparams["callback"] del hyperparams["callback"] - return hyperparams, env_wrapper, callbacks + return hyperparams, env_wrapper, callbacks, vec_env_wrapper def _preprocess_action_noise( self, hyperparams: Dict[str, Any], saved_hyperparams: Dict[str, Any], env: VecEnv @@ -517,6 +530,9 @@ def create_envs(self, n_envs: int, eval_env: bool = False, no_log: bool = False) monitor_kwargs=monitor_kwargs, ) + if self.vec_env_wrapper is not None: + env = self.vec_env_wrapper(env) + # Wrap the env into a VecNormalize wrapper if needed # and load saved statistics when present env = self._maybe_normalize(env, eval_env) @@ -653,9 +669,30 @@ def objective(self, trial: optuna.Trial) -> float: try: model.learn(self.n_timesteps, callback=callbacks) # Free memory + env_tmp = model.env + while isinstance(env_tmp, VecEnvWrapper): + env_tmp = env_tmp.venv + env_tmp.waiting = False + + env_tmp = eval_env + while isinstance(env_tmp, VecEnvWrapper): + env_tmp = env_tmp.venv + env_tmp.waiting = False + model.env.close() eval_env.close() except (AssertionError, ValueError) as e: + # Hack for zmq on Windows to allow early termination + env_tmp = model.env + while isinstance(env_tmp, VecEnvWrapper): + env_tmp = env_tmp.venv + env_tmp.waiting = False + + env_tmp = eval_env + while isinstance(env_tmp, VecEnvWrapper): + env_tmp = env_tmp.venv + env_tmp.waiting = False + # Sometimes, random hyperparams can generate NaN # Free memory model.env.close() diff --git a/utils/hyperparams_opt.py b/utils/hyperparams_opt.py index e37714503..32fbd8f17 100644 --- a/utils/hyperparams_opt.py +++ b/utils/hyperparams_opt.py @@ -218,21 +218,22 @@ def sample_sac_params(trial: optuna.Trial) -> Dict[str, Any]: gamma = trial.suggest_categorical("gamma", [0.9, 0.95, 0.98, 0.99, 0.995, 0.999, 0.9999]) learning_rate = trial.suggest_loguniform("learning_rate", 1e-5, 1) batch_size = trial.suggest_categorical("batch_size", [16, 32, 64, 128, 256, 512, 1024, 2048]) - buffer_size = trial.suggest_categorical("buffer_size", [int(1e4), int(1e5), int(1e6)]) - learning_starts = trial.suggest_categorical("learning_starts", [0, 1000, 10000, 20000]) + buffer_size = trial.suggest_categorical("buffer_size", [int(1e4), int(5e4), int(1e5)]) + learning_starts = trial.suggest_categorical("learning_starts", [0, 1000]) # train_freq = trial.suggest_categorical('train_freq', [1, 10, 100, 300]) - train_freq = trial.suggest_categorical("train_freq", [1, 4, 8, 16, 32, 64, 128, 256, 512]) + # train_freq = trial.suggest_categorical("train_freq", [1, 4, 8, 16, 32, 64, 128, 256, 512]) + train_freq = 100 # Polyak coeff - tau = trial.suggest_categorical("tau", [0.001, 0.005, 0.01, 0.02, 0.05, 0.08]) + tau = trial.suggest_categorical("tau", [0.001, 0.005, 0.008, 0.01, 0.02, 0.05, 0.08]) # gradient_steps takes too much time # gradient_steps = trial.suggest_categorical('gradient_steps', [1, 100, 300]) gradient_steps = train_freq # ent_coef = trial.suggest_categorical('ent_coef', ['auto', 0.5, 0.1, 0.05, 0.01, 0.0001]) ent_coef = "auto" # You can comment that out when not using gSDE - log_std_init = trial.suggest_uniform("log_std_init", -4, 1) + # log_std_init = trial.suggest_uniform("log_std_init", -4, 1) # NOTE: Add "verybig" to net_arch when tuning HER - net_arch = trial.suggest_categorical("net_arch", ["small", "medium", "big"]) + net_arch = trial.suggest_categorical("net_arch", ["small", "medium", "big", "large", "verybig"]) # activation_fn = trial.suggest_categorical('activation_fn', [nn.Tanh, nn.ReLU, nn.ELU, nn.LeakyReLU]) net_arch = { @@ -240,8 +241,8 @@ def sample_sac_params(trial: optuna.Trial) -> Dict[str, Any]: "medium": [256, 256], "big": [400, 300], # Uncomment for tuning HER - # "large": [256, 256, 256], - # "verybig": [512, 512, 512], + "large": [256, 256, 256], + "verybig": [512, 512, 512], }[net_arch] target_entropy = "auto" @@ -249,6 +250,8 @@ def sample_sac_params(trial: optuna.Trial) -> Dict[str, Any]: # # target_entropy = trial.suggest_categorical('target_entropy', ['auto', 5, 1, 0, -1, -5, -10, -20, -50]) # target_entropy = trial.suggest_uniform('target_entropy', -10, 10) + # log_std_init=log_std_init, + hyperparams = { "gamma": gamma, "learning_rate": learning_rate, @@ -260,7 +263,7 @@ def sample_sac_params(trial: optuna.Trial) -> Dict[str, Any]: "ent_coef": ent_coef, "tau": tau, "target_entropy": target_entropy, - "policy_kwargs": dict(log_std_init=log_std_init, net_arch=net_arch), + "policy_kwargs": dict(net_arch=net_arch), } if trial.using_her_replay_buffer: @@ -455,10 +458,12 @@ def sample_tqc_params(trial: optuna.Trial) -> Dict[str, Any]: # TQC is SAC + Distributional RL hyperparams = sample_sac_params(trial) - n_quantiles = trial.suggest_int("n_quantiles", 5, 50) - top_quantiles_to_drop_per_net = trial.suggest_int("top_quantiles_to_drop_per_net", 0, n_quantiles - 1) + # n_quantiles = trial.suggest_int("n_quantiles", 5, 50) + n_quantiles = trial.suggest_categorical("n_quantiles", [15, 25, 50]) + top_quantiles_to_drop_per_net = trial.suggest_int("top_quantiles_to_drop_per_net", 0, 10) + n_critics = trial.suggest_int("n_critics", 1, 2) - hyperparams["policy_kwargs"].update({"n_quantiles": n_quantiles}) + hyperparams["policy_kwargs"].update({"n_quantiles": n_quantiles, "n_critics": n_critics}) hyperparams["top_quantiles_to_drop_per_net"] = top_quantiles_to_drop_per_net return hyperparams diff --git a/utils/import_envs.py b/utils/import_envs.py index fbe0370e3..fc80707a0 100644 --- a/utils/import_envs.py +++ b/utils/import_envs.py @@ -28,6 +28,11 @@ except ImportError: gym_donkeycar = None +try: + import gym_space_engineers # pytype: disable=import-error +except ImportError: + gym_space_engineers = None + try: import panda_gym # pytype: disable=import-error except ImportError: diff --git a/utils/networks.py b/utils/networks.py new file mode 100644 index 000000000..126e53ea1 --- /dev/null +++ b/utils/networks.py @@ -0,0 +1,195 @@ +import os +from typing import Dict, List, Optional, Tuple, Type + +import gym +import torch as th +from sb3_contrib import TQC +from sb3_contrib.tqc.policies import TQCPolicy +from stable_baselines3.common.policies import BasePolicy, register_policy +from stable_baselines3.common.preprocessing import get_action_dim +from stable_baselines3.common.torch_layers import BaseFeaturesExtractor, create_mlp +from torch import nn + +# CAP the standard deviation of the actor +LOG_STD_MAX = 2 +LOG_STD_MIN = -20 + + +class MixtureActor(BasePolicy): + """ + Actor network (policy) for SAC. + + :param observation_space: Obervation space + :param action_space: Action space + :param net_arch: Network architecture + :param features_extractor: Network to extract features + (a CNN when using images, a nn.Flatten() layer otherwise) + :param features_dim: Number of features + :param activation_fn: Activation function + :param use_sde: Whether to use State Dependent Exploration or not + :param log_std_init: Initial value for the log standard deviation + :param full_std: Whether to use (n_features x n_actions) parameters + for the std instead of only (n_features,) when using gSDE. + :param sde_net_arch: Network architecture for extracting features + when using gSDE. If None, the latent features from the policy will be used. + Pass an empty list to use the states as features. + :param use_expln: Use ``expln()`` function instead of ``exp()`` when using gSDE to ensure + a positive standard deviation (cf paper). It allows to keep variance + above zero and prevent it from growing too fast. In practice, ``exp()`` is usually enough. + :param clip_mean: Clip the mean output when using gSDE to avoid numerical instability. + :param normalize_images: Whether to normalize images or not, + dividing by 255.0 (True by default) + """ + + def __init__( + self, + observation_space: gym.spaces.Space, + action_space: gym.spaces.Space, + net_arch: List[int], + features_extractor: nn.Module, + features_dim: int, + activation_fn: Type[nn.Module] = nn.ReLU, + normalize_images: bool = True, + n_additional_experts: int = 0, + # ignore + *_args, + **_kwargs, + ): + super().__init__( + observation_space, + action_space, + features_extractor=features_extractor, + normalize_images=normalize_images, + squash_output=True, + ) + + # Pretrained model + # set BACKWARD_CONTROLLER_PATH=logs\pretrained-tqc\SE-Symmetric-v1_2\SE-Symmetric-v1.zip + # set FORWARD_CONTROLLER_PATH=logs\pretrained-tqc\SE-Symmetric-v1_1\SE-Symmetric-v1.zip + # set TURN_LEFT_CONTROLLER_PATH=logs\pretrained-tqc\SE-TurnLeft-v1_1\SE-TurnLeft-v1.zip + # set TURN_RIGHT_CONTROLLER_PATH=logs\pretrained-tqc\SE-TurnLeft-v1_2\SE-TurnLeft-v1.zip + # set RANDOM_CONTROLLER_PATH=logs\pretrained-tqc\SE-Random-small\SE-TurnLeft-v1.zip + + expert_paths = ["FORWARD", "BACKWARD", "TURN_LEFT", "TURN_RIGHT"] + # Uncomment to start without experts + # expert_paths = [] + self.num_experts = len(expert_paths) + self.n_additional_experts = n_additional_experts + print(f"{n_additional_experts} additional experts") + self.num_experts += self.n_additional_experts + + self.experts = [] + for path in expert_paths: + actor = TQC.load(os.environ[f"{path}_CONTROLLER_PATH"]).actor + self.experts.append(actor) + + # Add additional experts + for _ in range(self.n_additional_experts): + actor = TQC.load(os.environ["RANDOM_CONTROLLER_PATH"]).actor + self.experts.append(actor) + + features_dim = self.experts[0].features_dim + self.experts = nn.ModuleList(self.experts) + # TODO: replace with MLP? + # self.w_gate = nn.Parameter(th.zeros(features_dim, num_experts), requires_grad=True) + gating_net_arch = [64, 64] + gating_net = create_mlp(features_dim, self.num_experts, gating_net_arch, activation_fn) + gating_net += [nn.Softmax(1)] + self.gating_net = nn.Sequential(*gating_net) + self.action_dim = get_action_dim(self.action_space) + self.action_dist = self.experts[0].action_dist + + def get_action_dist_params(self, obs: th.Tensor) -> Tuple[th.Tensor, th.Tensor, Dict[str, th.Tensor]]: + """ + Get the parameters for the action distribution. + + :param obs: + :return: + Mean, standard deviation and optional keyword arguments. + """ + features = self.extract_features(obs) + expert_means = th.zeros(obs.shape[0], self.num_experts, self.action_dim).to(obs.device) + expert_stds = th.zeros(obs.shape[0], self.num_experts, self.action_dim).to(obs.device) + + for i in range(self.num_experts): + # Allow grad for one expert only + with th.set_grad_enabled(i >= self.num_experts - self.n_additional_experts): + latent_pi = self.experts[i].latent_pi(features) + expert_means[:, i, :] = self.experts[i].mu(latent_pi) + # Unstructured exploration (Original implementation) + log_std = self.experts[i].log_std(latent_pi) + # Original Implementation to cap the standard deviation + expert_stds[:, i, :] = th.clamp(log_std, LOG_STD_MIN, LOG_STD_MAX) + + # gates: [batch_size, num_experts] + input_commands = features.clone() + # TODO: extract task features only? + # input_commands[:-2] = 0.0 + gates = self.gating_net(input_commands).unsqueeze(-1) + + # expert_means: [batch_size, num_experts, action_dim] + # mean_actions: [batch_size, action_dim] + mean_actions = (gates * expert_means).sum(dim=1) + log_std = (gates * expert_stds).sum(dim=1) + + return mean_actions, log_std, {} + + def forward(self, obs: th.Tensor, deterministic: bool = False) -> th.Tensor: + mean_actions, log_std, kwargs = self.get_action_dist_params(obs) + # Note: the action is squashed + return self.action_dist.actions_from_params(mean_actions, log_std, deterministic=deterministic, **kwargs) + + def action_log_prob(self, obs: th.Tensor) -> Tuple[th.Tensor, th.Tensor]: + mean_actions, log_std, kwargs = self.get_action_dist_params(obs) + # return action and associated log prob + return self.action_dist.log_prob_from_params(mean_actions, log_std, **kwargs) + + def _predict(self, observation: th.Tensor, deterministic: bool = False) -> th.Tensor: + return self.forward(observation, deterministic) + + +class MixtureMlpPolicy(TQCPolicy): + """ + Policy class (with both actor and critic) for TQC. + + :param observation_space: Observation space + :param action_space: Action space + :param lr_schedule: Learning rate schedule (could be constant) + :param net_arch: The specification of the policy and value networks. + :param activation_fn: Activation function + :param use_sde: Whether to use State Dependent Exploration or not + :param log_std_init: Initial value for the log standard deviation + :param sde_net_arch: Network architecture for extracting features + when using gSDE. If None, the latent features from the policy will be used. + Pass an empty list to use the states as features. + :param use_expln: Use ``expln()`` function instead of ``exp()`` when using gSDE to ensure + a positive standard deviation (cf paper). It allows to keep variance + above zero and prevent it from growing too fast. In practice, ``exp()`` is usually enough. + :param clip_mean: Clip the mean output when using gSDE to avoid numerical instability. + :param features_extractor_class: Features extractor to use. + :param features_extractor_kwargs: Keyword arguments + to pass to the features extractor. + :param normalize_images: Whether to normalize images or not, + dividing by 255.0 (True by default) + :param optimizer_class: The optimizer to use, + ``th.optim.Adam`` by default + :param optimizer_kwargs: Additional keyword arguments, + excluding the learning rate, to pass to the optimizer + :param n_critics: Number of critic networks to create. + :param share_features_extractor: Whether to share or not the features extractor + between the actor and the critic (this saves computation time) + """ + + def __init__(self, *args, n_additional_experts: int = 0, **kwargs): + self.n_additional_experts = n_additional_experts + super(MixtureMlpPolicy, self).__init__( + *args, + **kwargs, + ) + + def make_actor(self, features_extractor: Optional[BaseFeaturesExtractor] = None) -> MixtureActor: + actor_kwargs = self._update_features_extractor(self.actor_kwargs, features_extractor) + return MixtureActor(n_additional_experts=self.n_additional_experts, **actor_kwargs).to(self.device) + + +register_policy("MixtureMlpPolicy", MixtureMlpPolicy) diff --git a/utils/teleop.py b/utils/teleop.py new file mode 100644 index 000000000..b855c3891 --- /dev/null +++ b/utils/teleop.py @@ -0,0 +1,298 @@ +import os +import time +from typing import List, Optional, Tuple + +import numpy as np +import pygame +from gym_space_engineers.envs.walking_robot_ik import Task +from pygame.locals import * # noqa: F403 +from sb3_contrib import TQC +from stable_baselines3.common.base_class import BaseAlgorithm + +# TELEOP_RATE = 1 / 60 + +UP = (1, 0) +LEFT = (0, 1) +RIGHT = (0, -1) +DOWN = (-1, 0) +STOP = (0, 0) +KEY_CODE_SPACE = 32 + +# MAX_TURN = 1 +# # Smoothing constants +# STEP_THROTTLE = 0.8 +# STEP_TURN = 0.8 + +GREEN = (72, 205, 40) +RED = (205, 39, 46) +GREY = (187, 179, 179) +BLACK = (36, 36, 36) +WHITE = (230, 230, 230) +ORANGE = (200, 110, 0) + +# pytype: disable=name-error +moveBindingsGame = {K_UP: UP, K_LEFT: LEFT, K_RIGHT: RIGHT, K_DOWN: DOWN} # noqa: F405 +# pytype: enable=name-error +pygame.font.init() +FONT = pygame.font.SysFont("Open Sans", 25) +SMALL_FONT = pygame.font.SysFont("Open Sans", 20) +KEY_MIN_DELAY = 0.1 + + +class HumanTeleop(BaseAlgorithm): + def __init__( + self, + policy, + env, + tensorboard_log=None, + verbose=0, + seed=None, + device=None, + _init_setup_model: bool = False, + forward_controller_path: str = os.environ.get("FORWARD_CONTROLLER_PATH"), # noqa: B008 + backward_controller_path: str = os.environ.get("BACKWARD_CONTROLLER_PATH"), # noqa: B008 + turn_left_controller_path: str = os.environ.get("TURN_LEFT_CONTROLLER_PATH"), # noqa: B008 + turn_right_controller_path: str = os.environ.get("TURN_RIGHT_CONTROLLER_PATH"), # noqa: B008 + multi_controller_path: str = os.environ.get("MULTI_CONTROLLER_PATH"), # noqa: B008 + deterministic: bool = True, + ): + self.multi_controller_path = multi_controller_path + if multi_controller_path is None: + assert forward_controller_path is not None + assert backward_controller_path is not None + assert turn_left_controller_path is not None + assert turn_right_controller_path is not None + # Pretrained model + # set BACKWARD_CONTROLLER_PATH=logs\pretrained-tqc\SE-Symmetric-v1_2\SE-Symmetric-v1.zip + # set FORWARD_CONTROLLER_PATH=logs\pretrained-tqc\SE-Symmetric-v1_1\SE-Symmetric-v1.zip + # set TURN_LEFT_CONTROLLER_PATH=logs\pretrained-tqc\SE-TurnLeft-v1_1\SE-TurnLeft-v1.zip + # set TURN_RIGHT_CONTROLLER_PATH=logs\pretrained-tqc\SE-TurnLeft-v1_2\SE-TurnLeft-v1.zip + self.forward_controller = TQC.load(forward_controller_path) + self.backward_controller = TQC.load(backward_controller_path) + self.turn_left_controller = TQC.load(turn_left_controller_path) + self.turn_right_controller = TQC.load(turn_right_controller_path) + else: + # set MULTI_CONTROLLER_PATH=logs\multi-task-save\SE-MultiTask-v1_9/rl_model_250000_steps.zip + # set MULTI_CONTROLLER_PATH=logs\multi-task-save\SE-MultiTask-v1_10/rl_model_749925_steps.zip + self.forward_controller = TQC.load(multi_controller_path) + self.backward_controller = self.forward_controller + self.turn_left_controller = self.forward_controller + self.turn_right_controller = self.forward_controller + + super(HumanTeleop, self).__init__( + policy=None, env=env, policy_base=None, learning_rate=0.0, verbose=verbose, seed=seed + ) + + # Used to prevent from multiple successive key press + self.last_time_pressed = {} + self.event_buttons = None + self.exit_thread = False + self.process = None + self.window = None + self.max_speed = 0.0 + self._last_task = None + + self.deterministic = deterministic + + def _excluded_save_params(self) -> List[str]: + """ + Returns the names of the parameters that should be excluded by default + when saving the model. + + :return: (List[str]) List of parameters that should be excluded from save + """ + # Exclude aliases + return super()._excluded_save_params() + ["process", "window", "forward_controller", "turn_controller", "exit_thread"] + + def _setup_model(self): + self.exit_thread = False + + def init_buttons(self): + """ + Initialize the last_time_pressed timers that prevent + successive key press. + """ + self.event_buttons = [] + for key in self.event_buttons: + self.last_time_pressed[key] = 0 + + def check_key(self, keys, key): + """ + Check if a key was pressed and update associated timer. + + :param keys: (dict) + :param key: (any hashable type) + :return: (bool) Returns true when a given key was pressed, False otherwise + """ + if key is None: + return False + if keys[key] and (time.time() - self.last_time_pressed[key]) > KEY_MIN_DELAY: + # avoid multiple key press + self.last_time_pressed[key] = time.time() + return True + return False + + def handle_keys_event(self, keys): + """ + Handle the events induced by key press: + e.g. change of mode, toggling recording, ... + """ + + # Switch from "MANUAL" to "AUTONOMOUS" mode + # if self.check_key(keys, self.button_switch_mode) or self.check_key(keys, self.button_pause): + # self.is_manual = not self.is_manual + + def main_loop(self, total_timesteps=-1): + """ + Pygame loop that listens to keyboard events. + """ + pygame.init() + # Create a pygame window + self.window = pygame.display.set_mode((200, 200), RESIZABLE) # pytype: disable=name-error + + # Init values and fill the screen + move, task = "stay", None + # TODO: implement "stay" + self.update_screen(move) + + n_steps = 0 + action = np.array([self.env.action_space.sample()]) * 0.0 + self.max_speed = self.env.get_attr("max_speed") + + while not self.exit_thread: + x, theta = 0, 0 + # Record pressed keys + keys = pygame.key.get_pressed() + for keycode in moveBindingsGame.keys(): + if keys[keycode]: + x_tmp, th_tmp = moveBindingsGame[keycode] + x += x_tmp + theta += th_tmp + + self.handle_keys_event(keys) + # For now only handle one model at once + if x > 0: + move = "forward" + elif x < 0: + move = "backward" + elif theta < 0: + move = "turn_right" + elif theta > 0: + move = "turn_left" + else: + move = "stay" + + if move != "stay": + task = Task(move) + # Check if the task has changed + if task != self._last_task: + self.env.env_method("change_task", task) + self._last_task = task + # Re-enable joints movement + self.env.set_attr("max_speed", self.max_speed) + # TODO: update for the frame stack by stepping fast in the env? + # self._last_obs = self.env.env_method("change_task", task) + + controller = { + Task.FORWARD: self.forward_controller, + Task.BACKWARD: self.backward_controller, + Task.TURN_LEFT: self.turn_left_controller, + Task.TURN_RIGHT: self.turn_right_controller, + }[task] + + action, _ = controller.predict(self._last_obs, deterministic=self.deterministic) + # TODO for multi policy: display proba for each expert + else: + task = None + # Keep the joints at the current position + self.env.set_attr("max_speed", 0.0) + + self._last_obs, reward, done, infos = self.env.step(action) + + self.update_screen(move) + + n_steps += 1 + if total_timesteps > 0: + self.exit_thread = n_steps >= total_timesteps + + for event in pygame.event.get(): + if (event.type == QUIT or event.type == KEYDOWN) and event.key in [ # pytype: disable=name-error + K_ESCAPE, # pytype: disable=name-error + K_q, # pytype: disable=name-error + ]: + self.exit_thread = True + pygame.display.flip() + # Limit FPS + # pygame.time.Clock().tick(1 / TELEOP_RATE) + + def write_text(self, text, x, y, font, color=GREY): + """ + :param text: (str) + :param x: (int) + :param y: (int) + :param font: (str) + :param color: (tuple) + """ + text = str(text) + text = font.render(text, True, color) + self.window.blit(text, (x, y)) + + def clear(self) -> None: + self.window.fill((0, 0, 0)) + + def update_screen(self, move: str) -> None: + """ + Update pygame window. + + :param action: + """ + self.clear() + self.write_text(f"Task: {move}", 50, 50, FONT, WHITE) + + def _get_torch_save_params(self) -> Tuple[List[str], List[str]]: + """ + Get the name of the torch variables that will be saved. + ``th.save`` and ``th.load`` will be used with the right device + instead of the default pickling strategy. + + :return: (Tuple[List[str], List[str]]) + name of the variables with state dicts to save, name of additional torch tensors, + """ + return [], [] + + def learn( + self, + total_timesteps, + callback=None, + log_interval=100, + tb_log_name="run", + eval_env=None, + eval_freq=-1, + n_eval_episodes=5, + eval_log_path=None, + reset_num_timesteps=True, + ) -> "HumanTeleop": + self._last_obs = self.env.reset() + self.main_loop(total_timesteps) + + return self + + def predict( + self, + observation: np.ndarray, + state: Optional[np.ndarray] = None, + mask: Optional[np.ndarray] = None, + deterministic: bool = False, + ) -> Tuple[np.ndarray, Optional[np.ndarray]]: + """ + Get the model's action(s) from an observation + + :param observation: (np.ndarray) the input observation + :param state: (Optional[np.ndarray]) The last states (can be None, used in recurrent policies) + :param mask: (Optional[np.ndarray]) The last masks (can be None, used in recurrent policies) + :param deterministic: (bool) Whether or not to return deterministic actions. + :return: (Tuple[np.ndarray, Optional[np.ndarray]]) the model's action and the next state + (used in recurrent policies) + """ + # TODO: launch separate thread to handle user keyboard events + return self.model.predict(observation, deterministic) diff --git a/utils/utils.py b/utils/utils.py index bf42d3b39..1bd259e74 100644 --- a/utils/utils.py +++ b/utils/utils.py @@ -18,6 +18,10 @@ # For custom activation fn from torch import nn as nn # noqa: F401 pylint: disable=unused-import +# Register Additional policies +import utils.networks # noqa: F401 +from utils.teleop import HumanTeleop + ALGOS = { "a2c": A2C, "ddpg": DDPG, @@ -28,6 +32,7 @@ # SB3 Contrib, "qrdqn": QRDQN, "tqc": TQC, + "human": HumanTeleop, "trpo": TRPO, } @@ -41,7 +46,7 @@ def flatten_dict_observations(env: gym.Env) -> gym.Env: return gym.wrappers.FlattenDictWrapper(env, dict_keys=list(keys)) -def get_wrapper_class(hyperparams: Dict[str, Any]) -> Optional[Callable[[gym.Env], gym.Env]]: +def get_wrapper_class(hyperparams: Dict[str, Any], key: str = "env_wrapper") -> Optional[Callable[[gym.Env], gym.Env]]: """ Get one or more Gym environment wrapper class specified as a hyper parameter "env_wrapper". @@ -66,8 +71,8 @@ def get_module_name(wrapper_name): def get_class_name(wrapper_name): return wrapper_name.split(".")[-1] - if "env_wrapper" in hyperparams.keys(): - wrapper_name = hyperparams.get("env_wrapper") + if key in hyperparams.keys(): + wrapper_name = hyperparams.get(key) if wrapper_name is None: return None @@ -203,6 +208,11 @@ def create_test_env( if "env_wrapper" in hyperparams.keys(): del hyperparams["env_wrapper"] + # Ignore for now + # TODO: handle it properly + if "vec_env_wrapper" in hyperparams.keys(): + del hyperparams["vec_env_wrapper"] + vec_env_kwargs = {} vec_env_cls = DummyVecEnv if n_envs > 1 or (ExperimentManager.is_bullet(env_id) and should_render): diff --git a/utils/wrappers.py b/utils/wrappers.py index 9cdaf783f..03a9508bc 100644 --- a/utils/wrappers.py +++ b/utils/wrappers.py @@ -1,7 +1,75 @@ +from copy import deepcopy +from typing import Union + import gym import numpy as np from sb3_contrib.common.wrappers import TimeFeatureWrapper # noqa: F401 (backward compatibility) from scipy.signal import iirfilter, sosfilt, zpk2sos +from stable_baselines3.common.type_aliases import GymObs, GymStepReturn +from stable_baselines3.common.vec_env.base_vec_env import VecEnv, VecEnvObs, VecEnvStepReturn, VecEnvWrapper +from stable_baselines3.common.vec_env.subproc_vec_env import SubprocVecEnv, _flatten_obs + + +class VecForceResetWrapper(VecEnvWrapper): + """ + For all environments to reset at once, + and tell the agent the trajectory was truncated. + + :param venv: The vectorized environment + """ + + def __init__(self, venv: VecEnv): + super().__init__(venv=venv) + self.use_subproc = isinstance(venv, SubprocVecEnv) + + def reset(self) -> VecEnvObs: + return self.venv.reset() + + def step_wait(self) -> VecEnvStepReturn: + if self.use_subproc: + return self._subproc_step_wait() + + for env_idx in range(self.num_envs): + obs, self.buf_rews[env_idx], self.buf_dones[env_idx], self.buf_infos[env_idx] = self.envs[env_idx].step( + self.actions[env_idx] + ) + self._save_obs(env_idx, obs) + + if self.buf_dones.any(): + for env_idx in range(self.num_envs): + self.buf_infos[env_idx]["terminal_observation"] = self.buf_obs[None][env_idx] + if not self.buf_dones[env_idx]: + self.buf_infos[env_idx]["TimeLimit.truncated"] = True + self.buf_dones[env_idx] = True + obs = self.envs[env_idx].reset() + self._save_obs(env_idx, obs) + + return ( + self._obs_from_buf(), + np.copy(self.buf_rews), + np.copy(self.buf_dones), + deepcopy(self.buf_infos), + ) + + def _subproc_step_wait(self) -> VecEnvStepReturn: + results = [remote.recv() for remote in self.remotes] + self.waiting = False + obs, rewards, dones, infos = zip(*results) + dones = np.stack(dones) + obs = list(obs) + updated_remotes = [] + if np.array(dones).any(): + for idx, remote in enumerate(self.remotes): + if not dones[idx]: + infos[idx]["terminal_observation"] = obs[idx] + infos[idx]["TimeLimit.truncated"] = True + dones[idx] = True + remote.send(("reset", None)) + updated_remotes.append((idx, remote)) + + for idx, remote in updated_remotes: + obs[idx] = remote.recv() + return _flatten_obs(obs, self.observation_space), np.stack(rewards), dones, infos class DoneOnSuccessWrapper(gym.Wrapper): @@ -305,3 +373,50 @@ def step(self, action): obs_dict["observation"] = self._create_obs_from_history() return obs_dict, reward, done, info + + +class PhaseWrapper(gym.Wrapper): + """Add phase as input""" + + def __init__(self, env: gym.Env, period: int = 40, n_components: int = 4, phase_only: bool = False): + obs_space = env.observation_space + + assert len(obs_space.shape) == 1, "Only 1D observation spaces are supported" + + low, high = obs_space.low, obs_space.high + + if phase_only: + low, high = [], [] + low, high = np.concatenate((low, [-1.0] * 2 * n_components)), np.concatenate((high, [1.0] * 2 * n_components)) + + env.observation_space = gym.spaces.Box(low=low, high=high, dtype=np.float32) + + super(PhaseWrapper, self).__init__(env) + self._current_step = 0 + self._n_components = n_components + self._period = period + self._phase_only = phase_only + + def reset(self) -> GymObs: + self._current_step = 0 + return self._get_obs(self.env.reset()) + + def step(self, action: Union[int, np.ndarray]) -> GymStepReturn: + self._current_step += 1 + obs, reward, done, info = self.env.step(action) + return self._get_obs(obs), reward, done, info + + def _get_obs(self, obs: np.ndarray) -> np.ndarray: + """ + Concatenate the phase feature to the current observation. + """ + k = 2 * np.pi / self._period + phase_feature = [] + for i in range(1, self._n_components + 1): + phase_feature.append(np.cos(i * k * self._current_step)) + phase_feature.append(np.sin(i * k * self._current_step)) + + if self._phase_only: + return np.array(phase_feature) + + return np.append(obs, phase_feature)