@@ -143,6 +143,33 @@ impl KafkaPostProcessPlanner {
143143 }
144144 }
145145
146+ /// Compares schemas for equality, including metadata, except that physical_schema is allowed to
147+ /// have non-nullable versions of the target schema's field. This function is defined this way
148+ /// (instead of some perhaps more generalizable way) because it conservatively replaces an
149+ /// equality comparison.
150+ fn is_compatible_schema ( target_schema : & Schema , physical_schema : & Schema ) -> bool {
151+ if target_schema. metadata != physical_schema. metadata
152+ || target_schema. fields . len ( ) != physical_schema. fields . len ( )
153+ {
154+ return false ;
155+ }
156+ for ( target_field, physical_field) in target_schema
157+ . fields
158+ . iter ( )
159+ . zip ( physical_schema. fields . iter ( ) )
160+ {
161+ // See the >= there on is_nullable.
162+ if !( target_field. name ( ) == physical_field. name ( )
163+ && target_field. data_type ( ) == physical_field. data_type ( )
164+ && target_field. is_nullable ( ) >= physical_field. is_nullable ( )
165+ && target_field. metadata ( ) == physical_field. metadata ( ) )
166+ {
167+ return false ;
168+ }
169+ }
170+ return true ;
171+ }
172+
146173 pub async fn build (
147174 & self ,
148175 select_statement : String ,
@@ -169,7 +196,7 @@ impl KafkaPostProcessPlanner {
169196 let ( projection_plan, filter_plan) = self
170197 . make_projection_and_filter_physical_plans ( & logical_plan)
171198 . await ?;
172- if target_schema != projection_plan. schema ( ) {
199+ if ! Self :: is_compatible_schema ( target_schema. as_ref ( ) , projection_plan. schema ( ) . as_ref ( ) ) {
173200 return Err ( CubeError :: user ( format ! (
174201 "Table schema: {:?} don't match select_statement result schema: {:?}" ,
175202 target_schema,
0 commit comments