Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__pycache__/
4 changes: 2 additions & 2 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright 2024 Intrinsic Innovation LLC
# Copyright 2025 Intrinsic Innovation LLC

Copyright 2024 Intrinsic Innovation LLC
Copyright 2025 Intrinsic Innovation LLC

You are hereby granted a non-exclusive, worldwide, royalty-free license to use,
copy, modify, and distribute this Intrinsic SDK in source code or binary form for use
Expand Down
19 changes: 19 additions & 0 deletions flowstate_credentials_proxy/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Copyright 2024 Intrinsic Innovation LLC

You are hereby granted a non-exclusive, worldwide, royalty-free license to use,
copy, modify, and distribute this Intrinsic SDK in source code or binary form for use
in connection with the services and APIs provided by Intrinsic Innovation LLC (“Intrinsic”).

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

If you use this Intrinsic SDK with any Intrinsic services, your use is subject to the Intrinsic
Platform Terms of Service [https://intrinsic.ai/platform-terms]. If you create works that call
Intrinsic APIs, you must agree to the terms of service for those APIs separately. This license
does not grant you any special rights to use the services.

This copyright notice shall be included in all copies or substantial portions of the software.
Empty file.
126 changes: 126 additions & 0 deletions flowstate_credentials_proxy/flowstate_credentials_proxy/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import argparse
import asyncio
import sys

import aiohttp
from aiohttp import web

from flowstate_credentials_proxy import auth
from flowstate_credentials_proxy.auth import InvalidOrganizationError


async def proxy(
req: web.Request, token_source: auth.TokenSource, cluster: str, service: str
) -> web.WebSocketResponse:
"""Proxy a single websocket connection to flowstate."""

ws_response = web.WebSocketResponse()
await ws_response.prepare(req)

try:
token = await token_source.get_token()
except Exception as e:
print(f"Error getting token: {e}", file=sys.stderr)
await ws_response.close()
return ws_response

uri = f"wss://www.endpoints.{token_source.project}.cloud.goog/onprem/client/{cluster}/api/resourceinstances/{service}"
headers = {"cookie": f"auth-proxy={token}"}

try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(uri, headers=headers) as upstream:
print("Connected to upstream websocket")

async def forward_to_upstream(
d: web.WebSocketResponse, u: aiohttp.ClientWebSocketResponse
):
async for msg in d:
if msg.type == aiohttp.WSMsgType.TEXT:
await u.send_str(msg.data)
elif msg.type == aiohttp.WSMsgType.BINARY:
await u.send_bytes(msg.data)
elif msg.type == aiohttp.WSMsgType.CLOSE:
await u.close()
break
else:
break

async def forward_to_downstream(
u: aiohttp.ClientWebSocketResponse, d: web.WebSocketResponse
):
async for msg in u:
if msg.type == aiohttp.WSMsgType.TEXT:
await d.send_str(msg.data)
elif msg.type == aiohttp.WSMsgType.BINARY:
await d.send_bytes(msg.data)
elif msg.type in (
aiohttp.WSMsgType.CLOSED,
aiohttp.WSMsgType.ERROR,
):
await d.close(code=u.close_code or 1000)
break

await asyncio.gather(
forward_to_upstream(ws_response, upstream),
forward_to_downstream(upstream, ws_response),
)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
finally:
return ws_response


def main():
parser = argparse.ArgumentParser(
prog="flowstate_credentials_proxy",
description="Start a websocket proxy to allow a local zenoh router to connect to flowstate.",
)
parser.add_argument(
"--org",
required=True,
help="The organization name in the format organization@project.",
)
parser.add_argument("--cluster", required=True, help="Name of the cluster to use.")
parser.add_argument(
"--service", required=True, help="Name of the flowstate_ros_bridge instance."
)
parser.add_argument("--port", default=7448, help="Port to listen on.")
args = parser.parse_args()

try:
token_source = auth.TokenSource(args.org)
except FileNotFoundError:
print(
f"""Error: Credentials for given organization "{args.org}" not found.
Run 'inctl auth login --org {args.org}' to add it.

Download inctl here https://github.com/intrinsic-ai/sdk/releases""",
file=sys.stderr,
)
return
except InvalidOrganizationError:
print("Organization is invalid", file=sys.stderr)
return
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return

app = web.Application()
app.add_routes(
[web.get("/", lambda req: proxy(req, token_source, args.cluster, args.service))]
)

print(f"Starting zenoh proxy on port {args.port}.")
print(
f"""Start rmw_zenohd and connect it to the proxy by running:
```
export ZENOH_CONFIG_OVERRIDE='connect/endpoints=["ws/localhost:{args.port}"];routing/router/peers_failover_brokering=true'
ros2 run rmw_zenoh_cpp rmw_zenohd
```"""
)
web.run_app(app, port=args.port, handle_signals=True, print=None)


if __name__ == "__main__":
main()
101 changes: 101 additions & 0 deletions flowstate_credentials_proxy/flowstate_credentials_proxy/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import base64
import json
import pathlib
import re
from datetime import datetime, timedelta, timezone
from os import environ

import aiohttp

INTRINSIC_CONFIG_LOCATION = pathlib.Path(environ["HOME"], ".config", "intrinsic")
ORGANIZATION_PATTERN = re.compile(r"[a-zA-Z][\w-]*@[a-zA-Z][\w-]*")
FLOWSTATE_ADDR = "https://flowstate.intrinsic.ai"


class InvalidOrganizationError(ValueError):
pass


def get_project_from_organization(org: str) -> str:
"""
Get the project from an organization@project string.

Raises:
InvalidOrganizationError: If the organization is invalid.
"""
if ORGANIZATION_PATTERN.fullmatch(org) is None:
raise InvalidOrganizationError("invalid organization")
return org.split("@")[1]


def get_api_key(project: str):
"""
Get the cached API key for an organization.

Raies:
JSONDecodeError: If the API key file is malformed.
KeyError: If the API key file is invalid.
FileNotFoundError: If the API key file does not exist.
"""
user_token = json.load(
pathlib.Path(
INTRINSIC_CONFIG_LOCATION, "projects", f"{project}.user-token"
).open()
)
api_key = user_token["tokens"]["default"]["apiKey"]
return api_key


class TokenSource:
def __init__(self, org: str):
"""
Create a new instance of TokenSource.

Raies:
InvalidOrganizationError: If the organization is invalid.
JSONDecodeError: If the API key file is malformed.
KeyError: If the API key file is invalid.
FileNotFoundError: If the API key file does not exist.
"""
self.org = org
self.project = get_project_from_organization(self.org)
self.api_key = get_api_key(self.project)
self.__cached_token: str | None = None

@staticmethod
def __token_expiring(token: str) -> bool:
payload = json.loads(base64.b64decode(token.split(".")[1] + "=="))
return datetime.now(timezone.utc) > (
datetime.fromtimestamp(payload["exp"], tz=timezone.utc)
- timedelta(minutes=5)
)

async def get_token(self) -> str:
"""
Get an access token.

Raises:
aiohttp.ClientError: If the request fails.
KeyError: Cannot find the access token in the response.
"""
if self.__cached_token is not None and not self.__token_expiring(
self.__cached_token
):
return self.__cached_token

async with aiohttp.ClientSession(
base_url=FLOWSTATE_ADDR,
headers={"content-type": "application/json"},
raise_for_status=True,
) as session:
async with session.post(
f"/api/v1/accountstokens:idtoken",
json={
"apiKey": self.api_key,
"do_fan_out": False,
"api_key_project_hint": self.project,
},
) as resp:
access_token: str = (await resp.json())["idToken"]
self.__cached_token = access_token
return self.__cached_token
111 changes: 111 additions & 0 deletions flowstate_credentials_proxy/flowstate_credentials_proxy/test_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import json
import unittest
from contextlib import asynccontextmanager
from unittest.mock import AsyncMock, MagicMock, mock_open, patch

import aiohttp

from flowstate_credentials_proxy.auth import InvalidOrganizationError, TokenSource


class TestTokenSource(unittest.IsolatedAsyncioTestCase):
@staticmethod
def make_mock_response(json):
@asynccontextmanager
async def mock_response():
try:
resp = AsyncMock()
resp.json.return_value = json
yield resp
finally:
pass

return mock_response()

mock_user_token = {
"name": "test_project",
"tokens": {"default": {"apiKey": "test-api-key"}},
}

@patch("pathlib.Path.open", new_callable=mock_open)
@patch("flowstate_credentials_proxy.auth.aiohttp.ClientSession.post")
async def test_get_access_token_success(
self, mock_post: MagicMock, mock_open_file: MagicMock
) -> None:
mock_open_file.return_value.read.return_value = json.dumps(self.mock_user_token)

# {
# "iss": "test",
# "sub": "test",
# "aud": "test",
# "iat": 0,
# "exp": 2177596800 # 2039-01-01
# }
test_token = "eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiYWRtaW4iOnRydWUsImlhdCI6MCwiZXhwIjoyMTc3NTk2ODAwfQ."

@asynccontextmanager
async def mock_response():
try:
resp = AsyncMock()
resp.json.return_value = {"idToken": test_token}
yield resp
finally:
pass

mock_post.return_value = self.make_mock_response({"idToken": test_token})

ts = TokenSource("user@project")
access_token = await ts.get_token()

self.assertEqual(access_token, test_token)
mock_post.assert_called_once_with(
"/api/v1/accountstokens:idtoken",
json={
"apiKey": "test-api-key",
"do_fan_out": False,
"api_key_project_hint": "project",
},
)

def test_invalid_organization(self) -> None:
with self.assertRaises(InvalidOrganizationError) as e:
TokenSource("invalid-organization")
self.assertEqual(str(e.exception), "invalid organization")

@patch("pathlib.Path.open", new_callable=mock_open)
def test_invalid_json(self, mock_open_file: MagicMock) -> None:
mock_open_file.return_value.read.return_value = "invalid json"
with self.assertRaises(json.JSONDecodeError):
TokenSource("user@project")

@patch("pathlib.Path.open", new_callable=mock_open)
def test_missing_key(self, mock_open_file: MagicMock) -> None:
mock_open_file.return_value.read.return_value = json.dumps({})
with self.assertRaises(KeyError):
TokenSource("user@project")

@patch("pathlib.Path.open", new_callable=mock_open)
@patch("flowstate_credentials_proxy.auth.aiohttp.ClientSession.post")
async def test_get_token_http_error(
self, mock_post: MagicMock, mock_open_file: MagicMock
) -> None:
mock_open_file.return_value.read.return_value = json.dumps(self.mock_user_token)
mock_post.side_effect = aiohttp.ClientError

with self.assertRaises(aiohttp.ClientError):
await TokenSource("user@project").get_token()

@patch("pathlib.Path.open", new_callable=mock_open)
@patch("flowstate_credentials_proxy.auth.aiohttp.ClientSession.post")
async def test_get_access_token_missing_key(
self, mock_post: MagicMock, mock_open_file: MagicMock
) -> None:
mock_open_file.return_value.read.return_value = json.dumps(self.mock_user_token)
mock_post.return_value = self.make_mock_response({})

with self.assertRaises(KeyError):
await TokenSource("user@project").get_token()


if __name__ == "__main__":
unittest.main()
15 changes: 15 additions & 0 deletions flowstate_credentials_proxy/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>flowstate_credentials_proxy</name>
<version>0.0.1</version>
<description>websocket proxy to authenticate to flowstate</description>
<maintainer email="[email protected]">koonpeng</maintainer>
<license>Intrinsic License</license>

<depend>python3-aiohttp</depend>

<export>
<build_type>ament_python</build_type>
</export>
</package>
Empty file.
4 changes: 4 additions & 0 deletions flowstate_credentials_proxy/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/flowstate_credentials_proxy
[install]
install_scripts=$base/lib/flowstate_credentials_proxy
Loading