11use crate :: prelude:: * ;
22
3- use crate :: base:: schema:: { DataSchema , EnrichedValueType , FieldSchema , ValueType } ;
3+ use crate :: base:: schema:: { BasicValueType , FieldSchema , ValueType } ;
44use crate :: base:: spec:: VectorSimilarityMetric ;
55use crate :: execution:: query;
66use crate :: lib_context:: { clear_lib_context, get_auth_registry, init_lib_context} ;
77use crate :: ops:: interface:: { QueryResult , QueryResults } ;
88use crate :: ops:: py_factory:: PyOpArgSchema ;
99use crate :: ops:: { interface:: ExecutorFactory , py_factory:: PyFunctionFactory , register_factory} ;
1010use crate :: server:: { self , ServerSettings } ;
11+ use crate :: service:: flows:: get_flow_schema;
1112use crate :: settings:: Settings ;
1213use crate :: setup;
14+ use axum:: extract:: { Path , State } ;
1315use pyo3:: { exceptions:: PyException , prelude:: * } ;
1416use pyo3_async_runtimes:: tokio:: future_into_py;
1517use std:: collections:: btree_map;
@@ -368,58 +370,73 @@ fn add_auth_entry(key: String, value: Pythonized<serde_json::Value>) -> PyResult
368370}
369371
370372#[ pyfunction]
371- fn get_flow_schema ( _py : Python < ' _ > , flow_name : String ) -> PyResult < Vec < ( String , String , String ) > > {
372- let lib_context = get_lib_context ( ) . map_err ( |e| PyException :: new_err ( e. to_string ( ) ) ) ?;
373- let flow_ctx = lib_context
374- . get_flow_context ( & flow_name)
375- . map_err ( |e| PyException :: new_err ( e. to_string ( ) ) ) ?;
376- let schema = flow_ctx. flow . data_schema . clone ( ) ;
377-
378- let mut result = Vec :: new ( ) ;
379- fn process_fields (
380- fields : & [ FieldSchema ] ,
381- prefix : & str ,
382- result : & mut Vec < ( String , String , String ) > ,
383- ) {
384- for field in fields {
385- let field_name = format ! ( "{}{}" , prefix, field. name) ;
386-
387- let mut field_type = format ! ( "{}" , field. value_type. typ) ;
388- if field. value_type . nullable {
389- field_type. push ( '?' ) ;
390- }
373+ fn format_flow_schema < ' py > ( py : Python < ' py > , flow_name : String ) -> PyResult < Bound < ' py , PyAny > > {
374+ future_into_py ( py, async move {
375+ let lib_context = get_lib_context ( ) . into_py_result ( ) ?;
376+ let schema = get_flow_schema ( Path ( flow_name) , State ( lib_context) )
377+ . await
378+ . into_py_result ( ) ?;
391379
392- let attr_str = if field. value_type . attrs . is_empty ( ) {
393- String :: new ( )
394- } else {
395- field
396- . value_type
397- . attrs
398- . iter ( )
399- . map ( |( k, v) | {
400- let v_str = serde_json:: to_string ( v) . unwrap_or_default ( ) ;
401- format ! ( "{}: {}" , k, v_str. chars( ) . take( 50 ) . collect:: <String >( ) )
402- } )
403- . collect :: < Vec < _ > > ( )
404- . join ( ", " )
405- } ;
406-
407- result. push ( ( field_name. clone ( ) , field_type, attr_str) ) ;
408-
409- match & field. value_type . typ {
410- ValueType :: Struct ( s) => {
411- process_fields ( & s. fields , & format ! ( "{}." , field_name) , result) ;
380+ let mut result = Vec :: new ( ) ;
381+
382+ fn process_fields (
383+ fields : & [ FieldSchema ] ,
384+ prefix : & str ,
385+ result : & mut Vec < ( String , String , String ) > ,
386+ ) {
387+ for field in fields {
388+ let field_name = format ! ( "{}{}" , prefix, field. name) ;
389+
390+ let mut field_type = match & field. value_type . typ {
391+ ValueType :: Basic ( basic) => match basic {
392+ BasicValueType :: Vector ( v) => {
393+ let dim = v. dimension . map_or ( "*" . to_string ( ) , |d| d. to_string ( ) ) ;
394+ let elem = match * v. element_type {
395+ BasicValueType :: Float32 => "Float32" ,
396+ BasicValueType :: Float64 => "Float64" ,
397+ _ => "Unknown" ,
398+ } ;
399+ format ! ( "Vector[{}, {}]" , dim, elem)
400+ }
401+ other => format ! ( "{:?}" , other) ,
402+ } ,
403+ ValueType :: Table ( t) => format ! ( "{:?}" , t. kind) ,
404+ ValueType :: Struct ( _) => "Struct" . to_string ( ) ,
405+ } ;
406+
407+ if field. value_type . nullable {
408+ field_type. push ( '?' ) ;
412409 }
413- ValueType :: Table ( t) => {
414- process_fields ( & t. row . fields , & format ! ( "{}." , field_name) , result) ;
410+
411+ let attr_str = if field. value_type . attrs . is_empty ( ) {
412+ String :: new ( )
413+ } else {
414+ field
415+ . value_type
416+ . attrs
417+ . keys ( )
418+ . map ( |k| k. to_string ( ) )
419+ . collect :: < Vec < _ > > ( )
420+ . join ( ", " )
421+ } ;
422+
423+ result. push ( ( field_name. clone ( ) , field_type, attr_str) ) ;
424+
425+ match & field. value_type . typ {
426+ ValueType :: Struct ( s) => {
427+ process_fields ( & s. fields , & format ! ( "{}." , field_name) , result) ;
428+ }
429+ ValueType :: Table ( t) => {
430+ process_fields ( & t. row . fields , & format ! ( "{}." , field_name) , result) ;
431+ }
432+ ValueType :: Basic ( _) => { }
415433 }
416- ValueType :: Basic ( _) => { }
417434 }
418435 }
419- }
420436
421- process_fields ( & schema. schema . fields , "" , & mut result) ;
422- Ok ( result)
437+ process_fields ( & schema. schema . fields , "" , & mut result) ;
438+ Ok ( result)
439+ } )
423440}
424441
425442/// A Python module implemented in Rust.
@@ -435,7 +452,7 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> {
435452 m. add_function ( wrap_pyfunction ! ( apply_setup_changes, m) ?) ?;
436453 m. add_function ( wrap_pyfunction ! ( flow_names_with_setup, m) ?) ?;
437454 m. add_function ( wrap_pyfunction ! ( add_auth_entry, m) ?) ?;
438- m. add_function ( wrap_pyfunction ! ( get_flow_schema , m) ?) ?;
455+ m. add_function ( wrap_pyfunction ! ( format_flow_schema , m) ?) ?;
439456
440457 m. add_class :: < builder:: flow_builder:: FlowBuilder > ( ) ?;
441458 m. add_class :: < builder:: flow_builder:: DataCollector > ( ) ?;
0 commit comments