Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 5 additions & 46 deletions python/cocoindex/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
import importlib.util
import os
import signal
import sys
import threading
import types
from types import FrameType
from typing import Any, Iterable

Expand All @@ -20,6 +18,8 @@
from . import flow, lib, setting
from .setup import flow_names_with_setup
from .runtime import execution_context
from .subprocess_exec import add_user_app
from .user_app_loader import load_user_app

# Create ServerSettings lazily upon first call, as environment variables may be loaded from files, etc.
COCOINDEX_HOST = "https://cocoindex.io"
Expand Down Expand Up @@ -76,50 +76,9 @@ def _get_app_ref_from_specifier(
return app_ref


def _load_user_app(app_target: str) -> types.ModuleType:
"""
Loads the user's application, which can be a file path or an installed module name.
Exits on failure.
"""
if not app_target:
raise click.ClickException("Application target not provided.")

looks_like_path = os.sep in app_target or app_target.lower().endswith(".py")

if looks_like_path:
if not os.path.isfile(app_target):
raise click.ClickException(f"Application file path not found: {app_target}")
app_path = os.path.abspath(app_target)
app_dir = os.path.dirname(app_path)
module_name = os.path.splitext(os.path.basename(app_path))[0]

if app_dir not in sys.path:
sys.path.insert(0, app_dir)
try:
spec = importlib.util.spec_from_file_location(module_name, app_path)
if spec is None:
raise ImportError(f"Could not create spec for file: {app_path}")
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
if spec.loader is None:
raise ImportError(f"Could not create loader for file: {app_path}")
spec.loader.exec_module(module)
return module
except (ImportError, FileNotFoundError, PermissionError) as e:
raise click.ClickException(f"Failed importing file '{app_path}': {e}")
finally:
if app_dir in sys.path and sys.path[0] == app_dir:
sys.path.pop(0)

# Try as module
try:
return importlib.import_module(app_target)
except ImportError as e:
raise click.ClickException(f"Failed to load module '{app_target}': {e}")
except Exception as e:
raise click.ClickException(
f"Unexpected error importing module '{app_target}': {e}"
)
def _load_user_app(app_target: str) -> None:
load_user_app(app_target)
add_user_app(app_target)


def _initialize_cocoindex_in_process() -> None:
Expand Down
6 changes: 3 additions & 3 deletions python/cocoindex/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class SentenceTransformerEmbedExecutor:
spec: SentenceTransformerEmbed
_model: Any | None = None

def analyze(self, _text: Any) -> type:
def analyze(self) -> type:
try:
# Only import sentence_transformers locally when it's needed, as its import is very slow.
import sentence_transformers # pylint: disable=import-outside-toplevel
Expand Down Expand Up @@ -245,7 +245,7 @@ class ColPaliEmbedImageExecutor:
spec: ColPaliEmbedImage
_model_info: ColPaliModelInfo

def analyze(self, _img_bytes: Any) -> type:
def analyze(self) -> type:
# Get shared model and dimension
self._model_info = _get_colpali_model_and_processor(self.spec.model)

Expand Down Expand Up @@ -321,7 +321,7 @@ class ColPaliEmbedQueryExecutor:
spec: ColPaliEmbedQuery
_model_info: ColPaliModelInfo

def analyze(self, _query: Any) -> type:
def analyze(self) -> type:
# Get shared model and dimension
self._model_info = _get_colpali_model_and_processor(self.spec.model)

Expand Down
121 changes: 72 additions & 49 deletions python/cocoindex/op.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@
Awaitable,
Callable,
Protocol,
ParamSpec,
TypeVar,
Type,
cast,
dataclass_transform,
Annotated,
get_args,
)

