Skip to content

Latest commit

 

History

History
323 lines (250 loc) · 8.39 KB

File metadata and controls

323 lines (250 loc) · 8.39 KB

Oomol Connect SDK for Python

中文文档

Official Python SDK for Oomol Connect API. This SDK provides a complete, type-safe interface for interacting with Oomol Connect services.

Features

  • Complete API Coverage - Full support for all Oomol Connect API endpoints
  • 🔄 Smart Polling - Intelligent polling with exponential backoff strategy
  • 📊 Progress Monitoring - Real-time callbacks for task progress and logs
  • 📁 File Upload - Support for single and multiple file uploads
  • 🎯 Type Safe - Full type annotations with TypedDict support
  • Async First - Modern async design based on asyncio and httpx
  • 🛡️ Error Handling - Comprehensive error classification and handling

Installation

pip install oomol-connect-sdk

Quick Start

import asyncio
from oomol_connect_sdk import OomolConnectClient

async def main():
    async with OomolConnectClient(
        base_url="http://localhost:3000/api",
        api_token="your-api-token"
    ) as client:
        # Run a task and get results
        result = await client.tasks.run({
            "blockId": "audio-lab::text-to-audio",
            "inputValues": {"text": "Hello, World"}
        })

        print(f"Task ID: {result['task_id']}")
        print(f"Result: {result['result']}")

asyncio.run(main())

Core Concepts

Client Initialization

from oomol_connect_sdk import OomolConnectClient

client = OomolConnectClient(
    base_url="/api",              # API base URL
    api_token="your-token",       # API token (auto-added to Authorization header)
    default_headers={},           # Custom headers (optional)
    timeout=30.0                  # Request timeout in seconds
)

Task Management

# Simple task execution (recommended)
result = await client.tasks.run({
    "blockId": "audio-lab::text-to-audio",
    "inputValues": {"text": "Hello"}
})

# With progress monitoring
result = await client.tasks.run(
    {
        "blockId": "audio-lab::text-to-audio",
        "inputValues": {"text": "Hello"}
    },
    {
        "interval_ms": 1000,
        "timeout_ms": 60000,
        "on_progress": lambda task: print(f"Status: {task['status']}"),
        "on_log": lambda log: print(f"Log: {log['type']}")
    }
)

Input Value Formats

The SDK automatically normalizes three input formats:

# Format 1: Simple object (most common)
{"input1": "value1", "input2": 123}

# Format 2: Array format
[
    {"handle": "input1", "value": "value1"},
    {"handle": "input2", "value": 123}
]

# Format 3: Node format (for multi-node scenarios)
[
    {
        "nodeId": "node1",
        "inputs": [{"handle": "input1", "value": "value1"}]
    }
]

File Upload

# Single file upload
with open("test.txt", "rb") as f:
    result = await client.tasks.run_with_files(
        "pkg::file-processor",
        {"input1": "value"},
        f
    )

# Multiple files upload
with open("file1.txt", "rb") as f1, open("file2.txt", "rb") as f2:
    result = await client.tasks.run_with_files(
        "pkg::multi-file-processor",
        {"input1": "value"},
        [f1, f2]
    )

API Reference

Tasks Client

Core API for task management:

  • list() - List all tasks
  • create(request) - Create a task (JSON format)
  • create_with_files(block_id, input_values, files) - Create task with file upload
  • get(task_id) - Get task details
  • stop(task_id) - Stop a task
  • get_logs(task_id) - Get task logs
  • wait_for_completion(task_id, options) - Poll until task completes
  • create_and_wait(request, polling_options) - Create and wait for completion
  • run(request, polling_options) - Recommended - One-step run and get results
  • run_with_files(block_id, input_values, files, polling_options) - One-step run with files

Blocks Client

# List all blocks (only latest versions by default)
blocks_response = await client.blocks.list()
for block in blocks_response["blocks"]:
    print(f"{block['blockId']} - v{block['version']}")
    # Example output: ffmpeg::audio_video_separation - v0.1.9

