feat: private connect function for python and rust#31
Conversation
📝 WalkthroughWalkthroughThe PR introduces an explicit async init/init_iii API across Python and Rust, moves from module-level III singletons to lazy/explicit initialization with get_iii() accessor, adds InitOptions/OtelConfig handling (including telemetry init), and updates examples, streams, and tests to use the new flow. Changes
Sequence Diagram(s)sequenceDiagram
participant App
participant InitFunc as init(address, InitOptions)
participant Telemetry as init_otel
participant IIIClient as III
participant StreamReg as register_streams
App->>InitFunc: call init(address, options)
note right of InitFunc: construct III (with metadata if provided)
InitFunc->>Telemetry: init_otel(config) (if otel present)
InitFunc->>IIIClient: await connect()
InitFunc-->>App: return connected III
App->>IIIClient: calls via get_iii() / client API
App->>StreamReg: register_streams() (idempotent)
StreamReg->>IIIClient: create_stream("todo", TodoStream)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
packages/python/iii/src/iii/iii.py (1)
94-94:⚠️ Potential issue | 🔴 Critical
InitOptions.oteltype is too narrow — blocks both mypy and directOtelConfigusageThe field is declared
dict[str, Any] | None, butconnect()already handles anOtelConfiginstance viaisinstancecheck (line 133). Callers who passOtelConfig(...)directly will get a mypy error at the call site, and the root cause of the pipeline failure stems from this mismatch propagating tootel_cfg's inferred type.🐛 Proposed fix
- otel: dict[str, Any] | None = None + otel: OtelConfig | dict[str, Any] | None = None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/python/iii/src/iii/iii.py` at line 94, The InitOptions.otel annotation is too narrow (dict[str, Any] | None) while connect() handles an OtelConfig instance; change the type of InitOptions.otel to accept OtelConfig as well (e.g., OtelConfig | dict[str, Any] | None), import or forward-reference OtelConfig where InitOptions is defined, and update any related type hints (such as the local variable otel_cfg) so mypy recognizes both dict and OtelConfig inputs used by connect().packages/python/iii-example/src/main.py (1)
137-138:⚠️ Potential issue | 🟡 MinorCopy-paste bug: log message says "Creating new todo" in
_create_state.This handler is for creating state, but the log message was copied from
_create_todo. The misleading log will confuse debugging.Proposed fix
async def _create_state(req: ApiRequest, ctx) -> ApiResponse: - ctx.logger.info("Creating new todo", {"body": req.body}) + ctx.logger.info("Creating new state", {"body": req.body})🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/python/iii-example/src/main.py` around lines 137 - 138, The log in function _create_state incorrectly says "Creating new todo"; update the ctx.logger.info call inside _create_state to use a correct, specific message like "Creating new state" (or similar) and keep the same structured payload {"body": req.body} so logs reflect the actual handler name and payload; locate the ctx.logger.info in _create_state and change only the message string.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/python/iii/README.md`:
- Around line 48-66: The example returns from main() before the SDK's background
connect task runs, so move the code that yields to the event loop after calling
init() (or at the end of main) to allow the pending connect() to execute;
specifically, after calling init(...) in main() (where the variable iii is set)
add an await that yields control (e.g., await asyncio.sleep(0) or an appropriate
run_forever/wait-until-connected mechanism) before proceeding with
iii.register_function and iii.register_trigger so the SDK can establish the
connection and send those registrations.
In `@packages/python/iii/src/iii/iii.py`:
- Around line 131-137: The local variable otel_cfg is inferred as None by mypy;
explicitly annotate it as OtelConfig | None so assignments from
isinstance(self._options.otel, OtelConfig) and OtelConfig(**...) are
type-compatible and pass type checking; update the declaration before the
conditional to declare otel_cfg with the union type (referencing OtelConfig,
self._options.otel and the init_otel call) and keep the rest of the conditional
logic unchanged.
In `@packages/rust/iii/src/lib.rs`:
- Around line 56-61: The current auto-connect spawned task (using iii.clone(),
handle.spawn and calling client.connect()) hides connection outcome from callers
because connect() returns before actual connection; update the API so callers
can observe readiness: either make connect() await the initial connection
attempt (delegate to run_connection so connect() only returns after first
successful connect or a determinable failure), or add a public connection-state
accessor or awaitable like is_connected()/wait_for_ready() on the client that
reflects the background run_connection status; adjust connect(), run_connection
and any call sites (the spawn block) accordingly so callers can choose to await
readiness or poll the exposed state.
---
Outside diff comments:
In `@packages/python/iii-example/src/main.py`:
- Around line 137-138: The log in function _create_state incorrectly says
"Creating new todo"; update the ctx.logger.info call inside _create_state to use
a correct, specific message like "Creating new state" (or similar) and keep the
same structured payload {"body": req.body} so logs reflect the actual handler
name and payload; locate the ctx.logger.info in _create_state and change only
the message string.
In `@packages/python/iii/src/iii/iii.py`:
- Line 94: The InitOptions.otel annotation is too narrow (dict[str, Any] | None)
while connect() handles an OtelConfig instance; change the type of
InitOptions.otel to accept OtelConfig as well (e.g., OtelConfig | dict[str, Any]
| None), import or forward-reference OtelConfig where InitOptions is defined,
and update any related type hints (such as the local variable otel_cfg) so mypy
recognizes both dict and OtelConfig inputs used by connect().
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (3)
packages/python/iii/uv.lockis excluded by!**/*.lockpackages/rust/iii-example/Cargo.lockis excluded by!**/*.lockpackages/rust/iii/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (14)
packages/python/iii-example/src/hooks.pypackages/python/iii-example/src/iii.pypackages/python/iii-example/src/main.pypackages/python/iii-example/src/state.pypackages/python/iii-example/src/stream.pypackages/python/iii/README.mdpackages/python/iii/src/iii/__init__.pypackages/python/iii/src/iii/iii.pypackages/python/iii/tests/test_init_api.pypackages/rust/iii-example/src/main.rspackages/rust/iii/README.mdpackages/rust/iii/src/error.rspackages/rust/iii/src/lib.rspackages/rust/iii/tests/init_api.rs
There was a problem hiding this comment.
♻️ Duplicate comments (2)
packages/python/iii/README.md (1)
48-66:⚠️ Potential issue | 🟡 MinorSecond async example exits before the background
connect()task can run.In this snippet,
main()has no await after registration, soasyncio.run(main())can tear down the loop immediately after Line 65. Add a keep-alive await for worker-style usage.📝 Suggested docs fix
async def main(): iii = init( "ws://localhost:49134", InitOptions(otel={"enabled": True, "service_name": "iii-python-worker"}), ) iii.register_function("api.post.todo", create_todo) iii.register_trigger( type="http", function_id="api.post.todo", config={ "api_path": "/todo", "http_method": "POST", "description": "Create a new todo", }, ) + # Keep the process alive so the background connect task can run + await asyncio.Event().wait()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/python/iii/README.md` around lines 48 - 66, The example exits immediately because main() returns right after registering handlers; after calling init() and registering (iii.register_function, iii.register_trigger) you must add a keep-alive await so the background connect task can run (e.g., await a provided stop/wait method on the returned iii client such as iii.wait_until_stopped()/iii.run_forever() or a generic await asyncio.Event().wait()) before returning from main() so asyncio.run(main()) doesn't shut down the loop prematurely.packages/python/iii/src/iii/iii.py (1)
132-137:⚠️ Potential issue | 🟠 MajorGuard invalid
InitOptions.oteldicts to avoid background connect task failure.At Line 136,
OtelConfig(**self._options.otel)can raiseTypeError. Sinceconnect()is started via background task (packages/python/iii/src/iii/__init__.py, Line 47), this will crash the task without a deterministic client-facing failure state.🐛 Proposed fix
otel_cfg: OtelConfig | None = None - if self._options.otel: + if self._options.otel is not None: if isinstance(self._options.otel, OtelConfig): otel_cfg = self._options.otel else: - otel_cfg = OtelConfig(**self._options.otel) + try: + otel_cfg = OtelConfig(**self._options.otel) + except TypeError as exc: + log.error("Invalid InitOptions.otel config: %s", exc) + self._set_connection_state("failed") + return init_otel(config=otel_cfg, loop=loop)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/python/iii/src/iii/iii.py` around lines 132 - 137, The OtelConfig(**self._options.otel) call can raise TypeError for invalid dicts and currently will crash the background connect task; modify the block around self._options.otel to validate/guard construction of OtelConfig: attempt to construct otel_cfg from self._options.otel inside a try/except that catches TypeError (and other relevant validation exceptions), and on failure either raise a clear synchronous ValueError with a descriptive message or log the error and skip calling init_otel; ensure you reference the same symbols (self._options.otel, OtelConfig, init_otel) so the invalid config is handled before init_otel is invoked.
🧹 Nitpick comments (1)
packages/python/iii-example/src/stream.py (1)
91-99: Consider thread-safety for the registration guard.The
_streams_registeredflag prevents double registration but isn't thread-safe. Ifregister_streams()is called concurrently from multiple tasks before either completes, both could pass the check and register streams twice.For this example code, this is likely fine since:
register_streams()is typically called once during sequential setupcreate_streammay handle duplicate registration gracefullyIf this were production code, consider using a
threading.Lockor an async-aware guard.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/python/iii-example/src/stream.py` around lines 91 - 99, The registration guard using the module-level _streams_registered in register_streams is not thread-safe; wrap the check-and-set and the call to get_iii().create_stream("todo", TodoStream()) with a lock to prevent concurrent callers from both proceeding. Add a module-level lock (e.g., threading.Lock() for sync code or asyncio.Lock() for async contexts), acquire it at the start of register_streams, re-check _streams_registered inside the lock, perform the create_stream call and set _streams_registered = True while holding the lock, then release the lock.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@packages/python/iii/README.md`:
- Around line 48-66: The example exits immediately because main() returns right
after registering handlers; after calling init() and registering
(iii.register_function, iii.register_trigger) you must add a keep-alive await so
the background connect task can run (e.g., await a provided stop/wait method on
the returned iii client such as iii.wait_until_stopped()/iii.run_forever() or a
generic await asyncio.Event().wait()) before returning from main() so
asyncio.run(main()) doesn't shut down the loop prematurely.
In `@packages/python/iii/src/iii/iii.py`:
- Around line 132-137: The OtelConfig(**self._options.otel) call can raise
TypeError for invalid dicts and currently will crash the background connect
task; modify the block around self._options.otel to validate/guard construction
of OtelConfig: attempt to construct otel_cfg from self._options.otel inside a
try/except that catches TypeError (and other relevant validation exceptions),
and on failure either raise a clear synchronous ValueError with a descriptive
message or log the error and skip calling init_otel; ensure you reference the
same symbols (self._options.otel, OtelConfig, init_otel) so the invalid config
is handled before init_otel is invoked.
---
Nitpick comments:
In `@packages/python/iii-example/src/stream.py`:
- Around line 91-99: The registration guard using the module-level
_streams_registered in register_streams is not thread-safe; wrap the
check-and-set and the call to get_iii().create_stream("todo", TodoStream()) with
a lock to prevent concurrent callers from both proceeding. Add a module-level
lock (e.g., threading.Lock() for sync code or asyncio.Lock() for async
contexts), acquire it at the start of register_streams, re-check
_streams_registered inside the lock, perform the create_stream call and set
_streams_registered = True while holding the lock, then release the lock.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (3)
packages/python/iii/uv.lockis excluded by!**/*.lockpackages/rust/iii-example/Cargo.lockis excluded by!**/*.lockpackages/rust/iii/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (14)
packages/python/iii-example/src/hooks.pypackages/python/iii-example/src/iii.pypackages/python/iii-example/src/main.pypackages/python/iii-example/src/state.pypackages/python/iii-example/src/stream.pypackages/python/iii/README.mdpackages/python/iii/src/iii/__init__.pypackages/python/iii/src/iii/iii.pypackages/python/iii/tests/test_init_api.pypackages/rust/iii-example/src/main.rspackages/rust/iii/README.mdpackages/rust/iii/src/error.rspackages/rust/iii/src/lib.rspackages/rust/iii/tests/init_api.rs
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
packages/rust/iii-example/src/main.rs (1)
3-3: Optional: Make the OTEL feature dependency explicit in the example code.The example's
Cargo.tomlcorrectly enables theotelfeature for theiii-sdkdependency, so the imports and usage compile without issues. However, adding#[cfg(feature = "otel")]guards to the imports and field assignment would make the feature dependency more visible and provide clearer errors if the feature declaration is removed fromCargo.toml.Optional refactoring pattern
-use iii_sdk::{InitOptions, OtelConfig, Streams, UpdateBuilder, UpdateOp, init}; +use iii_sdk::{init, InitOptions, Streams, UpdateBuilder, UpdateOp}; +#[cfg(feature = "otel")] +use iii_sdk::OtelConfig; ... let iii = init( &iii_iii_url, InitOptions { + #[cfg(feature = "otel")] otel: Some(OtelConfig::default()), ..Default::default() }, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/rust/iii-example/src/main.rs` at line 3, The example imports and uses OtelConfig without conditional compilation, so the OTEL feature dependency is implicit; update the example to guard the OTEL-specific import and usage with #[cfg(feature = "otel")] so missing the feature yields a clear compile error—specifically add #[cfg(feature = "otel")] before the import of OtelConfig and before the code that constructs/assigns OtelConfig in InitOptions (references: OtelConfig, InitOptions, init, and the top-level use statement in main.rs).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/python/iii-example/src/iii.py`:
- Around line 10-20: The init_iii() function can race when multiple coroutines
call it concurrently; introduce a module-level asyncio.Lock (e.g., _iii_lock)
and wrap the initialization path in an async with _iii_lock: block, performing a
second check of the global _iii inside the lock before awaiting init(...); this
ensures only one coroutine actually calls init() and others return the
already-initialized _iii. Use the lock around the critical section in init_iii()
and keep the existing global _iii variable and return behavior.
In `@packages/rust/iii/tests/init_api.rs`:
- Around line 5-7: Replace the hard-coded "ws://127.0.0.1:49134" in the init
call with an environment-backed URL (e.g., read REMOTE_III_URL first, fall back
to III_BRIDGE_URL, then default to "ws://127.0.0.1:49134") and stop using
.expect(...) so the test doesn’t unconditionally assume a live backend; instead
make the test async return a Result and propagate the init error (or
early-return/skip the test with a clear message) so failures are handled
gracefully. Apply this to the init(...) call and any other occurrences in the
file that use InitOptions::default() with the hard-coded URL.
---
Nitpick comments:
In `@packages/rust/iii-example/src/main.rs`:
- Line 3: The example imports and uses OtelConfig without conditional
compilation, so the OTEL feature dependency is implicit; update the example to
guard the OTEL-specific import and usage with #[cfg(feature = "otel")] so
missing the feature yields a clear compile error—specifically add #[cfg(feature
= "otel")] before the import of OtelConfig and before the code that
constructs/assigns OtelConfig in InitOptions (references: OtelConfig,
InitOptions, init, and the top-level use statement in main.rs).
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
packages/python/iii-example/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (8)
packages/python/iii-example/src/iii.pypackages/python/iii-example/src/main.pypackages/python/iii/src/iii/__init__.pypackages/python/iii/tests/test_iii_registration_dedup.pypackages/python/iii/tests/test_init_api.pypackages/rust/iii-example/src/main.rspackages/rust/iii/src/lib.rspackages/rust/iii/tests/init_api.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/python/iii/tests/test_init_api.py
d7c94d7 to
12f93f6
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
packages/python/iii/README.md (1)
23-27:⚠️ Potential issue | 🟠 MajorFix both README snippets:
init(...)must be awaited, and the trigger worker must stay alive.
initis async, soiii = init(...)returns a coroutine. Also, the trigger sample returns immediately after registration, so the worker won’t keep serving events.📝 Proposed docs fix
async def main(): - iii = init( + iii = await init( "ws://localhost:49134", InitOptions(otel={"enabled": True, "service_name": "iii-python-worker"}), ) iii.register_function("my.function", my_function) @@ async def main(): - iii = init( + iii = await init( "ws://localhost:49134", InitOptions(otel={"enabled": True, "service_name": "iii-python-worker"}), ) @@ iii.register_trigger( type="http", function_id="api.post.todo", config={ "api_path": "/todo", "http_method": "POST", "description": "Create a new todo", }, ) + + # Keep the worker alive to handle incoming trigger invocations + await asyncio.Event().wait()Also applies to: 49-66
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/python/iii/README.md` around lines 23 - 27, The README examples call the async init(...) without awaiting and immediately return after iii.register_function, so the coroutine isn't awaited and the worker exits; update both snippets to await the async init call (await init(...)) and keep the trigger worker process alive after registering the function (e.g., run the worker loop or await a long-lived run/serve method) so the registered handler (register_function / "my.function") continues to receive events.packages/python/iii-example/src/iii.py (1)
10-20:⚠️ Potential issue | 🟠 MajorProtect
init_iii()from concurrent double initialization.Parallel coroutines can pass the
if _iii is Nonecheck and each runinit(...), causing duplicate clients.🔒 Proposed fix
+import asyncio import os @@ _iii: III | None = None +_iii_init_lock = asyncio.Lock() @@ async def init_iii() -> III: global _iii - if _iii is None: - _iii = await init( - address=engine_ws_url, - options=InitOptions( - worker_name="iii-example", - otel={"enabled": True, "service_name": "iii-example"}, - ), - ) + if _iii is not None: + return _iii + + async with _iii_init_lock: + if _iii is None: + _iii = await init( + address=engine_ws_url, + options=InitOptions( + worker_name="iii-example", + otel={"enabled": True, "service_name": "iii-example"}, + ), + ) return _iii🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/python/iii-example/src/iii.py` around lines 10 - 20, init_iii() can race under concurrent coroutines; add an async initialization guard so only one caller runs init(...) and others await the result. Create a module-level asyncio.Lock (e.g., _iii_init_lock) and wrap the initialization in "async with _iii_init_lock:"; inside the lock re-check "_iii is None" before calling init(address=engine_ws_url, options=InitOptions(...)) and set _iii after success so duplicate clients are prevented; keep the existing return _iii for callers who find it already initialized.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/rust/iii/README.md`:
- Around line 24-32: The README example calls the async function init(...)
without awaiting it; update the example to await the async init call (i.e., call
init(...).await?) so the Result is resolved—leave the InitOptions and OtelConfig
usage unchanged and ensure the assignment uses the awaited result (symbol: init,
types: InitOptions, OtelConfig).
---
Duplicate comments:
In `@packages/python/iii-example/src/iii.py`:
- Around line 10-20: init_iii() can race under concurrent coroutines; add an
async initialization guard so only one caller runs init(...) and others await
the result. Create a module-level asyncio.Lock (e.g., _iii_init_lock) and wrap
the initialization in "async with _iii_init_lock:"; inside the lock re-check
"_iii is None" before calling init(address=engine_ws_url,
options=InitOptions(...)) and set _iii after success so duplicate clients are
prevented; keep the existing return _iii for callers who find it already
initialized.
In `@packages/python/iii/README.md`:
- Around line 23-27: The README examples call the async init(...) without
awaiting and immediately return after iii.register_function, so the coroutine
isn't awaited and the worker exits; update both snippets to await the async init
call (await init(...)) and keep the trigger worker process alive after
registering the function (e.g., run the worker loop or await a long-lived
run/serve method) so the registered handler (register_function / "my.function")
continues to receive events.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (4)
packages/python/iii-example/uv.lockis excluded by!**/*.lockpackages/python/iii/uv.lockis excluded by!**/*.lockpackages/rust/iii-example/Cargo.lockis excluded by!**/*.lockpackages/rust/iii/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (15)
packages/python/iii-example/src/hooks.pypackages/python/iii-example/src/iii.pypackages/python/iii-example/src/main.pypackages/python/iii-example/src/state.pypackages/python/iii-example/src/stream.pypackages/python/iii/README.mdpackages/python/iii/src/iii/__init__.pypackages/python/iii/src/iii/iii.pypackages/python/iii/tests/test_iii_registration_dedup.pypackages/python/iii/tests/test_init_api.pypackages/rust/iii-example/src/main.rspackages/rust/iii/README.mdpackages/rust/iii/src/error.rspackages/rust/iii/src/lib.rspackages/rust/iii/tests/init_api.rs
🚧 Files skipped from review as they are similar to previous changes (6)
- packages/python/iii-example/src/main.py
- packages/rust/iii/tests/init_api.rs
- packages/rust/iii-example/src/main.rs
- packages/python/iii-example/src/hooks.py
- packages/python/iii/tests/test_iii_registration_dedup.py
- packages/python/iii-example/src/state.py
12f93f6 to
203363e
Compare
|
moved to mono repo: |
Summary by CodeRabbit
New Features
Bug Fixes / Stability
Tests