from . import _engine # type: ignore
from .subprocess_exec import executor_stub
from .convert import (
make_engine_value_encoder,
make_engine_value_decoder,
Expand Down Expand Up @@ -85,11 +90,13 @@ class Executor(Protocol):
op_category: OpCategory


def _load_spec_from_engine(spec_cls: type, spec: dict[str, Any]) -> Any:
def _load_spec_from_engine(
spec_loader: Callable[..., Any], spec: dict[str, Any]
) -> Any:
"""
Load a spec from the engine.
"""
return spec_cls(**spec)
return spec_loader(**spec)


def _get_required_method(cls: type, name: str) -> Callable[..., Any]:
Expand All @@ -101,18 +108,18 @@ def _get_required_method(cls: type, name: str) -> Callable[..., Any]:
return method


class _FunctionExecutorFactory:
_spec_cls: type
class _EngineFunctionExecutorFactory:
_spec_loader: Callable[..., Any]
_executor_cls: type

def __init__(self, spec_cls: type, executor_cls: type):
self._spec_cls = spec_cls
def __init__(self, spec_loader: Callable[..., Any], executor_cls: type):
self._spec_loader = spec_loader
self._executor_cls = executor_cls

def __call__(
self, spec: dict[str, Any], *args: Any, **kwargs: Any
) -> tuple[dict[str, Any], Executor]:
spec = _load_spec_from_engine(self._spec_cls, spec)
spec = _load_spec_from_engine(self._spec_loader, spec)
executor = self._executor_cls(spec)
result_type = executor.analyze_schema(*args, **kwargs)
return (result_type, executor)
Expand Down Expand Up @@ -166,31 +173,32 @@ def _register_op_factory(
category: OpCategory,
expected_args: list[tuple[str, inspect.Parameter]],
expected_return: Any,
executor_cls: type,
spec_cls: type,
executor_factory: Any,
spec_loader: Callable[..., Any],
op_kind: str,
op_args: OpArgs,
) -> type:
) -> None:
"""
Register an op factory.
"""

class _Fallback:
def enable_cache(self) -> bool:
return op_args.cache

def behavior_version(self) -> int | None:
return op_args.behavior_version

class _WrappedClass(executor_cls, _Fallback): # type: ignore[misc]
class _WrappedExecutor:
_executor: Any
_args_info: list[_ArgInfo]
_kwargs_info: dict[str, _ArgInfo]
_acall: Callable[..., Awaitable[Any]]
_result_encoder: Callable[[Any], Any]
_acall: Callable[..., Awaitable[Any]] | None = None

def __init__(self, spec: Any) -> None:
super().__init__()
self.spec = spec
self._acall = _to_async_call(super().__call__)
executor: Any

if op_args.gpu:
executor = executor_stub(executor_factory, spec)
else:
executor = executor_factory()
executor.spec = spec

self._executor = executor

