From 84fa84d3224c15209bf2a0c76cb5d63009ebc9a9 Mon Sep 17 00:00:00 2001 From: Denis Grachev Date: Sun, 26 Oct 2025 18:04:56 +0000 Subject: [PATCH 1/9] Remote policy v0.1 --- examples/remote/remote_policy_server.py | 38 +++++ pyproject.toml | 8 + src/lerobot/policies/__init__.py | 2 + src/lerobot/policies/factory.py | 16 ++ src/lerobot/policies/remote/__init__.py | 5 + .../policies/remote/configuration_remote.py | 55 +++++++ .../policies/remote/modeling_remote.py | 100 +++++++++++++ .../policies/remote/processor_remote.py | 65 +++++++++ src/lerobot/utils/messaging.py | 138 ++++++++++++++++++ 9 files changed, 427 insertions(+) create mode 100644 examples/remote/remote_policy_server.py create mode 100644 src/lerobot/policies/remote/__init__.py create mode 100644 src/lerobot/policies/remote/configuration_remote.py create mode 100644 src/lerobot/policies/remote/modeling_remote.py create mode 100644 src/lerobot/policies/remote/processor_remote.py create mode 100644 src/lerobot/utils/messaging.py diff --git a/examples/remote/remote_policy_server.py b/examples/remote/remote_policy_server.py new file mode 100644 index 0000000000..8c3928bebb --- /dev/null +++ b/examples/remote/remote_policy_server.py @@ -0,0 +1,38 @@ +import numpy as np +from fastapi import FastAPI, Request, Response + +from lerobot.utils.messaging import pack_msg, unpack_msg + +app = FastAPI() + + +@app.post("/predict") +async def predict(request: Request): + data = await request.body() + obs_input = unpack_msg(data) + + inf_cfg = obs_input.get("inference_config", {}) + n_action_steps = ( + inf_cfg.get("n_action_steps") + or inf_cfg.get("n_actions") + or inf_cfg.get("chunk_size") + or inf_cfg.get("horizon") + or 1 + ) + + # Try to infer batch size from any array-like input + B = None + for v in obs_input.values(): + if isinstance(v, np.ndarray): + if v.ndim >= 1: + B = int(v.shape[0]) + break + if B is None: + # Fallback to 1 if nothing array-like found + B = 1 + + action_dim = 7 # set to your actual action dimension + actions = np.zeros((B, n_action_steps, action_dim), dtype=np.float32) + + packed = pack_msg({"actions": actions}) + return Response(content=packed, media_type="application/octet-stream") \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 1c71acec07..8a0862fb3a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -134,6 +134,13 @@ pusht = ["gym-pusht>=0.1.5,<0.2.0", "pymunk>=6.6.0,<7.0.0"] # TODO: Fix pymunk v libero = ["lerobot[transformers-dep]", "libero @ git+https://github.com/huggingface/lerobot-libero.git@main#egg=libero"] metaworld = ["metaworld==3.0.0"] +# HTTP server extra +server = [ + "fastapi>=0.115.0,<1.0.0", + "uvicorn[standard]>=0.30.0,<1.0.0", + "msgpack>=1.0.8,<2.0.0", +] + # All all = [ "lerobot[dynamixel]", @@ -155,6 +162,7 @@ all = [ "lerobot[phone]", "lerobot[libero]", "lerobot[metaworld]", + "lerobot[server]", ] [project.scripts] diff --git a/src/lerobot/policies/__init__.py b/src/lerobot/policies/__init__.py index 49f1e0f955..59a0cf01e5 100644 --- a/src/lerobot/policies/__init__.py +++ b/src/lerobot/policies/__init__.py @@ -20,6 +20,7 @@ from .smolvla.processor_smolvla import SmolVLANewLineProcessor from .tdmpc.configuration_tdmpc import TDMPCConfig as TDMPCConfig from .vqbet.configuration_vqbet import VQBeTConfig as VQBeTConfig +from .remote.configuration_remote import RemoteConfig as RemoteConfig __all__ = [ "ACTConfig", @@ -29,4 +30,5 @@ "SmolVLAConfig", "TDMPCConfig", "VQBeTConfig", + "RemoteConfig", ] diff --git a/src/lerobot/policies/factory.py b/src/lerobot/policies/factory.py index 6e524f2ab0..42e13cc501 100644 --- a/src/lerobot/policies/factory.py +++ b/src/lerobot/policies/factory.py @@ -38,6 +38,7 @@ from lerobot.policies.smolvla.configuration_smolvla import SmolVLAConfig from lerobot.policies.tdmpc.configuration_tdmpc import TDMPCConfig from lerobot.policies.vqbet.configuration_vqbet import VQBeTConfig +from lerobot.policies.remote.configuration_remote import RemoteConfig from lerobot.processor import PolicyAction, PolicyProcessorPipeline from lerobot.processor.converters import ( batch_to_transition, @@ -101,6 +102,10 @@ def get_policy_class(name: str) -> type[PreTrainedPolicy]: from lerobot.policies.smolvla.modeling_smolvla import SmolVLAPolicy return SmolVLAPolicy + elif name == "remote": + from lerobot.policies.remote.modeling_remote import RemotePolicy + + return RemotePolicy else: raise NotImplementedError(f"Policy with name {name} is not implemented.") @@ -142,6 +147,8 @@ def make_policy_config(policy_type: str, **kwargs) -> PreTrainedConfig: return SmolVLAConfig(**kwargs) elif policy_type == "reward_classifier": return RewardClassifierConfig(**kwargs) + elif policy_type == "remote": + return RemoteConfig(**kwargs) else: raise ValueError(f"Policy type '{policy_type}' is not available.") @@ -292,6 +299,15 @@ def make_pre_post_processors( config=policy_cfg, dataset_stats=kwargs.get("dataset_stats"), ) + + elif isinstance(policy_cfg, RemoteConfig): + from lerobot.policies.remote.processor_remote import make_remote_pre_post_processors + + processors = make_remote_pre_post_processors( + config=policy_cfg, + dataset_stats=kwargs.get("dataset_stats"), + rename_map=kwargs.get("preprocessor_overrides").get("rename_observations_processor").get("rename_map"), + ) else: raise NotImplementedError(f"Processor for policy type '{policy_cfg.type}' is not implemented.") diff --git a/src/lerobot/policies/remote/__init__.py b/src/lerobot/policies/remote/__init__.py new file mode 100644 index 0000000000..21d9580c69 --- /dev/null +++ b/src/lerobot/policies/remote/__init__.py @@ -0,0 +1,5 @@ +from .configuration_remote import RemoteConfig +from .modeling_remote import RemotePolicy +from .processor_remote import make_remote_pre_post_processors + +__all__ = ["RemoteConfig", "RemotePolicy", "make_remote_pre_post_processors"] diff --git a/src/lerobot/policies/remote/configuration_remote.py b/src/lerobot/policies/remote/configuration_remote.py new file mode 100644 index 0000000000..c923cc13e6 --- /dev/null +++ b/src/lerobot/policies/remote/configuration_remote.py @@ -0,0 +1,55 @@ +from dataclasses import dataclass, field +from typing import Any + +from lerobot.configs.policies import PreTrainedConfig +from lerobot.optim.optimizers import AdamWConfig + +@PreTrainedConfig.register_subclass("remote") +@dataclass +class RemoteConfig(PreTrainedConfig): + # Identity and device placement + type: str = field(default="remote", metadata={"help": "Policy type name"}) + device: str = field(default="cpu", metadata={"help": "Device used for returned tensors"}) + + # Action execution + # How many environment steps to execute per policy call. Used by the runtime action queue. + n_action_steps: int = field(default=1, metadata={"help": "Number of env steps to execute per call"}) + + # Remote-specific + server_url: str = field(default="http://localhost:8000", metadata={"help": "Remote policy server URL"}) + timeout: float = field(default=30.0, metadata={"help": "HTTP timeout in seconds"}) + attempts: int = field(default=1, metadata={"help": "Number of retry attempts for failed requests"}) + + # Additional arguments to inject directly into the observation dict (e.g. {"inference_config": {...}}) + additional_args: dict[str, Any] = field( + default_factory=dict, + metadata={"help": "Extra observation keys to inject directly into observation"}, + ) + + # --- Abstract API implementations required by PreTrainedConfig --- + def get_optimizer_preset(self) -> AdamWConfig: + """Remote policy is inference-only; return a inert preset for API compatibility.""" + return AdamWConfig(lr=1e-5, weight_decay=0.0, grad_clip_norm=1.0) + + def get_scheduler_preset(self): + # No scheduler needed for inference-only policy + return None + + def validate_features(self) -> None: + # Minimal validation: allow any combination, but require at least one input feature + if not self.input_features: + raise ValueError("RemoteConfig requires at least one input feature to be defined.") + + @property + def observation_delta_indices(self): + # No temporal deltas required for observations by default + return None + + @property + def action_delta_indices(self): + # Minimal behavior: align deltas to n_action_steps + return list(range(self.n_action_steps)) + + @property + def reward_delta_indices(self): + return None \ No newline at end of file diff --git a/src/lerobot/policies/remote/modeling_remote.py b/src/lerobot/policies/remote/modeling_remote.py new file mode 100644 index 0000000000..e4c2abb8a8 --- /dev/null +++ b/src/lerobot/policies/remote/modeling_remote.py @@ -0,0 +1,100 @@ +import numpy as np +import requests +import torch +from collections import deque +from torch import Tensor + +from lerobot.utils.messaging import pack_msg, unpack_msg +from lerobot.policies.pretrained import PreTrainedPolicy +from .configuration_remote import RemoteConfig + + +class RemotePolicy(PreTrainedPolicy): + """ + A policy that proxies inference to a remote HTTP server. + """ + + config_class = RemoteConfig + name = "remote" + + def __init__(self, config: RemoteConfig): + super().__init__(config) + self.server_url = config.server_url.rstrip("/") + self.session = requests.Session() + self.timeout = config.timeout + self.reset() + + def get_optim_params(self) -> dict: + return {} + + def reset(self): + # Queue emits one action per env step; refilled when empty + self._action_queue = deque(maxlen=self.config.n_action_steps) + + def forward(self, batch: dict[str, Tensor]) -> tuple[Tensor, dict] | tuple[Tensor, None]: + raise NotImplementedError("RemotePolicy is inference-only") + + def custom_prepare_batch(self, batch: dict[str, Tensor]) -> dict[str, Tensor]: + batch.pop('action') + batch.pop('next.reward') + batch.pop('next.done') + batch.pop('next.truncated') + batch.pop('info') + + task = batch.pop('task') + batch['observation.task_instr'] = task + + if not hasattr(self, 'previous_state'): + self.previous_state = batch["observation.state"].clone() + + batch["observation.state"] = torch.stack([self.previous_state, batch["observation.state"]], dim=1) + self.previous_state = batch["observation.state"][:, -1].clone() + + return batch + + @torch.no_grad() + def predict_action_chunk(self, batch: dict[str, Tensor], **kwargs) -> Tensor: + # Build payload with raw tensors/arrays; pack_msg handles encoding + + batch = self.custom_prepare_batch(batch) + add_args = self.config.additional_args or {} + payload = batch | add_args + + packed = pack_msg(payload) + + last_exception = None + for _ in range(self.config.attempts): + try: + resp = self.session.post( + f"{self.server_url}/predict", + data=packed, + headers={"Content-Type": "application/octet-stream"}, + timeout=self.timeout, + ) + resp.raise_for_status() + break + except requests.RequestException as e: + last_exception = e + + if last_exception: + raise last_exception + + unpacked = unpack_msg(resp.content) + actions_np = np.asarray(unpacked) + + device = torch.device(self.config.device) + any_tensor = next((v for v in batch.values() if isinstance(v, torch.Tensor)), None) + dtype = any_tensor.dtype if isinstance(any_tensor, torch.Tensor) else torch.float32 + + actions = torch.from_numpy(actions_np).to(device=device, dtype=dtype) + return actions + + @torch.no_grad() + def select_action(self, batch: dict[str, Tensor], **kwargs) -> Tensor: + self.eval() + + if len(self._action_queue) == 0: + actions = self.predict_action_chunk(batch)[:, : self.config.n_action_steps] + self._action_queue.extend(actions.transpose(0, 1)) # [(B, A)] x T + + return self._action_queue.popleft() \ No newline at end of file diff --git a/src/lerobot/policies/remote/processor_remote.py b/src/lerobot/policies/remote/processor_remote.py new file mode 100644 index 0000000000..7329f0dc05 --- /dev/null +++ b/src/lerobot/policies/remote/processor_remote.py @@ -0,0 +1,65 @@ +from dataclasses import dataclass, field +from typing import Any + +import torch + +from lerobot.policies.remote.configuration_remote import RemoteConfig +from lerobot.processor import ( + AddBatchDimensionProcessorStep, + RenameObservationsProcessorStep, + PolicyAction, + PolicyProcessorPipeline, + ProcessorStep, +) +from lerobot.processor.converters import policy_action_to_transition, transition_to_policy_action +from lerobot.processor.core import EnvTransition, TransitionKey +from lerobot.utils.constants import ( + POLICY_POSTPROCESSOR_DEFAULT_NAME, + POLICY_PREPROCESSOR_DEFAULT_NAME, +) + + +def make_remote_pre_post_processors( + config: RemoteConfig, + dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None, + rename_map: dict[str, str] = {}, +) -> tuple[ + PolicyProcessorPipeline[dict[str, Any], dict[str, Any]], + PolicyProcessorPipeline[PolicyAction, PolicyAction], +]: + """ + Custom pre/post processors for the Remote policy. + + Pre: + - Normalizer (if stats provided) + - AddBatchDimension + - AppendInferenceConfig (copies config.inference_config into the batch) + - Device placement + + Post: + - Device to CPU + - Unnormalize outputs (if stats provided) + """ + + # Pre: allow renaming features and add batch dim. Rename map can be overridden at runtime + # through preprocessor_overrides with the key "rename_observations_processor". + input_steps: list[ProcessorStep] = [ + RenameObservationsProcessorStep(rename_map=rename_map), + AddBatchDimensionProcessorStep(), + ] + + # Minimal postprocessor: identity (no steps) + output_steps: list[ProcessorStep] = [] + + return ( + PolicyProcessorPipeline[dict[str, Any], dict[str, Any]]( + steps=input_steps, + name=POLICY_PREPROCESSOR_DEFAULT_NAME, + ), + PolicyProcessorPipeline[PolicyAction, PolicyAction]( + steps=output_steps, + name=POLICY_POSTPROCESSOR_DEFAULT_NAME, + to_transition=policy_action_to_transition, + to_output=transition_to_policy_action, + ), + ) \ No newline at end of file diff --git a/src/lerobot/utils/messaging.py b/src/lerobot/utils/messaging.py new file mode 100644 index 0000000000..34d85cc3c7 --- /dev/null +++ b/src/lerobot/utils/messaging.py @@ -0,0 +1,138 @@ +import time +from typing import Any, Dict + +import msgpack + + +from typing import Any, Dict, List, Union + +import numpy as np +import torch + + +class TorchSerialize: + def encodes(self, o: Union[torch.Tensor, np.ndarray]) -> dict: + if isinstance(o, torch.Tensor): + np_data = o.numpy() + return { + "data": np_data.tobytes(), "dtype": np_data.dtype.str, "encoding": "raw_bytes", "shape": o.shape, "type": "tensor" + } + elif isinstance(o, np.ndarray): + return { + "data": o.tobytes(), "shape": o.shape, "dtype": o.dtype.str, "encoding": "raw_bytes", "type": "array" + } + else: + return o + + # @timeit(logger) + def decodes(self, o: Dict) -> Union[torch.Tensor, np.ndarray]: + dtype = o["dtype"] + t = o["type"] + arr = np.frombuffer(o["data"], dtype=dtype) + arr = arr.reshape(o["shape"]) + + if t == "tensor": + retval = torch.as_tensor(arr) + elif t == "array": + retval = arr + return retval + +class JsonLikeSerialize: + def encodes(self, o: Union[torch.Tensor, np.ndarray, Any]) -> dict: + if not isinstance(o, list): + data = o.tolist() + return { + "data": data, "dtype": "tensor", "encoding": "json" + } + +img_serializer = TorchSerialize() + +class TensorEncoder: + + def __init__(self, image_data_function) -> None: + self.image_data_function = image_data_function + + def __call__(self, obj: Any) -> Any: + if isinstance(obj, (np.ndarray, torch.Tensor)): + return self.image_data_function(obj) + else: + return obj + +class TensorDecoder: + def __init__(self, image_data_function) -> None: + self.image_data_function = image_data_function + + def __call__(self, obj: Any) -> Any: + if '__image_data__' in obj: + return self.image_data_function(obj) + else: + return obj + + +def encode_image_data(obj: Union[torch.Tensor, np.ndarray]) -> Dict: + return {"__image_data__": True, "as_str": img_serializer.encodes(obj)} + +def encode_image_data_json(obj: Union[torch.Tensor, np.ndarray]) -> List: + return img_serializer.encodes(obj) + +def decode_image_data(obj: Dict) -> Union[torch.Tensor, np.ndarray]: + return img_serializer.decodes(obj["as_str"]) + +encoder_data = TensorEncoder(image_data_function=encode_image_data) +decoder_data = TensorDecoder(image_data_function=decode_image_data) +json_like_encoder = TensorEncoder(image_data_function=encode_image_data_json) + +def envelope(msg: Any) -> Dict: + """Wrap a message in an envelope. Just a dict for future json serialization. + + Args: + msg (Any): Any serializable message, might be (int,string,float,list,dict,tuple) + + Returns: + dict: Returns the msg wrapped in an envelope. + """ + return {"payload": msg, "monotonic_time": time.monotonic()} + + +def rm_envelope(msg: dict) -> Any: + """Remove the envelope from a message. + + Args: + msg (dict): The wrapped message + + Returns: + Any: The payload of the message + """ + return msg["payload"] + + +def pack_msg(msg: Any, json_like=False) -> bytes: + """Packs msg to a bytearray following the msgpack specification. + + Args: + msg (Any): Any message to send. + + Returns: + bytearray: [description] + """ + encoder = json_like_encoder if json_like else encoder_data + msg = envelope(msg) # Wrap message in an envelope + return msgpack.packb(msg, default=encoder, use_bin_type=True) # Pack to byte array using msgpack + + +def unpack_msg(packed: bytes, with_header: bool = False) -> Any: + """Unpack an image message message. + + Args: + packed (bytearray): bytearray containin a msgpack message + """ + unpacked = msgpack.unpackb( + packed, raw=False, object_hook=decoder_data + ) + if with_header: + if isinstance(unpacked["monotonic_time"], list): + unpacked["monotonic_time"] = unpacked["monotonic_time"][0] + return unpacked + else: + return rm_envelope(unpacked) + From d6010a4eed1bb7f1141866e6737af063815f3d0b Mon Sep 17 00:00:00 2001 From: Denis Grachev Date: Mon, 27 Oct 2025 12:11:04 +0100 Subject: [PATCH 2/9] Simplify changes --- examples/remote/remote_policy_server.py | 21 ++++++---------- .../policies/remote/modeling_remote.py | 24 +------------------ 2 files changed, 8 insertions(+), 37 deletions(-) diff --git a/examples/remote/remote_policy_server.py b/examples/remote/remote_policy_server.py index 8c3928bebb..306efdf344 100644 --- a/examples/remote/remote_policy_server.py +++ b/examples/remote/remote_policy_server.py @@ -1,3 +1,4 @@ +import torch import numpy as np from fastapi import FastAPI, Request, Response @@ -12,27 +13,19 @@ async def predict(request: Request): obs_input = unpack_msg(data) inf_cfg = obs_input.get("inference_config", {}) - n_action_steps = ( - inf_cfg.get("n_action_steps") - or inf_cfg.get("n_actions") - or inf_cfg.get("chunk_size") - or inf_cfg.get("horizon") - or 1 - ) + dataset_info = obs_input.get("dataset_info", {}) + n_action_steps = inf_cfg.get("n_action_steps", 10) + action_dim = dataset_info.get("action_dof", 7) # Try to infer batch size from any array-like input B = None for v in obs_input.values(): - if isinstance(v, np.ndarray): + if isinstance(v, torch.Tensor) or isinstance(v, np.ndarray): if v.ndim >= 1: B = int(v.shape[0]) break - if B is None: - # Fallback to 1 if nothing array-like found - B = 1 - action_dim = 7 # set to your actual action dimension - actions = np.zeros((B, n_action_steps, action_dim), dtype=np.float32) + actions = torch.zeros((B, n_action_steps, action_dim), dtype=torch.float32) - packed = pack_msg({"actions": actions}) + packed = pack_msg(actions) return Response(content=packed, media_type="application/octet-stream") \ No newline at end of file diff --git a/src/lerobot/policies/remote/modeling_remote.py b/src/lerobot/policies/remote/modeling_remote.py index e4c2abb8a8..d48aad6c57 100644 --- a/src/lerobot/policies/remote/modeling_remote.py +++ b/src/lerobot/policies/remote/modeling_remote.py @@ -34,29 +34,9 @@ def reset(self): def forward(self, batch: dict[str, Tensor]) -> tuple[Tensor, dict] | tuple[Tensor, None]: raise NotImplementedError("RemotePolicy is inference-only") - def custom_prepare_batch(self, batch: dict[str, Tensor]) -> dict[str, Tensor]: - batch.pop('action') - batch.pop('next.reward') - batch.pop('next.done') - batch.pop('next.truncated') - batch.pop('info') - - task = batch.pop('task') - batch['observation.task_instr'] = task - - if not hasattr(self, 'previous_state'): - self.previous_state = batch["observation.state"].clone() - - batch["observation.state"] = torch.stack([self.previous_state, batch["observation.state"]], dim=1) - self.previous_state = batch["observation.state"][:, -1].clone() - - return batch - @torch.no_grad() def predict_action_chunk(self, batch: dict[str, Tensor], **kwargs) -> Tensor: # Build payload with raw tensors/arrays; pack_msg handles encoding - - batch = self.custom_prepare_batch(batch) add_args = self.config.additional_args or {} payload = batch | add_args @@ -83,10 +63,8 @@ def predict_action_chunk(self, batch: dict[str, Tensor], **kwargs) -> Tensor: actions_np = np.asarray(unpacked) device = torch.device(self.config.device) - any_tensor = next((v for v in batch.values() if isinstance(v, torch.Tensor)), None) - dtype = any_tensor.dtype if isinstance(any_tensor, torch.Tensor) else torch.float32 - actions = torch.from_numpy(actions_np).to(device=device, dtype=dtype) + actions = torch.from_numpy(actions_np).to(device=device, dtype=torch.float32) return actions @torch.no_grad() From 52581d22a8d241bf085287b14c1869733e34cee1 Mon Sep 17 00:00:00 2001 From: Denis Grachev Date: Mon, 27 Oct 2025 14:20:54 +0100 Subject: [PATCH 3/9] Fix max_parallel_workers and some other little bugs --- README.md | 33 +++++++++++++++ src/lerobot/async_inference/constants.py | 2 +- src/lerobot/policies/factory.py | 2 +- .../policies/remote/modeling_remote.py | 40 +++++++++++++------ src/lerobot/utils/messaging.py | 2 +- 5 files changed, 64 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 56d82c0c70..e34265bd1c 100644 --- a/README.md +++ b/README.md @@ -290,6 +290,39 @@ lerobot-train --config_path=lerobot/diffusion_pusht reproduces SOTA results for Diffusion Policy on the PushT task. +### Remote policy evaluation (experimental) +In case you have a custom model served through an HTTP API, +you can delegate action selection to an external HTTP service by using the `remote` policy. +Install the dedicated dependencies and start the demo server: + +```bash +pip install -e ".[server]" +uvicorn examples.remote.remote_policy_server:app --host 0.0.0.0 --port 8000 +``` + +The sample FastAPI app simply echoes zero actions with the requested shape, which is useful to validate end-to-end wiring before deploying a real model. + +To evaluate the Libero benchmark through the remote policy, run: + +```bash +lerobot-eval \ + --env.type=libero \ + --env.task=libero_spatial \ + --env.max_parallel_tasks=1 \ + --eval.batch_size=1 \ + --eval.n_episodes=3 \ + --policy.type=remote \ + --policy.server_url=http://localhost:8000 \ + --policy.timeout=30 \ + --policy.attempts=3 \ + --policy.n_action_steps=10 \ + --policy.additional_args='{"dataset_info":{"action_type":"eef","robot_embodiment":"single_arm","robot_type":"franka","stereo_replace_depth":false,"handheld":false,"no_state":false,"obs_dof":8,"action_dof":7},"inference_config":{"n_actions":6,"n_inference_steps":10}}' \ + --rename_map='{"observation.images.image":"observation.images.static1","observation.images.image2":"observation.images.wrist1"}' \ + --output_dir=./eval_logs_libero_spatial +``` + +The `additional_args` payload is forwarded to the remote server alongside the observation batch and can be adjusted to match your remote model’s expectations. + ## Contribute If you would like to contribute to 🤗 LeRobot, please check out our [contribution guide](https://github.com/huggingface/lerobot/blob/main/CONTRIBUTING.md). diff --git a/src/lerobot/async_inference/constants.py b/src/lerobot/async_inference/constants.py index 1b1dac0f57..d55aae9c56 100644 --- a/src/lerobot/async_inference/constants.py +++ b/src/lerobot/async_inference/constants.py @@ -23,7 +23,7 @@ DEFAULT_OBS_QUEUE_TIMEOUT = 2 # All action chunking policies -SUPPORTED_POLICIES = ["act", "smolvla", "diffusion", "tdmpc", "vqbet", "pi0", "pi05"] +SUPPORTED_POLICIES = ["act", "smolvla", "diffusion", "tdmpc", "vqbet", "pi0", "pi05", "remote"] # TODO: Add all other robots SUPPORTED_ROBOTS = ["so100_follower", "so101_follower", "bi_so100_follower"] diff --git a/src/lerobot/policies/factory.py b/src/lerobot/policies/factory.py index 42e13cc501..d37d9817fa 100644 --- a/src/lerobot/policies/factory.py +++ b/src/lerobot/policies/factory.py @@ -306,7 +306,7 @@ def make_pre_post_processors( processors = make_remote_pre_post_processors( config=policy_cfg, dataset_stats=kwargs.get("dataset_stats"), - rename_map=kwargs.get("preprocessor_overrides").get("rename_observations_processor").get("rename_map"), + rename_map=kwargs.get("preprocessor_overrides", {}).get("rename_observations_processor", {}).get("rename_map", {}), ) else: diff --git a/src/lerobot/policies/remote/modeling_remote.py b/src/lerobot/policies/remote/modeling_remote.py index d48aad6c57..4086323f92 100644 --- a/src/lerobot/policies/remote/modeling_remote.py +++ b/src/lerobot/policies/remote/modeling_remote.py @@ -1,7 +1,9 @@ +from collections import deque +import threading + import numpy as np import requests import torch -from collections import deque from torch import Tensor from lerobot.utils.messaging import pack_msg, unpack_msg @@ -20,22 +22,32 @@ class RemotePolicy(PreTrainedPolicy): def __init__(self, config: RemoteConfig): super().__init__(config) self.server_url = config.server_url.rstrip("/") - self.session = requests.Session() self.timeout = config.timeout + self._thread_state = threading.local() self.reset() def get_optim_params(self) -> dict: return {} def reset(self): - # Queue emits one action per env step; refilled when empty - self._action_queue = deque(maxlen=self.config.n_action_steps) + # Reinitialize thread-local state so each worker gets its own queue/session + self._thread_state = threading.local() + + def _state(self): + state = self._thread_state + if not hasattr(state, "session"): + state.session = requests.Session() + if not hasattr(state, "action_queue"): + state.action_queue = deque(maxlen=self.config.n_action_steps) + return state def forward(self, batch: dict[str, Tensor]) -> tuple[Tensor, dict] | tuple[Tensor, None]: raise NotImplementedError("RemotePolicy is inference-only") @torch.no_grad() def predict_action_chunk(self, batch: dict[str, Tensor], **kwargs) -> Tensor: + state = self._state() + # Build payload with raw tensors/arrays; pack_msg handles encoding add_args = self.config.additional_args or {} payload = batch | add_args @@ -45,7 +57,7 @@ def predict_action_chunk(self, batch: dict[str, Tensor], **kwargs) -> Tensor: last_exception = None for _ in range(self.config.attempts): try: - resp = self.session.post( + resp = state.session.post( f"{self.server_url}/predict", data=packed, headers={"Content-Type": "application/octet-stream"}, @@ -60,19 +72,23 @@ def predict_action_chunk(self, batch: dict[str, Tensor], **kwargs) -> Tensor: raise last_exception unpacked = unpack_msg(resp.content) - actions_np = np.asarray(unpacked) + if isinstance(unpacked, torch.Tensor): + actions = unpacked + else: + actions_np = np.asarray(unpacked) + actions = torch.from_numpy(actions_np) device = torch.device(self.config.device) - - actions = torch.from_numpy(actions_np).to(device=device, dtype=torch.float32) - return actions + return actions.to(device=device, dtype=torch.float32) @torch.no_grad() def select_action(self, batch: dict[str, Tensor], **kwargs) -> Tensor: self.eval() - if len(self._action_queue) == 0: + queue = self._state().action_queue + + if len(queue) == 0: actions = self.predict_action_chunk(batch)[:, : self.config.n_action_steps] - self._action_queue.extend(actions.transpose(0, 1)) # [(B, A)] x T + queue.extend(actions.transpose(0, 1)) # [(B, A)] x T - return self._action_queue.popleft() \ No newline at end of file + return queue.popleft() diff --git a/src/lerobot/utils/messaging.py b/src/lerobot/utils/messaging.py index 34d85cc3c7..5382b2dc84 100644 --- a/src/lerobot/utils/messaging.py +++ b/src/lerobot/utils/messaging.py @@ -13,7 +13,7 @@ class TorchSerialize: def encodes(self, o: Union[torch.Tensor, np.ndarray]) -> dict: if isinstance(o, torch.Tensor): - np_data = o.numpy() + np_data = o.detach().cpu().numpy() return { "data": np_data.tobytes(), "dtype": np_data.dtype.str, "encoding": "raw_bytes", "shape": o.shape, "type": "tensor" } From 76d64e6aacac0f178e83ca5fcc7488aee99f4784 Mon Sep 17 00:00:00 2001 From: Denis Grachev Date: Tue, 28 Oct 2025 13:17:36 +0100 Subject: [PATCH 4/9] Little bug fixed --- src/lerobot/policies/factory.py | 4 +++- src/lerobot/utils/messaging.py | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/lerobot/policies/factory.py b/src/lerobot/policies/factory.py index d37d9817fa..e1bc581ee9 100644 --- a/src/lerobot/policies/factory.py +++ b/src/lerobot/policies/factory.py @@ -303,10 +303,12 @@ def make_pre_post_processors( elif isinstance(policy_cfg, RemoteConfig): from lerobot.policies.remote.processor_remote import make_remote_pre_post_processors + overrides = kwargs.get("preprocessor_overrides") or {} + processors = make_remote_pre_post_processors( config=policy_cfg, dataset_stats=kwargs.get("dataset_stats"), - rename_map=kwargs.get("preprocessor_overrides", {}).get("rename_observations_processor", {}).get("rename_map", {}), + rename_map=overrides.get("rename_observations_processor", {}).get("rename_map", {}), ) else: diff --git a/src/lerobot/utils/messaging.py b/src/lerobot/utils/messaging.py index 5382b2dc84..b6a27073ce 100644 --- a/src/lerobot/utils/messaging.py +++ b/src/lerobot/utils/messaging.py @@ -30,6 +30,8 @@ def decodes(self, o: Dict) -> Union[torch.Tensor, np.ndarray]: t = o["type"] arr = np.frombuffer(o["data"], dtype=dtype) arr = arr.reshape(o["shape"]) + if not arr.flags.writeable: + arr = arr.copy() if t == "tensor": retval = torch.as_tensor(arr) @@ -135,4 +137,3 @@ def unpack_msg(packed: bytes, with_header: bool = False) -> Any: return unpacked else: return rm_envelope(unpacked) - From d441d5f19a02eba32da7c57c02b495b19e63cf27 Mon Sep 17 00:00:00 2001 From: Denis Grachev Date: Tue, 28 Oct 2025 14:15:21 +0100 Subject: [PATCH 5/9] Adjust for copilot codereview --- examples/remote/remote_policy_server.py | 18 ++++--- .../policies/remote/configuration_remote.py | 5 +- src/lerobot/utils/messaging.py | 54 ++++++++++--------- 3 files changed, 43 insertions(+), 34 deletions(-) diff --git a/examples/remote/remote_policy_server.py b/examples/remote/remote_policy_server.py index 306efdf344..a9d8343339 100644 --- a/examples/remote/remote_policy_server.py +++ b/examples/remote/remote_policy_server.py @@ -1,5 +1,5 @@ -import torch import numpy as np +import torch from fastapi import FastAPI, Request, Response from lerobot.utils.messaging import pack_msg, unpack_msg @@ -18,14 +18,16 @@ async def predict(request: Request): action_dim = dataset_info.get("action_dof", 7) # Try to infer batch size from any array-like input - B = None + batch_size = None for v in obs_input.values(): - if isinstance(v, torch.Tensor) or isinstance(v, np.ndarray): - if v.ndim >= 1: - B = int(v.shape[0]) - break + if isinstance(v, (torch.Tensor, np.ndarray)) and v.ndim > 0: + batch_size = int(v.shape[0]) + break + + if batch_size is None: + batch_size = 1 # Default to batch size 1 if no array-like inputs found - actions = torch.zeros((B, n_action_steps, action_dim), dtype=torch.float32) + actions = torch.zeros((batch_size, n_action_steps, action_dim), dtype=torch.float32) packed = pack_msg(actions) - return Response(content=packed, media_type="application/octet-stream") \ No newline at end of file + return Response(content=packed, media_type="application/octet-stream") diff --git a/src/lerobot/policies/remote/configuration_remote.py b/src/lerobot/policies/remote/configuration_remote.py index c923cc13e6..b3053e11fe 100644 --- a/src/lerobot/policies/remote/configuration_remote.py +++ b/src/lerobot/policies/remote/configuration_remote.py @@ -4,6 +4,7 @@ from lerobot.configs.policies import PreTrainedConfig from lerobot.optim.optimizers import AdamWConfig + @PreTrainedConfig.register_subclass("remote") @dataclass class RemoteConfig(PreTrainedConfig): @@ -28,7 +29,7 @@ class RemoteConfig(PreTrainedConfig): # --- Abstract API implementations required by PreTrainedConfig --- def get_optimizer_preset(self) -> AdamWConfig: - """Remote policy is inference-only; return a inert preset for API compatibility.""" + """Remote policy is inference-only; return an inert preset for API compatibility.""" return AdamWConfig(lr=1e-5, weight_decay=0.0, grad_clip_norm=1.0) def get_scheduler_preset(self): @@ -52,4 +53,4 @@ def action_delta_indices(self): @property def reward_delta_indices(self): - return None \ No newline at end of file + return None diff --git a/src/lerobot/utils/messaging.py b/src/lerobot/utils/messaging.py index b6a27073ce..97f1134faa 100644 --- a/src/lerobot/utils/messaging.py +++ b/src/lerobot/utils/messaging.py @@ -1,31 +1,34 @@ import time -from typing import Any, Dict +from typing import Any import msgpack - - -from typing import Any, Dict, List, Union - import numpy as np import torch class TorchSerialize: - def encodes(self, o: Union[torch.Tensor, np.ndarray]) -> dict: + def encodes(self, o: torch.Tensor | np.ndarray) -> dict: if isinstance(o, torch.Tensor): np_data = o.detach().cpu().numpy() return { - "data": np_data.tobytes(), "dtype": np_data.dtype.str, "encoding": "raw_bytes", "shape": o.shape, "type": "tensor" + "data": np_data.tobytes(), + "dtype": np_data.dtype.str, + "encoding": "raw_bytes", + "shape": o.shape, + "type": "tensor", } elif isinstance(o, np.ndarray): return { - "data": o.tobytes(), "shape": o.shape, "dtype": o.dtype.str, "encoding": "raw_bytes", "type": "array" + "data": o.tobytes(), + "shape": o.shape, + "dtype": o.dtype.str, + "encoding": "raw_bytes", + "type": "array", } else: return o - # @timeit(logger) - def decodes(self, o: Dict) -> Union[torch.Tensor, np.ndarray]: + def decodes(self, o: dict) -> torch.Tensor | np.ndarray: dtype = o["dtype"] t = o["type"] arr = np.frombuffer(o["data"], dtype=dtype) @@ -39,18 +42,18 @@ def decodes(self, o: Dict) -> Union[torch.Tensor, np.ndarray]: retval = arr return retval + class JsonLikeSerialize: - def encodes(self, o: Union[torch.Tensor, np.ndarray, Any]) -> dict: + def encodes(self, o: torch.Tensor | np.ndarray | Any) -> dict: if not isinstance(o, list): data = o.tolist() - return { - "data": data, "dtype": "tensor", "encoding": "json" - } + return {"data": data, "dtype": "tensor", "encoding": "json"} + img_serializer = TorchSerialize() -class TensorEncoder: +class TensorEncoder: def __init__(self, image_data_function) -> None: self.image_data_function = image_data_function @@ -60,31 +63,36 @@ def __call__(self, obj: Any) -> Any: else: return obj + class TensorDecoder: def __init__(self, image_data_function) -> None: self.image_data_function = image_data_function def __call__(self, obj: Any) -> Any: - if '__image_data__' in obj: + if "__image_data__" in obj: return self.image_data_function(obj) else: return obj -def encode_image_data(obj: Union[torch.Tensor, np.ndarray]) -> Dict: +def encode_image_data(obj: torch.Tensor | np.ndarray) -> dict: return {"__image_data__": True, "as_str": img_serializer.encodes(obj)} -def encode_image_data_json(obj: Union[torch.Tensor, np.ndarray]) -> List: + +def encode_image_data_json(obj: torch.Tensor | np.ndarray) -> list: return img_serializer.encodes(obj) -def decode_image_data(obj: Dict) -> Union[torch.Tensor, np.ndarray]: + +def decode_image_data(obj: dict) -> torch.Tensor | np.ndarray: return img_serializer.decodes(obj["as_str"]) + encoder_data = TensorEncoder(image_data_function=encode_image_data) decoder_data = TensorDecoder(image_data_function=decode_image_data) json_like_encoder = TensorEncoder(image_data_function=encode_image_data_json) -def envelope(msg: Any) -> Dict: + +def envelope(msg: Any) -> dict: """Wrap a message in an envelope. Just a dict for future json serialization. Args: @@ -126,11 +134,9 @@ def unpack_msg(packed: bytes, with_header: bool = False) -> Any: """Unpack an image message message. Args: - packed (bytearray): bytearray containin a msgpack message + packed (bytearray): bytearray containing a msgpack message """ - unpacked = msgpack.unpackb( - packed, raw=False, object_hook=decoder_data - ) + unpacked = msgpack.unpackb(packed, raw=False, object_hook=decoder_data) if with_header: if isinstance(unpacked["monotonic_time"], list): unpacked["monotonic_time"] = unpacked["monotonic_time"][0] From 30c91a0d298e4b02f233ed931ccb8c70031d93f5 Mon Sep 17 00:00:00 2001 From: Denis Grachev Date: Wed, 29 Oct 2025 15:29:55 +0100 Subject: [PATCH 6/9] move to grpc --- README.md | 34 ++- examples/remote/remote_policy_server.py | 39 +-- src/lerobot/policies/factory.py | 7 +- .../policies/remote/configuration_remote.py | 83 +++++- .../policies/remote/modeling_remote.py | 271 +++++++++++++++--- 5 files changed, 344 insertions(+), 90 deletions(-) diff --git a/README.md b/README.md index e34265bd1c..131c8fe455 100644 --- a/README.md +++ b/README.md @@ -291,18 +291,20 @@ lerobot-train --config_path=lerobot/diffusion_pusht reproduces SOTA results for Diffusion Policy on the PushT task. ### Remote policy evaluation (experimental) -In case you have a custom model served through an HTTP API, -you can delegate action selection to an external HTTP service by using the `remote` policy. -Install the dedicated dependencies and start the demo server: + +You can delegate action selection to a remote machine by pointing the `remote` +policy to the async inference gRPC policy server. Start the server either +directly or through the compatibility wrapper: ```bash -pip install -e ".[server]" -uvicorn examples.remote.remote_policy_server:app --host 0.0.0.0 --port 8000 -``` +# Option 1: run the async inference server module +python -m lerobot.async_inference.policy_server --host 0.0.0.0 --port 8080 -The sample FastAPI app simply echoes zero actions with the requested shape, which is useful to validate end-to-end wiring before deploying a real model. +# Option 2: backward-compatible entry point +python examples/remote/remote_policy_server.py --host 0.0.0.0 --port 8080 +``` -To evaluate the Libero benchmark through the remote policy, run: +Then launch evaluation with the remote policy pointing to that server: ```bash lerobot-eval \ @@ -312,16 +314,22 @@ lerobot-eval \ --eval.batch_size=1 \ --eval.n_episodes=3 \ --policy.type=remote \ - --policy.server_url=http://localhost:8000 \ - --policy.timeout=30 \ - --policy.attempts=3 \ + --policy.server_address=localhost:8080 \ + --policy.request_timeout=30 \ + --policy.retries=3 \ --policy.n_action_steps=10 \ - --policy.additional_args='{"dataset_info":{"action_type":"eef","robot_embodiment":"single_arm","robot_type":"franka","stereo_replace_depth":false,"handheld":false,"no_state":false,"obs_dof":8,"action_dof":7},"inference_config":{"n_actions":6,"n_inference_steps":10}}' \ + --policy.remote_policy_type=pi05 \ + --policy.remote_pretrained_name_or_path=lerobot/pi05_libero_finetuned \ + --policy.remote_policy_device=cuda \ --rename_map='{"observation.images.image":"observation.images.static1","observation.images.image2":"observation.images.wrist1"}' \ --output_dir=./eval_logs_libero_spatial ``` -The `additional_args` payload is forwarded to the remote server alongside the observation batch and can be adjusted to match your remote model’s expectations. +The optional `additional_args` payload is forwarded to the async inference server +alongside the observation batch and can be adjusted to match your remote model’s +expectations. + +If you omit `--policy.remote_policy_type`, the remote checkpoint’s config is loaded to infer it automatically. ## Contribute diff --git a/examples/remote/remote_policy_server.py b/examples/remote/remote_policy_server.py index a9d8343339..b502902c14 100644 --- a/examples/remote/remote_policy_server.py +++ b/examples/remote/remote_policy_server.py @@ -1,33 +1,12 @@ -import numpy as np -import torch -from fastapi import FastAPI, Request, Response +""" +Backward-compatible entry point for the async inference policy server. -from lerobot.utils.messaging import pack_msg, unpack_msg +Rather than running a custom FastAPI stub, the remote policy now relies on the +shared async inference gRPC implementation. You can start the server from this +module or via ``python -m lerobot.async_inference.policy_server``. +""" -app = FastAPI() +from lerobot.async_inference.policy_server import serve - -@app.post("/predict") -async def predict(request: Request): - data = await request.body() - obs_input = unpack_msg(data) - - inf_cfg = obs_input.get("inference_config", {}) - dataset_info = obs_input.get("dataset_info", {}) - n_action_steps = inf_cfg.get("n_action_steps", 10) - action_dim = dataset_info.get("action_dof", 7) - - # Try to infer batch size from any array-like input - batch_size = None - for v in obs_input.values(): - if isinstance(v, (torch.Tensor, np.ndarray)) and v.ndim > 0: - batch_size = int(v.shape[0]) - break - - if batch_size is None: - batch_size = 1 # Default to batch size 1 if no array-like inputs found - - actions = torch.zeros((batch_size, n_action_steps, action_dim), dtype=torch.float32) - - packed = pack_msg(actions) - return Response(content=packed, media_type="application/octet-stream") +if __name__ == "__main__": + serve() diff --git a/src/lerobot/policies/factory.py b/src/lerobot/policies/factory.py index e1bc581ee9..e4173c9b20 100644 --- a/src/lerobot/policies/factory.py +++ b/src/lerobot/policies/factory.py @@ -33,12 +33,12 @@ from lerobot.policies.pi0.configuration_pi0 import PI0Config from lerobot.policies.pi05.configuration_pi05 import PI05Config from lerobot.policies.pretrained import PreTrainedPolicy +from lerobot.policies.remote.configuration_remote import RemoteConfig from lerobot.policies.sac.configuration_sac import SACConfig from lerobot.policies.sac.reward_model.configuration_classifier import RewardClassifierConfig from lerobot.policies.smolvla.configuration_smolvla import SmolVLAConfig from lerobot.policies.tdmpc.configuration_tdmpc import TDMPCConfig from lerobot.policies.vqbet.configuration_vqbet import VQBeTConfig -from lerobot.policies.remote.configuration_remote import RemoteConfig from lerobot.processor import PolicyAction, PolicyProcessorPipeline from lerobot.processor.converters import ( batch_to_transition, @@ -299,7 +299,7 @@ def make_pre_post_processors( config=policy_cfg, dataset_stats=kwargs.get("dataset_stats"), ) - + elif isinstance(policy_cfg, RemoteConfig): from lerobot.policies.remote.processor_remote import make_remote_pre_post_processors @@ -368,6 +368,9 @@ def make_policy( policy_cls = get_policy_class(cfg.type) kwargs = {} + if cfg.type == "remote": + cfg.rename_map = rename_map or {} + if ds_meta is not None: features = dataset_to_policy_features(ds_meta.features) else: diff --git a/src/lerobot/policies/remote/configuration_remote.py b/src/lerobot/policies/remote/configuration_remote.py index b3053e11fe..9cd13ee664 100644 --- a/src/lerobot/policies/remote/configuration_remote.py +++ b/src/lerobot/policies/remote/configuration_remote.py @@ -16,10 +16,48 @@ class RemoteConfig(PreTrainedConfig): # How many environment steps to execute per policy call. Used by the runtime action queue. n_action_steps: int = field(default=1, metadata={"help": "Number of env steps to execute per call"}) - # Remote-specific - server_url: str = field(default="http://localhost:8000", metadata={"help": "Remote policy server URL"}) - timeout: float = field(default=30.0, metadata={"help": "HTTP timeout in seconds"}) - attempts: int = field(default=1, metadata={"help": "Number of retry attempts for failed requests"}) + # Remote-specific (gRPC policy server) + server_address: str = field( + default="localhost:8080", metadata={"help": "Async inference policy server address (host:port)"} + ) + request_timeout: float = field(default=30.0, metadata={"help": "gRPC request timeout in seconds"}) + retries: int = field(default=3, metadata={"help": "Number of retry attempts for failed RPC calls"}) + + remote_policy_type: str = field( + default="", + metadata={"help": "Policy type for the async inference server to load (e.g. act, diffusion)"}, + ) + remote_pretrained_name_or_path: str = field( + default="", + metadata={ + "help": ( + "Pretrained model repo ID or path for the async inference server. " + "Should match a directory containing policy weights or a Hugging Face repo ID." + ) + }, + ) + remote_policy_device: str = field( + default="cpu", metadata={"help": "Device on which the async inference server loads the policy"} + ) + + actions_per_chunk: int | None = field( + default=None, + metadata={ + "help": ( + "Number of actions returned per chunk by the remote server. " + "Defaults to `n_action_steps` when not provided." + ) + }, + ) + rename_map: dict[str, str] = field( + default_factory=dict, + metadata={ + "help": ( + "Observation rename map forwarded to the async inference server so it can match " + "environment keys to the policy's expected features." + ) + }, + ) # Additional arguments to inject directly into the observation dict (e.g. {"inference_config": {...}}) additional_args: dict[str, Any] = field( @@ -37,9 +75,42 @@ def get_scheduler_preset(self): return None def validate_features(self) -> None: - # Minimal validation: allow any combination, but require at least one input feature + if not self.remote_pretrained_name_or_path: + raise ValueError( + "RemoteConfig expects `remote_pretrained_name_or_path` to be provided so the server can load the policy." + ) + + remote_cfg: PreTrainedConfig | None = None + if not self.remote_policy_type or not self.input_features or not self.output_features: + remote_cfg = PreTrainedConfig.from_pretrained(self.remote_pretrained_name_or_path) + + if not self.remote_policy_type: + self.remote_policy_type = remote_cfg.type if remote_cfg is not None else "" + + if remote_cfg is not None and remote_cfg.type != self.remote_policy_type: + raise ValueError( + f"Loaded remote policy config type '{remote_cfg.type}' does not match " + f"requested remote_policy_type '{self.remote_policy_type}'." + ) + + if not self.input_features and remote_cfg is not None: + self.input_features = remote_cfg.input_features + + if not self.output_features and remote_cfg is not None: + self.output_features = remote_cfg.output_features + if not self.input_features: - raise ValueError("RemoteConfig requires at least one input feature to be defined.") + raise ValueError("RemoteConfig requires `input_features` to be defined.") + if not self.remote_policy_type: + raise ValueError("RemoteConfig expects `remote_policy_type` to be set for async inference.") + if self.effective_actions_per_chunk <= 0: + raise ValueError("RemoteConfig requires `actions_per_chunk` or `n_action_steps` to be positive.") + if self.retries < 1: + raise ValueError("RemoteConfig expects `retries` to be at least 1.") + + @property + def effective_actions_per_chunk(self) -> int: + return self.actions_per_chunk or self.n_action_steps @property def observation_delta_indices(self): diff --git a/src/lerobot/policies/remote/modeling_remote.py b/src/lerobot/policies/remote/modeling_remote.py index 4086323f92..616284fdae 100644 --- a/src/lerobot/policies/remote/modeling_remote.py +++ b/src/lerobot/policies/remote/modeling_remote.py @@ -1,19 +1,29 @@ -from collections import deque +import logging +import pickle # nosec B403 - trusted channel between client/server import threading +import time +from collections import deque +from typing import Any -import numpy as np -import requests +import grpc import torch from torch import Tensor -from lerobot.utils.messaging import pack_msg, unpack_msg +from lerobot.async_inference.helpers import RemotePolicyConfig, TimedAction, TimedObservation +from lerobot.configs.types import FeatureType from lerobot.policies.pretrained import PreTrainedPolicy +from lerobot.transport import services_pb2, services_pb2_grpc +from lerobot.transport.utils import grpc_channel_options, send_bytes_in_chunks +from lerobot.utils.constants import OBS_STR + from .configuration_remote import RemoteConfig +logger = logging.getLogger(__name__) + class RemotePolicy(PreTrainedPolicy): """ - A policy that proxies inference to a remote HTTP server. + A policy that proxies inference to the async inference gRPC policy server. """ config_class = RemoteConfig @@ -21,8 +31,10 @@ class RemotePolicy(PreTrainedPolicy): def __init__(self, config: RemoteConfig): super().__init__(config) - self.server_url = config.server_url.rstrip("/") - self.timeout = config.timeout + config.validate_features() + self._vector_name_map: dict[str, list[str]] = {} + self._image_key_map: dict[str, str] = {} + self._lerobot_features = self._build_lerobot_features() self._thread_state = threading.local() self.reset() @@ -35,51 +47,232 @@ def reset(self): def _state(self): state = self._thread_state - if not hasattr(state, "session"): - state.session = requests.Session() if not hasattr(state, "action_queue"): state.action_queue = deque(maxlen=self.config.n_action_steps) + if not hasattr(state, "stub") or state.stub is None: + self._initialize_connection(state) return state + def _initialize_connection(self, state) -> None: + state.channel = grpc.insecure_channel( + self.config.server_address, + options=grpc_channel_options(), + ) + state.stub = services_pb2_grpc.AsyncInferenceStub(state.channel) + state.next_timestep = 0 + + policy_cfg = RemotePolicyConfig( + policy_type=self.config.remote_policy_type, + pretrained_name_or_path=self.config.remote_pretrained_name_or_path, + lerobot_features=self._lerobot_features, + actions_per_chunk=self.config.effective_actions_per_chunk, + device=self.config.remote_policy_device, + rename_map=self.config.rename_map, + ) + + payload = pickle.dumps(policy_cfg) # nosec B301 - config originates from local process + request = services_pb2.PolicySetup(data=payload) + + for attempt in range(1, self.config.retries + 1): + try: + state.stub.Ready(services_pb2.Empty(), timeout=self.config.request_timeout) + state.stub.SendPolicyInstructions(request, timeout=self.config.request_timeout) + logger.debug("Remote policy handshake completed on attempt %d", attempt) + return + except grpc.RpcError as err: + logger.warning("Remote policy handshake failed on attempt %d: %s", attempt, err) + self._close_channel(state) + if attempt == self.config.retries: + raise + time.sleep(0.1) + state.channel = grpc.insecure_channel( + self.config.server_address, + options=grpc_channel_options(), + ) + state.stub = services_pb2_grpc.AsyncInferenceStub(state.channel) + + def _close_channel(self, state) -> None: + if getattr(state, "channel", None) is not None: + state.channel.close() + state.stub = None + + def _build_lerobot_features(self) -> dict[str, dict[str, Any]]: + """ + Build a hw-style feature dictionary expected by the async inference server. + Vector features (state/env) are split into individual scalar names, while image features + are mapped to (H, W, C) tensors keyed by their camera name. + """ + features: dict[str, dict[str, Any]] = {} + vector_name_map: dict[str, list[str]] = {} + image_key_map: dict[str, str] = {} + + for key, feature in self.config.input_features.items(): + if feature.type in (FeatureType.STATE, FeatureType.ENV): + if not feature.shape or len(feature.shape) != 1: + raise ValueError( + f"RemotePolicy only supports 1D state features, got shape {feature.shape} for '{key}'." + ) + dim = feature.shape[0] + names = [f"{key.replace('.', '_')}_d{idx}" for idx in range(dim)] + features[key] = { + "dtype": "float32", + "shape": (dim,), + "names": names, + } + vector_name_map[key] = names + elif feature.type is FeatureType.VISUAL: + if not feature.shape or len(feature.shape) != 3: + raise ValueError( + f"RemotePolicy only supports 3D visual features, got shape {feature.shape} for '{key}'." + ) + channels, height, width = feature.shape + camera_base = key.removeprefix(f"{OBS_STR}.images.") + # Ensure uniqueness if multiple features share the same suffix + raw_key = camera_base + counter = 1 + while raw_key in image_key_map.values(): + raw_key = f"{camera_base}_{counter}" + counter += 1 + + features[key] = { + "dtype": "video", + "shape": (height, width, channels), + "names": ["height", "width", "channels"], + } + image_key_map[key] = raw_key + else: + logger.debug("Skipping unsupported feature '%s' of type '%s'", key, feature.type) + + self._vector_name_map = vector_name_map + self._image_key_map = image_key_map + return features + + def _prepare_payload(self, batch: dict[str, Tensor]) -> dict[str, Any]: + if not batch: + raise ValueError("RemotePolicy received an empty batch.") + + payload: dict[str, Any] = {} + cpu_batch: dict[str, Any] = { + key: value.detach().cpu() if isinstance(value, torch.Tensor) else value + for key, value in batch.items() + } + + # Serialize vector features (state/env) into individual scalar entries + for key, names in self._vector_name_map.items(): + tensor = cpu_batch.get(key) + if tensor is None: + continue + + if isinstance(tensor, torch.Tensor): + if tensor.ndim == 2: + tensor = tensor.squeeze(0) + tensor = tensor.flatten() + if tensor.numel() != len(names): + raise ValueError( + f"Feature '{key}' expected {len(names)} values, got shape {tuple(tensor.shape)}." + ) + for idx, name in enumerate(names): + payload[name] = float(tensor[idx].item()) + else: + raise TypeError(f"Expected tensor for feature '{key}', got {type(tensor)}") + + # Serialize image features (convert to HWC uint8 tensors) + for key, raw_key in self._image_key_map.items(): + tensor = cpu_batch.get(key) + if tensor is None: + continue + + if not isinstance(tensor, torch.Tensor): + raise TypeError(f"Expected tensor for image feature '{key}', got {type(tensor)}") + + if tensor.ndim == 4: + tensor = tensor.squeeze(0) + if tensor.ndim != 3: + raise ValueError( + f"Image feature '{key}' must have 3 dimensions after squeeze, got {tensor.ndim}" + ) + + if tensor.dtype != torch.uint8: + tensor = (tensor.clamp(0.0, 1.0) * 255.0).to(torch.uint8) + + payload[raw_key] = tensor.permute(1, 2, 0).contiguous() + + # Optional task/instruction keys + for extra_key in ["task", "instruction"]: + if extra_key in cpu_batch: + payload[extra_key] = cpu_batch[extra_key] + + for key, value in (self.config.additional_args or {}).items(): + payload[key] = value + + return payload + + def _timed_actions_to_tensor(self, timed_actions: list[TimedAction]) -> Tensor: + if not timed_actions: + raise RuntimeError("Remote policy server returned an empty action chunk.") + + actions = [] + for timed_action in timed_actions: + action = timed_action.get_action() + if isinstance(action, torch.Tensor): + actions.append(action.detach().cpu()) + else: + actions.append(torch.as_tensor(action, dtype=torch.float32)) + + stacked = torch.stack(actions, dim=0).unsqueeze(0) # (B=1, T, A) + return stacked.to(device=self.config.device, dtype=torch.float32) + + def _request_action_chunk(self, state, batch: dict[str, Tensor]) -> Tensor: + payload = self._prepare_payload(batch) + timestamp = time.time() + timestep = state.next_timestep + state.next_timestep += 1 + + observation = TimedObservation( + timestamp=timestamp, + timestep=timestep, + observation=payload, + must_go=True, + ) + packed = pickle.dumps(observation) # nosec B301 - observation built locally + + iterator = send_bytes_in_chunks( + packed, + services_pb2.Observation, + log_prefix="[RemotePolicy]", + silent=True, + ) + state.stub.SendObservations(iterator, timeout=self.config.request_timeout) + actions_msg = state.stub.GetActions(services_pb2.Empty(), timeout=self.config.request_timeout) + if not actions_msg.data: + raise RuntimeError("Remote policy server returned an empty response payload.") + + timed_actions = pickle.loads(actions_msg.data) # nosec B301 - server is trusted peer + return self._timed_actions_to_tensor(timed_actions) + def forward(self, batch: dict[str, Tensor]) -> tuple[Tensor, dict] | tuple[Tensor, None]: raise NotImplementedError("RemotePolicy is inference-only") @torch.no_grad() def predict_action_chunk(self, batch: dict[str, Tensor], **kwargs) -> Tensor: - state = self._state() + last_error: Exception | None = None - # Build payload with raw tensors/arrays; pack_msg handles encoding - add_args = self.config.additional_args or {} - payload = batch | add_args - - packed = pack_msg(payload) - - last_exception = None - for _ in range(self.config.attempts): + for attempt in range(1, self.config.retries + 1): + state = self._state() try: - resp = state.session.post( - f"{self.server_url}/predict", - data=packed, - headers={"Content-Type": "application/octet-stream"}, - timeout=self.timeout, - ) - resp.raise_for_status() + return self._request_action_chunk(state, batch) + except grpc.RpcError as err: + logger.warning("Remote policy RPC failed on attempt %d: %s", attempt, err) + last_error = err + self._close_channel(state) + time.sleep(0.1) + except Exception as err: + logger.error("Unexpected error when requesting remote action chunk: %s", err) + last_error = err break - except requests.RequestException as e: - last_exception = e - - if last_exception: - raise last_exception - - unpacked = unpack_msg(resp.content) - if isinstance(unpacked, torch.Tensor): - actions = unpacked - else: - actions_np = np.asarray(unpacked) - actions = torch.from_numpy(actions_np) - device = torch.device(self.config.device) - return actions.to(device=device, dtype=torch.float32) + assert last_error is not None + raise last_error @torch.no_grad() def select_action(self, batch: dict[str, Tensor], **kwargs) -> Tensor: From 23e8fdab8c1aa53eafad896ababf2ef78e44b397 Mon Sep 17 00:00:00 2001 From: Denis Grachev Date: Wed, 29 Oct 2025 15:31:58 +0100 Subject: [PATCH 7/9] Remove unused messaging.py --- src/lerobot/utils/messaging.py | 145 --------------------------------- 1 file changed, 145 deletions(-) delete mode 100644 src/lerobot/utils/messaging.py diff --git a/src/lerobot/utils/messaging.py b/src/lerobot/utils/messaging.py deleted file mode 100644 index 97f1134faa..0000000000 --- a/src/lerobot/utils/messaging.py +++ /dev/null @@ -1,145 +0,0 @@ -import time -from typing import Any - -import msgpack -import numpy as np -import torch - - -class TorchSerialize: - def encodes(self, o: torch.Tensor | np.ndarray) -> dict: - if isinstance(o, torch.Tensor): - np_data = o.detach().cpu().numpy() - return { - "data": np_data.tobytes(), - "dtype": np_data.dtype.str, - "encoding": "raw_bytes", - "shape": o.shape, - "type": "tensor", - } - elif isinstance(o, np.ndarray): - return { - "data": o.tobytes(), - "shape": o.shape, - "dtype": o.dtype.str, - "encoding": "raw_bytes", - "type": "array", - } - else: - return o - - def decodes(self, o: dict) -> torch.Tensor | np.ndarray: - dtype = o["dtype"] - t = o["type"] - arr = np.frombuffer(o["data"], dtype=dtype) - arr = arr.reshape(o["shape"]) - if not arr.flags.writeable: - arr = arr.copy() - - if t == "tensor": - retval = torch.as_tensor(arr) - elif t == "array": - retval = arr - return retval - - -class JsonLikeSerialize: - def encodes(self, o: torch.Tensor | np.ndarray | Any) -> dict: - if not isinstance(o, list): - data = o.tolist() - return {"data": data, "dtype": "tensor", "encoding": "json"} - - -img_serializer = TorchSerialize() - - -class TensorEncoder: - def __init__(self, image_data_function) -> None: - self.image_data_function = image_data_function - - def __call__(self, obj: Any) -> Any: - if isinstance(obj, (np.ndarray, torch.Tensor)): - return self.image_data_function(obj) - else: - return obj - - -class TensorDecoder: - def __init__(self, image_data_function) -> None: - self.image_data_function = image_data_function - - def __call__(self, obj: Any) -> Any: - if "__image_data__" in obj: - return self.image_data_function(obj) - else: - return obj - - -def encode_image_data(obj: torch.Tensor | np.ndarray) -> dict: - return {"__image_data__": True, "as_str": img_serializer.encodes(obj)} - - -def encode_image_data_json(obj: torch.Tensor | np.ndarray) -> list: - return img_serializer.encodes(obj) - - -def decode_image_data(obj: dict) -> torch.Tensor | np.ndarray: - return img_serializer.decodes(obj["as_str"]) - - -encoder_data = TensorEncoder(image_data_function=encode_image_data) -decoder_data = TensorDecoder(image_data_function=decode_image_data) -json_like_encoder = TensorEncoder(image_data_function=encode_image_data_json) - - -def envelope(msg: Any) -> dict: - """Wrap a message in an envelope. Just a dict for future json serialization. - - Args: - msg (Any): Any serializable message, might be (int,string,float,list,dict,tuple) - - Returns: - dict: Returns the msg wrapped in an envelope. - """ - return {"payload": msg, "monotonic_time": time.monotonic()} - - -def rm_envelope(msg: dict) -> Any: - """Remove the envelope from a message. - - Args: - msg (dict): The wrapped message - - Returns: - Any: The payload of the message - """ - return msg["payload"] - - -def pack_msg(msg: Any, json_like=False) -> bytes: - """Packs msg to a bytearray following the msgpack specification. - - Args: - msg (Any): Any message to send. - - Returns: - bytearray: [description] - """ - encoder = json_like_encoder if json_like else encoder_data - msg = envelope(msg) # Wrap message in an envelope - return msgpack.packb(msg, default=encoder, use_bin_type=True) # Pack to byte array using msgpack - - -def unpack_msg(packed: bytes, with_header: bool = False) -> Any: - """Unpack an image message message. - - Args: - packed (bytearray): bytearray containing a msgpack message - """ - unpacked = msgpack.unpackb(packed, raw=False, object_hook=decoder_data) - if with_header: - if isinstance(unpacked["monotonic_time"], list): - unpacked["monotonic_time"] = unpacked["monotonic_time"][0] - return unpacked - else: - return rm_envelope(unpacked) From d6427f0d1e75808c2dbe5f8df3bbd6d62654613f Mon Sep 17 00:00:00 2001 From: Denis Grachev Date: Wed, 29 Oct 2025 15:33:24 +0100 Subject: [PATCH 8/9] restore toml file --- pyproject.toml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 8a0862fb3a..7f194b4227 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -134,12 +134,6 @@ pusht = ["gym-pusht>=0.1.5,<0.2.0", "pymunk>=6.6.0,<7.0.0"] # TODO: Fix pymunk v libero = ["lerobot[transformers-dep]", "libero @ git+https://github.com/huggingface/lerobot-libero.git@main#egg=libero"] metaworld = ["metaworld==3.0.0"] -# HTTP server extra -server = [ - "fastapi>=0.115.0,<1.0.0", - "uvicorn[standard]>=0.30.0,<1.0.0", - "msgpack>=1.0.8,<2.0.0", -] # All all = [ @@ -162,7 +156,6 @@ all = [ "lerobot[phone]", "lerobot[libero]", "lerobot[metaworld]", - "lerobot[server]", ] [project.scripts] From 5b582694be9a163fba038fbaf63dc0fa4d954aee Mon Sep 17 00:00:00 2001 From: Denis Grachev Date: Wed, 29 Oct 2025 15:37:01 +0100 Subject: [PATCH 9/9] cosmetic fixes --- README.md | 2 +- pyproject.toml | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index 131c8fe455..5d8ad6719d 100644 --- a/README.md +++ b/README.md @@ -321,7 +321,7 @@ lerobot-eval \ --policy.remote_policy_type=pi05 \ --policy.remote_pretrained_name_or_path=lerobot/pi05_libero_finetuned \ --policy.remote_policy_device=cuda \ - --rename_map='{"observation.images.image":"observation.images.static1","observation.images.image2":"observation.images.wrist1"}' \ + --rename_map='"--rename_map={"observation.images.empty_camera_0":"observation.images.image"}' \ --output_dir=./eval_logs_libero_spatial ``` diff --git a/pyproject.toml b/pyproject.toml index 7f194b4227..1c71acec07 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -134,7 +134,6 @@ pusht = ["gym-pusht>=0.1.5,<0.2.0", "pymunk>=6.6.0,<7.0.0"] # TODO: Fix pymunk v libero = ["lerobot[transformers-dep]", "libero @ git+https://github.com/huggingface/lerobot-libero.git@main#egg=libero"] metaworld = ["metaworld==3.0.0"] - # All all = [ "lerobot[dynamixel]",