# List all versions
all_blocks = await client.blocks.list(include_all_versions=True)

Packages Client

# List installed packages
packages = await client.packages.list()

# Install a package
install_task = await client.packages.install("package-name", "1.0.0")

# Install and wait for completion
install_task = await client.packages.install_and_wait("package-name", "1.0.0")

Applets Client

Applets are blocks with preset parameters. The Applets Client provides convenient methods to discover and run them:

# List all available applets
applets = await client.applets.list()
for applet in applets:
    print(f"Applet ID: {applet['appletId']}")
    print(f"  Title: {applet['data'].get('title', 'N/A')}")
    print(f"  Package: {applet['data']['packageId']}")
    print(f"  Block: {applet['data']['blockName']}")
    print(f"  Preset Inputs: {applet['data'].get('presetInputs', {})}")

# Run an applet with all preset parameters
result = await client.applets.run(
    applet_id="84dc8cac-7f91-4bd1-a3b6-6715bf4f81c9"
)
print(f"Result: {result['result']}")

# Run an applet and override some parameters
result = await client.applets.run(
    applet_id="84dc8cac-7f91-4bd1-a3b6-6715bf4f81c9",
    input_values={"content": "test", "indent": 4}
)

# Run with progress monitoring
result = await client.applets.run(
    applet_id="84dc8cac-7f91-4bd1-a3b6-6715bf4f81c9",
    input_values={"content": "test"},
    polling_options={
        "interval_ms": 2000,
        "on_progress": lambda task: print(f"Status: {task['status']}"),
        "on_log": lambda log: print(f"Log: {log['type']}")
    }
)

Note: Applets queries use a built-in query server (https://chat-data.oomol.com), while task execution uses your configured server.

Polling Options

from oomol_connect_sdk import BackoffStrategy

polling_options = {
    "interval_ms": 2000,                    # Polling interval (milliseconds)
    "timeout_ms": 300000,                   # Timeout (milliseconds)
    "max_interval_ms": 10000,               # Maximum interval (milliseconds)
    "backoff_strategy": BackoffStrategy.EXPONENTIAL,  # Backoff strategy
    "backoff_factor": 1.5,                  # Backoff factor
    "on_progress": lambda task: ...,        # Progress callback
    "on_log": lambda log: ...               # Log callback
}

Error Handling

from oomol_connect_sdk import (
    OomolConnectError,      # Base class
    ApiError,               # HTTP errors
    TaskFailedError,        # Task execution failed
    TaskStoppedError,       # Task was stopped
    TimeoutError,           # Polling timeout
    InstallFailedError      # Package installation failed
)

try:
    result = await client.tasks.run({
        "blockId": "audio-lab::text-to-audio",
        "inputValues": {"text": "test"}
    })
except TaskFailedError as e:
    print(f"Task failed: {e.task_id}")
except ApiError as e:
    print(f"HTTP {e.status}: {e.message}")

Advanced Usage

Concurrent Tasks

import asyncio

tasks = [
    client.tasks.run({
        "blockId": "audio-lab::text-to-audio",
        "inputValues": {"text": f"test-{i}"}
    })
    for i in range(5)
]

results = await asyncio.gather(*tasks)

Custom Backoff Strategy

from oomol_connect_sdk import BackoffStrategy

result = await client.tasks.run(
    {"blockId": "audio-lab::text-to-audio", "inputValues": {"text": "Hello"}},
    {
        "interval_ms": 1000,
        "max_interval_ms": 5000,
        "backoff_strategy": BackoffStrategy.EXPONENTIAL,
        "backoff_factor": 2.0
    }
)

Examples

See examples/ directory for more examples:

  • basic_usage.py - Basic usage examples
  • advanced_usage.py - Advanced features and patterns
  • applets_usage.py - Applets API usage examples

Development

# Install development dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Type checking
mypy oomol_connect_sdk

Requirements

  • Python >= 3.8
  • httpx >= 0.27.0

License

MIT License - see LICENSE file for details

Links

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.