Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/python/iii-example/src/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

from iii import ApiRequest, ApiResponse, FunctionInfo, get_context

from .iii import iii
from .iii import get_iii


def use_api(
config: dict[str, Any],
handler: Callable[[ApiRequest[Any], Any], Awaitable[ApiResponse[Any]]],
) -> None:
iii = get_iii()
api_path = config["api_path"]
http_method = config["http_method"]
function_id = f"api.{http_method.lower()}.{api_path}"
Expand All @@ -33,4 +34,5 @@ async def wrapped(data: Any) -> dict[str, Any]:


def use_functions_available(callback: Callable[[list[FunctionInfo]], None]) -> Callable[[], None]:
iii = get_iii()
return iii.on_functions_available(callback)
23 changes: 21 additions & 2 deletions packages/python/iii-example/src/iii.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,26 @@
import os

from iii import III, InitOptions
from iii import III, InitOptions, init

engine_ws_url = os.environ.get("III_BRIDGE_URL", "ws://localhost:49134")

iii = III(address=engine_ws_url, options=InitOptions(worker_name="iii-example"))
_iii: III | None = None


def init_iii() -> III:
global _iii
if _iii is None:
_iii = init(
address=engine_ws_url,
options=InitOptions(
worker_name="iii-example",
otel={"enabled": True, "service_name": "iii-example"},
),
)
return _iii


def get_iii() -> III:
if _iii is None:
raise RuntimeError("III client is not initialized. Call init_iii() from an active async context first.")
return _iii
44 changes: 32 additions & 12 deletions packages/python/iii-example/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import time
import urllib.request
from datetime import datetime, timezone
from typing import Any

from iii import ApiRequest, ApiResponse

from .hooks import use_api, use_functions_available
from .state import state
from .stream import streams
state: Any = None
streams: Any = None


def _generate_todo_id() -> str:
Expand All @@ -19,11 +19,23 @@ def _generate_todo_id() -> str:


def _setup() -> None:
use_functions_available(lambda functions: print(
"--------------------------------\n"
f"Functions available: {len(functions)}\n"
"--------------------------------"
))
from .hooks import use_api, use_functions_available
from .state import state as state_client
from .stream import register_streams
from .stream import streams as streams_client

global state, streams
state = state_client
streams = streams_client
register_streams()

use_functions_available(
lambda functions: print(
"--------------------------------\n"
f"Functions available: {len(functions)}\n"
"--------------------------------"
)
)

use_api(
{"api_path": "todo", "http_method": "POST", "description": "Create a new todo", "metadata": {"tags": ["todo"]}},
Expand Down Expand Up @@ -51,12 +63,20 @@ def _setup() -> None:
)

use_api(
{"api_path": "http-fetch", "http_method": "GET", "description": "Fetch a todo from JSONPlaceholder (tests urllib instrumentation)"},
{
"api_path": "http-fetch",
"http_method": "GET",
"description": "Fetch a todo from JSONPlaceholder (tests urllib instrumentation)",
},
_fetch_example,
)

use_api(
{"api_path": "http-fetch", "http_method": "POST", "description": "Post data to httpbin (tests urllib instrumentation)"},
{
"api_path": "http-fetch",
"http_method": "POST",
"description": "Post data to httpbin (tests urllib instrumentation)",
},
_post_example,
)

Expand Down Expand Up @@ -165,10 +185,10 @@ async def _post_example(req: ApiRequest, ctx) -> ApiResponse:


async def _async_main() -> None:
from .iii import iii
from .iii import init_iii

init_iii()
_setup()
await iii.connect()

while True:
await asyncio.sleep(60)
Expand Down
5 changes: 4 additions & 1 deletion packages/python/iii-example/src/state.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
from typing import Any

from .iii import iii
from .iii import get_iii


class State:
async def get(self, scope: str, key: str) -> Any | None:
iii = get_iii()
return await iii.call("state::get", {"scope": scope, "key": key})

async def set(self, scope: str, key: str, value: Any) -> Any:
return await iii.call("state::set", {"scope": scope, "key": key, "value": value})

async def delete(self, scope: str, key: str) -> None:
iii = get_iii()
return await iii.call("state::delete", {"scope": scope, "key": key})

async def get_group(self, scope: str) -> list[Any]:
iii = get_iii()
return await iii.call("state::list", {"scope": scope})


Expand Down
18 changes: 15 additions & 3 deletions packages/python/iii-example/src/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,45 @@
from iii import (
IStream,
StreamDeleteInput,
StreamListInput,
StreamGetInput,
StreamListGroupsInput,
StreamListInput,
StreamSetInput,
StreamSetResult,
StreamUpdateInput,
)

from .iii import iii
from .iii import get_iii
from .models import Todo


class StreamClient:
async def get(self, stream_name: str, group_id: str, item_id: str) -> Any | None:
iii = get_iii()
return await iii.call(
"stream::get", {"stream_name": stream_name, "group_id": group_id, "item_id": item_id}
)

