Skip to content

Commit 93e640b

Browse files
committed
Expose eval functionality by the CLI.
1 parent 21c3a03 commit 93e640b

File tree

4 files changed

+56
-13
lines changed

4 files changed

+56
-13
lines changed

python/cocoindex/cli.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import click
2+
import datetime
23

34
from . import flow, lib
45
from .setup import check_setup_status, CheckSetupStatusOptions, apply_setup_changes
@@ -52,6 +53,24 @@ def update(flow_name: str | None):
5253
stats = _flow_by_name(flow_name).update()
5354
print(stats)
5455

56+
@cli.command()
57+
@click.argument("flow_name", type=str, required=False)
58+
@click.option(
59+
"-o", "--output-dir", type=str, required=False,
60+
help="The directory to dump the evaluation output to.")
61+
@click.option(
62+
"-c", "--use-cache", is_flag=True, show_default=True, default=True,
63+
help="Use cached evaluation results if available.")
64+
def evaluate(flow_name: str | None, output_dir: str | None, use_cache: bool = True):
65+
"""
66+
Evaluate and dump the flow.
67+
"""
68+
fl = _flow_by_name(flow_name)
69+
if output_dir is None:
70+
output_dir = f"eval_{fl.name}_{datetime.datetime.now().strftime('%y%m%d_%H%M%S')}"
71+
options = flow.EvaluateAndDumpOptions(output_dir=output_dir, use_cache=use_cache)
72+
fl.evaluate_and_dump(options)
73+
5574
_default_server_settings = lib.ServerSettings.from_env()
5675

5776
@cli.command()

python/cocoindex/flow.py

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from typing import Any, Callable, Sequence, TypeVar, get_origin
1010
from threading import Lock
1111
from enum import Enum
12+
from dataclasses import dataclass
1213

