Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,20 @@ pip install iii-sdk
```

```python
from iii import III

iii = III("ws://localhost:49134")
import asyncio
from iii import init, InitOptions

async def my_function(data):
return {"result": "success"}

iii.register_function("my.function", my_function)
async def main():
iii = init("ws://localhost:49134", InitOptions(worker_name="my-worker"))
iii.register_function("my.function", my_function)

result = await iii.call("other.function", {"param": "value"})
print(result)

result = await iii.call("other.function", {"param": "value"})
asyncio.run(main())
```

### Rust
Expand All @@ -75,13 +79,12 @@ iii-sdk = { path = "path/to/iii" }
```

```rust
use iii_sdk::III;
use iii_sdk::{init, InitOptions};
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let iii = III::new("ws://127.0.0.1:49134");
iii.connect().await?;
let iii = init("ws://127.0.0.1:49134", InitOptions::default())?;

iii.register_function("my.function", |input| async move {
Ok(json!({ "message": "Hello, world!", "input": input }))
Expand Down
6 changes: 3 additions & 3 deletions sdk/packages/python/iii-example/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ modules:
memory_max_spans: ${OTEL_MEMORY_MAX_SPANS:10000}

metrics_enabled: true
metrics_exporter: ${OTEL_METRICS_EXPORTER:both}
metrics_exporter: ${OTEL_METRICS_EXPORTER:memory}
prometheus_port: ${PROMETHEUS_PORT:9464}
metrics_retention_seconds: 3600
metrics_max_count: 10000
Expand All @@ -71,10 +71,10 @@ modules:
collection_interval_seconds: 5
history_size: 60

- class: modules::event::EventModule
- class: modules::queue::QueueModule
config:
adapter:
class: modules::event::RedisAdapter
class: modules::queue::RedisAdapter
config:
redis_url: redis://localhost:6379

Expand Down
4 changes: 3 additions & 1 deletion sdk/packages/python/iii-example/src/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

from iii import ApiRequest, ApiResponse, FunctionInfo, get_context

from .iii import iii
from .iii import get_iii


def use_api(
config: dict[str, Any],
handler: Callable[[ApiRequest[Any], Any], Awaitable[ApiResponse[Any]]],
) -> None:
iii = get_iii()
api_path = config["api_path"]
http_method = config["http_method"]
function_id = f"api.{http_method.lower()}.{api_path}"
Expand All @@ -33,4 +34,5 @@ async def wrapped(data: Any) -> dict[str, Any]:


def use_functions_available(callback: Callable[[list[FunctionInfo]], None]) -> Callable[[], None]:
iii = get_iii()
return iii.on_functions_available(callback)
23 changes: 21 additions & 2 deletions sdk/packages/python/iii-example/src/iii.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,26 @@
import os

from iii import III, InitOptions
from iii import III, InitOptions, init

engine_ws_url = os.environ.get("III_BRIDGE_URL", "ws://localhost:49134")

iii = III(address=engine_ws_url, options=InitOptions(worker_name="iii-example"))
_iii: III | None = None


def init_iii() -> III:
global _iii
if _iii is None:
_iii = init(
address=engine_ws_url,
options=InitOptions(
worker_name="iii-example",
otel={"enabled": True, "service_name": "iii-example"},
),
)
return _iii


def get_iii() -> III:
if _iii is None:
raise RuntimeError("III client is not initialized. Call init_iii() from an active async context first.")
return _iii
44 changes: 32 additions & 12 deletions sdk/packages/python/iii-example/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import time
import urllib.request
from datetime import datetime, timezone
from typing import Any

from iii import ApiRequest, ApiResponse

from .hooks import use_api, use_functions_available
from .state import state
from .stream import streams
state: Any = None
streams: Any = None


def _generate_todo_id() -> str:
Expand All @@ -19,11 +19,23 @@ def _generate_todo_id() -> str:


def _setup() -> None:
use_functions_available(lambda functions: print(
"--------------------------------\n"
f"Functions available: {len(functions)}\n"
"--------------------------------"
))
from .hooks import use_api, use_functions_available
from .state import state as state_client
from .stream import register_streams
from .stream import streams as streams_client

global state, streams
state = state_client
streams = streams_client
register_streams()

use_functions_available(
lambda functions: print(
"--------------------------------\n"
f"Functions available: {len(functions)}\n"
"--------------------------------"
)
)

use_api(
{"api_path": "todo", "http_method": "POST", "description": "Create a new todo", "metadata": {"tags": ["todo"]}},
Expand Down Expand Up @@ -51,12 +63,20 @@ def _setup() -> None:
)

use_api(
{"api_path": "http-fetch", "http_method": "GET", "description": "Fetch a todo from JSONPlaceholder (tests urllib instrumentation)"},
{
"api_path": "http-fetch",
"http_method": "GET",
"description": "Fetch a todo from JSONPlaceholder (tests urllib instrumentation)",
},
_fetch_example,
)

use_api(
{"api_path": "http-fetch", "http_method": "POST", "description": "Post data to httpbin (tests urllib instrumentation)"},
{
"api_path": "http-fetch",
"http_method": "POST",
"description": "Post data to httpbin (tests urllib instrumentation)",
},
_post_example,
)

Expand Down Expand Up @@ -165,10 +185,10 @@ async def _post_example(req: ApiRequest, ctx) -> ApiResponse:


async def _async_main() -> None:
from .iii import iii
from .iii import init_iii

init_iii()
_setup()
await iii.connect()

while True:
await asyncio.sleep(60)
Expand Down
5 changes: 4 additions & 1 deletion sdk/packages/python/iii-example/src/state.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
from typing import Any

from .iii import iii
from .iii import get_iii


class State:
async def get(self, scope: str, key: str) -> Any | None:
iii = get_iii()
return await iii.call("state::get", {"scope": scope, "key": key})

async def set(self, scope: str, key: str, value: Any) -> Any:
return await iii.call("state::set", {"scope": scope, "key": key, "value": value})

async def delete(self, scope: str, key: str) -> None:
iii = get_iii()
return await iii.call("state::delete", {"scope": scope, "key": key})

async def get_group(self, scope: str) -> list[Any]:
iii = get_iii()
return await iii.call("state::list", {"scope": scope})


Expand Down
18 changes: 15 additions & 3 deletions sdk/packages/python/iii-example/src/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,45 @@
from iii import (
IStream,
StreamDeleteInput,
StreamListInput,
StreamGetInput,
StreamListGroupsInput,
StreamListInput,
StreamSetInput,
StreamSetResult,
StreamUpdateInput,
)

from .iii import iii
from .iii import get_iii
from .models import Todo


class StreamClient:
async def get(self, stream_name: str, group_id: str, item_id: str) -> Any | None:
iii = get_iii()
return await iii.call(
"stream::get", {"stream_name": stream_name, "group_id": group_id, "item_id": item_id}
)

async def set(self, stream_name: str, group_id: str, item_id: str, data: Any) -> Any:
iii = get_iii()
return await iii.call(
"stream::set", {"stream_name": stream_name, "group_id": group_id, "item_id": item_id, "data": data}
)

async def delete(self, stream_name: str, group_id: str, item_id: str) -> None:
iii = get_iii()
return await iii.call(
"stream::delete", {"stream_name": stream_name, "group_id": group_id, "item_id": item_id}
)

async def get_group(self, stream_name: str, group_id: str) -> list[Any]:
iii = get_iii()
return await iii.call(
"stream::list", {"stream_name": stream_name, "group_id": group_id}
)

async def list_groups(self, stream_name: str) -> list[str]:
iii = get_iii()
return await iii.call("stream::list_groups", {"stream_name": stream_name})


Expand Down Expand Up @@ -83,5 +88,12 @@ async def update(self, input: StreamUpdateInput) -> StreamSetResult[dict[str, An


streams = StreamClient()
_streams_registered = False


iii.create_stream("todo", TodoStream())
def register_streams() -> None:
global _streams_registered
if _streams_registered:
return
get_iii().create_stream("todo", TodoStream())
_streams_registered = True
4 changes: 3 additions & 1 deletion sdk/packages/python/iii-example/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 18 additions & 11 deletions sdk/packages/python/iii/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,50 +12,57 @@ pip install iii-sdk

```python
import asyncio
from iii import III
from iii import InitOptions, init


