diff --git a/.gitignore b/.gitignore index e69de29..c18dd8d 100644 --- a/.gitignore +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__/ diff --git a/LICENSE b/LICENSE index ab17cca..7904a56 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ -# Copyright 2024 Intrinsic Innovation LLC +# Copyright 2025 Intrinsic Innovation LLC -Copyright 2024 Intrinsic Innovation LLC +Copyright 2025 Intrinsic Innovation LLC You are hereby granted a non-exclusive, worldwide, royalty-free license to use, copy, modify, and distribute this Intrinsic SDK in source code or binary form for use diff --git a/flowstate_credentials_proxy/LICENSE b/flowstate_credentials_proxy/LICENSE new file mode 100644 index 0000000..8906ebc --- /dev/null +++ b/flowstate_credentials_proxy/LICENSE @@ -0,0 +1,19 @@ +Copyright 2024 Intrinsic Innovation LLC + +You are hereby granted a non-exclusive, worldwide, royalty-free license to use, +copy, modify, and distribute this Intrinsic SDK in source code or binary form for use +in connection with the services and APIs provided by Intrinsic Innovation LLC (“Intrinsic”). + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +If you use this Intrinsic SDK with any Intrinsic services, your use is subject to the Intrinsic +Platform Terms of Service [https://intrinsic.ai/platform-terms]. If you create works that call +Intrinsic APIs, you must agree to the terms of service for those APIs separately. This license +does not grant you any special rights to use the services. + +This copyright notice shall be included in all copies or substantial portions of the software. diff --git a/flowstate_credentials_proxy/flowstate_credentials_proxy/__init__.py b/flowstate_credentials_proxy/flowstate_credentials_proxy/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flowstate_credentials_proxy/flowstate_credentials_proxy/__main__.py b/flowstate_credentials_proxy/flowstate_credentials_proxy/__main__.py new file mode 100644 index 0000000..1cd59ba --- /dev/null +++ b/flowstate_credentials_proxy/flowstate_credentials_proxy/__main__.py @@ -0,0 +1,126 @@ +import argparse +import asyncio +import sys + +import aiohttp +from aiohttp import web + +from flowstate_credentials_proxy import auth +from flowstate_credentials_proxy.auth import InvalidOrganizationError + + +async def proxy( + req: web.Request, token_source: auth.TokenSource, cluster: str, service: str +) -> web.WebSocketResponse: + """Proxy a single websocket connection to flowstate.""" + + ws_response = web.WebSocketResponse() + await ws_response.prepare(req) + + try: + token = await token_source.get_token() + except Exception as e: + print(f"Error getting token: {e}", file=sys.stderr) + await ws_response.close() + return ws_response + + uri = f"wss://www.endpoints.{token_source.project}.cloud.goog/onprem/client/{cluster}/api/resourceinstances/{service}" + headers = {"cookie": f"auth-proxy={token}"} + + try: + async with aiohttp.ClientSession() as session: + async with session.ws_connect(uri, headers=headers) as upstream: + print("Connected to upstream websocket") + + async def forward_to_upstream( + d: web.WebSocketResponse, u: aiohttp.ClientWebSocketResponse + ): + async for msg in d: + if msg.type == aiohttp.WSMsgType.TEXT: + await u.send_str(msg.data) + elif msg.type == aiohttp.WSMsgType.BINARY: + await u.send_bytes(msg.data) + elif msg.type == aiohttp.WSMsgType.CLOSE: + await u.close() + break + else: + break + + async def forward_to_downstream( + u: aiohttp.ClientWebSocketResponse, d: web.WebSocketResponse + ): + async for msg in u: + if msg.type == aiohttp.WSMsgType.TEXT: + await d.send_str(msg.data) + elif msg.type == aiohttp.WSMsgType.BINARY: + await d.send_bytes(msg.data) + elif msg.type in ( + aiohttp.WSMsgType.CLOSED, + aiohttp.WSMsgType.ERROR, + ): + await d.close(code=u.close_code or 1000) + break + + await asyncio.gather( + forward_to_upstream(ws_response, upstream), + forward_to_downstream(upstream, ws_response), + ) + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + finally: + return ws_response + + +def main(): + parser = argparse.ArgumentParser( + prog="flowstate_credentials_proxy", + description="Start a websocket proxy to allow a local zenoh router to connect to flowstate.", + ) + parser.add_argument( + "--org", + required=True, + help="The organization name in the format organization@project.", + ) + parser.add_argument("--cluster", required=True, help="Name of the cluster to use.") + parser.add_argument( + "--service", required=True, help="Name of the flowstate_ros_bridge instance." + ) + parser.add_argument("--port", default=7448, help="Port to listen on.") + args = parser.parse_args() + + try: + token_source = auth.TokenSource(args.org) + except FileNotFoundError: + print( + f"""Error: Credentials for given organization "{args.org}" not found. +Run 'inctl auth login --org {args.org}' to add it. + +Download inctl here https://github.com/intrinsic-ai/sdk/releases""", + file=sys.stderr, + ) + return + except InvalidOrganizationError: + print("Organization is invalid", file=sys.stderr) + return + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + return + + app = web.Application() + app.add_routes( + [web.get("/", lambda req: proxy(req, token_source, args.cluster, args.service))] + ) + + print(f"Starting zenoh proxy on port {args.port}.") + print( + f"""Start rmw_zenohd and connect it to the proxy by running: +``` +export ZENOH_CONFIG_OVERRIDE='connect/endpoints=["ws/localhost:{args.port}"];routing/router/peers_failover_brokering=true' +ros2 run rmw_zenoh_cpp rmw_zenohd +```""" + ) + web.run_app(app, port=args.port, handle_signals=True, print=None) + + +if __name__ == "__main__": + main() diff --git a/flowstate_credentials_proxy/flowstate_credentials_proxy/auth.py b/flowstate_credentials_proxy/flowstate_credentials_proxy/auth.py new file mode 100644 index 0000000..afdf88d --- /dev/null +++ b/flowstate_credentials_proxy/flowstate_credentials_proxy/auth.py @@ -0,0 +1,101 @@ +import base64 +import json +import pathlib +import re +from datetime import datetime, timedelta, timezone +from os import environ + +import aiohttp + +INTRINSIC_CONFIG_LOCATION = pathlib.Path(environ["HOME"], ".config", "intrinsic") +ORGANIZATION_PATTERN = re.compile(r"[a-zA-Z][\w-]*@[a-zA-Z][\w-]*") +FLOWSTATE_ADDR = "https://flowstate.intrinsic.ai" + + +class InvalidOrganizationError(ValueError): + pass + + +def get_project_from_organization(org: str) -> str: + """ + Get the project from an organization@project string. + + Raises: + InvalidOrganizationError: If the organization is invalid. + """ + if ORGANIZATION_PATTERN.fullmatch(org) is None: + raise InvalidOrganizationError("invalid organization") + return org.split("@")[1] + + +def get_api_key(project: str): + """ + Get the cached API key for an organization. + + Raies: + JSONDecodeError: If the API key file is malformed. + KeyError: If the API key file is invalid. + FileNotFoundError: If the API key file does not exist. + """ + user_token = json.load( + pathlib.Path( + INTRINSIC_CONFIG_LOCATION, "projects", f"{project}.user-token" + ).open() + ) + api_key = user_token["tokens"]["default"]["apiKey"] + return api_key + + +class TokenSource: + def __init__(self, org: str): + """ + Create a new instance of TokenSource. + + Raies: + InvalidOrganizationError: If the organization is invalid. + JSONDecodeError: If the API key file is malformed. + KeyError: If the API key file is invalid. + FileNotFoundError: If the API key file does not exist. + """ + self.org = org + self.project = get_project_from_organization(self.org) + self.api_key = get_api_key(self.project) + self.__cached_token: str | None = None + + @staticmethod + def __token_expiring(token: str) -> bool: + payload = json.loads(base64.b64decode(token.split(".")[1] + "==")) + return datetime.now(timezone.utc) > ( + datetime.fromtimestamp(payload["exp"], tz=timezone.utc) + - timedelta(minutes=5) + ) + + async def get_token(self) -> str: + """ + Get an access token. + + Raises: + aiohttp.ClientError: If the request fails. + KeyError: Cannot find the access token in the response. + """ + if self.__cached_token is not None and not self.__token_expiring( + self.__cached_token + ): + return self.__cached_token + + async with aiohttp.ClientSession( + base_url=FLOWSTATE_ADDR, + headers={"content-type": "application/json"}, + raise_for_status=True, + ) as session: + async with session.post( + f"/api/v1/accountstokens:idtoken", + json={ + "apiKey": self.api_key, + "do_fan_out": False, + "api_key_project_hint": self.project, + }, + ) as resp: + access_token: str = (await resp.json())["idToken"] + self.__cached_token = access_token + return self.__cached_token diff --git a/flowstate_credentials_proxy/flowstate_credentials_proxy/test_auth.py b/flowstate_credentials_proxy/flowstate_credentials_proxy/test_auth.py new file mode 100644 index 0000000..dd5b10e --- /dev/null +++ b/flowstate_credentials_proxy/flowstate_credentials_proxy/test_auth.py @@ -0,0 +1,111 @@ +import json +import unittest +from contextlib import asynccontextmanager +from unittest.mock import AsyncMock, MagicMock, mock_open, patch + +import aiohttp + +from flowstate_credentials_proxy.auth import InvalidOrganizationError, TokenSource + + +class TestTokenSource(unittest.IsolatedAsyncioTestCase): + @staticmethod + def make_mock_response(json): + @asynccontextmanager + async def mock_response(): + try: + resp = AsyncMock() + resp.json.return_value = json + yield resp + finally: + pass + + return mock_response() + + mock_user_token = { + "name": "test_project", + "tokens": {"default": {"apiKey": "test-api-key"}}, + } + + @patch("pathlib.Path.open", new_callable=mock_open) + @patch("flowstate_credentials_proxy.auth.aiohttp.ClientSession.post") + async def test_get_access_token_success( + self, mock_post: MagicMock, mock_open_file: MagicMock + ) -> None: + mock_open_file.return_value.read.return_value = json.dumps(self.mock_user_token) + + # { + # "iss": "test", + # "sub": "test", + # "aud": "test", + # "iat": 0, + # "exp": 2177596800 # 2039-01-01 + # } + test_token = "eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiYWRtaW4iOnRydWUsImlhdCI6MCwiZXhwIjoyMTc3NTk2ODAwfQ." + + @asynccontextmanager + async def mock_response(): + try: + resp = AsyncMock() + resp.json.return_value = {"idToken": test_token} + yield resp + finally: + pass + + mock_post.return_value = self.make_mock_response({"idToken": test_token}) + + ts = TokenSource("user@project") + access_token = await ts.get_token() + + self.assertEqual(access_token, test_token) + mock_post.assert_called_once_with( + "/api/v1/accountstokens:idtoken", + json={ + "apiKey": "test-api-key", + "do_fan_out": False, + "api_key_project_hint": "project", + }, + ) + + def test_invalid_organization(self) -> None: + with self.assertRaises(InvalidOrganizationError) as e: + TokenSource("invalid-organization") + self.assertEqual(str(e.exception), "invalid organization") + + @patch("pathlib.Path.open", new_callable=mock_open) + def test_invalid_json(self, mock_open_file: MagicMock) -> None: + mock_open_file.return_value.read.return_value = "invalid json" + with self.assertRaises(json.JSONDecodeError): + TokenSource("user@project") + + @patch("pathlib.Path.open", new_callable=mock_open) + def test_missing_key(self, mock_open_file: MagicMock) -> None: + mock_open_file.return_value.read.return_value = json.dumps({}) + with self.assertRaises(KeyError): + TokenSource("user@project") + + @patch("pathlib.Path.open", new_callable=mock_open) + @patch("flowstate_credentials_proxy.auth.aiohttp.ClientSession.post") + async def test_get_token_http_error( + self, mock_post: MagicMock, mock_open_file: MagicMock + ) -> None: + mock_open_file.return_value.read.return_value = json.dumps(self.mock_user_token) + mock_post.side_effect = aiohttp.ClientError + + with self.assertRaises(aiohttp.ClientError): + await TokenSource("user@project").get_token() + + @patch("pathlib.Path.open", new_callable=mock_open) + @patch("flowstate_credentials_proxy.auth.aiohttp.ClientSession.post") + async def test_get_access_token_missing_key( + self, mock_post: MagicMock, mock_open_file: MagicMock + ) -> None: + mock_open_file.return_value.read.return_value = json.dumps(self.mock_user_token) + mock_post.return_value = self.make_mock_response({}) + + with self.assertRaises(KeyError): + await TokenSource("user@project").get_token() + + +if __name__ == "__main__": + unittest.main() diff --git a/flowstate_credentials_proxy/package.xml b/flowstate_credentials_proxy/package.xml new file mode 100644 index 0000000..fb1f252 --- /dev/null +++ b/flowstate_credentials_proxy/package.xml @@ -0,0 +1,15 @@ + + + + flowstate_credentials_proxy + 0.0.1 + websocket proxy to authenticate to flowstate + koonpeng + Intrinsic License + + python3-aiohttp + + + ament_python + + diff --git a/flowstate_credentials_proxy/resource/flowstate_credentials_proxy b/flowstate_credentials_proxy/resource/flowstate_credentials_proxy new file mode 100644 index 0000000..e69de29 diff --git a/flowstate_credentials_proxy/setup.cfg b/flowstate_credentials_proxy/setup.cfg new file mode 100644 index 0000000..c154926 --- /dev/null +++ b/flowstate_credentials_proxy/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/flowstate_credentials_proxy +[install] +install_scripts=$base/lib/flowstate_credentials_proxy diff --git a/flowstate_credentials_proxy/setup.py b/flowstate_credentials_proxy/setup.py new file mode 100644 index 0000000..af64e03 --- /dev/null +++ b/flowstate_credentials_proxy/setup.py @@ -0,0 +1,24 @@ +from setuptools import find_packages, setup + +package_name = "flowstate_credentials_proxy" + +setup( + name=package_name, + version="0.0.1", + packages=find_packages(exclude=["test_*.py"]), + data_files=[ + ("share/ament_index/resource_index/packages", ["resource/" + package_name]), + ("share/" + package_name, ["package.xml"]), + ], + install_requires=["setuptools", "aiohttp"], + zip_safe=True, + maintainer="koonpeng", + maintainer_email="koonpeng@intrinsic.ai", + description="websocket proxy to authenticate to flowstate", + license="Intrinsic License", + entry_points={ + "console_scripts": [ + "credentials_proxy = flowstate_credentials_proxy.__main__:main", + ], + }, +) diff --git a/flowstate_ros_bridge/Dockerfile.service b/flowstate_ros_bridge/Dockerfile.service index ba9dd4e..ecd06b5 100644 --- a/flowstate_ros_bridge/Dockerfile.service +++ b/flowstate_ros_bridge/Dockerfile.service @@ -19,8 +19,9 @@ ADD src/sdk-ros /opt/ros/underlay/src/sdk-ros RUN . /opt/ros/jazzy/setup.sh \ && apt-get update \ - && apt install -y ros-jazzy-ament-cmake-vendor-package \ + && apt install -y ros-jazzy-ament-cmake-vendor-package libprotobuf-dev libzmq3-dev libbz2-dev clang-19 \ && colcon build \ + --cmake-args -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_COMPILER=/usr/bin/clang-19 -DCMAKE_CXX_COMPILER=/usr/bin/clang++-19 \ --continue-on-error \ --event-handlers=console_direct+ \ --merge-install \ @@ -41,6 +42,7 @@ RUN . /opt/ros/jazzy/setup.sh \ && . /opt/ros/underlay/install/setup.sh \ && cd /opt/ros/overlay \ && colcon build \ + --cmake-args -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_COMPILER=/usr/bin/clang-19 -DCMAKE_CXX_COMPILER=/usr/bin/clang++-19 \ --event-handlers=console_direct+ \ --merge-install \ --packages-up-to=${SERVICE_PACKAGE} \ @@ -53,7 +55,7 @@ ARG SERVICE_PACKAGE ARG SERVICE_NAME RUN apt-get update \ - && apt-get install -y ros-jazzy-rmw-zenoh-cpp \ + && apt-get install -y ros-jazzy-rmw-zenoh-cpp libzmq3-dev libbz2-dev \ && rm -rf /var/lib/apt/lists/* COPY --from=overlay /opt/ros/underlay/install /opt/ros/underlay/install diff --git a/flowstate_ros_bridge/flowstate_ros_bridge.manifest.textproto b/flowstate_ros_bridge/flowstate_ros_bridge.manifest.textproto index e10bedd..c354748 100644 --- a/flowstate_ros_bridge/flowstate_ros_bridge.manifest.textproto +++ b/flowstate_ros_bridge/flowstate_ros_bridge.manifest.textproto @@ -22,6 +22,7 @@ service_def { archive_filename: "flowstate_ros_bridge.tar" } } + http_config {} } assets { default_configuration_filename: "default_config.binarypb", diff --git a/flowstate_ros_bridge/package.xml b/flowstate_ros_bridge/package.xml index f91eb86..4cde55a 100644 --- a/flowstate_ros_bridge/package.xml +++ b/flowstate_ros_bridge/package.xml @@ -2,7 +2,7 @@ flowstate_ros_bridge - 0.0.1 + 0.0.2 A bridge between Flowstate services and ROS applications Morgan Quigley Yadunund Vijay diff --git a/flowstate_ros_bridge/src/flowstate_ros_bridge_main.cpp b/flowstate_ros_bridge/src/flowstate_ros_bridge_main.cpp index 93b995b..cbf51de 100644 --- a/flowstate_ros_bridge/src/flowstate_ros_bridge_main.cpp +++ b/flowstate_ros_bridge/src/flowstate_ros_bridge_main.cpp @@ -39,7 +39,9 @@ int main(int argc, char* argv[]) { // If external router address is provided, override the Zenoh environment // variable - std::string zenoh_config_override = "connect/endpoints=[\""; + std::string zenoh_config_override = + "listen/endpoints=[\"ws/0.0.0.0:" + + std::to_string(runtime_context.http_port()) + "\"];connect/endpoints=[\""; if (!external_router_address.empty()) { zenoh_config_override += external_router_address; } else {