Skip to content

Commit 936dd0b

Browse files
committed
feat(subprocessing): implement subprocessing for GPU workloads
1 parent 120dd2a commit 936dd0b

File tree

6 files changed

+309
-77
lines changed

6 files changed

+309
-77
lines changed

python/cocoindex/cli.py

Lines changed: 5 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@
44
import importlib.util
55
import os
66
import signal
7-
import sys
87
import threading
9-
import types
108
from types import FrameType
119
from typing import Any, Iterable
1210

@@ -20,6 +18,8 @@
2018
from . import flow, lib, setting
2119
from .setup import flow_names_with_setup
2220
from .runtime import execution_context
21+
from .subprocess_exec import add_user_app
22+
from .user_app_loader import load_user_app
2323

2424
# Create ServerSettings lazily upon first call, as environment variables may be loaded from files, etc.
2525
COCOINDEX_HOST = "https://cocoindex.io"
@@ -76,50 +76,9 @@ def _get_app_ref_from_specifier(
7676
return app_ref
7777

7878

79-
def _load_user_app(app_target: str) -> types.ModuleType:
80-
"""
81-
Loads the user's application, which can be a file path or an installed module name.
82-
Exits on failure.
83-
"""
84-
if not app_target:
85-
raise click.ClickException("Application target not provided.")
86-
87-
looks_like_path = os.sep in app_target or app_target.lower().endswith(".py")
88-
89-
if looks_like_path:
90-
if not os.path.isfile(app_target):
91-
raise click.ClickException(f"Application file path not found: {app_target}")
92-
app_path = os.path.abspath(app_target)
93-
app_dir = os.path.dirname(app_path)
94-
module_name = os.path.splitext(os.path.basename(app_path))[0]
95-
96-
if app_dir not in sys.path:
97-
sys.path.insert(0, app_dir)
98-
try:
99-
spec = importlib.util.spec_from_file_location(module_name, app_path)
100-
if spec is None:
101-
raise ImportError(f"Could not create spec for file: {app_path}")
102-
module = importlib.util.module_from_spec(spec)
103-
sys.modules[spec.name] = module
104-
if spec.loader is None:
105-
raise ImportError(f"Could not create loader for file: {app_path}")
106-
spec.loader.exec_module(module)
107-
return module
108-
except (ImportError, FileNotFoundError, PermissionError) as e:
109-
raise click.ClickException(f"Failed importing file '{app_path}': {e}")
110-
finally:
111-
if app_dir in sys.path and sys.path[0] == app_dir:
112-
sys.path.pop(0)
113-
114-
# Try as module
115-
try:
116-
return importlib.import_module(app_target)
117-
except ImportError as e:
118-
raise click.ClickException(f"Failed to load module '{app_target}': {e}")
119-
except Exception as e:
120-
raise click.ClickException(
121-
f"Unexpected error importing module '{app_target}': {e}"
122-
)
79+
def _load_user_app(app_target: str) -> None:
80+
load_user_app(app_target)
81+
add_user_app(app_target)
12382

12483

12584
def _initialize_cocoindex_in_process() -> None:

python/cocoindex/functions.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class SentenceTransformerEmbedExecutor:
8989
spec: SentenceTransformerEmbed
9090
_model: Any | None = None
9191

92-
def analyze(self, _text: Any) -> type:
92+
def analyze(self) -> type:
9393
try:
9494
# Only import sentence_transformers locally when it's needed, as its import is very slow.
9595
import sentence_transformers # pylint: disable=import-outside-toplevel
@@ -245,7 +245,7 @@ class ColPaliEmbedImageExecutor:
245245
spec: ColPaliEmbedImage
246246
_model_info: ColPaliModelInfo
247247

248-
def analyze(self, _img_bytes: Any) -> type:
248+
def analyze(self) -> type:
249249
# Get shared model and dimension
250250
self._model_info = _get_colpali_model_and_processor(self.spec.model)
251251

@@ -321,7 +321,7 @@ class ColPaliEmbedQueryExecutor:
321321
spec: ColPaliEmbedQuery
322322
_model_info: ColPaliModelInfo
323323

324-
def analyze(self, _query: Any) -> type:
324+
def analyze(self) -> type:
325325
# Get shared model and dimension
326326
self._model_info = _get_colpali_model_and_processor(self.spec.model)
327327

python/cocoindex/op.py

Lines changed: 51 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,17 @@
1111
Awaitable,
1212
Callable,
1313
Protocol,
14+
ParamSpec,
15+
TypeVar,
16+
Type,
17+
cast,
1418
dataclass_transform,
1519
Annotated,
1620
get_args,
1721
)
1822