async def set(self, stream_name: str, group_id: str, item_id: str, data: Any) -> Any:
iii = get_iii()
return await iii.call(
"stream::set", {"stream_name": stream_name, "group_id": group_id, "item_id": item_id, "data": data}
)

async def delete(self, stream_name: str, group_id: str, item_id: str) -> None:
iii = get_iii()
return await iii.call(
"stream::delete", {"stream_name": stream_name, "group_id": group_id, "item_id": item_id}
)

async def get_group(self, stream_name: str, group_id: str) -> list[Any]:
iii = get_iii()
return await iii.call(
"stream::list", {"stream_name": stream_name, "group_id": group_id}
)

async def list_groups(self, stream_name: str) -> list[str]:
iii = get_iii()
return await iii.call("stream::list_groups", {"stream_name": stream_name})


Expand Down Expand Up @@ -83,5 +88,12 @@ async def update(self, input: StreamUpdateInput) -> StreamSetResult[dict[str, An


streams = StreamClient()
_streams_registered = False


iii.create_stream("todo", TodoStream())
def register_streams() -> None:
global _streams_registered
if _streams_registered:
return
get_iii().create_stream("todo", TodoStream())
_streams_registered = True
29 changes: 18 additions & 11 deletions packages/python/iii/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,50 +12,57 @@ pip install iii-sdk

```python
import asyncio
from iii import III
from iii import InitOptions, init


async def my_function(data):
return {"result": "success"}

iii = III("ws://localhost:49134")
iii.register_function("my.function", my_function)

async def main():
await iii.connect()
iii = init(
"ws://localhost:49134",
InitOptions(otel={"enabled": True, "service_name": "iii-python-worker"}),
)
iii.register_function("my.function", my_function)

result = await iii.call("other.function", {"param": "value"})
print(result)


asyncio.run(main())
```

### Register API trigger

```python
import asyncio
from iii import III, ApiRequest, ApiResponse
from iii import ApiRequest, ApiResponse, InitOptions, init

iii = III("ws://localhost:49134")

async def create_todo(data):
async def create_todo(data):
req = ApiRequest(**data)
return ApiResponse(status=201, data={"id": "123", "title": req.body.get("title")})

iii.register_function("api.post.todo", create_todo)

async def main():
await iii.connect()
iii = init(
"ws://localhost:49134",
InitOptions(otel={"enabled": True, "service_name": "iii-python-worker"}),
)

iii.register_function("api.post.todo", create_todo)
iii.register_trigger(
type="http",
function_id="api.post.todo",
config={
"api_path": "/todo",
"http_method": "POST",
"description": "Create a new todo"
}
"description": "Create a new todo",
},
)


asyncio.run(main())
```

Expand Down
18 changes: 18 additions & 0 deletions packages/python/iii/src/iii/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""III SDK for Python."""

import asyncio
import logging

from .channels import ChannelReader, ChannelWriter, ReadableStream, WritableStream
Expand Down Expand Up @@ -41,6 +42,22 @@
)


def init(address: str, options: InitOptions | None = None) -> III:
"""Create an III client and auto-start its connection task."""
client = III(address, options)

try:
loop = asyncio.get_running_loop()
except RuntimeError as exc:
raise RuntimeError(
"iii.init() requires an active asyncio event loop. "
"Call it inside async code or use `client = III(...); await client.connect()`"
) from exc

loop.create_task(client.connect())
return client


def configure_logging(level: int = logging.INFO, format: str | None = None) -> None:
"""Configure logging for the III SDK.

Expand All @@ -58,6 +75,7 @@ def configure_logging(level: int = logging.INFO, format: str | None = None) -> N
__all__ = [
# Core
"III",
"init",
"InitOptions",
"ReconnectionConfig",
"IIIConnectionState",
Expand Down
11 changes: 9 additions & 2 deletions packages/python/iii/src/iii/iii.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
)
from .logger import Logger
from .stream import IStream
from .telemetry_types import OtelConfig
from .triggers import Trigger, TriggerConfig, TriggerHandler
from .types import Channel, RemoteFunctionData, RemoteTriggerTypeData, is_channel_ref

Expand Down Expand Up @@ -93,7 +94,7 @@ class InitOptions:
enable_metrics_reporting: bool = True
invocation_timeout_ms: int = DEFAULT_INVOCATION_TIMEOUT_MS
reconnection_config: ReconnectionConfig | None = None
otel: dict[str, Any] | None = None
otel: OtelConfig | dict[str, Any] | None = None
telemetry: TelemetryOptions | None = None


Expand Down Expand Up @@ -131,7 +132,13 @@ async def connect(self) -> None:
try:
from .telemetry import attach_event_loop, init_otel
loop = asyncio.get_running_loop()
init_otel(loop=loop)
otel_cfg: OtelConfig | None = None
if self._options.otel:
if isinstance(self._options.otel, OtelConfig):
otel_cfg = self._options.otel
else:
otel_cfg = OtelConfig(**self._options.otel)
init_otel(config=otel_cfg, loop=loop)
attach_event_loop(loop)
except ImportError:
pass
Expand Down
Loading