diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index db83db953..284484e4f 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -4,9 +4,7 @@ import importlib.util import os import signal -import sys import threading -import types from types import FrameType from typing import Any, Iterable @@ -20,6 +18,8 @@ from . import flow, lib, setting from .setup import flow_names_with_setup from .runtime import execution_context +from .subprocess_exec import add_user_app +from .user_app_loader import load_user_app # Create ServerSettings lazily upon first call, as environment variables may be loaded from files, etc. COCOINDEX_HOST = "https://cocoindex.io" @@ -76,50 +76,9 @@ def _get_app_ref_from_specifier( return app_ref -def _load_user_app(app_target: str) -> types.ModuleType: - """ - Loads the user's application, which can be a file path or an installed module name. - Exits on failure. - """ - if not app_target: - raise click.ClickException("Application target not provided.") - - looks_like_path = os.sep in app_target or app_target.lower().endswith(".py") - - if looks_like_path: - if not os.path.isfile(app_target): - raise click.ClickException(f"Application file path not found: {app_target}") - app_path = os.path.abspath(app_target) - app_dir = os.path.dirname(app_path) - module_name = os.path.splitext(os.path.basename(app_path))[0] - - if app_dir not in sys.path: - sys.path.insert(0, app_dir) - try: - spec = importlib.util.spec_from_file_location(module_name, app_path) - if spec is None: - raise ImportError(f"Could not create spec for file: {app_path}") - module = importlib.util.module_from_spec(spec) - sys.modules[spec.name] = module - if spec.loader is None: - raise ImportError(f"Could not create loader for file: {app_path}") - spec.loader.exec_module(module) - return module - except (ImportError, FileNotFoundError, PermissionError) as e: - raise click.ClickException(f"Failed importing file '{app_path}': {e}") - finally: - if app_dir in sys.path and sys.path[0] == app_dir: - sys.path.pop(0) - - # Try as module - try: - return importlib.import_module(app_target) - except ImportError as e: - raise click.ClickException(f"Failed to load module '{app_target}': {e}") - except Exception as e: - raise click.ClickException( - f"Unexpected error importing module '{app_target}': {e}" - ) +def _load_user_app(app_target: str) -> None: + load_user_app(app_target) + add_user_app(app_target) def _initialize_cocoindex_in_process() -> None: diff --git a/python/cocoindex/functions.py b/python/cocoindex/functions.py index 675e0ff73..73d3f43b9 100644 --- a/python/cocoindex/functions.py +++ b/python/cocoindex/functions.py @@ -89,7 +89,7 @@ class SentenceTransformerEmbedExecutor: spec: SentenceTransformerEmbed _model: Any | None = None - def analyze(self, _text: Any) -> type: + def analyze(self) -> type: try: # Only import sentence_transformers locally when it's needed, as its import is very slow. import sentence_transformers # pylint: disable=import-outside-toplevel @@ -245,7 +245,7 @@ class ColPaliEmbedImageExecutor: spec: ColPaliEmbedImage _model_info: ColPaliModelInfo - def analyze(self, _img_bytes: Any) -> type: + def analyze(self) -> type: # Get shared model and dimension self._model_info = _get_colpali_model_and_processor(self.spec.model) @@ -321,7 +321,7 @@ class ColPaliEmbedQueryExecutor: spec: ColPaliEmbedQuery _model_info: ColPaliModelInfo - def analyze(self, _query: Any) -> type: + def analyze(self) -> type: # Get shared model and dimension self._model_info = _get_colpali_model_and_processor(self.spec.model) diff --git a/python/cocoindex/op.py b/python/cocoindex/op.py index d2422ba3c..d2ca3068c 100644 --- a/python/cocoindex/op.py +++ b/python/cocoindex/op.py @@ -11,12 +11,17 @@ Awaitable, Callable, Protocol, + ParamSpec, + TypeVar, + Type, + cast, dataclass_transform, Annotated, get_args, ) from . import _engine # type: ignore +from .subprocess_exec import executor_stub from .convert import ( make_engine_value_encoder, make_engine_value_decoder, @@ -85,11 +90,13 @@ class Executor(Protocol): op_category: OpCategory -def _load_spec_from_engine(spec_cls: type, spec: dict[str, Any]) -> Any: +def _load_spec_from_engine( + spec_loader: Callable[..., Any], spec: dict[str, Any] +) -> Any: """ Load a spec from the engine. """ - return spec_cls(**spec) + return spec_loader(**spec) def _get_required_method(cls: type, name: str) -> Callable[..., Any]: @@ -101,18 +108,18 @@ def _get_required_method(cls: type, name: str) -> Callable[..., Any]: return method -class _FunctionExecutorFactory: - _spec_cls: type +class _EngineFunctionExecutorFactory: + _spec_loader: Callable[..., Any] _executor_cls: type - def __init__(self, spec_cls: type, executor_cls: type): - self._spec_cls = spec_cls + def __init__(self, spec_loader: Callable[..., Any], executor_cls: type): + self._spec_loader = spec_loader self._executor_cls = executor_cls def __call__( self, spec: dict[str, Any], *args: Any, **kwargs: Any ) -> tuple[dict[str, Any], Executor]: - spec = _load_spec_from_engine(self._spec_cls, spec) + spec = _load_spec_from_engine(self._spec_loader, spec) executor = self._executor_cls(spec) result_type = executor.analyze_schema(*args, **kwargs) return (result_type, executor) @@ -166,31 +173,32 @@ def _register_op_factory( category: OpCategory, expected_args: list[tuple[str, inspect.Parameter]], expected_return: Any, - executor_cls: type, - spec_cls: type, + executor_factory: Any, + spec_loader: Callable[..., Any], + op_kind: str, op_args: OpArgs, -) -> type: +) -> None: """ Register an op factory. """ - class _Fallback: - def enable_cache(self) -> bool: - return op_args.cache - - def behavior_version(self) -> int | None: - return op_args.behavior_version - - class _WrappedClass(executor_cls, _Fallback): # type: ignore[misc] + class _WrappedExecutor: + _executor: Any _args_info: list[_ArgInfo] _kwargs_info: dict[str, _ArgInfo] - _acall: Callable[..., Awaitable[Any]] _result_encoder: Callable[[Any], Any] + _acall: Callable[..., Awaitable[Any]] | None = None def __init__(self, spec: Any) -> None: - super().__init__() - self.spec = spec - self._acall = _to_async_call(super().__call__) + executor: Any + + if op_args.gpu: + executor = executor_stub(executor_factory, spec) + else: + executor = executor_factory() + executor.spec = spec + + self._executor = executor def analyze_schema( self, *args: _engine.OpArgSchema, **kwargs: _engine.OpArgSchema @@ -294,9 +302,9 @@ def process_arg( if len(missing_args) > 0: raise ValueError(f"Missing arguments: {', '.join(missing_args)}") - base_analyze_method = getattr(self, "analyze", None) + base_analyze_method = getattr(self._executor, "analyze", None) if base_analyze_method is not None: - result_type = base_analyze_method(*args, **kwargs) + result_type = base_analyze_method() else: result_type = expected_return if len(attributes) > 0: @@ -316,9 +324,10 @@ async def prepare(self) -> None: Prepare for execution. It's executed after `analyze` and before any `__call__` execution. """ - prepare_method = getattr(super(), "prepare", None) + prepare_method = getattr(self._executor, "prepare", None) if prepare_method is not None: await _to_async_call(prepare_method)() + self._acall = _to_async_call(self._executor.__call__) async def __call__(self, *args: Any, **kwargs: Any) -> Any: decoded_args = [] @@ -338,6 +347,7 @@ async def __call__(self, *args: Any, **kwargs: Any) -> Any: return None decoded_kwargs[kwarg_name] = kwarg_info.decoder(arg) + assert self._acall is not None if op_args.gpu: # For GPU executions, data-level parallelism is applied, so we don't want to # execute different tasks in parallel. @@ -350,21 +360,19 @@ async def __call__(self, *args: Any, **kwargs: Any) -> Any: output = await self._acall(*decoded_args, **decoded_kwargs) return self._result_encoder(output) - _WrappedClass.__name__ = executor_cls.__name__ - _WrappedClass.__doc__ = executor_cls.__doc__ - _WrappedClass.__module__ = executor_cls.__module__ - _WrappedClass.__qualname__ = executor_cls.__qualname__ - _WrappedClass.__wrapped__ = executor_cls + def enable_cache(self) -> bool: + return op_args.cache + + def behavior_version(self) -> int | None: + return op_args.behavior_version if category == OpCategory.FUNCTION: _engine.register_function_factory( - spec_cls.__name__, _FunctionExecutorFactory(spec_cls, _WrappedClass) + op_kind, _EngineFunctionExecutorFactory(spec_loader, _WrappedExecutor) ) else: raise ValueError(f"Unsupported executor type {category}") - return _WrappedClass - def executor_class(**args: Any) -> Callable[[type], type]: """ @@ -382,18 +390,31 @@ def _inner(cls: type[Executor]) -> type: raise TypeError("Expect a `spec` field with type hint") spec_cls = resolve_forward_ref(type_hints["spec"]) sig = inspect.signature(cls.__call__) - return _register_op_factory( + _register_op_factory( category=spec_cls._op_category, expected_args=list(sig.parameters.items())[1:], # First argument is `self` expected_return=sig.return_annotation, - executor_cls=cls, - spec_cls=spec_cls, + executor_factory=cls, + spec_loader=spec_cls, + op_kind=spec_cls.__name__, op_args=op_args, ) + return cls return _inner +class _EmptyFunctionSpec(FunctionSpec): + pass + + +class _SimpleFunctionExecutor: + spec: Any + + def prepare(self) -> None: + self.__call__ = self.spec.__call__ + + def function(**args: Any) -> Callable[[Callable[..., Any]], FunctionSpec]: """ Decorate a function to provide a function for an op. @@ -404,30 +425,32 @@ def _inner(fn: Callable[..., Any]) -> FunctionSpec: # Convert snake case to camel case. op_name = "".join(word.capitalize() for word in fn.__name__.split("_")) sig = inspect.signature(fn) + full_name = f"{fn.__module__}.{fn.__qualname__}" - class _Executor: - def __call__(self, *args: Any, **kwargs: Any) -> Any: - return fn(*args, **kwargs) + # An object that is both callable and can act as a FunctionSpec. + class _CallableSpec(_EmptyFunctionSpec): + __call__ = staticmethod(fn) - class _Spec(FunctionSpec): - def __call__(self, *args: Any, **kwargs: Any) -> Any: - return fn(*args, **kwargs) + def __reduce__(self) -> str | tuple[Any, ...]: + return full_name - _Spec.__name__ = op_name - _Spec.__doc__ = fn.__doc__ - _Spec.__module__ = fn.__module__ - _Spec.__qualname__ = fn.__qualname__ + _CallableSpec.__name__ = op_name + _CallableSpec.__doc__ = fn.__doc__ + _CallableSpec.__qualname__ = fn.__qualname__ + _CallableSpec.__module__ = fn.__module__ + callable_spec = _CallableSpec() _register_op_factory( category=OpCategory.FUNCTION, expected_args=list(sig.parameters.items()), expected_return=sig.return_annotation, - executor_cls=_Executor, - spec_cls=_Spec, + executor_factory=_SimpleFunctionExecutor, + spec_loader=lambda: callable_spec, + op_kind=op_name, op_args=op_args, ) - return _Spec() + return callable_spec return _inner diff --git a/python/cocoindex/subprocess_exec.py b/python/cocoindex/subprocess_exec.py new file mode 100644 index 000000000..db8f22f8c --- /dev/null +++ b/python/cocoindex/subprocess_exec.py @@ -0,0 +1,167 @@ +""" +Lightweight subprocess-backed executor stub. + +- Uses a single global ProcessPoolExecutor (max_workers=1), created lazily. +- In the subprocess, maintains a registry of executor instances keyed by + (executor_factory, pickled spec) to enable reuse. +- Caches analyze() and prepare() results per key to avoid repeated calls + even if key collision happens. +""" + +from __future__ import annotations + +from concurrent.futures import ProcessPoolExecutor +from dataclasses import dataclass, field +from typing import Any, Callable +import pickle +import threading +import asyncio +from .user_app_loader import load_user_app + + +# --------------------------------------------- +# Main process: single, lazily-created pool +# --------------------------------------------- +_pool_lock = threading.Lock() +_pool: ProcessPoolExecutor | None = None +_user_apps: list[str] = [] + + +def _get_pool() -> ProcessPoolExecutor: + global _pool + with _pool_lock: + if _pool is None: + # Single worker process as requested + _pool = ProcessPoolExecutor( + max_workers=1, initializer=_subprocess_init, initargs=(_user_apps,) + ) + return _pool + + +def add_user_app(app_target: str) -> None: + with _pool_lock: + _user_apps.append(app_target) + + +# --------------------------------------------- +# Subprocess: executor registry and helpers +# --------------------------------------------- + + +def _subprocess_init(user_apps: list[str]) -> None: + for app_target in user_apps: + load_user_app(app_target) + + +class _OnceResult: + _result: Any = None + _done: bool = False + + def run_once(self, method: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: + if self._done: + return self._result + self._result = _call_method(method, *args, **kwargs) + self._done = True + return self._result + + +@dataclass +class _ExecutorEntry: + executor: Any + prepare: _OnceResult = field(default_factory=_OnceResult) + analyze: _OnceResult = field(default_factory=_OnceResult) + ready_to_call: bool = False + + +_SUBPROC_EXECUTORS: dict[bytes, _ExecutorEntry] = {} + + +def _call_method(method: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: + """Run an awaitable/coroutine to completion synchronously, otherwise return as-is.""" + if asyncio.iscoroutinefunction(method): + return asyncio.run(method(*args, **kwargs)) + else: + return method(*args, **kwargs) + + +def _get_or_create_entry(key_bytes: bytes) -> _ExecutorEntry: + entry = _SUBPROC_EXECUTORS.get(key_bytes) + if entry is None: + executor_factory, spec = pickle.loads(key_bytes) + inst = executor_factory() + inst.spec = spec + entry = _ExecutorEntry(executor=inst) + _SUBPROC_EXECUTORS[key_bytes] = entry + return entry + + +def _sp_analyze(key_bytes: bytes) -> Any: + entry = _get_or_create_entry(key_bytes) + return entry.analyze.run_once(entry.executor.analyze) + + +def _sp_prepare(key_bytes: bytes) -> Any: + entry = _get_or_create_entry(key_bytes) + return entry.prepare.run_once(entry.executor.prepare) + + +def _sp_call(key_bytes: bytes, args: tuple[Any, ...], kwargs: dict[str, Any]) -> Any: + entry = _get_or_create_entry(key_bytes) + # There's a chance that the subprocess crashes and restarts in the middle. + # So we want to always make sure the executor is ready before each call. + if not entry.ready_to_call: + if analyze_fn := getattr(entry.executor, "analyze", None): + entry.analyze.run_once(analyze_fn) + if prepare_fn := getattr(entry.executor, "prepare", None): + entry.prepare.run_once(prepare_fn) + entry.ready_to_call = True + return _call_method(entry.executor.__call__, *args, **kwargs) + + +# --------------------------------------------- +# Public stub +# --------------------------------------------- + + +class _ExecutorStub: + _pool: ProcessPoolExecutor + _key_bytes: bytes + + def __init__(self, executor_factory: type[Any], spec: Any) -> None: + self._pool = _get_pool() + self._key_bytes = pickle.dumps( + (executor_factory, spec), protocol=pickle.HIGHEST_PROTOCOL + ) + + # Conditionally expose analyze if underlying class has it (sync-only in caller) + if hasattr(executor_factory, "analyze"): + # Bind as attribute so getattr(..., "analyze", None) works upstream + def _analyze() -> Any: + fut = self._pool.submit(_sp_analyze, self._key_bytes) + return fut.result() + + # Attach method + setattr(self, "analyze", _analyze) + + if hasattr(executor_factory, "prepare"): + + async def prepare() -> Any: + fut = self._pool.submit(_sp_prepare, self._key_bytes) + return await asyncio.wrap_future(fut) + + setattr(self, "prepare", prepare) + + async def __call__(self, *args: Any, **kwargs: Any) -> Any: + fut = self._pool.submit(_sp_call, self._key_bytes, args, kwargs) + return await asyncio.wrap_future(fut) + + +def executor_stub(executor_factory: type[Any], spec: Any) -> Any: + """ + Create a subprocess-backed stub for the given executor class/spec. + + - Lazily initializes a singleton ProcessPoolExecutor (max_workers=1). + - Returns a stub object exposing async __call__ and async prepare; analyze is + exposed if present on the original class. + """ + return _ExecutorStub(executor_factory, spec) diff --git a/python/cocoindex/tests/test_transform_flow.py b/python/cocoindex/tests/test_transform_flow.py index b3e7ff696..38b6a3a47 100644 --- a/python/cocoindex/tests/test_transform_flow.py +++ b/python/cocoindex/tests/test_transform_flow.py @@ -142,3 +142,35 @@ def transform_flow( result = transform_flow.eval(1, 2, None, None) assert result is None, f"Expected None, got {result}" + + +# Test GPU function behavior. +# They're not really executed on GPU, but we want to make sure they're scheduled on subprocesses correctly. + + +@cocoindex.op.function(gpu=True) +def gpu_append_world(text: str) -> str: + """Append ' world' to the input text.""" + return f"{text} world" + + +class GpuAppendSuffix(cocoindex.op.FunctionSpec): + suffix: str + + +@cocoindex.op.executor_class(gpu=True) +class GpuAppendSuffixExecutor: + spec: GpuAppendSuffix + + def __call__(self, text: str) -> str: + return f"{text}{self.spec.suffix}" + + +def test_gpu_function() -> None: + @cocoindex.transform_flow() + def transform_flow(text: cocoindex.DataSlice[str]) -> cocoindex.DataSlice[str]: + return text.transform(gpu_append_world).transform(GpuAppendSuffix(suffix="!")) + + result = transform_flow.eval("Hello") + expected = "Hello world!" + assert result == expected, f"Expected {expected}, got {result}" diff --git a/python/cocoindex/user_app_loader.py b/python/cocoindex/user_app_loader.py new file mode 100644 index 000000000..18d7e007d --- /dev/null +++ b/python/cocoindex/user_app_loader.py @@ -0,0 +1,51 @@ +import os +import sys +import importlib +import click +import types + + +def load_user_app(app_target: str) -> types.ModuleType: + """ + Loads the user's application, which can be a file path or an installed module name. + Exits on failure. + """ + if not app_target: + raise click.ClickException("Application target not provided.") + + looks_like_path = os.sep in app_target or app_target.lower().endswith(".py") + + if looks_like_path: + if not os.path.isfile(app_target): + raise click.ClickException(f"Application file path not found: {app_target}") + app_path = os.path.abspath(app_target) + app_dir = os.path.dirname(app_path) + module_name = os.path.splitext(os.path.basename(app_path))[0] + + if app_dir not in sys.path: + sys.path.insert(0, app_dir) + try: + spec = importlib.util.spec_from_file_location(module_name, app_path) + if spec is None: + raise ImportError(f"Could not create spec for file: {app_path}") + module = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = module + if spec.loader is None: + raise ImportError(f"Could not create loader for file: {app_path}") + spec.loader.exec_module(module) + return module + except (ImportError, FileNotFoundError, PermissionError) as e: + raise click.ClickException(f"Failed importing file '{app_path}': {e}") + finally: + if app_dir in sys.path and sys.path[0] == app_dir: + sys.path.pop(0) + + # Try as module + try: + return importlib.import_module(app_target) + except ImportError as e: + raise click.ClickException(f"Failed to load module '{app_target}': {e}") + except Exception as e: + raise click.ClickException( + f"Unexpected error importing module '{app_target}': {e}" + )