1314
from . import _engine
1415
from . import vector
@@ -61,18 +62,18 @@ def _create_data_slice(
6162
def _spec_kind(spec: Any) -> str:
6263
return spec.__class__.__name__
6364

64-
def _spec_value_dump(v: Any) -> Any:
65-
"""Recursively dump a spec object and its nested attributes to a dictionary."""
65+
def _dump_engine_object(v: Any) -> Any:
66+
"""Recursively dump an object for engine. Engine side uses `Pythonzized` to catch."""
6667
if isinstance(v, type) or get_origin(v) is not None:
6768
return encode_enriched_type(v)
6869
elif isinstance(v, Enum):
6970
return v.value
7071
elif hasattr(v, '__dict__'):
71-
return {k: _spec_value_dump(v) for k, v in v.__dict__.items()}
72+
return {k: _dump_engine_object(v) for k, v in v.__dict__.items()}
7273
elif isinstance(v, (list, tuple)):
73-
return [_spec_value_dump(item) for item in v]
74+
return [_dump_engine_object(item) for item in v]
7475
elif isinstance(v, dict):
75-
return {k: _spec_value_dump(v) for k, v in v.items()}
76+
return {k: _dump_engine_object(v) for k, v in v.items()}
7677
return v
7778

7879
T = TypeVar('T')
@@ -177,7 +178,7 @@ def transform(self, fn_spec: op.FunctionSpec, *args, **kwargs) -> DataSlice:
177178
lambda target_scope, name:
178179
flow_builder_state.engine_flow_builder.transform(
179180
_spec_kind(fn_spec),
180-
_spec_value_dump(fn_spec),
181+
_dump_engine_object(fn_spec),
181182
transform_args,
182183
target_scope,
183184
flow_builder_state.field_name_builder.build_name(
@@ -267,7 +268,7 @@ def export(self, name: str, target_spec: op.StorageSpec, /, *,
267268
{"field_name": field_name, "metric": metric.value}
268269
for field_name, metric in vector_index]
269270
self._flow_builder_state.engine_flow_builder.export(
270-
name, _spec_kind(target_spec), _spec_value_dump(target_spec),
271+
name, _spec_kind(target_spec), _dump_engine_object(target_spec),
271272
index_options, self._engine_data_collector)
272273

273274

@@ -316,13 +317,20 @@ def add_source(self, spec: op.SourceSpec, /, name: str | None = None) -> DataSli
316317
self._state,
317318
lambda target_scope, name: self._state.engine_flow_builder.add_source(
318319
_spec_kind(spec),
319-
_spec_value_dump(spec),
320+
_dump_engine_object(spec),
320321
target_scope,
321322
self._state.field_name_builder.build_name(
322323
name, prefix=_to_snake_case(_spec_kind(spec))+'_'),
323324
),
324325
name
325326
)
327+
@dataclass
328+
class EvaluateAndDumpOptions:
329+
"""
330+
Options for evaluating and dumping a flow.
331+
"""
332+
output_dir: str
333+
use_cache: bool = True
326334

327335
class Flow:
328336
"""
@@ -348,20 +356,32 @@ def __str__(self):
348356
def __repr__(self):
349357
return repr(self._lazy_engine_flow())
350358

359+
@property
360+
def name(self) -> str:
361+
"""
362+
Get the name of the flow.
363+
"""
364+
return self._lazy_engine_flow().name()
365+
351366
def update(self):
352367
"""
353368
Update the index defined by the flow.
354369
Once the function returns, the indice is fresh up to the moment when the function is called.
355370
"""
356371
return self._lazy_engine_flow().update()
357372

373+
def evaluate_and_dump(self, options: EvaluateAndDumpOptions):
374+
"""
375+
Evaluate and dump the flow.
376+
"""
377+
return self._lazy_engine_flow().evaluate_and_dump(_dump_engine_object(options))
378+
358379
def internal_flow(self) -> _engine.Flow:
359380
"""
360381
Get the engine flow.
361382
"""
362383
return self._lazy_engine_flow()
363384

364-
365385
def _create_lazy_flow(name: str | None, fl_def: Callable[[FlowBuilder, DataScope], None]) -> Flow:
366386
"""
367387
Create a flow without really building it yet.

src/execution/dumper.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::builder::plan::{AnalyzedSourceOp, ExecutionPlan};
1515
use crate::utils::yaml_ser::YamlSerializer;
1616

1717
#[derive(Debug, Clone, Deserialize)]
18-
pub struct DumpEvaluationOutputOptions {
18+
pub struct EvaluateAndDumpOptions {
1919
pub output_dir: String,
2020
pub use_cache: bool,
2121
}
@@ -54,7 +54,7 @@ struct Dumper<'a> {
5454
plan: &'a ExecutionPlan,
5555
schema: &'a schema::DataSchema,
5656
pool: &'a PgPool,
57-
options: DumpEvaluationOutputOptions,
57+
options: EvaluateAndDumpOptions,
5858
}
5959

6060
impl<'a> Dumper<'a> {
@@ -215,7 +215,7 @@ impl<'a> Dumper<'a> {
215215
pub async fn evaluate_and_dump(
216216
plan: &ExecutionPlan,
217217
schema: &schema::DataSchema,
218-
options: DumpEvaluationOutputOptions,
218+
options: EvaluateAndDumpOptions,
219219
pool: &PgPool,
220220
) -> Result<()> {
221221
let output_dir = Path::new(&options.output_dir);

src/py/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ impl Flow {
103103
self.__str__()
104104
}
105105

106+
pub fn name(&self) -> &str {
107+
&self.0.flow_instance.name
108+
}
109+
106110
pub fn update(&self, py: Python<'_>) -> PyResult<IndexUpdateInfo> {
107111
py.allow_threads(|| {
108112
let lib_context = get_lib_context()
@@ -122,7 +126,7 @@ impl Flow {
122126
pub fn evaluate_and_dump(
123127
&self,
124128
py: Python<'_>,
125-
options: Pythonized<execution::dumper::DumpEvaluationOutputOptions>,
129+
options: Pythonized<execution::dumper::EvaluateAndDumpOptions>,
126130
) -> PyResult<()> {
127131
py.allow_threads(|| {
128132
let lib_context = get_lib_context()

0 commit comments

Comments
 (0)