diff --git a/.gitignore b/.gitignore index e69de29..c18dd8d 100644 --- a/.gitignore +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__/ diff --git a/flowstate_credentials_proxy/LICENSE b/flowstate_credentials_proxy/LICENSE new file mode 100644 index 0000000..8906ebc --- /dev/null +++ b/flowstate_credentials_proxy/LICENSE @@ -0,0 +1,19 @@ +Copyright 2024 Intrinsic Innovation LLC + +You are hereby granted a non-exclusive, worldwide, royalty-free license to use, +copy, modify, and distribute this Intrinsic SDK in source code or binary form for use +in connection with the services and APIs provided by Intrinsic Innovation LLC (“Intrinsic”). + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +If you use this Intrinsic SDK with any Intrinsic services, your use is subject to the Intrinsic +Platform Terms of Service [https://intrinsic.ai/platform-terms]. If you create works that call +Intrinsic APIs, you must agree to the terms of service for those APIs separately. This license +does not grant you any special rights to use the services. + +This copyright notice shall be included in all copies or substantial portions of the software. diff --git a/flowstate_credentials_proxy/flowstate_credentials_proxy/__init__.py b/flowstate_credentials_proxy/flowstate_credentials_proxy/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flowstate_credentials_proxy/flowstate_credentials_proxy/__main__.py b/flowstate_credentials_proxy/flowstate_credentials_proxy/__main__.py new file mode 100644 index 0000000..1cd59ba --- /dev/null +++ b/flowstate_credentials_proxy/flowstate_credentials_proxy/__main__.py @@ -0,0 +1,126 @@ +import argparse +import asyncio +import sys + +import aiohttp +from aiohttp import web + +from flowstate_credentials_proxy import auth +from flowstate_credentials_proxy.auth import InvalidOrganizationError + + +async def proxy( + req: web.Request, token_source: auth.TokenSource, cluster: str, service: str +) -> web.WebSocketResponse: + """Proxy a single websocket connection to flowstate.""" + + ws_response = web.WebSocketResponse() + await ws_response.prepare(req) + + try: + token = await token_source.get_token() + except Exception as e: + print(f"Error getting token: {e}", file=sys.stderr) + await ws_response.close() + return ws_response + + uri = f"wss://www.endpoints.{token_source.project}.cloud.goog/onprem/client/{cluster}/api/resourceinstances/{service}" + headers = {"cookie": f"auth-proxy={token}"} + + try: + async with aiohttp.ClientSession() as session: + async with session.ws_connect(uri, headers=headers) as upstream: + print("Connected to upstream websocket") + + async def forward_to_upstream( + d: web.WebSocketResponse, u: aiohttp.ClientWebSocketResponse + ): + async for msg in d: + if msg.type == aiohttp.WSMsgType.TEXT: + await u.send_str(msg.data) + elif msg.type == aiohttp.WSMsgType.BINARY: + await u.send_bytes(msg.data) + elif msg.type == aiohttp.WSMsgType.CLOSE: + await u.close() + break + else: + break + + async def forward_to_downstream( + u: aiohttp.ClientWebSocketResponse, d: web.WebSocketResponse + ): + async for msg in u: + if msg.type == aiohttp.WSMsgType.TEXT: + await d.send_str(msg.data) + elif msg.type == aiohttp.WSMsgType.BINARY: + await d.send_bytes(msg.data) + elif msg.type in ( + aiohttp.WSMsgType.CLOSED, + aiohttp.WSMsgType.ERROR, + ): + await d.close(code=u.close_code or 1000) + break + + await asyncio.gather( + forward_to_upstream(ws_response, upstream), + forward_to_downstream(upstream, ws_response), + ) + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + finally: + return ws_response + + +def main(): + parser = argparse.ArgumentParser( + prog="flowstate_credentials_proxy", + description="Start a websocket proxy to allow a local zenoh router to connect to flowstate.", + ) + parser.add_argument( + "--org", + required=True, + help="The organization name in the format organization@project.", + ) + parser.add_argument("--cluster", required=True, help="Name of the cluster to use.") + parser.add_argument( + "--service", required=True, help="Name of the flowstate_ros_bridge instance." + ) + parser.add_argument("--port", default=7448, help="Port to listen on.") + args = parser.parse_args() + + try: + token_source = auth.TokenSource(args.org) + except FileNotFoundError: + print( + f"""Error: Credentials for given organization "{args.org}" not found. +Run 'inctl auth login --org {args.org}' to add it. + +Download inctl here https://github.com/intrinsic-ai/sdk/releases""", + file=sys.stderr, + ) + return + except InvalidOrganizationError: + print("Organization is invalid", file=sys.stderr) + return + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + return + + app = web.Application() + app.add_routes( + [web.get("/", lambda req: proxy(req, token_source, args.cluster, args.service))] + ) + + print(f"Starting zenoh proxy on port {args.port}.") + print( + f"""Start rmw_zenohd and connect it to the proxy by running: +``` +export ZENOH_CONFIG_OVERRIDE='connect/endpoints=["ws/localhost:{args.port}"];routing/router/peers_failover_brokering=true' +ros2 run rmw_zenoh_cpp rmw_zenohd +```""" + ) + web.run_app(app, port=args.port, handle_signals=True, print=None) + + +if __name__ == "__main__": + main() diff --git a/flowstate_credentials_proxy/flowstate_credentials_proxy/auth.py b/flowstate_credentials_proxy/flowstate_credentials_proxy/auth.py new file mode 100644 index 0000000..afdf88d --- /dev/null +++ b/flowstate_credentials_proxy/flowstate_credentials_proxy/auth.py @@ -0,0 +1,101 @@ +import base64 +import json +import pathlib +import re +from datetime import datetime, timedelta, timezone +from os import environ + +import aiohttp + +INTRINSIC_CONFIG_LOCATION = pathlib.Path(environ["HOME"], ".config", "intrinsic") +ORGANIZATION_PATTERN = re.compile(r"[a-zA-Z][\w-]*@[a-zA-Z][\w-]*") +FLOWSTATE_ADDR = "https://flowstate.intrinsic.ai" + + +class InvalidOrganizationError(ValueError): + pass + + +def get_project_from_organization(org: str) -> str: + """ + Get the project from an organization@project string. + + Raises: + InvalidOrganizationError: If the organization is invalid. + """ + if ORGANIZATION_PATTERN.fullmatch(org) is None: + raise InvalidOrganizationError("invalid organization") + return org.split("@")[1] + + +def get_api_key(project: str): + """ + Get the cached API key for an organization. + + Raies: + JSONDecodeError: If the API key file is malformed. + KeyError: If the API key file is invalid. + FileNotFoundError: If the API key file does not exist. + """ + user_token = json.load( + pathlib.Path( + INTRINSIC_CONFIG_LOCATION, "projects", f"{project}.user-token" + ).open() + ) + api_key = user_token["tokens"]["default"]["apiKey"] + return api_key + + +class TokenSource: + def __init__(self, org: str): + """ + Create a new instance of TokenSource. + + Raies: + InvalidOrganizationError: If the organization is invalid. + JSONDecodeError: If the API key file is malformed. + KeyError: If the API key file is invalid. + FileNotFoundError: If the API key file does not exist. + """ + self.org = org + self.project = get_project_from_organization(self.org) + self.api_key = get_api_key(self.project) + self.__cached_token: str | None = None + + @staticmethod + def __token_expiring(token: str) -> bool: + payload = json.loads(base64.b64decode(token.split(".")[1] + "==")) + return datetime.now(timezone.utc) > ( + datetime.fromtimestamp(payload["exp"], tz=timezone.utc) + - timedelta(minutes=5) + ) + + async def get_token(self) -> str: + """ + Get an access token. + + Raises: + aiohttp.ClientError: If the request fails. + KeyError: Cannot find the access token in the response. + """ + if self.__cached_token is not None and not self.__token_expiring( + self.__cached_token + ): + return self.__cached_token + + async with aiohttp.ClientSession( + base_url=FLOWSTATE_ADDR, + headers={"content-type": "application/json"}, + raise_for_status=True, + ) as session: + async with session.post( + f"/api/v1/accountstokens:idtoken", + json={ + "apiKey": self.api_key, + "do_fan_out": False, + "api_key_project_hint": self.project, + }, + ) as resp: + access_token: str = (await resp.json())["idToken"] + self.__cached_token = access_token + return self.__cached_token diff --git a/flowstate_credentials_proxy/flowstate_credentials_proxy/test_auth.py b/flowstate_credentials_proxy/flowstate_credentials_proxy/test_auth.py new file mode 100644 index 0000000..dd5b10e --- /dev/null +++ b/flowstate_credentials_proxy/flowstate_credentials_proxy/test_auth.py @@ -0,0 +1,111 @@ +import json +import unittest +from contextlib import asynccontextmanager +from unittest.mock import AsyncMock, MagicMock, mock_open, patch + +import aiohttp + +from flowstate_credentials_proxy.auth import InvalidOrganizationError, TokenSource + + +class TestTokenSource(unittest.IsolatedAsyncioTestCase): + @staticmethod + def make_mock_response(json): + @asynccontextmanager + async def mock_response(): + try: + resp = AsyncMock() + resp.json.return_value = json + yield resp + finally: + pass + + return mock_response() + + mock_user_token = { + "name": "test_project", + "tokens": {"default": {"apiKey": "test-api-key"}}, + } + + @patch("pathlib.Path.open", new_callable=mock_open) + @patch("flowstate_credentials_proxy.auth.aiohttp.ClientSession.post") + async def test_get_access_token_success( + self, mock_post: MagicMock, mock_open_file: MagicMock + ) -> None: + mock_open_file.return_value.read.return_value = json.dumps(self.mock_user_token) + + # { + # "iss": "test", + # "sub": "test", + # "aud": "test", + # "iat": 0, + # "exp": 2177596800 # 2039-01-01 + # } + test_token = "eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiYWRtaW4iOnRydWUsImlhdCI6MCwiZXhwIjoyMTc3NTk2ODAwfQ." + + @asynccontextmanager + async def mock_response(): + try: + resp = AsyncMock() + resp.json.return_value = {"idToken": test_token} + yield resp + finally: + pass + + mock_post.return_value = self.make_mock_response({"idToken": test_token}) + + ts = TokenSource("user@project") + access_token = await ts.get_token() + + self.assertEqual(access_token, test_token) + mock_post.assert_called_once_with( + "/api/v1/accountstokens:idtoken", + json={ + "apiKey": "test-api-key", + "do_fan_out": False, + "api_key_project_hint": "project", + }, + ) + + def test_invalid_organization(self) -> None: + with self.assertRaises(InvalidOrganizationError) as e: + TokenSource("invalid-organization") + self.assertEqual(str(e.exception), "invalid organization") + + @patch("pathlib.Path.open", new_callable=mock_open) + def test_invalid_json(self, mock_open_file: MagicMock) -> None: + mock_open_file.return_value.read.return_value = "invalid json" + with self.assertRaises(json.JSONDecodeError): + TokenSource("user@project") + + @patch("pathlib.Path.open", new_callable=mock_open) + def test_missing_key(self, mock_open_file: MagicMock) -> None: + mock_open_file.return_value.read.return_value = json.dumps({}) + with self.assertRaises(KeyError): + TokenSource("user@project") + + @patch("pathlib.Path.open", new_callable=mock_open) + @patch("flowstate_credentials_proxy.auth.aiohttp.ClientSession.post") + async def test_get_token_http_error( + self, mock_post: MagicMock, mock_open_file: MagicMock + ) -> None: + mock_open_file.return_value.read.return_value = json.dumps(self.mock_user_token) + mock_post.side_effect = aiohttp.ClientError + + with self.assertRaises(aiohttp.ClientError): + await TokenSource("user@project").get_token() + + @patch("pathlib.Path.open", new_callable=mock_open) + @patch("flowstate_credentials_proxy.auth.aiohttp.ClientSession.post") + async def test_get_access_token_missing_key( + self, mock_post: MagicMock, mock_open_file: MagicMock + ) -> None: + mock_open_file.return_value.read.return_value = json.dumps(self.mock_user_token) + mock_post.return_value = self.make_mock_response({}) + + with self.assertRaises(KeyError): + await TokenSource("user@project").get_token() + + +if __name__ == "__main__": + unittest.main() diff --git a/flowstate_credentials_proxy/package.xml b/flowstate_credentials_proxy/package.xml new file mode 100644 index 0000000..fb1f252 --- /dev/null +++ b/flowstate_credentials_proxy/package.xml @@ -0,0 +1,15 @@ + + + + flowstate_credentials_proxy + 0.0.1 + websocket proxy to authenticate to flowstate + koonpeng + Intrinsic License + + python3-aiohttp + + + ament_python + + diff --git a/flowstate_credentials_proxy/resource/flowstate_credentials_proxy b/flowstate_credentials_proxy/resource/flowstate_credentials_proxy new file mode 100644 index 0000000..e69de29 diff --git a/flowstate_credentials_proxy/setup.cfg b/flowstate_credentials_proxy/setup.cfg new file mode 100644 index 0000000..c154926 --- /dev/null +++ b/flowstate_credentials_proxy/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/flowstate_credentials_proxy +[install] +install_scripts=$base/lib/flowstate_credentials_proxy diff --git a/flowstate_credentials_proxy/setup.py b/flowstate_credentials_proxy/setup.py new file mode 100644 index 0000000..af64e03 --- /dev/null +++ b/flowstate_credentials_proxy/setup.py @@ -0,0 +1,24 @@ +from setuptools import find_packages, setup + +package_name = "flowstate_credentials_proxy" + +setup( + name=package_name, + version="0.0.1", + packages=find_packages(exclude=["test_*.py"]), + data_files=[ + ("share/ament_index/resource_index/packages", ["resource/" + package_name]), + ("share/" + package_name, ["package.xml"]), + ], + install_requires=["setuptools", "aiohttp"], + zip_safe=True, + maintainer="koonpeng", + maintainer_email="koonpeng@intrinsic.ai", + description="websocket proxy to authenticate to flowstate", + license="Intrinsic License", + entry_points={ + "console_scripts": [ + "credentials_proxy = flowstate_credentials_proxy.__main__:main", + ], + }, +) diff --git a/flowstate_ros_bridge/CMakeLists.txt b/flowstate_ros_bridge/CMakeLists.txt index c831a6f..992978b 100644 --- a/flowstate_ros_bridge/CMakeLists.txt +++ b/flowstate_ros_bridge/CMakeLists.txt @@ -1,6 +1,8 @@ cmake_minimum_required(VERSION 3.8) project(flowstate_ros_bridge) +include(GNUInstallDirs) + # Default to C++17 if(NOT CMAKE_CXX_STANDARD) set(CMAKE_CXX_STANDARD 17) @@ -92,6 +94,7 @@ target_include_directories(flowstate_ros_bridge_interface $ $ ) +install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/include/ TYPE INCLUDE) #=============================================================================== diff --git a/flowstate_ros_bridge/Dockerfile.service b/flowstate_ros_bridge/Dockerfile.service index ba9dd4e..ecd06b5 100644 --- a/flowstate_ros_bridge/Dockerfile.service +++ b/flowstate_ros_bridge/Dockerfile.service @@ -19,8 +19,9 @@ ADD src/sdk-ros /opt/ros/underlay/src/sdk-ros RUN . /opt/ros/jazzy/setup.sh \ && apt-get update \ - && apt install -y ros-jazzy-ament-cmake-vendor-package \ + && apt install -y ros-jazzy-ament-cmake-vendor-package libprotobuf-dev libzmq3-dev libbz2-dev clang-19 \ && colcon build \ + --cmake-args -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_COMPILER=/usr/bin/clang-19 -DCMAKE_CXX_COMPILER=/usr/bin/clang++-19 \ --continue-on-error \ --event-handlers=console_direct+ \ --merge-install \ @@ -41,6 +42,7 @@ RUN . /opt/ros/jazzy/setup.sh \ && . /opt/ros/underlay/install/setup.sh \ && cd /opt/ros/overlay \ && colcon build \ + --cmake-args -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_COMPILER=/usr/bin/clang-19 -DCMAKE_CXX_COMPILER=/usr/bin/clang++-19 \ --event-handlers=console_direct+ \ --merge-install \ --packages-up-to=${SERVICE_PACKAGE} \ @@ -53,7 +55,7 @@ ARG SERVICE_PACKAGE ARG SERVICE_NAME RUN apt-get update \ - && apt-get install -y ros-jazzy-rmw-zenoh-cpp \ + && apt-get install -y ros-jazzy-rmw-zenoh-cpp libzmq3-dev libbz2-dev \ && rm -rf /var/lib/apt/lists/* COPY --from=overlay /opt/ros/underlay/install /opt/ros/underlay/install diff --git a/flowstate_ros_bridge/flowstate_ros_bridge.manifest.textproto b/flowstate_ros_bridge/flowstate_ros_bridge.manifest.textproto index e10bedd..c354748 100644 --- a/flowstate_ros_bridge/flowstate_ros_bridge.manifest.textproto +++ b/flowstate_ros_bridge/flowstate_ros_bridge.manifest.textproto @@ -22,6 +22,7 @@ service_def { archive_filename: "flowstate_ros_bridge.tar" } } + http_config {} } assets { default_configuration_filename: "default_config.binarypb", diff --git a/flowstate_ros_bridge/include/flowstate_ros_bridge/executive.hpp b/flowstate_ros_bridge/include/flowstate_ros_bridge/executive.hpp index deada13..55bbeab 100644 --- a/flowstate_ros_bridge/include/flowstate_ros_bridge/executive.hpp +++ b/flowstate_ros_bridge/include/flowstate_ros_bridge/executive.hpp @@ -28,10 +28,8 @@ #include #include -#include #include #include -#include #include #include "absl/status/statusor.h" @@ -40,10 +38,7 @@ #include "intrinsic/executive/proto/executive_service.grpc.pb.h" #include "intrinsic/executive/proto/executive_service.pb.h" #include "intrinsic/frontend/solution_service/proto/solution_service.grpc.pb.h" -#include "intrinsic/frontend/solution_service/proto/solution_service.pb.h" #include "intrinsic/skills/proto/skill_registry.grpc.pb.h" -#include "intrinsic/skills/proto/skill_registry.pb.h" -#include "intrinsic/util/grpc/grpc.h" #include "nlohmann/json.hpp" namespace flowstate_ros_bridge { @@ -65,12 +60,9 @@ class Executive : public std::enable_shared_from_this { * for its required dependent services and setting default timing parameters * for its operations. * - * @param executive_service_address The network address (e.g., "host:port" or - * URI) of the main Executive Service to connect to. - * @param skill_registry_address The network address (e.g., "host:port" or - * URI) of the Skill Registry Service. - * @param solution_service_address The network address (e.g., "host:port" or - * URI) of the Solution Service. + * @param executive_channel The grpc channel for the Executive Service. + * @param skill_registry_channel The grpc channel for the Skill Registry Service. + * @param solution_channel The grpc channel the Solution Service. * @param deadline_seconds The default timeout duration in seconds for * operations initiated by this Executive. Defaults to 5 seconds. * @param update_rate_millis The frequency in milliseconds at which the @@ -81,9 +73,9 @@ class Executive : public std::enable_shared_from_this { * checked upon attempting connection, not necessarily within this * constructor. */ - Executive(const std::string &executive_service_address, - const std::string &skill_registry_address, - const std::string &solution_service_address, + Executive(std::shared_ptr executive_channel, + std::shared_ptr skill_registry_channel, + std::shared_ptr solution_channel, std::size_t deadline_seconds = 5, std::size_t update_rate_millis = 1000); @@ -147,9 +139,9 @@ class Executive : public std::enable_shared_from_this { private: mutable std::recursive_mutex mutex_; - std::string executive_service_address_; - std::string skill_registry_address_; - std::string solution_service_address_; + std::shared_ptr executive_channel_; + std::shared_ptr skill_registry_channel_; + std::shared_ptr solution_channel_; std::size_t deadline_seconds_; std::size_t update_rate_millis_; bool connected_; diff --git a/flowstate_ros_bridge/include/flowstate_ros_bridge/world.hpp b/flowstate_ros_bridge/include/flowstate_ros_bridge/world.hpp index 257e246..f70c6fa 100644 --- a/flowstate_ros_bridge/include/flowstate_ros_bridge/world.hpp +++ b/flowstate_ros_bridge/include/flowstate_ros_bridge/world.hpp @@ -27,14 +27,13 @@ #define FLOWSTATE_ROS_BRIDGE__WORLD_HPP_ #include +#include #include #include "absl/status/statusor.h" #include "intrinsic/geometry/proto/geometry_service.grpc.pb.h" -#include "intrinsic/geometry/proto/geometry_service.pb.h" #include "intrinsic/math/proto/tf_message.pb.h" #include "intrinsic/platform/pubsub/pubsub.h" -#include "intrinsic/platform/pubsub/zenoh_publisher_data.h" #include "intrinsic/world/objects/object_world_client.h" #include "intrinsic/world/objects/world_object.h" #include "intrinsic/world/proto/object_world_service.grpc.pb.h" @@ -49,8 +48,8 @@ class World : public std::enable_shared_from_this { public: World(std::shared_ptr pubsub, - const std::string& world_service_address, - const std::string& geometry_service_address, + std::shared_ptr world_channel, + std::shared_ptr geometry_channel, std::size_t deadline_seconds = 10); absl::StatusOr> CreateTfSubscription( @@ -75,8 +74,8 @@ class World : public std::enable_shared_from_this { private: std::shared_ptr pubsub_; - std::string world_service_address_; - std::string geometry_service_address_; + std::shared_ptr world_channel_; + std::shared_ptr geometry_channel_; size_t deadline_seconds_ = 10; bool connected_ = false; std::shared_ptr object_world_client_; diff --git a/flowstate_ros_bridge/package.xml b/flowstate_ros_bridge/package.xml index f91eb86..4cde55a 100644 --- a/flowstate_ros_bridge/package.xml +++ b/flowstate_ros_bridge/package.xml @@ -2,7 +2,7 @@ flowstate_ros_bridge - 0.0.1 + 0.0.2 A bridge between Flowstate services and ROS applications Morgan Quigley Yadunund Vijay diff --git a/flowstate_ros_bridge/src/executive.cpp b/flowstate_ros_bridge/src/executive.cpp index ae4ac25..817a2d2 100644 --- a/flowstate_ros_bridge/src/executive.cpp +++ b/flowstate_ros_bridge/src/executive.cpp @@ -38,11 +38,9 @@ #include "absl/log/log.h" #include "absl/status/status.h" #include "absl/status/statusor.h" -#include "absl/synchronization/notification.h" #include "intrinsic/executive/proto/run_metadata.pb.h" #include "intrinsic/skills/proto/skills.pb.h" #include "intrinsic/util/grpc/grpc.h" -#include "intrinsic/util/proto/descriptors.h" #include "intrinsic/util/status/status_conversion_grpc.h" #include "intrinsic/util/status/status_macros.h" @@ -56,18 +54,18 @@ static constexpr std::string PROCESS_PARAMS_KEY = "parameters"; static constexpr std::string RESOURCE_PARAMS_KEY = "resources"; ///============================================================================= -Executive::Executive(const std::string& executive_service_address, - const std::string& skill_registry_address, - const std::string& solution_service_address, - std::size_t deadline_seconds, - std::size_t update_rate_millis) - : executive_service_address_(std::move(executive_service_address)), - skill_registry_address_(std::move(skill_registry_address)), - solution_service_address_(std::move(solution_service_address)), +Executive::Executive( + std::shared_ptr executive_channel, + std::shared_ptr skill_registry_channel, + std::shared_ptr solution_channel, + std::size_t deadline_seconds, + std::size_t update_rate_millis) + : executive_channel_(std::move(executive_channel)), + skill_registry_channel_(std::move(skill_registry_channel)), + solution_channel_(std::move(solution_channel)), deadline_seconds_(deadline_seconds), update_rate_millis_(update_rate_millis), - connected_(false), - current_process_(nullptr) { + connected_(false), current_process_(nullptr) { // Do nothing. } @@ -79,71 +77,28 @@ absl::Status Executive::connect() { std::lock_guard lock(mutex_); - grpc::ChannelArguments channel_args = intrinsic::DefaultGrpcChannelArgs(); - // The skill registry may need to call out to one or more skill information - // services. Those services might not be ready at startup. We configure a - // retry policy to mitigate b/283020857. - // (See - // https://github.com/grpc/grpc-go/blob/master/examples/features/retry/README.md - // for an example of this gRPC feature.) - channel_args.SetServiceConfigJSON(R"( - { - "methodConfig": [{ - "name": [{"service": "intrinsic_proto.skills.SkillRegistry"}], - "waitForReady": true, - "timeout": "300s", - "retryPolicy": { - "maxAttempts": 10, - "initialBackoff": "0.1s", - "maxBackoff": "10s", - "backoffMultiplier": 1.5, - "retryableStatusCodes": [ "UNAVAILABLE" ] - } - }] - })"); - channel_args.SetMaxReceiveMessageSize(10000000); // 10 MB - channel_args.SetMaxSendMessageSize(10000000); // 10 MB - // Connect to the executive service. - LOG(INFO) << "Connecting to executive service at address " - << executive_service_address_; - INTR_ASSIGN_OR_RETURN( - std::shared_ptr executive_channel, - intrinsic::CreateClientChannel( - executive_service_address_, - absl::Now() + absl::Seconds(deadline_seconds_), channel_args)); + LOG(INFO) << "Connecting to executive service"; INTR_RETURN_IF_ERROR(intrinsic::WaitForChannelConnected( - executive_service_address_, executive_channel, + "executive service", executive_channel_, absl::Now() + absl::Seconds(deadline_seconds_))); - executive_stub_ = ExecutiveService::NewStub(std::move(executive_channel)); + executive_stub_ = ExecutiveService::NewStub(executive_channel_); LOG(INFO) << "Successfully connected to executive service!"; // Connect to the solution service. - LOG(INFO) << "Connecting to solution service at address " - << solution_service_address_; - INTR_ASSIGN_OR_RETURN( - std::shared_ptr solution_channel, - intrinsic::CreateClientChannel( - solution_service_address_, - absl::Now() + absl::Seconds(deadline_seconds_), channel_args)); + LOG(INFO) << "Connecting to solution service"; INTR_RETURN_IF_ERROR(intrinsic::WaitForChannelConnected( - solution_service_address_, solution_channel, + "solution service", solution_channel_, absl::Now() + absl::Seconds(deadline_seconds_))); - solution_stub_ = SolutionService::NewStub(std::move(solution_channel)); + solution_stub_ = SolutionService::NewStub(solution_channel_); LOG(INFO) << "Successfully connected to solution service!"; // Connect to the skill registry. - LOG(INFO) << "Connecting to skill registry at address " - << skill_registry_address_; - INTR_ASSIGN_OR_RETURN( - std::shared_ptr skill_channel, - intrinsic::CreateClientChannel( - skill_registry_address_, - absl::Now() + absl::Seconds(deadline_seconds_), channel_args)); + LOG(INFO) << "Connecting to skill registry"; INTR_RETURN_IF_ERROR(intrinsic::WaitForChannelConnected( - skill_registry_address_, skill_channel, + "skill registry", skill_registry_channel_, absl::Now() + absl::Seconds(deadline_seconds_))); - skill_registry_stub_ = SkillRegistry::NewStub(std::move(skill_channel)); + skill_registry_stub_ = SkillRegistry::NewStub(skill_registry_channel_); LOG(INFO) << "Successfully connected to the skill registry!"; // TODO(Yadunund): Check if we can add hooks to reconnect if disconnected diff --git a/flowstate_ros_bridge/src/flowstate_ros_bridge.cpp b/flowstate_ros_bridge/src/flowstate_ros_bridge.cpp index bc41750..ab4849b 100644 --- a/flowstate_ros_bridge/src/flowstate_ros_bridge.cpp +++ b/flowstate_ros_bridge/src/flowstate_ros_bridge.cpp @@ -28,6 +28,7 @@ #include "absl/flags/flag.h" #include "flowstate_ros_bridge/bridge_interface.hpp" #include "intrinsic/platform/pubsub/zenoh_util/zenoh_config.h" +#include namespace flowstate_ros_bridge { constexpr const char* kExecutiveAddressParamName = "executive_service_address"; @@ -41,6 +42,9 @@ constexpr const char* kGeometryAddressParamName = "geometry_service_address"; constexpr const char* kServiceTunnelParamName = "service_tunnel"; constexpr const char* kBridgePluginParamName = "bridge_plugins"; +const std::size_t kExecutiveDeadlineSeconds = 5; +const std::size_t kWorldDeadlineSeconds = 10; + ///============================================================================= FlowstateROSBridge::FlowstateROSBridge(const rclcpp::NodeOptions& options) : LifecycleNode("flowstate_ros_bridge", options), @@ -91,18 +95,6 @@ FlowstateROSBridge::FlowstateROSBridge(const rclcpp::NodeOptions& options) .get_value()); this->pubsub_ = std::make_shared(this->get_name()); - // Initialize the executive client. - this->executive_ = std::make_shared( - this->get_parameter(kExecutiveAddressParamName).get_value(), - this->get_parameter(kSkillAddressParamName).get_value(), - this->get_parameter(kSolutionAddressParamName).get_value()); - - // Initialize the world client. - this->world_ = std::make_shared( - this->pubsub_, - this->get_parameter(kWorldAddressParamName).get_value(), - this->get_parameter(kGeometryAddressParamName).get_value()); - bridge_ids_ = this->get_parameter(kBridgePluginParamName).as_string_array(); for (const auto& id : bridge_ids_) { @@ -126,6 +118,96 @@ FlowstateROSBridge::FlowstateROSBridge(const rclcpp::NodeOptions& options) ///============================================================================= auto FlowstateROSBridge::on_configure( const rclcpp_lifecycle::State& /*previous_state*/) -> CallbackReturn { + grpc::ChannelArguments executive_channel_args = intrinsic::DefaultGrpcChannelArgs(); + // The skill registry may need to call out to one or more skill information + // services. Those services might not be ready at startup. We configure a + // retry policy to mitigate b/283020857. + // (See + // https://github.com/grpc/grpc-go/blob/master/examples/features/retry/README.md + // for an example of this gRPC feature.) + executive_channel_args.SetServiceConfigJSON(R"( + { + "methodConfig": [{ + "name": [{"service": "intrinsic_proto.skills.SkillRegistry"}], + "waitForReady": true, + "timeout": "300s", + "retryPolicy": { + "maxAttempts": 10, + "initialBackoff": "0.1s", + "maxBackoff": "10s", + "backoffMultiplier": 1.5, + "retryableStatusCodes": [ "UNAVAILABLE" ] + } + }] + })"); + executive_channel_args.SetMaxReceiveMessageSize(10000000); // 10 MB + executive_channel_args.SetMaxSendMessageSize(10000000); // 10 MB + + auto executive_address = this->get_parameter(kExecutiveAddressParamName).get_value(); + auto maybe_executive_channel = intrinsic::CreateClientChannel( + executive_address, absl::Now() + absl::Seconds(kExecutiveDeadlineSeconds)); + if (!maybe_executive_channel.ok()) { + LOG(ERROR) << maybe_executive_channel; + return CallbackReturn::FAILURE; + } + auto executive_channel = maybe_executive_channel.value(); + LOG(INFO) << "Executive address: " << executive_address; + + auto skill_registry_address = this->get_parameter(kSkillAddressParamName).get_value(); + auto maybe_skill_registry_channel = intrinsic::CreateClientChannel( + skill_registry_address, absl::Now() + absl::Seconds(kExecutiveDeadlineSeconds)); + if (!maybe_skill_registry_channel.ok()) { + LOG(ERROR) << maybe_skill_registry_channel; + return CallbackReturn::FAILURE; + } + auto skill_registry_channel = maybe_skill_registry_channel.value(); + LOG(INFO) << "Skill registry address: " << skill_registry_address; + + auto solution_address = this->get_parameter(kSolutionAddressParamName).get_value(); + auto maybe_solution_channel = intrinsic::CreateClientChannel( + solution_address, absl::Now() + absl::Seconds(kExecutiveDeadlineSeconds)); + if (!maybe_solution_channel.ok()) { + LOG(ERROR) << maybe_solution_channel; + return CallbackReturn::FAILURE; + } + auto solution_channel = maybe_solution_channel.value(); + LOG(INFO) << "Solution service address: " << solution_address; + + // Initialize the executive client. + this->executive_ = std::make_shared( + executive_channel, skill_registry_channel, solution_channel, kExecutiveDeadlineSeconds); + + grpc::ChannelArguments world_channel_args = intrinsic::DefaultGrpcChannelArgs(); + // We might eventually need a retry policy here, like in executive (?) + // Some of the meshes that we'll receive in the geometry client are large, + // like a few 10's of MB. + world_channel_args.SetMaxReceiveMessageSize(-1); // no limit + world_channel_args.SetMaxSendMessageSize(10000000); // 10 MB + + auto world_address = this->get_parameter(kWorldAddressParamName).get_value(); + auto maybe_world_channel = intrinsic::CreateClientChannel( + world_address, absl::Now() + absl::Seconds(kWorldDeadlineSeconds)); + if (!maybe_world_channel.ok()) { + LOG(ERROR) << maybe_world_channel; + return CallbackReturn::FAILURE; + } + auto world_channel = maybe_world_channel.value(); + LOG(INFO) << "World address: " << world_address; + + auto geometry_address = this->get_parameter(kGeometryAddressParamName).get_value(); + auto maybe_geometry_channel = intrinsic::CreateClientChannel( + geometry_address, absl::Now() + absl::Seconds(kWorldDeadlineSeconds)); + if (!maybe_geometry_channel.ok()) { + LOG(ERROR) << maybe_geometry_channel; + return CallbackReturn::FAILURE; + } + auto geometry_channel = maybe_geometry_channel.value(); + LOG(INFO) << "Geometry address: " << geometry_address; + + // Initialize the world client. + this->world_ = std::make_shared( + this->pubsub_, world_channel, geometry_channel, kWorldDeadlineSeconds); + // Configure client services. if (!this->executive_->connect().ok()) { return CallbackReturn::FAILURE; diff --git a/flowstate_ros_bridge/src/flowstate_ros_bridge_main.cpp b/flowstate_ros_bridge/src/flowstate_ros_bridge_main.cpp index 93b995b..cbf51de 100644 --- a/flowstate_ros_bridge/src/flowstate_ros_bridge_main.cpp +++ b/flowstate_ros_bridge/src/flowstate_ros_bridge_main.cpp @@ -39,7 +39,9 @@ int main(int argc, char* argv[]) { // If external router address is provided, override the Zenoh environment // variable - std::string zenoh_config_override = "connect/endpoints=[\""; + std::string zenoh_config_override = + "listen/endpoints=[\"ws/0.0.0.0:" + + std::to_string(runtime_context.http_port()) + "\"];connect/endpoints=[\""; if (!external_router_address.empty()) { zenoh_config_override += external_router_address; } else { diff --git a/flowstate_ros_bridge/src/world.cpp b/flowstate_ros_bridge/src/world.cpp index f07c9e4..f107e93 100644 --- a/flowstate_ros_bridge/src/world.cpp +++ b/flowstate_ros_bridge/src/world.cpp @@ -30,7 +30,6 @@ #include "absl/container/flat_hash_set.h" #include "absl/log/log.h" #include "absl/status/status.h" -#include "intrinsic/geometry/proto/geometry_service.grpc.pb.h" #include "intrinsic/util/grpc/grpc.h" #include "intrinsic/util/status/status_conversion_grpc.h" #include "intrinsic/util/status/status_macros.h" @@ -39,12 +38,12 @@ namespace flowstate_ros_bridge { World::World(std::shared_ptr pubsub, - const std::string& world_service_address, - const std::string& geometry_service_address, + std::shared_ptr world_channel, + std::shared_ptr geometry_channel, std::size_t deadline_seconds) : pubsub_(pubsub), - world_service_address_(world_service_address), - geometry_service_address_(geometry_service_address), + world_channel_(std::move(world_channel)), + geometry_channel_(std::move(geometry_channel)), deadline_seconds_(deadline_seconds) { // Do nothing. Connections will happen later. } @@ -65,39 +64,21 @@ absl::Status World::connect() { return absl::OkStatus(); } - grpc::ChannelArguments channel_args = intrinsic::DefaultGrpcChannelArgs(); - // We might eventually need a retry policy here, like in executive (?) - // Some of the meshes that we'll receive in the geometry client are large, - // like a few 10's of MB. - channel_args.SetMaxReceiveMessageSize(-1); // no limit - channel_args.SetMaxSendMessageSize(10000000); // 10 MB - - LOG(INFO) << "Connecting to world service at " << world_service_address_; - INTR_ASSIGN_OR_RETURN( - std::shared_ptr world_channel, - intrinsic::CreateClientChannel( - world_service_address_, - absl::Now() + absl::Seconds(deadline_seconds_), channel_args)); + LOG(INFO) << "Connecting to world service"; INTR_RETURN_IF_ERROR(intrinsic::WaitForChannelConnected( - world_service_address_, world_channel, + "world service", world_channel_, absl::Now() + absl::Seconds(deadline_seconds_))); std::shared_ptr object_world_stub = - ObjectWorldService::NewStub(std::move(world_channel)); + ObjectWorldService::NewStub(world_channel_); object_world_client_ = std::make_shared( "world", std::move(object_world_stub)); LOG(INFO) << "Successfully connected to world service!"; - LOG(INFO) << "Connecting to geometry service at " - << geometry_service_address_; - INTR_ASSIGN_OR_RETURN( - std::shared_ptr geometry_channel, - intrinsic::CreateClientChannel( - geometry_service_address_, - absl::Now() + absl::Seconds(deadline_seconds_), channel_args)); + LOG(INFO) << "Connecting to geometry service"; INTR_RETURN_IF_ERROR(intrinsic::WaitForChannelConnected( - geometry_service_address_, geometry_channel, + "geometry service", geometry_channel_, absl::Now() + absl::Seconds(deadline_seconds_))); - geometry_stub_ = GeometryService::NewStub(std::move(geometry_channel)); + geometry_stub_ = GeometryService::NewStub(std::move(geometry_channel_)); LOG(INFO) << "Successfully connected to geometry service!"; connected_ = true; diff --git a/intrinsic_sdk_cmake/cmake/fetch_sdk.cmake b/intrinsic_sdk_cmake/cmake/fetch_sdk.cmake index bea0090..187fda8 100644 --- a/intrinsic_sdk_cmake/cmake/fetch_sdk.cmake +++ b/intrinsic_sdk_cmake/cmake/fetch_sdk.cmake @@ -1,6 +1,7 @@ # Fetch the sdk and make it available for use locally. set(SDK_VERSION_JSON_FILE "${CMAKE_CURRENT_SOURCE_DIR}/cmake/sdk_version.json") +set_property(DIRECTORY APPEND PROPERTY CMAKE_CONFIGURE_DEPENDS ${SDK_VERSION_JSON_FILE}) file(READ "${SDK_VERSION_JSON_FILE}" sdk_version_json) string(JSON sdk_version GET ${sdk_version_json} "sdk_version") string(JSON sdk_checksum GET ${sdk_version_json} "sdk_checksum") diff --git a/intrinsic_sdk_cmake/cmake/sdk_version.json b/intrinsic_sdk_cmake/cmake/sdk_version.json index dbb0ac1..ed13e1c 100644 --- a/intrinsic_sdk_cmake/cmake/sdk_version.json +++ b/intrinsic_sdk_cmake/cmake/sdk_version.json @@ -1,4 +1,4 @@ { - "sdk_version": "v1.23.20250825", - "sdk_checksum": "SHA256=e4fc5b71f63aed1e69fe283df2bf8d89e0ca54acf320185d41a1034c361aac71" + "sdk_version": "v1.24.20250929", + "sdk_checksum": "SHA256=39e88fa8f265f39856d15bde14996c0fd0ba0d63e8e633bd1713e799bb3e1dd7" } diff --git a/local_flowstate_ros_bridge/CMakeLists.txt b/local_flowstate_ros_bridge/CMakeLists.txt new file mode 100644 index 0000000..94dfa87 --- /dev/null +++ b/local_flowstate_ros_bridge/CMakeLists.txt @@ -0,0 +1,40 @@ +cmake_minimum_required(VERSION 3.8) +project(local_flowstate_ros_bridge) + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +# find dependencies +find_package(ament_cmake REQUIRED) +find_package(flowstate_ros_bridge REQUIRED) +find_package(Boost REQUIRED COMPONENTS program_options) +find_package(rclcpp_lifecycle REQUIRED) +find_package(pluginlib REQUIRED) +find_package(rclcpp_components REQUIRED) + +add_library(local_flowstate_ros_bridge_component SHARED + src/local_flowstate_ros_bridge.cpp + src/local_flowstate_ros_bridge_main.cpp +) +target_link_libraries(local_flowstate_ros_bridge_component + PRIVATE + flowstate_ros_bridge::flowstate_ros_bridge_interface + Boost::program_options + rclcpp_lifecycle::rclcpp_lifecycle + pluginlib::pluginlib + rclcpp_components::component +) + +rclcpp_components_register_node(local_flowstate_ros_bridge_component + PLUGIN "local_flowstate_ros_bridge::LocalFlowstateROSBridge" + EXECUTABLE local_flowstate_ros_bridge + EXECUTOR EventsExecutor +) + +install(TARGETS + local_flowstate_ros_bridge_component + RUNTIME DESTINATION lib/${PROJECT_NAME} +) + +ament_package() diff --git a/local_flowstate_ros_bridge/package.xml b/local_flowstate_ros_bridge/package.xml new file mode 100644 index 0000000..2281f62 --- /dev/null +++ b/local_flowstate_ros_bridge/package.xml @@ -0,0 +1,21 @@ + + + + local_flowstate_ros_bridge + 0.0.1 + A bridge between Flowstate services and ROS applications + koonpeng + Intrinsic License + + ament_cmake + + flowstate_interfaces + flowstate_ros_bridge + libboost-program-options + + libboost-program-options-dev + + + ament_cmake + + diff --git a/local_flowstate_ros_bridge/src/local_flowstate_ros_bridge.cpp b/local_flowstate_ros_bridge/src/local_flowstate_ros_bridge.cpp new file mode 100644 index 0000000..1a99587 --- /dev/null +++ b/local_flowstate_ros_bridge/src/local_flowstate_ros_bridge.cpp @@ -0,0 +1,188 @@ +// Copyright 2025 Intrinsic Innovation LLC +// +// You are hereby granted a non-exclusive, worldwide, royalty-free license to +// use, copy, modify, and distribute this Intrinsic SDK in source code or binary +// form for use in connection with the services and APIs provided by Intrinsic +// Innovation LLC (“Intrinsic”). +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// +// If you use this Intrinsic SDK with any Intrinsic services, your use is +// subject to the Intrinsi Platform Terms of Service +// [https://intrinsic.ai/platform-terms]. If you create works that call +// Intrinsic APIs, you must agree to the terms of service for those APIs +// separately. This license does not grant you any special rights to use the +// services. +// +// This copyright notice shall be included in all copies or substantial portions +// of the software. + +#include "local_flowstate_ros_bridge.hpp" + +#include +#include +#include +#include + +#define ASSIGN_OR_RETURN_FAILURE_CONCAT_IMPL(x, y) x##y +#define ASSIGN_OR_RETURN_FAILURE_CONCAT(x, y) \ + ASSIGN_OR_RETURN_FAILURE_CONCAT_IMPL(x, y) +#define ASSIGN_OR_RETURN_FAILURE(lhs, rhs) \ + auto ASSIGN_OR_RETURN_FAILURE_CONCAT(macro_internal_status_, __LINE__) = \ + (rhs); \ + if (!ASSIGN_OR_RETURN_FAILURE_CONCAT(macro_internal_status_, __LINE__) \ + .ok()) { \ + LOG(ERROR) << ASSIGN_OR_RETURN_FAILURE_CONCAT(macro_internal_status_, \ + __LINE__) \ + .status(); \ + return CallbackReturn::FAILURE; \ + } \ + lhs = std::move( \ + ASSIGN_OR_RETURN_FAILURE_CONCAT(macro_internal_status_, __LINE__) \ + .value()) + +namespace local_flowstate_ros_bridge { + +constexpr const char *kOrgProjectParamName = "org"; +constexpr const char *kClusterParamName = "cluster"; +constexpr const char *kAutostartParamName = "autostart"; +constexpr const char *kBridgePluginParamName = "bridge_plugins"; + +LocalFlowstateROSBridge::LocalFlowstateROSBridge( + const rclcpp::NodeOptions &options) + : LifecycleNode("flowstate_ros_bridge", options), + bridge_loader_("flowstate_ros_bridge", + "flowstate_ros_bridge::BridgeInterface") { + rcl_interfaces::msg::ParameterDescriptor org_pd; + org_pd.name = kOrgProjectParamName; + org_pd.description = "Organization in the form org@project"; + this->declare_parameter(kOrgProjectParamName, "", org_pd); + + rcl_interfaces::msg::ParameterDescriptor cluster_pd; + cluster_pd.name = kClusterParamName; + cluster_pd.description = "Target cluster"; + this->declare_parameter(kClusterParamName, "", cluster_pd); + + const bool autostart = this->declare_parameter(kAutostartParamName, true); + + this->declare_parameter(kBridgePluginParamName, std::vector{}); + + this->pubsub_ = std::make_shared(this->get_name()); + + bridge_ids_ = this->get_parameter(kBridgePluginParamName).as_string_array(); + + for (const auto &id : bridge_ids_) { + try { + auto bridge = bridge_loader_.createUniqueInstance(id); + bridge->declare_ros_parameters(*this); + bridges_.push_back(std::move(bridge)); + } catch (const std::exception &e) { + LOG(ERROR) << "Failed to load bridge_plugin [" << id + << "]. Detailed error: " << e.what(); + continue; + } + } + + if (autostart) { + this->configure(); + this->activate(); + } +} + +auto LocalFlowstateROSBridge::on_configure( + const rclcpp_lifecycle::State & /*previous_state*/) -> CallbackReturn { + auto org_project = + this->get_parameter(kOrgProjectParamName).get_value(); + ASSIGN_OR_RETURN_FAILURE( + auto org_info, intrinsic::Channel::OrgInfo::FromString(org_project)); + + auto cluster = + this->get_parameter(kClusterParamName).get_value(); + + auto maybe_cluster_channel = + intrinsic::Channel::MakeFromCluster(org_info, cluster); + if (!maybe_cluster_channel.ok()) { + if (maybe_cluster_channel.status().code() == absl::StatusCode::kNotFound) { + std::cerr << "Cannot find authentication information." << std::endl + << "Run `inctl auth login --org=" << org_project + << "` to authenticate to flowstate." << std::endl + << std::endl + << "Download inctl from " + "https://github.com/intrinsic-ai/sdk/releases." + << std::endl; + } else { + LOG(ERROR) << maybe_cluster_channel.status(); + } + return CallbackReturn::FAILURE; + } + auto grpc_channel = maybe_cluster_channel.value()->GetChannel(); + + // Initialize the executive client. + this->executive_ = std::make_shared( + grpc_channel, grpc_channel, grpc_channel); + + // Initialize the world client. + // FIXME(koonpeng): pubsub does not work because flowstate does not expose it + // atm. + this->world_ = std::make_shared( + this->pubsub_, grpc_channel, grpc_channel); + + // Configure client services. + auto connect_result = this->executive_->connect(); + if (!connect_result.ok()) { + LOG(ERROR) << connect_result; + return CallbackReturn::FAILURE; + } + if (!this->world_->connect().ok()) { + return CallbackReturn::FAILURE; + } + + return CallbackReturn::SUCCESS; +} + +auto LocalFlowstateROSBridge::on_activate( + const rclcpp_lifecycle::State & /*previous_state*/) -> CallbackReturn { + // Initialize/start the various bridges. + for (const auto &b : bridges_) { + try { + if (!b->initialize(*this, this->executive_, this->world_)) { + return CallbackReturn::FAILURE; + } + } catch (const std::exception &e) { + LOG(ERROR) << "Error initializing bridges: " << e.what(); + return CallbackReturn::FAILURE; + } + } + + return CallbackReturn::SUCCESS; +} + +auto LocalFlowstateROSBridge::on_deactivate( + const rclcpp_lifecycle::State & /*previous_state*/) -> CallbackReturn { + // TODO(Yadunund): Should bridges have a "deactivate" api? + return CallbackReturn::SUCCESS; +} + +auto LocalFlowstateROSBridge::on_cleanup( + const rclcpp_lifecycle::State & /*previous_state*/) -> CallbackReturn { + bridges_.clear(); + executive_.reset(); + world_.reset(); + LOG(INFO) << "Closing PubSub session..."; + pubsub_.reset(); + LOG(INFO) << "Done."; + return CallbackReturn::SUCCESS; +} + +} // namespace local_flowstate_ros_bridge + +#include + +RCLCPP_COMPONENTS_REGISTER_NODE( + local_flowstate_ros_bridge::LocalFlowstateROSBridge) diff --git a/local_flowstate_ros_bridge/src/local_flowstate_ros_bridge.hpp b/local_flowstate_ros_bridge/src/local_flowstate_ros_bridge.hpp new file mode 100644 index 0000000..f9ab423 --- /dev/null +++ b/local_flowstate_ros_bridge/src/local_flowstate_ros_bridge.hpp @@ -0,0 +1,72 @@ +// Copyright 2025 Intrinsic Innovation LLC +// +// You are hereby granted a non-exclusive, worldwide, royalty-free license to +// use, copy, modify, and distribute this Intrinsic SDK in source code or binary +// form for use in connection with the services and APIs provided by Intrinsic +// Innovation LLC (“Intrinsic”). +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// +// If you use this Intrinsic SDK with any Intrinsic services, your use is +// subject to the Intrinsi Platform Terms of Service +// [https://intrinsic.ai/platform-terms]. If you create works that call +// Intrinsic APIs, you must agree to the terms of service for those APIs +// separately. This license does not grant you any special rights to use the +// services. +// +// This copyright notice shall be included in all copies or substantial portions +// of the software. + +#ifndef LOCAL_FLOWSTATE_ROS_BRIDGE_HPP_ +#define LOCAL_FLOWSTATE_ROS_BRIDGE_HPP_ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace local_flowstate_ros_bridge { + +class LocalFlowstateROSBridge : public rclcpp_lifecycle::LifecycleNode { +public: + using CallbackReturn = + rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn; + + /// Constructor. + explicit LocalFlowstateROSBridge( + const rclcpp::NodeOptions &options = rclcpp::NodeOptions()); + + /// Lifecycle methods. + CallbackReturn + on_configure(const rclcpp_lifecycle::State &previous_state) override; + CallbackReturn + on_activate(const rclcpp_lifecycle::State &previous_state) override; + CallbackReturn + on_deactivate(const rclcpp_lifecycle::State &previous_state) override; + CallbackReturn + on_cleanup(const rclcpp_lifecycle::State &previous_state) override; + +private: + pluginlib::ClassLoader bridge_loader_; + std::vector> + bridges_; + std::shared_ptr executive_; + std::shared_ptr world_; + std::shared_ptr pubsub_; + std::vector bridge_ids_; +}; +} // namespace local_flowstate_ros_bridge + +#endif // FLOWSTATE_ROS_BRIDGE_HPP_ diff --git a/local_flowstate_ros_bridge/src/local_flowstate_ros_bridge_main.cpp b/local_flowstate_ros_bridge/src/local_flowstate_ros_bridge_main.cpp new file mode 100644 index 0000000..782829a --- /dev/null +++ b/local_flowstate_ros_bridge/src/local_flowstate_ros_bridge_main.cpp @@ -0,0 +1,66 @@ +// Copyright 2025 Intrinsic Innovation LLC +// +// You are hereby granted a non-exclusive, worldwide, royalty-free license to +// use, copy, modify, and distribute this Intrinsic SDK in source code or binary +// form for use in connection with the services and APIs provided by Intrinsic +// Innovation LLC (“Intrinsic”). +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// +// If you use this Intrinsic SDK with any Intrinsic services, your use is +// subject to the Intrinsi Platform Terms of Service +// [https://intrinsic.ai/platform-terms]. If you create works that call +// Intrinsic APIs, you must agree to the terms of service for those APIs +// separately. This license does not grant you any special rights to use the +// services. +// +// This copyright notice shall be included in all copies or substantial portions +// of the software. + +#include + +#include +#include + +#include +#include + +namespace po = boost::program_options; + +int main(int argc, char *argv[]) { + po::options_description desc( + "Bridge between Flowstate services and ROS applications"); + std::string solution; + std::string org; + + desc.add_options()("help", "produce help message")( + "solution", po::value(&solution)->required(), "solution name")( + "org", po::value(&org)->required(), "organization name"); + + po::variables_map vm; + po::store(po::parse_command_line(argc, argv, desc), vm); + + if (vm.count("help")) { + std::cout << desc << std::endl; + return 0; + } + + try { + po::notify(vm); + } catch (const po::error &e) { + std::cerr << "Error: " << e.what() << std::endl; + std::cerr << desc << std::endl; + return 1; + } + + std::cout << "solution: " << solution << std::endl; + std::cout << "org: " << org << std::endl; + + return 0; +}