def analyze_schema(
self, *args: _engine.OpArgSchema, **kwargs: _engine.OpArgSchema
Expand Down Expand Up @@ -294,9 +302,9 @@ def process_arg(
if len(missing_args) > 0:
raise ValueError(f"Missing arguments: {', '.join(missing_args)}")

base_analyze_method = getattr(self, "analyze", None)
base_analyze_method = getattr(self._executor, "analyze", None)
if base_analyze_method is not None:
result_type = base_analyze_method(*args, **kwargs)
result_type = base_analyze_method()
else:
result_type = expected_return
if len(attributes) > 0:
Expand All @@ -316,9 +324,10 @@ async def prepare(self) -> None:
Prepare for execution.
It's executed after `analyze` and before any `__call__` execution.
"""
prepare_method = getattr(super(), "prepare", None)
prepare_method = getattr(self._executor, "prepare", None)
if prepare_method is not None:
await _to_async_call(prepare_method)()
self._acall = _to_async_call(self._executor.__call__)

async def __call__(self, *args: Any, **kwargs: Any) -> Any:
decoded_args = []
Expand All @@ -338,6 +347,7 @@ async def __call__(self, *args: Any, **kwargs: Any) -> Any:
return None
decoded_kwargs[kwarg_name] = kwarg_info.decoder(arg)

assert self._acall is not None
if op_args.gpu:
# For GPU executions, data-level parallelism is applied, so we don't want to
# execute different tasks in parallel.
Expand All @@ -350,21 +360,19 @@ async def __call__(self, *args: Any, **kwargs: Any) -> Any:
output = await self._acall(*decoded_args, **decoded_kwargs)
return self._result_encoder(output)

_WrappedClass.__name__ = executor_cls.__name__
_WrappedClass.__doc__ = executor_cls.__doc__
_WrappedClass.__module__ = executor_cls.__module__
_WrappedClass.__qualname__ = executor_cls.__qualname__
_WrappedClass.__wrapped__ = executor_cls
def enable_cache(self) -> bool:
return op_args.cache

def behavior_version(self) -> int | None:
return op_args.behavior_version

if category == OpCategory.FUNCTION:
_engine.register_function_factory(
spec_cls.__name__, _FunctionExecutorFactory(spec_cls, _WrappedClass)
op_kind, _EngineFunctionExecutorFactory(spec_loader, _WrappedExecutor)
)
else:
raise ValueError(f"Unsupported executor type {category}")

return _WrappedClass


def executor_class(**args: Any) -> Callable[[type], type]:
"""
Expand All @@ -382,18 +390,31 @@ def _inner(cls: type[Executor]) -> type:
raise TypeError("Expect a `spec` field with type hint")
spec_cls = resolve_forward_ref(type_hints["spec"])
sig = inspect.signature(cls.__call__)
return _register_op_factory(
_register_op_factory(
category=spec_cls._op_category,
expected_args=list(sig.parameters.items())[1:], # First argument is `self`
expected_return=sig.return_annotation,
executor_cls=cls,
spec_cls=spec_cls,
executor_factory=cls,
spec_loader=spec_cls,
op_kind=spec_cls.__name__,
op_args=op_args,
)
return cls

return _inner


class _EmptyFunctionSpec(FunctionSpec):
pass


class _SimpleFunctionExecutor:
spec: Any

def prepare(self) -> None:
self.__call__ = self.spec.__call__


def function(**args: Any) -> Callable[[Callable[..., Any]], FunctionSpec]:
"""
Decorate a function to provide a function for an op.
Expand All @@ -404,30 +425,32 @@ def _inner(fn: Callable[..., Any]) -> FunctionSpec:
# Convert snake case to camel case.
op_name = "".join(word.capitalize() for word in fn.__name__.split("_"))
sig = inspect.signature(fn)
full_name = f"{fn.__module__}.{fn.__qualname__}"

class _Executor:
def __call__(self, *args: Any, **kwargs: Any) -> Any:
return fn(*args, **kwargs)
# An object that is both callable and can act as a FunctionSpec.
class _CallableSpec(_EmptyFunctionSpec):
__call__ = staticmethod(fn)

class _Spec(FunctionSpec):
def __call__(self, *args: Any, **kwargs: Any) -> Any:
return fn(*args, **kwargs)
def __reduce__(self) -> str | tuple[Any, ...]:
return full_name

_Spec.__name__ = op_name
_Spec.__doc__ = fn.__doc__
_Spec.__module__ = fn.__module__
_Spec.__qualname__ = fn.__qualname__
_CallableSpec.__name__ = op_name
_CallableSpec.__doc__ = fn.__doc__
_CallableSpec.__qualname__ = fn.__qualname__
_CallableSpec.__module__ = fn.__module__
callable_spec = _CallableSpec()

_register_op_factory(
category=OpCategory.FUNCTION,
expected_args=list(sig.parameters.items()),
expected_return=sig.return_annotation,
executor_cls=_Executor,
spec_cls=_Spec,
executor_factory=_SimpleFunctionExecutor,
spec_loader=lambda: callable_spec,
op_kind=op_name,
op_args=op_args,
)

return _Spec()
return callable_spec

return _inner

Expand Down
Loading
Loading