@@ -62,7 +62,7 @@ use tracing::{Instrument, info_span, warn};
6262use crate :: AdapterError ;
6363use crate :: catalog:: state:: LocalExpressionCache ;
6464use crate :: catalog:: { BuiltinTableUpdate , CatalogState } ;
65- use crate :: coord:: controller_commands :: parsed_state_updates:: { self , ParsedStateUpdate } ;
65+ use crate :: coord:: catalog_implications :: parsed_state_updates:: { self , ParsedStateUpdate } ;
6666use crate :: util:: index_sql;
6767
6868/// Maintains the state of retractions while applying catalog state updates for a single timestamp.
@@ -108,7 +108,7 @@ impl CatalogState {
108108 Vec < ParsedStateUpdate > ,
109109 ) {
110110 let mut builtin_table_updates = Vec :: with_capacity ( updates. len ( ) ) ;
111- let mut controller_state_updates = Vec :: with_capacity ( updates. len ( ) ) ;
111+ let mut catalog_updates = Vec :: with_capacity ( updates. len ( ) ) ;
112112 let updates = sort_updates ( updates) ;
113113
114114 let mut groups: Vec < Vec < _ > > = Vec :: new ( ) ;
@@ -121,29 +121,28 @@ impl CatalogState {
121121
122122 for update in updates {
123123 let next_apply_state = BootstrapApplyState :: new ( update) ;
124- let ( next_apply_state, ( builtin_table_update, controller_state_update) ) =
125- apply_state
126- . step (
127- next_apply_state,
128- self ,
129- & mut retractions,
130- local_expression_cache,
131- )
132- . await ;
124+ let ( next_apply_state, ( builtin_table_update, catalog_update) ) = apply_state
125+ . step (
126+ next_apply_state,
127+ self ,
128+ & mut retractions,
129+ local_expression_cache,
130+ )
131+ . await ;
133132 apply_state = next_apply_state;
134133 builtin_table_updates. extend ( builtin_table_update) ;
135- controller_state_updates . extend ( controller_state_update ) ;
134+ catalog_updates . extend ( catalog_update ) ;
136135 }
137136
138137 // Apply remaining state.
139- let ( builtin_table_update, controller_state_update ) = apply_state
138+ let ( builtin_table_update, catalog_update ) = apply_state
140139 . apply ( self , & mut retractions, local_expression_cache)
141140 . await ;
142141 builtin_table_updates. extend ( builtin_table_update) ;
143- controller_state_updates . extend ( controller_state_update ) ;
142+ catalog_updates . extend ( catalog_update ) ;
144143 }
145144
146- ( builtin_table_updates, controller_state_updates )
145+ ( builtin_table_updates, catalog_updates )
147146 }
148147
149148 /// Update in-memory catalog state from a list of updates made to the durable catalog state.
@@ -161,7 +160,7 @@ impl CatalogState {
161160 CatalogError ,
162161 > {
163162 let mut builtin_table_updates = Vec :: with_capacity ( updates. len ( ) ) ;
164- let mut controller_state_updates = Vec :: with_capacity ( updates. len ( ) ) ;
163+ let mut catalog_updates = Vec :: with_capacity ( updates. len ( ) ) ;
165164
166165 // First, consolidate updates. The code that applies parsed state
167166 // updates _requires_ that the given updates are consolidated. There
@@ -175,16 +174,16 @@ impl CatalogState {
175174
176175 for ( _, updates) in & updates. into_iter ( ) . chunk_by ( |update| update. ts ) {
177176 let mut retractions = InProgressRetractions :: default ( ) ;
178- let ( builtin_table_update, parsed_catalog_updates_op ) = self . apply_updates_inner (
177+ let ( builtin_table_update, catalog_updates_op ) = self . apply_updates_inner (
179178 updates. collect ( ) ,
180179 & mut retractions,
181180 & mut LocalExpressionCache :: Closed ,
182181 ) ?;
183182 builtin_table_updates. extend ( builtin_table_update) ;
184- controller_state_updates . extend ( parsed_catalog_updates_op ) ;
183+ catalog_updates . extend ( catalog_updates_op ) ;
185184 }
186185
187- Ok ( ( builtin_table_updates, controller_state_updates ) )
186+ Ok ( ( builtin_table_updates, catalog_updates ) )
188187 }
189188
190189 /// It can happen that the sequencing logic creates "fluctuating" updates
@@ -236,7 +235,7 @@ impl CatalogState {
236235 let mut update_system_config = false ;
237236
238237 let mut builtin_table_updates = Vec :: with_capacity ( updates. len ( ) ) ;
239- let mut controller_state_updates = Vec :: new ( ) ;
238+ let mut catalog_updates = Vec :: new ( ) ;
240239
241240 for state_update in updates {
242241 if matches ! ( state_update. kind, StateUpdateKind :: SystemConfiguration ( _) ) {
@@ -245,14 +244,13 @@ impl CatalogState {
245244
246245 match state_update. diff {
247246 StateDiff :: Retraction => {
248- // We want the parsed controller state updates to match the
249- // state of the catalog _before_ applying a retraction. So
250- // that we can still have useful in-memory state to work
251- // with.
247+ // We want the parsed catalog updates to match the state of
248+ // the catalog _before_ applying a retraction. So that we can
249+ // still have useful in-memory state to work with.
252250 if let Some ( update) =
253251 parsed_state_updates:: parse_state_update ( self , state_update. clone ( ) )
254252 {
255- controller_state_updates . push ( update) ;
253+ catalog_updates . push ( update) ;
256254 }
257255
258256 // We want the builtin table retraction to match the state of the catalog
@@ -283,12 +281,12 @@ impl CatalogState {
283281 state_update. diff ,
284282 ) ) ;
285283
286- // We want the parsed controller state updates to match the
287- // state of the catalog _after_ applying an addition.
284+ // We want the parsed catalog updates to match the state of
285+ // the catalog _after_ applying an addition.
288286 if let Some ( update) =
289287 parsed_state_updates:: parse_state_update ( self , state_update. clone ( ) )
290288 {
291- controller_state_updates . push ( update) ;
289+ catalog_updates . push ( update) ;
292290 }
293291 }
294292 }
@@ -298,7 +296,7 @@ impl CatalogState {
298296 self . system_configuration . dyncfg_updates ( ) ;
299297 }
300298
301- Ok ( ( builtin_table_updates, controller_state_updates ) )
299+ Ok ( ( builtin_table_updates, catalog_updates ) )
302300 }
303301
304302 #[ instrument( level = "debug" ) ]
0 commit comments