Skip to content

Commit 72a4cc3

Browse files
committed
refactor: use Flow pymethod to get schema
1 parent 1065c37 commit 72a4cc3

File tree

3 files changed

+78
-88
lines changed

3 files changed

+78
-88
lines changed

python/cocoindex/cli.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,16 +65,19 @@ def show(flow_name: str | None, color: bool):
6565
console = Console(no_color=not color)
6666
console.print(flow._render_text())
6767

68-
async def render_schema_and_print():
69-
table = Table(title=f"Schema for Flow: {flow.name}", show_header=True, header_style="bold magenta")
70-
table.add_column("Field", style="cyan")
71-
table.add_column("Type", style="green")
72-
table.add_column("Attributes", style="yellow")
73-
for field_name, field_type, attr_str in await flow._render_schema():
74-
table.add_row(field_name, field_type, attr_str)
75-
console.print(table)
76-
77-
asyncio.run(render_schema_and_print())
68+
table = Table(
69+
title=f"Schema for Flow: {flow.name}",
70+
show_header=True,
71+
header_style="bold magenta"
72+
)
73+
table.add_column("Field", style="cyan")
74+
table.add_column("Type", style="green")
75+
table.add_column("Attributes", style="yellow")
76+
77+
for field_name, field_type, attr_str in flow._render_schema():
78+
table.add_row(field_name, field_type, attr_str)
79+
80+
console.print(table)
7881

7982
@cli.command()
8083
def setup():

python/cocoindex/flow.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -504,11 +504,8 @@ def _render_text(self) -> Text:
504504
except json.JSONDecodeError:
505505
return Text(flow_spec_str)
506506

507-
async def _render_schema(self) -> list[tuple[str, str, str]]:
508-
"""
509-
Render the schema as a list of (field_name, field_type, attributes) tuples.
510-
"""
511-
return await _engine.format_flow_schema(self.name)
507+
def _render_schema(self) -> list[tuple[str, str, str]]:
508+
return self._lazy_engine_flow().get_schema()
512509

513510
def __str__(self):
514511
return str(self._render_text())

src/py/mod.rs

Lines changed: 63 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,8 @@ use crate::ops::interface::{QueryResult, QueryResults};
88
use crate::ops::py_factory::PyOpArgSchema;
99
use crate::ops::{interface::ExecutorFactory, py_factory::PyFunctionFactory, register_factory};
1010
use crate::server::{self, ServerSettings};
11-
use crate::service::flows::get_flow_schema;
1211
use crate::settings::Settings;
1312
use crate::setup;
14-
use axum::extract::{Path, State};
1513
use pyo3::{exceptions::PyException, prelude::*};
1614
use pyo3_async_runtimes::tokio::future_into_py;
1715
use std::collections::btree_map;
@@ -197,6 +195,69 @@ impl Flow {
197195
Ok(())
198196
})
199197
}
198+
199+
pub fn get_schema(&self) -> Vec<(String, String, String)> {
200+
let schema = &self.0.flow.data_schema;
201+
let mut result = Vec::new();
202+
203+
fn process_fields(
204+
fields: &[FieldSchema],
205+
prefix: &str,
206+
result: &mut Vec<(String, String, String)>,
207+
) {
208+
for field in fields {
209+
let field_name = format!("{}{}", prefix, field.name);
210+
211+
let mut field_type = match &field.value_type.typ {
212+
ValueType::Basic(basic) => match basic {
213+
BasicValueType::Vector(v) => {
214+
let dim = v.dimension.map_or("*".to_string(), |d| d.to_string());
215+
let elem = match *v.element_type {
216+
BasicValueType::Float32 => "Float32",
217+
BasicValueType::Float64 => "Float64",
218+
_ => "Unknown",
219+
};
220+
format!("Vector[{}, {}]", dim, elem)
221+
}
222+
other => format!("{:?}", other),
223+
},
224+
ValueType::Table(t) => format!("{:?}", t.kind),
225+
ValueType::Struct(_) => "Struct".to_string(),
226+
};
227+
228+
if field.value_type.nullable {
229+
field_type.push('?');
230+
}
231+
232+
let attr_str = if field.value_type.attrs.is_empty() {
233+
String::new()
234+
} else {
235+
field
236+
.value_type
237+
.attrs
238+
.keys()
239+
.map(|k| k.to_string())
240+
.collect::<Vec<_>>()
241+
.join(", ")
242+
};
243+
244+
result.push((field_name.clone(), field_type, attr_str));
245+
246+
match &field.value_type.typ {
247+
ValueType::Struct(s) => {
248+
process_fields(&s.fields, &format!("{}.", field_name), result);
249+
}
250+
ValueType::Table(t) => {
251+
process_fields(&t.row.fields, &format!("{}.", field_name), result);
252+
}
253+
ValueType::Basic(_) => {}
254+
}
255+
}
256+
}
257+
258+
process_fields(&schema.schema.fields, "", &mut result);
259+
result
260+
}
200261
}
201262