async def my_function(data):
return {"result": "success"}

iii = III("ws://localhost:49134")
iii.register_function("my.function", my_function)

async def main():
await iii.connect()
iii = init(
"ws://localhost:49134",
InitOptions(otel={"enabled": True, "service_name": "iii-python-worker"}),
)
iii.register_function("my.function", my_function)

result = await iii.call("other.function", {"param": "value"})
print(result)


asyncio.run(main())
```

### Register API trigger

```python
import asyncio
from iii import III, ApiRequest, ApiResponse
from iii import ApiRequest, ApiResponse, InitOptions, init

iii = III("ws://localhost:49134")

async def create_todo(data):
async def create_todo(data):
req = ApiRequest(**data)
return ApiResponse(status=201, data={"id": "123", "title": req.body.get("title")})

iii.register_function("api.post.todo", create_todo)

async def main():
await iii.connect()
iii = init(
"ws://localhost:49134",
InitOptions(otel={"enabled": True, "service_name": "iii-python-worker"}),
)

iii.register_function("api.post.todo", create_todo)
iii.register_trigger(
type="http",
function_id="api.post.todo",
config={
"api_path": "/todo",
"http_method": "POST",
"description": "Create a new todo"
}
"description": "Create a new todo",
},
)


asyncio.run(main())
```

Expand Down
Loading
Loading