@@ -197,6 +197,11 @@ def name(self): # pragma: no cover
197197 """friendly name for this step"""
198198 return "Read"
199199
200+ def sensors (self ):
201+ base = super ().sensors ()
202+ base ["commited_at" ] = self .committed_at
203+ return base
204+
200205 @property
201206 def config (self ):
202207 """Additional details for this step"""
@@ -216,6 +221,89 @@ def config(self):
216221 f"{ ' WITH(' + ',' .join (self .parameters .get ('hints' )) + ')' if self .parameters .get ('hints' ) else '' } )"
217222 )
218223
224+ def plan_config (self ) -> dict :
225+ """
226+ Structured configuration for planning/telemetry purposes.
227+
228+ Returns a dict containing:
229+ - files: list of {file_path, rows, bytes}
230+ - selection_pushdown: predicates (simple repr)
231+ - projection_pushdown: list of projected column identities/names
232+ - connector: connector type
233+ - relation: dataset name
234+ """
235+ config = {
236+ "connector" : getattr (self .connector , "__type__" , None ),
237+ "relation" : self .parameters .get ("relation" ),
238+ "files" : [],
239+ "selection_pushdown" : [],
240+ "projection_pushdown" : [],
241+ }
242+
243+ # If a manifest is attached, prefer its file entries
244+ manifest = getattr (self , "manifest" , None ) or self .parameters .get ("manifest" )
245+ pruned = getattr (self , "pruned_files" , None ) or self .parameters .get ("pruned_files" )
246+ if manifest is not None :
247+ # manifest.files contains FileEntry objects
248+ for f in manifest .files :
249+ config ["files" ].append (
250+ {"path" : f .file_path , "rows" : f .record_count , "bytes" : f .uncompressed_size }
251+ )
252+ # If pruning reduced files, filter to pruned list
253+ if pruned :
254+ pruned_set = set (pruned )
255+ config ["files" ] = [ff for ff in config ["files" ] if ff .get ("path" ) in pruned_set ]
256+ elif pruned :
257+ # We only have file paths
258+ for p in pruned :
259+ config ["files" ].append ({"path" : p , "rows" : None , "bytes" : None })
260+
261+ # Selection pushdown: represent predicates simply
262+ try :
263+ config ["selection_pushdown" ] = [str (p ) for p in (self .predicates or [])]
264+ except Exception :
265+ config ["selection_pushdown" ] = []
266+
267+ # Projection pushdown: provide schema index and column name for each projected column
268+ proj = []
269+
270+ schema_columns = getattr (self .schema , "columns" , []) or []
271+ for c in self .columns or []:
272+ # use the column identity (internal identity) as the column_name
273+ identity = c .schema_column .identity
274+ schema_index = None
275+ for idx , sc in enumerate (schema_columns ):
276+ if sc .identity == identity :
277+ schema_index = idx
278+ break
279+ proj .append ({"schema_index" : schema_index , "identity" : identity })
280+
281+ config ["projection_pushdown" ] = proj
282+
283+ # Summary: aggregate totals for files/rows/bytes when available
284+ total_files = len (config ["files" ])
285+ # If any file lacks rows/bytes info, mark totals as None
286+ total_rows = None
287+ total_bytes = None
288+ if total_files == 0 :
289+ total_rows = 0
290+ total_bytes = 0
291+ else :
292+ rows_known = all ((f .get ("rows" ) is not None for f in config ["files" ]))
293+ bytes_known = all ((f .get ("bytes" ) is not None for f in config ["files" ]))
294+ if rows_known :
295+ total_rows = sum ((f .get ("rows" , 0 ) for f in config ["files" ]))
296+ if bytes_known :
297+ total_bytes = sum ((f .get ("bytes" , 0 ) for f in config ["files" ]))
298+
299+ config ["summary" ] = {
300+ "total-files" : total_files ,
301+ "total-rows" : total_rows ,
302+ "total-bytes" : total_bytes ,
303+ }
304+
305+ return config
306+
219307 def execute (self , morsel , ** kwargs ) -> Generator :
220308 """Perform this step, time how long is spent doing work"""
221309 if morsel == EOS :
0 commit comments