@@ -8,6 +8,7 @@ use nemo_physical::{
88} ;
99
1010use crate :: {
11+ error:: Error ,
1112 execution:: {
1213 ExecutionEngine ,
1314 execution_engine:: tracing:: node_query:: {
@@ -18,9 +19,12 @@ use crate::{
1819 unique_variables, valid_tables_plan, variable_translation,
1920 } ,
2021 } ,
21- planning:: normalization:: { atom:: ground:: GroundAtom , program:: NormalizedProgram } ,
22+ planning:: normalization:: {
23+ atom:: ground:: GroundAtom , program:: NormalizedProgram , rule:: NormalizedRule ,
24+ } ,
2225 selection_strategy:: strategy:: RuleSelectionStrategy ,
2326 tracing:: {
27+ error:: TracingError ,
2428 node_query:: {
2529 TableEntriesForTreeNodesQuery , TableEntriesForTreeNodesQueryInner ,
2630 TableEntriesForTreeNodesResponse , TableEntriesForTreeNodesResponseElement ,
@@ -46,9 +50,11 @@ impl<Strategy: RuleSelectionStrategy> ExecutionEngine<Strategy> {
4650 address : TreeAddress ,
4751 predicate : & Tag ,
4852 program : & NormalizedProgram ,
49- ) {
53+ ) -> Result < ( ) , TracingError > {
5054 if !node. queries . is_empty ( ) {
51- let arity = self . predicate_arity ( predicate) . expect ( "invalid predicate" ) ;
55+ let arity = self
56+ . predicate_arity ( predicate)
57+ . expect ( "predicate has been used in the normalized program" ) ;
5258
5359 // We collect facts that are part of the query
5460 // in a light-weight table
@@ -59,21 +65,28 @@ impl<Strategy: RuleSelectionStrategy> ExecutionEngine<Strategy> {
5965 let terms = match query {
6066 TableEntryQuery :: Entry ( row_index) => {
6167 let terms_to_trace: Vec < AnyDataValue > = self
62- . predicate_rows ( predicate)
68+ . table_manager
69+ . row_by_id ( predicate, * row_index)
6370 . await
64- . expect ( "unknown predicate" )
65- . into_iter ( )
66- . flatten ( )
67- . nth ( * row_index)
68- . expect ( "invalid id" ) ;
71+ . ok_or ( TracingError :: InvalidFactId {
72+ predicate : predicate. to_string ( ) ,
73+ id : * row_index,
74+ } ) ?;
6975
7076 terms_to_trace
7177 }
7278 TableEntryQuery :: Query ( query_string) => {
73- let atom = Atom :: parse ( & format ! ( "P({query_string})" ) )
74- . expect ( "invalid query string" ) ;
79+ let atom = Atom :: parse ( & format ! ( "P({query_string})" ) ) . map_err ( |_| {
80+ TracingError :: InvalidFact {
81+ fact : query_string. clone ( ) ,
82+ }
83+ } ) ?;
84+
85+ let ground =
86+ GroundAtom :: try_from ( atom) . map_err ( |_| TracingError :: InvalidFact {
87+ fact : query_string. clone ( ) ,
88+ } ) ?;
7589
76- let ground = GroundAtom :: try_from ( atom) . expect ( "only support ground fact" ) ;
7790 ground. datavalues ( ) . collect :: < Vec < _ > > ( )
7891 }
7992 } ;
@@ -115,9 +128,11 @@ impl<Strategy: RuleSelectionStrategy> ExecutionEngine<Strategy> {
115128 & atom. predicate ( ) ,
116129 program,
117130 ) )
118- . await ;
131+ . await ? ;
119132 }
120133 }
134+
135+ Ok ( ( ) )
121136 }
122137
123138 /// Phase 2 of `trace_node_execute`
@@ -139,12 +154,14 @@ impl<Strategy: RuleSelectionStrategy> ExecutionEngine<Strategy> {
139154 manager. add_discard ( & address, discarded_columns) ;
140155
141156 if let Some ( successor) = & node. next {
142- // True if all children do not have any restrictions
143- let simple_successor = successor. children . iter ( ) . all ( |child| child. is_simple ( ) ) ;
144-
145157 let rule = program. rules ( ) [ successor. rule ] . clone ( ) ;
146158 let order = rule. variable_order ( ) . clone ( ) ;
147159
160+ // True if all children do not have any restrictions
161+ // and there is only one way to derive the body predicates of the current rule
162+ let simple_successor = rule_is_simple ( program, & rule)
163+ && successor. children . iter ( ) . all ( |child| child. is_simple ( ) ) ;
164+
148165 // Any facts derived after this point are not relevant for this node
149166 let next_step = {
150167 self . rule_history [ ..before_step]
@@ -222,7 +239,7 @@ impl<Strategy: RuleSelectionStrategy> ExecutionEngine<Strategy> {
222239 . database_mut ( )
223240 . execute_plan ( plan)
224241 . await
225- . expect ( "execute plan failed " ) ;
242+ . expect ( "plan does not require loading sources " ) ;
226243
227244 if let Some ( id_valid) = id_valid {
228245 if let Some ( result_id) = execution_results. get ( & id_valid) {
@@ -399,7 +416,7 @@ impl<Strategy: RuleSelectionStrategy> ExecutionEngine<Strategy> {
399416 & mut self ,
400417 query : & TableEntriesForTreeNodesQuery ,
401418 program : & NormalizedProgram ,
402- ) -> TraceNodeManager {
419+ ) -> Result < TraceNodeManager , TracingError > {
403420 let mut manager = TraceNodeManager :: default ( ) ;
404421 let address = TreeAddress :: default ( ) ;
405422 let predicate = Tag :: new ( query. predicate . clone ( ) ) ;
@@ -408,7 +425,7 @@ impl<Strategy: RuleSelectionStrategy> ExecutionEngine<Strategy> {
408425 let discarded_columns = Vec :: default ( ) ;
409426
410427 self . trace_node_restriction ( & mut manager, node, address. clone ( ) , & predicate, program)
411- . await ;
428+ . await ? ;
412429
413430 self . trace_node_valid (
414431 & mut manager,
@@ -425,10 +442,10 @@ impl<Strategy: RuleSelectionStrategy> ExecutionEngine<Strategy> {
425442 . trace_node_filter ( & mut manager, node, address. clone ( ) , program)
426443 . await ;
427444
428- manager
445+ Ok ( manager)
429446 }
430447
431- /// Collect the answer to the node query using [TreeTableManager ]
448+ /// Collect the answer to the node query using [TraceNodeManager ]
432449 /// and write it into the prepared [TableEntriesForTreeNodesResponse]
433450 async fn trace_node_answer (
434451 & mut self ,
@@ -469,7 +486,7 @@ impl<Strategy: RuleSelectionStrategy> ExecutionEngine<Strategy> {
469486 . table_manager
470487 . table_row_id ( & Tag :: new ( element. predicate . clone ( ) ) , & row)
471488 . await
472- . expect ( "row should be contained somewhere " ) ;
489+ . expect ( "if a row appears in an answer it must have an id " ) ;
473490
474491 let table_response = TableEntryResponse {
475492 entry_id,
@@ -493,7 +510,7 @@ impl<Strategy: RuleSelectionStrategy> ExecutionEngine<Strategy> {
493510 address : TreeAddress ,
494511 predicate : & Tag ,
495512 program : & NormalizedProgram ,
496- ) {
513+ ) -> Result < ( ) , Error > {
497514 // Collect all (syntactly) possible rules
498515 // that could be triggered by or could trigger
499516 // the predicate assigned to the current node
@@ -564,7 +581,7 @@ impl<Strategy: RuleSelectionStrategy> ExecutionEngine<Strategy> {
564581 & next_predicate,
565582 program,
566583 ) )
567- . await ;
584+ . await ? ;
568585 }
569586
570587 // The content of the negated nodes is just the complete table
@@ -574,15 +591,12 @@ impl<Strategy: RuleSelectionStrategy> ExecutionEngine<Strategy> {
574591 let mut next_address = address. clone ( ) ;
575592 next_address. push ( rule. positive_all ( ) . len ( ) + index) ;
576593
577- let rows = if let Some ( rows) = self
578- . predicate_rows ( & negative_atom. predicate ( ) )
579- . await
580- . expect ( "collect negation rows failed" )
581- {
582- rows. collect :: < Vec < _ > > ( )
583- } else {
584- Vec :: default ( )
585- } ;
594+ let rows =
595+ if let Some ( rows) = self . predicate_rows ( & negative_atom. predicate ( ) ) . await ? {
596+ rows. collect :: < Vec < _ > > ( )
597+ } else {
598+ Vec :: default ( )
599+ } ;
586600
587601 let mut entries = Vec :: default ( ) ;
588602
@@ -591,7 +605,7 @@ impl<Strategy: RuleSelectionStrategy> ExecutionEngine<Strategy> {
591605 . table_manager
592606 . table_row_id ( & negative_atom. predicate ( ) , & row)
593607 . await
594- . expect ( "row should be contained somewhere " ) ;
608+ . expect ( "rows has been filled from tables and therefore must have an id " ) ;
595609
596610 let table_response = TableEntryResponse {
597611 entry_id,
@@ -616,6 +630,8 @@ impl<Strategy: RuleSelectionStrategy> ExecutionEngine<Strategy> {
616630 elements. push ( negation_element) ;
617631 }
618632 }
633+
634+ Ok ( ( ) )
619635 }
620636
621637 /// For a given [TableEntriesForTreeNodesQuery]
@@ -625,7 +641,7 @@ impl<Strategy: RuleSelectionStrategy> ExecutionEngine<Strategy> {
625641 & mut self ,
626642 query : & TableEntriesForTreeNodesQuery ,
627643 program : & NormalizedProgram ,
628- ) -> TableEntriesForTreeNodesResponse {
644+ ) -> Result < TableEntriesForTreeNodesResponse , Error > {
629645 let mut elements = Vec :: < TableEntriesForTreeNodesResponseElement > :: default ( ) ;
630646
631647 self . trace_node_prepare_recursive (
@@ -635,23 +651,33 @@ impl<Strategy: RuleSelectionStrategy> ExecutionEngine<Strategy> {
635651 & Tag :: new ( query. predicate . clone ( ) ) ,
636652 program,
637653 )
638- . await ;
654+ . await ? ;
639655
640- TableEntriesForTreeNodesResponse { elements }
656+ Ok ( TableEntriesForTreeNodesResponse { elements } )
641657 }
642658
643659 /// Evaluate a [TableEntriesForTreeNodesQuery].
644660 pub async fn trace_node (
645661 & mut self ,
646662 query : & TableEntriesForTreeNodesQuery ,
647- ) -> TableEntriesForTreeNodesResponse {
663+ ) -> Result < TableEntriesForTreeNodesResponse , Error > {
648664 let program = self . program . clone ( ) ;
649665
650- let response = self . trace_node_prepare ( query, & program) . await ;
651- let manager = self . trace_node_execute ( query, & program) . await ;
666+ let response = self . trace_node_prepare ( query, & program) . await ? ;
667+ let manager = self . trace_node_execute ( query, & program) . await ? ;
652668
653- self . trace_node_answer ( & manager, response)
669+ let result = self
670+ . trace_node_answer ( & manager, response)
654671 . await
655- . unwrap_or_default ( )
672+ . unwrap_or_default ( ) ;
673+
674+ Ok ( result)
656675 }
657676}
677+
678+ /// Check whether the body atoms of a rule can only be derived in one way.
679+ fn rule_is_simple ( program : & NormalizedProgram , rule : & NormalizedRule ) -> bool {
680+ rule. positive ( )
681+ . iter ( )
682+ . all ( |atom| program. rules_with_head_predicate ( & atom. predicate ( ) ) . count ( ) <= 1 )
683+ }
0 commit comments