@@ -2,27 +2,34 @@ use crate::prelude::*;
22
33use super :: schema:: { EnrichedValueType , FieldSchema } ;
44use serde:: { Deserialize , Serialize } ;
5- use serde_json:: Value ;
65use std:: fmt;
76use std:: ops:: Deref ;
87
9- // Define SpecFormatMode enum for type-safe formatting
8+ /// OutputMode enum for displaying spec info in different granularity
109#[ derive( Debug , Clone , Copy , PartialEq ) ]
11- pub enum SpecFormatMode {
10+ pub enum OutputMode {
1211 Concise ,
1312 Verbose ,
1413}
1514
16- impl SpecFormatMode {
17- pub fn from_str ( s : & str ) -> Result < Self , String > {
15+ impl OutputMode {
16+ pub fn from_str ( s : & str ) -> Self {
1817 match s. to_lowercase ( ) . as_str ( ) {
19- "concise" => Ok ( SpecFormatMode :: Concise ) ,
20- "verbose" => Ok ( SpecFormatMode :: Verbose ) ,
21- _ => Err ( format ! ( "Invalid format mode: {}" , s) ) ,
18+ "concise" => OutputMode :: Concise ,
19+ "verbose" => OutputMode :: Verbose ,
20+ _ => unreachable ! (
21+ "Invalid format mode: {}. Expected 'concise' or 'verbose'." ,
22+ s
23+ ) ,
2224 }
2325 }
2426}
2527
28+ /// Formatting spec per output mode
29+ pub trait SpecFormatter {
30+ fn format ( & self , mode : OutputMode ) -> String ;
31+ }
32+
2633#[ derive( Debug , Clone , Serialize , Deserialize ) ]
2734#[ serde( tag = "kind" ) ]
2835pub enum SpecString {
@@ -233,69 +240,31 @@ pub struct OpSpec {
233240 pub spec : serde_json:: Map < String , serde_json:: Value > ,
234241}
235242
236- impl OpSpec {
237- pub fn format_concise ( & self ) -> String {
238- let mut parts = vec ! [ ] ;
239- for ( key, value) in self . spec . iter ( ) {
240- match value {
241- Value :: String ( s) => parts. push ( format ! ( "{}={}" , key, s) ) ,
242- Value :: Array ( arr) => {
243- let items = arr
244- . iter ( )
245- . filter_map ( |v| v. as_str ( ) )
246- . collect :: < Vec < _ > > ( )
247- . join ( "," ) ;
248- if !items. is_empty ( ) {
249- parts. push ( format ! ( "{}={}" , key, items) ) ;
250- }
251- }
252- Value :: Object ( obj) => {
253- if let Some ( model) = obj. get ( "model" ) . and_then ( |v| v. as_str ( ) ) {
254- parts. push ( format ! ( "{}={}" , key, model) ) ;
255- }
256- }
257- _ => { }
258- }
259- }
260- if parts. is_empty ( ) {
261- self . kind . clone ( )
262- } else {
263- format ! ( "{}({})" , self . kind, parts. join( ", " ) )
264- }
265- }
266-
267- pub fn format_verbose ( & self ) -> String {
268- let spec_str = serde_json:: to_string_pretty ( & self . spec )
269- . map ( |s| {
270- let lines: Vec < & str > = s. lines ( ) . collect ( ) ;
271- if lines. len ( ) < s. lines ( ) . count ( ) {
272- lines
273- . into_iter ( )
274- . chain ( [ "..." ] )
275- . collect :: < Vec < _ > > ( )
276- . join ( "\n " )
277- } else {
278- lines. join ( "\n " )
279- }
280- } )
281- . unwrap_or ( "#serde_error" . to_string ( ) ) ;
282- format ! ( "{}({})" , self . kind, spec_str)
283- }
284-
285- pub fn format ( & self , mode : SpecFormatMode ) -> String {
243+ impl SpecFormatter for OpSpec {
244+ fn format ( & self , mode : OutputMode ) -> String {
286245 match mode {
287- SpecFormatMode :: Concise => self . format_concise ( ) ,
288- SpecFormatMode :: Verbose => self . format_verbose ( ) ,
246+ OutputMode :: Concise => self . kind . clone ( ) ,
247+ OutputMode :: Verbose => {
248+ let spec_str = serde_json:: to_string_pretty ( & self . spec )
249+ . map ( |s| {
250+ let lines: Vec < & str > = s. lines ( ) . collect ( ) ;
251+ if lines. len ( ) < s. lines ( ) . count ( ) {
252+ lines
253+ . into_iter ( )
254+ . chain ( [ "..." ] )
255+ . collect :: < Vec < _ > > ( )
256+ . join ( "\n " )
257+ } else {
258+ lines. join ( "\n " )
259+ }
260+ } )
261+ . unwrap_or ( "#serde_error" . to_string ( ) ) ;
262+ format ! ( "{}({})" , self . kind, spec_str)
263+ }
289264 }
290265 }
291266}
292267
293- impl fmt:: Display for OpSpec {
294- fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
295- write ! ( f, "{}" , self . format_concise( ) )
296- }
297- }
298-
299268#[ derive( Debug , Clone , Serialize , Deserialize , Default ) ]
300269pub struct SourceRefreshOptions {
301270 pub refresh_interval : Option < std:: time:: Duration > ,
@@ -319,53 +288,43 @@ pub struct ImportOpSpec {
319288 pub refresh_options : SourceRefreshOptions ,
320289}
321290
322- impl ImportOpSpec {
323- pub fn format ( & self , mode : SpecFormatMode ) -> String {
324- let source = match mode {
325- SpecFormatMode :: Concise => self . source . format_concise ( ) ,
326- SpecFormatMode :: Verbose => self . source . format_verbose ( ) ,
327- } ;
291+ impl SpecFormatter for ImportOpSpec {
292+ fn format ( & self , mode : OutputMode ) -> String {
293+ let source = self . source . format ( mode) ;
328294 format ! ( "source={}, refresh={}" , source, self . refresh_options)
329295 }
330296}
331297
332298impl fmt:: Display for ImportOpSpec {
333299 fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
334- write ! ( f, "{}" , self . format( SpecFormatMode :: Concise ) )
300+ write ! ( f, "{}" , self . format( OutputMode :: Concise ) )
335301 }
336302}
337303
304+ /// Transform data using a given operator.
338305#[ derive( Debug , Clone , Serialize , Deserialize ) ]
339306pub struct TransformOpSpec {
340307 pub inputs : Vec < OpArgBinding > ,
341308 pub op : OpSpec ,
342309}
343310
344- impl TransformOpSpec {
345- pub fn format ( & self , mode : SpecFormatMode ) -> String {
311+ impl SpecFormatter for TransformOpSpec {
312+ fn format ( & self , mode : OutputMode ) -> String {
346313 let inputs = self
347314 . inputs
348315 . iter ( )
349316 . map ( ToString :: to_string)
350317 . collect :: < Vec < _ > > ( )
351318 . join ( "," ) ;
352- let op_str = match mode {
353- SpecFormatMode :: Concise => self . op . format_concise ( ) ,
354- SpecFormatMode :: Verbose => self . op . format_verbose ( ) ,
355- } ;
319+ let op_str = self . op . format ( mode) ;
356320 match mode {
357- SpecFormatMode :: Concise => format ! ( "op={}, inputs={}" , op_str, inputs) ,
358- SpecFormatMode :: Verbose => format ! ( "op={}, inputs=[{}]" , op_str, inputs) ,
321+ OutputMode :: Concise => format ! ( "op={}, inputs={}" , op_str, inputs) ,
322+ OutputMode :: Verbose => format ! ( "op={}, inputs=[{}]" , op_str, inputs) ,
359323 }
360324 }
361325}
362326
363- impl fmt:: Display for TransformOpSpec {
364- fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
365- write ! ( f, "{}" , self . format( SpecFormatMode :: Concise ) )
366- }
367- }
368-
327+ /// Apply reactive operations to each row of the input field.
369328#[ derive( Debug , Clone , Serialize , Deserialize ) ]
370329pub struct ForEachOpSpec {
371330 /// Mapping that provides a table to apply reactive operations to.
@@ -377,21 +336,18 @@ impl ForEachOpSpec {
377336 pub fn get_label ( & self ) -> String {
378337 format ! ( "Loop over {}" , self . field_path)
379338 }
339+ }
380340
381- pub fn format ( & self , mode : SpecFormatMode ) -> String {
341+ impl SpecFormatter for ForEachOpSpec {
342+ fn format ( & self , mode : OutputMode ) -> String {
382343 match mode {
383- SpecFormatMode :: Concise => self . get_label ( ) ,
384- SpecFormatMode :: Verbose => format ! ( "field={}" , self . field_path) ,
344+ OutputMode :: Concise => self . get_label ( ) ,
345+ OutputMode :: Verbose => format ! ( "field={}" , self . field_path) ,
385346 }
386347 }
387348}
388349
389- impl fmt:: Display for ForEachOpSpec {
390- fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
391- write ! ( f, "field={}" , self . field_path)
392- }
393- }
394-
350+ /// Emit data to a given collector at the given scope.
395351#[ derive( Debug , Clone , Serialize , Deserialize ) ]
396352pub struct CollectOpSpec {
397353 /// Field values to be collected.
@@ -405,17 +361,17 @@ pub struct CollectOpSpec {
405361 pub auto_uuid_field : Option < FieldName > ,
406362}
407363
408- impl CollectOpSpec {
409- pub fn format ( & self , mode : SpecFormatMode ) -> String {
364+ impl SpecFormatter for CollectOpSpec {
365+ fn format ( & self , mode : OutputMode ) -> String {
410366 let uuid = self . auto_uuid_field . as_deref ( ) . unwrap_or ( "none" ) ;
411367 match mode {
412- SpecFormatMode :: Concise => {
368+ OutputMode :: Concise => {
413369 format ! (
414370 "collector={}, input={}, uuid={}" ,
415371 self . collector_name, self . input, uuid
416372 )
417373 }
418- SpecFormatMode :: Verbose => {
374+ OutputMode :: Verbose => {
419375 format ! (
420376 "scope={}, collector={}, input=[{}], uuid={}" ,
421377 self . scope_name, self . collector_name, self . input, uuid
@@ -425,12 +381,6 @@ impl CollectOpSpec {
425381 }
426382}
427383
428- impl fmt:: Display for CollectOpSpec {
429- fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
430- write ! ( f, "{}" , self . format( SpecFormatMode :: Concise ) )
431- }
432- }
433-
434384#[ derive( Debug , Clone , Copy , Serialize , Deserialize , PartialEq , Eq ) ]
435385pub enum VectorSimilarityMetric {
436386 CosineSimilarity ,
@@ -485,6 +435,7 @@ impl fmt::Display for IndexOptions {
485435 }
486436}
487437
438+ /// Store data to a given sink.
488439#[ derive( Debug , Clone , Serialize , Deserialize ) ]
489440pub struct ExportOpSpec {
490441 pub collector_name : FieldName ,
@@ -493,29 +444,21 @@ pub struct ExportOpSpec {
493444 pub setup_by_user : bool ,
494445}
495446
496- impl ExportOpSpec {
497- pub fn format ( & self , mode : SpecFormatMode ) -> String {
498- let target_str = match mode {
499- SpecFormatMode :: Concise => self . target . format_concise ( ) ,
500- SpecFormatMode :: Verbose => self . target . format_verbose ( ) ,
501- } ;
447+ impl SpecFormatter for ExportOpSpec {
448+ fn format ( & self , mode : OutputMode ) -> String {
449+ let target_str = self . target . format ( mode) ;
502450 let base = format ! (
503451 "collector={}, target={}, {}" ,
504452 self . collector_name, target_str, self . index_options
505453 ) ;
506454 match mode {
507- SpecFormatMode :: Concise => base,
508- SpecFormatMode :: Verbose => format ! ( "{}, setup_by_user={}" , base, self . setup_by_user) ,
455+ OutputMode :: Concise => base,
456+ OutputMode :: Verbose => format ! ( "{}, setup_by_user={}" , base, self . setup_by_user) ,
509457 }
510458 }
511459}
512460
513- impl fmt:: Display for ExportOpSpec {
514- fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
515- write ! ( f, "{}" , self . format( SpecFormatMode :: Concise ) )
516- }
517- }
518-
461+ /// A reactive operation reacts on given input values.
519462#[ derive( Debug , Clone , Serialize , Deserialize ) ]
520463#[ serde( tag = "action" ) ]
521464pub enum ReactiveOpSpec {
@@ -524,25 +467,19 @@ pub enum ReactiveOpSpec {
524467 Collect ( CollectOpSpec ) ,
525468}
526469
527- impl ReactiveOpSpec {
528- pub fn format ( & self , mode : SpecFormatMode ) -> String {
470+ impl SpecFormatter for ReactiveOpSpec {
471+ fn format ( & self , mode : OutputMode ) -> String {
529472 match self {
530473 ReactiveOpSpec :: Transform ( t) => format ! ( "Transform: {}" , t. format( mode) ) ,
531474 ReactiveOpSpec :: ForEach ( fe) => match mode {
532- SpecFormatMode :: Concise => format ! ( "{}" , fe. get_label( ) ) ,
533- SpecFormatMode :: Verbose => format ! ( "ForEach: {}" , fe. format( mode) ) ,
475+ OutputMode :: Concise => format ! ( "{}" , fe. get_label( ) ) ,
476+ OutputMode :: Verbose => format ! ( "ForEach: {}" , fe. format( mode) ) ,
534477 } ,
535478 ReactiveOpSpec :: Collect ( c) => format ! ( "Collect: {}" , c. format( mode) ) ,
536479 }
537480 }
538481}
539482
540- impl fmt:: Display for ReactiveOpSpec {
541- fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
542- write ! ( f, "{}" , self . format( SpecFormatMode :: Concise ) )
543- }
544- }
545-
546483#[ derive( Debug , Clone , Serialize , Deserialize ) ]
547484pub struct ReactiveOpScope {
548485 pub name : ScopeName ,
@@ -556,6 +493,7 @@ impl fmt::Display for ReactiveOpScope {
556493 }
557494}
558495
496+ /// A flow defines the rule to sync data from given sources to given sinks with given transformations.
559497#[ derive( Debug , Clone , Serialize , Deserialize ) ]
560498pub struct FlowInstanceSpec {
561499 /// Name of the flow instance.
0 commit comments