202263
#[pyclass]
@@ -369,76 +430,6 @@ fn add_auth_entry(key: String, value: Pythonized<serde_json::Value>) -> PyResult
369430
Ok(())
370431
}
371432

372-
#[pyfunction]
373-
fn format_flow_schema<'py>(py: Python<'py>, flow_name: String) -> PyResult<Bound<'py, PyAny>> {
374-
future_into_py(py, async move {
375-
let lib_context = get_lib_context().into_py_result()?;
376-
let schema = get_flow_schema(Path(flow_name), State(lib_context))
377-
.await
378-
.into_py_result()?;
379-
380-
let mut result = Vec::new();
381-
382-
fn process_fields(
383-
fields: &[FieldSchema],
384-
prefix: &str,
385-
result: &mut Vec<(String, String, String)>,
386-
) {
387-
for field in fields {
388-
let field_name = format!("{}{}", prefix, field.name);
389-
390-
let mut field_type = match &field.value_type.typ {
391-
ValueType::Basic(basic) => match basic {
392-
BasicValueType::Vector(v) => {
393-
let dim = v.dimension.map_or("*".to_string(), |d| d.to_string());
394-
let elem = match *v.element_type {
395-
BasicValueType::Float32 => "Float32",
396-
BasicValueType::Float64 => "Float64",
397-
_ => "Unknown",
398-
};
399-
format!("Vector[{}, {}]", dim, elem)
400-
}
401-
other => format!("{:?}", other),
402-
},
403-
ValueType::Table(t) => format!("{:?}", t.kind),
404-
ValueType::Struct(_) => "Struct".to_string(),
405-
};
406-
407-
if field.value_type.nullable {
408-
field_type.push('?');
409-
}
410-
411-
let attr_str = if field.value_type.attrs.is_empty() {
412-
String::new()
413-
} else {
414-
field
415-
.value_type
416-
.attrs
417-
.keys()
418-
.map(|k| k.to_string())
419-
.collect::<Vec<_>>()
420-
.join(", ")
421-
};
422-
423-
result.push((field_name.clone(), field_type, attr_str));
424-
425-
match &field.value_type.typ {
426-
ValueType::Struct(s) => {
427-
process_fields(&s.fields, &format!("{}.", field_name), result);
428-
}
429-
ValueType::Table(t) => {
430-
process_fields(&t.row.fields, &format!("{}.", field_name), result);
431-
}
432-
ValueType::Basic(_) => {}
433-
}
434-
}
435-
}
436-
437-
process_fields(&schema.schema.fields, "", &mut result);
438-
Ok(result)
439-
})
440-
}
441-
442433
/// A Python module implemented in Rust.
443434
#[pymodule]
444435
#[pyo3(name = "_engine")]
@@ -452,7 +443,6 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> {
452443
m.add_function(wrap_pyfunction!(apply_setup_changes, m)?)?;
453444
m.add_function(wrap_pyfunction!(flow_names_with_setup, m)?)?;
454445
m.add_function(wrap_pyfunction!(add_auth_entry, m)?)?;
455-
m.add_function(wrap_pyfunction!(format_flow_schema, m)?)?;
456446

457447
m.add_class::<builder::flow_builder::FlowBuilder>()?;
458448
m.add_class::<builder::flow_builder::DataCollector>()?;

0 commit comments

Comments
 (0)