11use crate :: errors:: DatabaseError ;
2- use crate :: execution:: { build_read, Executor , WriteExecutor } ;
2+ use crate :: execution:: { build_read, spawn_executor , Executor , WriteExecutor } ;
33use crate :: planner:: LogicalPlan ;
44use crate :: storage:: { StatisticsMetaCache , TableCache , ViewCache } ;
55use crate :: types:: index:: { Index , IndexType } ;
@@ -9,9 +9,6 @@ use crate::types::value::DataValue;
99use crate :: {
1010 planner:: operator:: alter_table:: add_column:: AddColumnOperator , storage:: Transaction , throw,
1111} ;
12- use std:: ops:: Coroutine ;
13- use std:: ops:: CoroutineState ;
14- use std:: pin:: Pin ;
1512
1613pub struct AddColumn {
1714 op : AddColumnOperator ,
@@ -30,73 +27,83 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for AddColumn {
3027 cache : ( & ' a TableCache , & ' a ViewCache , & ' a StatisticsMetaCache ) ,
3128 transaction : * mut T ,
3229 ) -> Executor < ' a > {
33- Box :: new (
34- #[ coroutine]
35- move || {
36- let AddColumnOperator {
37- table_name,
38- column,
39- if_not_exists,
40- } = & self . op ;
30+ spawn_executor ( move |co| async move {
31+ let AddColumnOperator {
32+ table_name,
33+ column,
34+ if_not_exists,
35+ } = & self . op ;
4136
42- let mut unique_values = column. desc ( ) . is_unique ( ) . then ( Vec :: new) ;
43- let mut tuples = Vec :: new ( ) ;
44- let schema = self . input . output_schema ( ) ;
45- let mut types = Vec :: with_capacity ( schema. len ( ) + 1 ) ;
37+ let mut unique_values = column. desc ( ) . is_unique ( ) . then ( Vec :: new) ;
38+ let mut tuples = Vec :: new ( ) ;
39+ let schema = self . input . output_schema ( ) ;
40+ let mut types = Vec :: with_capacity ( schema. len ( ) + 1 ) ;
4641
47- for column_ref in schema. iter ( ) {
48- types. push ( column_ref. datatype ( ) . clone ( ) ) ;
49- }
50- types. push ( column. datatype ( ) . clone ( ) ) ;
42+ for column_ref in schema. iter ( ) {
43+ types. push ( column_ref. datatype ( ) . clone ( ) ) ;
44+ }
45+ types. push ( column. datatype ( ) . clone ( ) ) ;
5146
52- let mut coroutine = build_read ( self . input , cache, transaction) ;
47+ let mut coroutine = build_read ( self . input , cache, transaction) ;
5348
54- while let CoroutineState :: Yielded ( tuple) = Pin :: new ( & mut coroutine) . resume ( ( ) ) {
55- let mut tuple: Tuple = throw ! ( tuple) ;
49+ while let Some ( tuple) = coroutine. next ( ) {
50+ let mut tuple: Tuple = throw ! ( co , tuple) ;
5651
57- if let Some ( value) = throw ! ( column. default_value( ) ) {
58- if let Some ( unique_values) = & mut unique_values {
59- unique_values. push ( (
60- throw ! ( tuple. pk. clone( ) . ok_or( DatabaseError :: PrimaryKeyNotFound ) ) ,
61- value. clone ( ) ,
62- ) ) ;
63- }
64- tuple. values . push ( value) ;
65- } else {
66- tuple. values . push ( DataValue :: Null ) ;
52+ if let Some ( value) = throw ! ( co, column. default_value( ) ) {
53+ if let Some ( unique_values) = & mut unique_values {
54+ unique_values. push ( (
55+ throw ! (
56+ co,
57+ tuple. pk. clone( ) . ok_or( DatabaseError :: PrimaryKeyNotFound )
58+ ) ,
59+ value. clone ( ) ,
60+ ) ) ;
6761 }
68- tuples. push ( tuple) ;
62+ tuple. values . push ( value) ;
63+ } else {
64+ tuple. values . push ( DataValue :: Null ) ;
6965 }
70- drop ( coroutine) ;
66+ tuples. push ( tuple) ;
67+ }
68+ drop ( coroutine) ;
7169
72- for tuple in tuples {
73- throw ! ( unsafe { & mut ( * transaction) }
74- . append_tuple( table_name, tuple, & types, true ) ) ;
75- }
76- let col_id = throw ! ( unsafe { & mut ( * transaction) } . add_column(
70+ for tuple in tuples {
71+ throw ! (
72+ co,
73+ unsafe { & mut ( * transaction) } . append_tuple( table_name, tuple, & types, true )
74+ ) ;
75+ }
76+ let col_id = throw ! (
77+ co,
78+ unsafe { & mut ( * transaction) } . add_column(
7779 cache. 0 ,
7880 table_name,
7981 column,
8082 * if_not_exists
81- ) ) ;
83+ )
84+ ) ;
8285
83- // Unique Index
84- if let ( Some ( unique_values) , Some ( unique_meta) ) = (
85- unique_values,
86- throw ! ( unsafe { & mut ( * transaction) } . table( cache. 0 , table_name. clone( ) ) )
87- . and_then ( |table| table. get_unique_index ( & col_id) )
88- . cloned ( ) ,
89- ) {
90- for ( tuple_id, value) in unique_values {
91- let index = Index :: new ( unique_meta. id , & value, IndexType :: Unique ) ;
92- throw ! (
93- unsafe { & mut ( * transaction) } . add_index( table_name, index, & tuple_id)
94- ) ;
95- }
86+ // Unique Index
87+ if let ( Some ( unique_values) , Some ( unique_meta) ) = (
88+ unique_values,
89+ throw ! (
90+ co,
91+ unsafe { & mut ( * transaction) } . table( cache. 0 , table_name. clone( ) )
92+ )
93+ . and_then ( |table| table. get_unique_index ( & col_id) )
94+ . cloned ( ) ,
95+ ) {
96+ for ( tuple_id, value) in unique_values {
97+ let index = Index :: new ( unique_meta. id , & value, IndexType :: Unique ) ;
98+ throw ! (
99+ co,
100+ unsafe { & mut ( * transaction) } . add_index( table_name, index, & tuple_id)
101+ ) ;
96102 }
103+ }
97104
98- yield Ok ( TupleBuilder :: build_result ( "1" . to_string ( ) ) ) ;
99- } ,
100- )
105+ co . yield_ ( Ok ( TupleBuilder :: build_result ( "1" . to_string ( ) ) ) )
106+ . await ;
107+ } )
101108 }
102109}
0 commit comments