diff --git a/sdk/README.md b/sdk/README.md index c3567faae..a79ef6333 100644 --- a/sdk/README.md +++ b/sdk/README.md @@ -55,16 +55,20 @@ pip install iii-sdk ``` ```python -from iii import III - -iii = III("ws://localhost:49134") +import asyncio +from iii import init, InitOptions async def my_function(data): return {"result": "success"} -iii.register_function("my.function", my_function) +async def main(): + iii = init("ws://localhost:49134", InitOptions(worker_name="my-worker")) + iii.register_function("my.function", my_function) + + result = await iii.call("other.function", {"param": "value"}) + print(result) -result = await iii.call("other.function", {"param": "value"}) +asyncio.run(main()) ``` ### Rust @@ -75,13 +79,12 @@ iii-sdk = { path = "path/to/iii" } ``` ```rust -use iii_sdk::III; +use iii_sdk::{init, InitOptions}; use serde_json::json; #[tokio::main] async fn main() -> Result<(), Box> { - let iii = III::new("ws://127.0.0.1:49134"); - iii.connect().await?; + let iii = init("ws://127.0.0.1:49134", InitOptions::default())?; iii.register_function("my.function", |input| async move { Ok(json!({ "message": "Hello, world!", "input": input })) diff --git a/sdk/packages/python/iii-example/config.yaml b/sdk/packages/python/iii-example/config.yaml index 41ccf41f6..384ac3dcd 100644 --- a/sdk/packages/python/iii-example/config.yaml +++ b/sdk/packages/python/iii-example/config.yaml @@ -56,7 +56,7 @@ modules: memory_max_spans: ${OTEL_MEMORY_MAX_SPANS:10000} metrics_enabled: true - metrics_exporter: ${OTEL_METRICS_EXPORTER:both} + metrics_exporter: ${OTEL_METRICS_EXPORTER:memory} prometheus_port: ${PROMETHEUS_PORT:9464} metrics_retention_seconds: 3600 metrics_max_count: 10000 @@ -71,10 +71,10 @@ modules: collection_interval_seconds: 5 history_size: 60 - - class: modules::event::EventModule + - class: modules::queue::QueueModule config: adapter: - class: modules::event::RedisAdapter + class: modules::queue::RedisAdapter config: redis_url: redis://localhost:6379 diff --git a/sdk/packages/python/iii-example/src/hooks.py b/sdk/packages/python/iii-example/src/hooks.py index e7a30731a..cb97c7147 100644 --- a/sdk/packages/python/iii-example/src/hooks.py +++ b/sdk/packages/python/iii-example/src/hooks.py @@ -1,11 +1,10 @@ from typing import Any, Awaitable, Callable -from iii import ApiRequest, ApiResponse, FunctionInfo, get_context - -from .iii import iii +from iii import III, ApiRequest, ApiResponse, FunctionInfo, get_context def use_api( + iii: III, config: dict[str, Any], handler: Callable[[ApiRequest[Any], Any], Awaitable[ApiResponse[Any]]], ) -> None: @@ -32,5 +31,5 @@ async def wrapped(data: Any) -> dict[str, Any]: ) -def use_functions_available(callback: Callable[[list[FunctionInfo]], None]) -> Callable[[], None]: +def use_functions_available(iii: III, callback: Callable[[list[FunctionInfo]], None]) -> Callable[[], None]: return iii.on_functions_available(callback) diff --git a/sdk/packages/python/iii-example/src/iii.py b/sdk/packages/python/iii-example/src/iii.py deleted file mode 100644 index 940667300..000000000 --- a/sdk/packages/python/iii-example/src/iii.py +++ /dev/null @@ -1,7 +0,0 @@ -import os - -from iii import III, InitOptions - -engine_ws_url = os.environ.get("III_BRIDGE_URL", "ws://localhost:49134") - -iii = III(address=engine_ws_url, options=InitOptions(worker_name="iii-example")) diff --git a/sdk/packages/python/iii-example/src/main.py b/sdk/packages/python/iii-example/src/main.py index 230c14a21..6e4477784 100644 --- a/sdk/packages/python/iii-example/src/main.py +++ b/sdk/packages/python/iii-example/src/main.py @@ -1,16 +1,17 @@ import asyncio import json +import os import random import string import time import urllib.request from datetime import datetime, timezone +from typing import Any -from iii import ApiRequest, ApiResponse +from iii import ApiRequest, ApiResponse, InitOptions, init -from .hooks import use_api, use_functions_available -from .state import state -from .stream import streams +state: Any = None +streams: Any = None def _generate_todo_id() -> str: @@ -18,45 +19,72 @@ def _generate_todo_id() -> str: return f"todo-{int(time.time() * 1000)}-{suffix}" -def _setup() -> None: - use_functions_available(lambda functions: print( - "--------------------------------\n" - f"Functions available: {len(functions)}\n" - "--------------------------------" - )) +def _setup(iii) -> None: + from .hooks import use_api, use_functions_available + from .state import State + from .stream import StreamClient, register_streams + + global state, streams + state = State(iii) + streams = StreamClient(iii) + register_streams(iii) + + use_functions_available( + iii, + lambda functions: print( + "--------------------------------\n" + f"Functions available: {len(functions)}\n" + "--------------------------------" + ) + ) use_api( + iii, {"api_path": "todo", "http_method": "POST", "description": "Create a new todo", "metadata": {"tags": ["todo"]}}, _create_todo, ) use_api( + iii, {"api_path": "todo", "http_method": "DELETE", "description": "Delete a todo", "metadata": {"tags": ["todo"]}}, _delete_todo, ) use_api( + iii, {"api_path": "todo/:id", "http_method": "PUT", "description": "Update a todo", "metadata": {"tags": ["todo"]}}, _update_todo, ) use_api( + iii, {"api_path": "state", "http_method": "POST", "description": "Set application state"}, _create_state, ) use_api( + iii, {"api_path": "state/:id", "http_method": "GET", "description": "Get state by ID"}, _get_state, ) use_api( - {"api_path": "http-fetch", "http_method": "GET", "description": "Fetch a todo from JSONPlaceholder (tests urllib instrumentation)"}, + iii, + { + "api_path": "http-fetch", + "http_method": "GET", + "description": "Fetch a todo from JSONPlaceholder (tests urllib instrumentation)", + }, _fetch_example, ) use_api( - {"api_path": "http-fetch", "http_method": "POST", "description": "Post data to httpbin (tests urllib instrumentation)"}, + iii, + { + "api_path": "http-fetch", + "http_method": "POST", + "description": "Post data to httpbin (tests urllib instrumentation)", + }, _post_example, ) @@ -165,10 +193,15 @@ async def _post_example(req: ApiRequest, ctx) -> ApiResponse: async def _async_main() -> None: - from .iii import iii - - _setup() - await iii.connect() + engine_ws_url = os.environ.get("III_BRIDGE_URL", "ws://localhost:49134") + iii = init( + address=engine_ws_url, + options=InitOptions( + worker_name="iii-example", + otel={"enabled": True, "service_name": "iii-example"}, + ), + ) + _setup(iii) while True: await asyncio.sleep(60) diff --git a/sdk/packages/python/iii-example/src/state.py b/sdk/packages/python/iii-example/src/state.py index a147a1f4b..ca2388377 100644 --- a/sdk/packages/python/iii-example/src/state.py +++ b/sdk/packages/python/iii-example/src/state.py @@ -1,20 +1,20 @@ from typing import Any -from .iii import iii +from iii import III class State: + def __init__(self, iii: III) -> None: + self._iii = iii + async def get(self, scope: str, key: str) -> Any | None: - return await iii.call("state::get", {"scope": scope, "key": key}) + return await self._iii.call("state::get", {"scope": scope, "key": key}) async def set(self, scope: str, key: str, value: Any) -> Any: - return await iii.call("state::set", {"scope": scope, "key": key, "value": value}) + return await self._iii.call("state::set", {"scope": scope, "key": key, "value": value}) async def delete(self, scope: str, key: str) -> None: - return await iii.call("state::delete", {"scope": scope, "key": key}) + return await self._iii.call("state::delete", {"scope": scope, "key": key}) async def get_group(self, scope: str) -> list[Any]: - return await iii.call("state::list", {"scope": scope}) - - -state = State() + return await self._iii.call("state::list", {"scope": scope}) diff --git a/sdk/packages/python/iii-example/src/stream.py b/sdk/packages/python/iii-example/src/stream.py index 612b54237..236e89440 100644 --- a/sdk/packages/python/iii-example/src/stream.py +++ b/sdk/packages/python/iii-example/src/stream.py @@ -3,43 +3,46 @@ from typing import Any from iii import ( + III, IStream, StreamDeleteInput, - StreamListInput, StreamGetInput, StreamListGroupsInput, + StreamListInput, StreamSetInput, StreamSetResult, StreamUpdateInput, ) -from .iii import iii from .models import Todo class StreamClient: + def __init__(self, iii: III) -> None: + self._iii = iii + async def get(self, stream_name: str, group_id: str, item_id: str) -> Any | None: - return await iii.call( + return await self._iii.call( "stream::get", {"stream_name": stream_name, "group_id": group_id, "item_id": item_id} ) async def set(self, stream_name: str, group_id: str, item_id: str, data: Any) -> Any: - return await iii.call( + return await self._iii.call( "stream::set", {"stream_name": stream_name, "group_id": group_id, "item_id": item_id, "data": data} ) async def delete(self, stream_name: str, group_id: str, item_id: str) -> None: - return await iii.call( + return await self._iii.call( "stream::delete", {"stream_name": stream_name, "group_id": group_id, "item_id": item_id} ) async def get_group(self, stream_name: str, group_id: str) -> list[Any]: - return await iii.call( + return await self._iii.call( "stream::list", {"stream_name": stream_name, "group_id": group_id} ) async def list_groups(self, stream_name: str) -> list[str]: - return await iii.call("stream::list_groups", {"stream_name": stream_name}) + return await self._iii.call("stream::list_groups", {"stream_name": stream_name}) class TodoStream(IStream[dict[str, Any]]): @@ -82,6 +85,5 @@ async def update(self, input: StreamUpdateInput) -> StreamSetResult[dict[str, An return None -streams = StreamClient() - -iii.create_stream("todo", TodoStream()) +def register_streams(iii: III) -> None: + iii.create_stream("todo", TodoStream()) diff --git a/sdk/packages/python/iii-example/uv.lock b/sdk/packages/python/iii-example/uv.lock index e5a5e95d3..30c12acf5 100644 --- a/sdk/packages/python/iii-example/uv.lock +++ b/sdk/packages/python/iii-example/uv.lock @@ -30,7 +30,7 @@ requires-dist = [ [[package]] name = "iii-sdk" -version = "0.2.0" +version = "0.4.1" source = { editable = "../iii" } dependencies = [ { name = "pydantic" }, @@ -39,6 +39,8 @@ dependencies = [ [package.metadata] requires-dist = [ + { name = "aiohttp", marker = "extra == 'dev'", specifier = ">=3.9" }, + { name = "anyio", marker = "extra == 'dev'", specifier = ">=4.0" }, { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.8" }, { name = "opentelemetry-api", marker = "extra == 'dev'", specifier = ">=1.25" }, { name = "opentelemetry-api", marker = "extra == 'otel'", specifier = ">=1.25" }, diff --git a/sdk/packages/python/iii/README.md b/sdk/packages/python/iii/README.md index 9e0955d31..683ec72b4 100644 --- a/sdk/packages/python/iii/README.md +++ b/sdk/packages/python/iii/README.md @@ -12,20 +12,24 @@ pip install iii-sdk ```python import asyncio -from iii import III +from iii import InitOptions, init + async def my_function(data): return {"result": "success"} -iii = III("ws://localhost:49134") -iii.register_function("my.function", my_function) async def main(): - await iii.connect() + iii = init( + "ws://localhost:49134", + InitOptions(otel={"enabled": True, "service_name": "iii-python-worker"}), + ) + iii.register_function("my.function", my_function) result = await iii.call("other.function", {"param": "value"}) print(result) + asyncio.run(main()) ``` @@ -33,29 +37,32 @@ asyncio.run(main()) ```python import asyncio -from iii import III, ApiRequest, ApiResponse +from iii import ApiRequest, ApiResponse, InitOptions, init -iii = III("ws://localhost:49134") -async def create_todo(data): +async def create_todo(data): req = ApiRequest(**data) return ApiResponse(status=201, data={"id": "123", "title": req.body.get("title")}) -iii.register_function("api.post.todo", create_todo) async def main(): - await iii.connect() + iii = init( + "ws://localhost:49134", + InitOptions(otel={"enabled": True, "service_name": "iii-python-worker"}), + ) + iii.register_function("api.post.todo", create_todo) iii.register_trigger( type="http", function_id="api.post.todo", config={ "api_path": "/todo", "http_method": "POST", - "description": "Create a new todo" - } + "description": "Create a new todo", + }, ) + asyncio.run(main()) ``` diff --git a/sdk/packages/python/iii/src/iii/__init__.py b/sdk/packages/python/iii/src/iii/__init__.py index 6ada90727..15a531300 100644 --- a/sdk/packages/python/iii/src/iii/__init__.py +++ b/sdk/packages/python/iii/src/iii/__init__.py @@ -1,5 +1,6 @@ """III SDK for Python.""" +import asyncio import logging from .channels import ChannelReader, ChannelWriter, ReadableStream, WritableStream @@ -41,6 +42,22 @@ ) +def init(address: str, options: InitOptions | None = None) -> III: + """Create an III client and auto-start its connection task.""" + client = III(address, options) + + try: + loop = asyncio.get_running_loop() + except RuntimeError as exc: + raise RuntimeError( + "iii.init() requires an active asyncio event loop. " + "Call it inside async code or use `client = III(...); await client.connect()`" + ) from exc + + loop.create_task(client.connect()) + return client + + def configure_logging(level: int = logging.INFO, format: str | None = None) -> None: """Configure logging for the III SDK. @@ -58,6 +75,7 @@ def configure_logging(level: int = logging.INFO, format: str | None = None) -> N __all__ = [ # Core "III", + "init", "InitOptions", "ReconnectionConfig", "IIIConnectionState", diff --git a/sdk/packages/python/iii/src/iii/iii.py b/sdk/packages/python/iii/src/iii/iii.py index 11a3c759b..e8fc5eb04 100644 --- a/sdk/packages/python/iii/src/iii/iii.py +++ b/sdk/packages/python/iii/src/iii/iii.py @@ -34,6 +34,7 @@ ) from .logger import Logger from .stream import IStream +from .telemetry_types import OtelConfig from .triggers import Trigger, TriggerConfig, TriggerHandler from .types import Channel, RemoteFunctionData, RemoteTriggerTypeData, is_channel_ref @@ -93,7 +94,7 @@ class InitOptions: enable_metrics_reporting: bool = True invocation_timeout_ms: int = DEFAULT_INVOCATION_TIMEOUT_MS reconnection_config: ReconnectionConfig | None = None - otel: dict[str, Any] | None = None + otel: OtelConfig | dict[str, Any] | None = None telemetry: TelemetryOptions | None = None @@ -131,7 +132,13 @@ async def connect(self) -> None: try: from .telemetry import attach_event_loop, init_otel loop = asyncio.get_running_loop() - init_otel(loop=loop) + otel_cfg: OtelConfig | None = None + if self._options.otel: + if isinstance(self._options.otel, OtelConfig): + otel_cfg = self._options.otel + else: + otel_cfg = OtelConfig(**self._options.otel) + init_otel(config=otel_cfg, loop=loop) attach_event_loop(loop) except ImportError: pass diff --git a/sdk/packages/python/iii/tests/test_init_api.py b/sdk/packages/python/iii/tests/test_init_api.py new file mode 100644 index 000000000..2d79828ae --- /dev/null +++ b/sdk/packages/python/iii/tests/test_init_api.py @@ -0,0 +1,62 @@ +import asyncio + +import pytest + +from iii import III, InitOptions, init + + +@pytest.fixture +def anyio_backend() -> str: + return "asyncio" + + +@pytest.mark.anyio +async def test_init_schedules_connect(monkeypatch: pytest.MonkeyPatch) -> None: + called = asyncio.Event() + + async def fake_connect(self: III) -> None: + called.set() + + monkeypatch.setattr(III, "connect", fake_connect) + + client = init("ws://fake") + assert isinstance(client, III) + + await asyncio.wait_for(called.wait(), timeout=0.2) + + +def test_init_requires_running_loop() -> None: + with pytest.raises(RuntimeError, match="active asyncio event loop"): + init("ws://fake") + + +@pytest.mark.anyio +async def test_connect_consumes_otel_from_init_options(monkeypatch: pytest.MonkeyPatch) -> None: + import iii.telemetry as telemetry + + captured = {"config": None} + + def fake_init_otel(config=None, loop=None): + captured["config"] = config + + def fake_attach_event_loop(loop): + return None + + async def fake_do_connect(self: III) -> None: + return None + + monkeypatch.setattr(telemetry, "init_otel", fake_init_otel) + monkeypatch.setattr(telemetry, "attach_event_loop", fake_attach_event_loop) + monkeypatch.setattr(III, "_do_connect", fake_do_connect) + + client = init( + "ws://fake", + InitOptions(otel={"enabled": True, "service_name": "iii-python-init-test"}), + ) + + # let scheduled connect task run + await asyncio.sleep(0) + + assert isinstance(client, III) + assert captured["config"] is not None + assert getattr(captured["config"], "service_name", None) == "iii-python-init-test" diff --git a/sdk/packages/rust/iii-example/Cargo.lock b/sdk/packages/rust/iii-example/Cargo.lock index 6c18a1e67..07bd00fb8 100644 --- a/sdk/packages/rust/iii-example/Cargo.lock +++ b/sdk/packages/rust/iii-example/Cargo.lock @@ -543,7 +543,7 @@ dependencies = [ [[package]] name = "iii-sdk" -version = "0.2.0" +version = "0.4.1" dependencies = [ "async-trait", "futures-util", diff --git a/sdk/packages/rust/iii-example/src/main.rs b/sdk/packages/rust/iii-example/src/main.rs index f6b6050ff..e735a5e7b 100644 --- a/sdk/packages/rust/iii-example/src/main.rs +++ b/sdk/packages/rust/iii-example/src/main.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use iii_sdk::{III, OtelConfig, Streams, UpdateBuilder, UpdateOp}; +use iii_sdk::{InitOptions, OtelConfig, Streams, UpdateBuilder, UpdateOp, init}; use serde_json::json; mod http_example; @@ -8,9 +8,13 @@ mod http_example; #[tokio::main] async fn main() -> Result<(), Box> { let iii_iii_url = std::env::var("REMOTE_III_URL").unwrap_or("ws://127.0.0.1:49134".into()); - let iii = III::new(&iii_iii_url); - iii.set_otel_config(OtelConfig::default()); - iii.connect().await?; + let iii = init( + &iii_iii_url, + InitOptions { + otel: Some(OtelConfig::default()), + ..Default::default() + }, + )?; // Register HTTP fetch API handlers (GET & POST http-fetch with OTel instrumentation) http_example::setup(&iii); diff --git a/sdk/packages/rust/iii/README.md b/sdk/packages/rust/iii/README.md index 6b59c95ff..783e8afd0 100644 --- a/sdk/packages/rust/iii/README.md +++ b/sdk/packages/rust/iii/README.md @@ -14,13 +14,21 @@ iii-sdk = { path = "../path/to/iii" } ## Usage ```rust -use iii_sdk::III; +use iii_sdk::{init, InitOptions}; +#[cfg(feature = "otel")] +use iii_sdk::OtelConfig; use serde_json::json; #[tokio::main] async fn main() -> Result<(), Box> { - let iii = III::new("ws://127.0.0.1:49134"); - iii.connect().await?; + let iii = init( + "ws://127.0.0.1:49134", + InitOptions { + #[cfg(feature = "otel")] + otel: Some(OtelConfig::default()), + ..Default::default() + }, + )?; iii.register_function("my.function", |input| async move { Ok(json!({ "message": "Hello, world!", "input": input })) @@ -45,6 +53,7 @@ async fn main() -> Result<(), Box> { ## Notes -- `III::connect` starts a background task and handles reconnection automatically. +- `init(...)` starts connection in the background (Node-style startup). +- `III::connect` remains available for explicit startup control. - The engine protocol currently supports `registertriggertype` but does not include an `unregistertriggertype` message; `unregister_trigger_type` only removes local handlers. diff --git a/sdk/packages/rust/iii/src/error.rs b/sdk/packages/rust/iii/src/error.rs index 9c81fc5e4..242a57ebe 100644 --- a/sdk/packages/rust/iii/src/error.rs +++ b/sdk/packages/rust/iii/src/error.rs @@ -6,6 +6,8 @@ pub enum IIIError { NotConnected, #[error("invocation timed out")] Timeout, + #[error("runtime error: {0}")] + Runtime(String), #[error("remote error ({code}): {message}")] Remote { code: String, message: String }, #[error("handler error: {0}")] diff --git a/sdk/packages/rust/iii/src/lib.rs b/sdk/packages/rust/iii/src/lib.rs index f4fb18bc4..e5f6a4c6d 100644 --- a/sdk/packages/rust/iii/src/lib.rs +++ b/sdk/packages/rust/iii/src/lib.rs @@ -34,6 +34,44 @@ pub use types::{ pub use serde_json::Value; +#[derive(Debug, Clone, Default)] +pub struct InitOptions { + pub metadata: Option, + #[cfg(feature = "otel")] + pub otel: Option, +} + +pub fn init(address: &str, options: InitOptions) -> Result { + let InitOptions { + metadata, + #[cfg(feature = "otel")] + otel, + } = options; + + let iii = if let Some(metadata) = metadata { + III::with_metadata(address, metadata) + } else { + III::new(address) + }; + + #[cfg(feature = "otel")] + if let Some(cfg) = otel { + iii.set_otel_config(cfg); + } + + let handle = tokio::runtime::Handle::try_current() + .map_err(|_| IIIError::Runtime("iii_sdk::init requires an active Tokio runtime".into()))?; + + let client = iii.clone(); + handle.spawn(async move { + if let Err(err) = client.connect().await { + tracing::warn!(error = %err, "iii_sdk::init auto-connect failed"); + } + }); + + Ok(iii) +} + // OpenTelemetry re-exports (behind "otel" feature flag) #[cfg(feature = "otel")] pub use telemetry::{ diff --git a/sdk/packages/rust/iii/tests/init_api.rs b/sdk/packages/rust/iii/tests/init_api.rs new file mode 100644 index 000000000..f0b5929b4 --- /dev/null +++ b/sdk/packages/rust/iii/tests/init_api.rs @@ -0,0 +1,39 @@ +use iii_sdk::{IIIError, InitOptions, init}; + +#[test] +fn init_without_runtime_returns_runtime_error() { + match init("ws://127.0.0.1:49134", InitOptions::default()) { + Err(IIIError::Runtime(_)) => {} + Err(other) => panic!("expected Runtime error, got {other:?}"), + Ok(_) => panic!("expected init to fail without Tokio runtime"), + } +} + +#[tokio::test] +async fn init_with_runtime_returns_sdk_instance() { + let client = init("ws://127.0.0.1:49134", InitOptions::default()) + .expect("init should succeed inside Tokio runtime"); + + // API should remain usable immediately after init() + client.register_function("test.echo", |input| async move { Ok(input) }); +} + +#[cfg(feature = "otel")] +#[tokio::test] +async fn init_applies_otel_config_before_auto_connect() { + use iii_sdk::OtelConfig; + + let client = init( + "ws://127.0.0.1:49134", + InitOptions { + otel: Some(OtelConfig { + service_name: Some("iii-rust-init-test".to_string()), + ..Default::default() + }), + ..Default::default() + }, + ) + .expect("init should succeed"); + + client.register_function("test.echo.otel", |input| async move { Ok(input) }); +}