Skip to content

Commit 9e34793

Browse files
committed
feat: add flow schema processing pyfunction
1 parent 0660f34 commit 9e34793

File tree

1 file changed

+58
-0
lines changed

1 file changed

+58
-0
lines changed

src/py/mod.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::prelude::*;
22

3+
use crate::base::schema::{DataSchema, EnrichedValueType, FieldSchema, ValueType};
34
use crate::base::spec::VectorSimilarityMetric;
45
use crate::execution::query;
56
use crate::lib_context::{clear_lib_context, get_auth_registry, init_lib_context};
@@ -13,6 +14,7 @@ use pyo3::{exceptions::PyException, prelude::*};
1314
use pyo3_async_runtimes::tokio::future_into_py;
1415
use std::collections::btree_map;
1516
use std::fmt::Write;
17+
use std::sync::Arc;
1618

1719
mod convert;
1820
pub use convert::*;
@@ -365,6 +367,61 @@ fn add_auth_entry(key: String, value: Pythonized<serde_json::Value>) -> PyResult
365367
Ok(())
366368
}
367369

370+
#[pyfunction]
371+
fn get_flow_schema(_py: Python<'_>, flow_name: String) -> PyResult<Vec<(String, String, String)>> {
372+
let lib_context = get_lib_context().map_err(|e| PyException::new_err(e.to_string()))?;
373+
let flow_ctx = lib_context
374+
.get_flow_context(&flow_name)
375+
.map_err(|e| PyException::new_err(e.to_string()))?;
376+
let schema = flow_ctx.flow.data_schema.clone();
377+
378+
let mut result = Vec::new();
379+
fn process_fields(
380+
fields: &[FieldSchema],
381+
prefix: &str,
382+
result: &mut Vec<(String, String, String)>,
383+
) {
384+
for field in fields {
385+
let field_name = format!("{}{}", prefix, field.name);
386+
387+
let mut field_type = format!("{}", field.value_type.typ);
388+
if field.value_type.nullable {
389+
field_type.push('?');
390+
}
391+
392+
let attr_str = if field.value_type.attrs.is_empty() {
393+
String::new()
394+
} else {
395+
field
396+
.value_type
397+
.attrs
398+
.iter()
399+
.map(|(k, v)| {
400+
let v_str = serde_json::to_string(v).unwrap_or_default();
401+
format!("{}: {}", k, v_str.chars().take(50).collect::<String>())
402+
})
403+
.collect::<Vec<_>>()
404+
.join(", ")
405+
};
406+
407+
result.push((field_name.clone(), field_type, attr_str));
408+
409+
match &field.value_type.typ {
410+
ValueType::Struct(s) => {
411+
process_fields(&s.fields, &format!("{}.", field_name), result);
412+
}
413+
ValueType::Table(t) => {
414+
process_fields(&t.row.fields, &format!("{}.", field_name), result);
415+
}
416+
ValueType::Basic(_) => {}
417+
}
418+
}
419+
}
420+
421+
process_fields(&schema.schema.fields, "", &mut result);
422+
Ok(result)
423+
}
424+
368425
/// A Python module implemented in Rust.
369426
#[pymodule]
370427
#[pyo3(name = "_engine")]
@@ -378,6 +435,7 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> {
378435
m.add_function(wrap_pyfunction!(apply_setup_changes, m)?)?;
379436
m.add_function(wrap_pyfunction!(flow_names_with_setup, m)?)?;
380437
m.add_function(wrap_pyfunction!(add_auth_entry, m)?)?;
438+
m.add_function(wrap_pyfunction!(get_flow_schema, m)?)?;
381439

382440
m.add_class::<builder::flow_builder::FlowBuilder>()?;
383441
m.add_class::<builder::flow_builder::DataCollector>()?;

0 commit comments

Comments
 (0)