1923
from . import _engine # type: ignore
24+
from .subprocess_exec import executor_stub
2025
from .convert import (
2126
make_engine_value_encoder,
2227
make_engine_value_decoder,
@@ -85,11 +90,13 @@ class Executor(Protocol):
8590
op_category: OpCategory
8691

8792

88-
def _load_spec_from_engine(spec_cls: type, spec: dict[str, Any]) -> Any:
93+
def _load_spec_from_engine(
94+
spec_loader: Callable[..., Any], spec: dict[str, Any]
95+
) -> Any:
8996
"""
9097
Load a spec from the engine.
9198
"""
92-
return spec_cls(**spec)
99+
return spec_loader(**spec)
93100

94101

95102
def _get_required_method(cls: type, name: str) -> Callable[..., Any]:
@@ -101,18 +108,18 @@ def _get_required_method(cls: type, name: str) -> Callable[..., Any]:
101108
return method
102109

103110

104-
class _FunctionExecutorFactory:
105-
_spec_cls: type
111+
class _EngineFunctionExecutorFactory:
112+
_spec_loader: Callable[..., Any]
106113
_executor_cls: type
107114

108-
def __init__(self, spec_cls: type, executor_cls: type):
109-
self._spec_cls = spec_cls
115+
def __init__(self, spec_loader: Callable[..., Any], executor_cls: type):
116+
self._spec_loader = spec_loader
110117
self._executor_cls = executor_cls
111118

112119
def __call__(
113120
self, spec: dict[str, Any], *args: Any, **kwargs: Any
114121
) -> tuple[dict[str, Any], Executor]:
115-
spec = _load_spec_from_engine(self._spec_cls, spec)
122+
spec = _load_spec_from_engine(self._spec_loader, spec)
116123
executor = self._executor_cls(spec)
117124
result_type = executor.analyze_schema(*args, **kwargs)
118125
return (result_type, executor)
@@ -166,8 +173,8 @@ def _register_op_factory(
166173
category: OpCategory,
167174
expected_args: list[tuple[str, inspect.Parameter]],
168175
expected_return: Any,
169-
executor_cls: type,
170-
spec_cls: type,
176+
executor_factory: Any,
177+
spec_loader: Callable[..., Any],
171178
op_kind: str,
172179
op_args: OpArgs,
173180
) -> None:
@@ -179,15 +186,19 @@ class _WrappedExecutor:
179186
_executor: Any
180187
_args_info: list[_ArgInfo]
181188
_kwargs_info: dict[str, _ArgInfo]
182-
_acall: Callable[..., Awaitable[Any]]
183189
_result_encoder: Callable[[Any], Any]
190+
_acall: Callable[..., Awaitable[Any]] | None = None
184191

185192
def __init__(self, spec: Any) -> None:
186-
executor: Any = executor_class()
187-
executor = executor_cls()
188-
executor.spec = spec
193+
executor: Any
194+
195+
if op_args.gpu:
196+
executor = executor_stub(executor_factory, spec)
197+
else:
198+
executor = executor_factory()
199+
executor.spec = spec
200+
189201
self._executor = executor
190-
self._acall = _to_async_call(executor.__call__)
191202

192203
def analyze_schema(
193204
self, *args: _engine.OpArgSchema, **kwargs: _engine.OpArgSchema
@@ -293,7 +304,7 @@ def process_arg(
293304

294305
base_analyze_method = getattr(self._executor, "analyze", None)
295306
if base_analyze_method is not None:
296-
result_type = base_analyze_method(*args, **kwargs)
307+
result_type = base_analyze_method()
297308
else:
298309
result_type = expected_return
299310
if len(attributes) > 0:
@@ -316,6 +327,7 @@ async def prepare(self) -> None:
316327
prepare_method = getattr(self._executor, "prepare", None)
317328
if prepare_method is not None:
318329
await _to_async_call(prepare_method)()
330+
self._acall = _to_async_call(self._executor.__call__)
319331

320332
async def __call__(self, *args: Any, **kwargs: Any) -> Any:
321333
decoded_args = []
@@ -335,6 +347,7 @@ async def __call__(self, *args: Any, **kwargs: Any) -> Any:
335347
return None
336348
decoded_kwargs[kwarg_name] = kwarg_info.decoder(arg)
337349

350+
assert self._acall is not None
338351
if op_args.gpu:
339352
# For GPU executions, data-level parallelism is applied, so we don't want to
340353
# execute different tasks in parallel.
@@ -355,7 +368,7 @@ def behavior_version(self) -> int | None:
355368

356369
if category == OpCategory.FUNCTION:
357370
_engine.register_function_factory(
358-
op_kind, _FunctionExecutorFactory(spec_cls, _WrappedExecutor)
371+
op_kind, _EngineFunctionExecutorFactory(spec_loader, _WrappedExecutor)
359372
)
360373
else:
361374
raise ValueError(f"Unsupported executor type {category}")
@@ -381,8 +394,8 @@ def _inner(cls: type[Executor]) -> type:
381394
category=spec_cls._op_category,
382395
expected_args=list(sig.parameters.items())[1:], # First argument is `self`
383396
expected_return=sig.return_annotation,
384-
executor_cls=cls,
385-
spec_cls=spec_cls,
397+
executor_factory=cls,
398+
spec_loader=spec_cls,
386399
op_kind=spec_cls.__name__,
387400
op_args=op_args,
388401
)
@@ -395,6 +408,13 @@ class _EmptyFunctionSpec(FunctionSpec):
395408
pass
396409

397410

411+
class _SimpleFunctionExecutor:
412+
spec: Any
413+
414+
def prepare(self) -> None:
415+
self.__call__ = self.spec.__call__
416+
417+
398418
def function(**args: Any) -> Callable[[Callable[..., Any]], FunctionSpec]:
399419
"""
400420
Decorate a function to provide a function for an op.
@@ -405,29 +425,32 @@ def _inner(fn: Callable[..., Any]) -> FunctionSpec:
405425
# Convert snake case to camel case.
406426
op_name = "".join(word.capitalize() for word in fn.__name__.split("_"))
407427
sig = inspect.signature(fn)
428+
full_name = f"{fn.__module__}.{fn.__qualname__}"
408429

409-
class _SpecExecutor(_EmptyFunctionSpec):
410-
def __call__(self, *args: Any, **kwargs: Any) -> Any:
411-
return fn(*args, **kwargs)
430+
# An object that is both callable and can act as a FunctionSpec.
431+
class _CallableSpec(_EmptyFunctionSpec):
432+
__call__ = staticmethod(fn)
412433

413434
def __reduce__(self) -> str | tuple[Any, ...]:
414-
return fn.__qualname__
435+
return full_name
415436

416-
_SpecExecutor.__name__ = op_name
417-
_SpecExecutor.__doc__ = fn.__doc__
418-
_SpecExecutor.__module__ = fn.__module__
437+
_CallableSpec.__name__ = op_name
438+
_CallableSpec.__doc__ = fn.__doc__
439+
_CallableSpec.__qualname__ = fn.__qualname__
440+
_CallableSpec.__module__ = fn.__module__
441+
callable_spec = _CallableSpec()
419442

420443
_register_op_factory(
421444
category=OpCategory.FUNCTION,
422445
expected_args=list(sig.parameters.items()),
423446
expected_return=sig.return_annotation,
424-
executor_cls=_SpecExecutor,
425-
spec_cls=_EmptyFunctionSpec,
447+
executor_factory=_SimpleFunctionExecutor,
448+
spec_loader=lambda: callable_spec,
426449
op_kind=op_name,
427450
op_args=op_args,
428451
)
429452

430-
return _SpecExecutor()
453+
return callable_spec
431454

432455
return _inner
433456

0 commit comments

Comments
 (0)