66from pyspark .sql import DataFrame
77from pyspark .sql .types import StringType , StructField , StructType
88
9- from databricks .labs .ucx .config import InventoryConfig
109from databricks .labs .ucx .inventory .types import (
1110 AclItemsContainer ,
1211 LogicalObjectType ,
1817logger = logging .getLogger (__name__ )
1918
2019
21- class InventoryTableManager (SparkMixin ):
22- def __init__ (self , config : InventoryConfig , ws : WorkspaceClient ):
20+ class PermissionsInventoryTable (SparkMixin ):
21+ def __init__ (self , inventory_database : str , ws : WorkspaceClient ):
2322 super ().__init__ (ws )
24- self .config = config
23+ self ._table = f"hive_metastore. { inventory_database } .permissions"
2524
2625 @property
2726 def _table_schema (self ) -> StructType :
@@ -35,26 +34,25 @@ def _table_schema(self) -> StructType:
3534 )
3635
3736 @property
38- def _table (self ) -> DataFrame :
39- assert self .config .table , "Inventory table name is not set"
40- return self .spark .table (self .config .table .to_spark ())
37+ def _df (self ) -> DataFrame :
38+ return self .spark .table (self ._table )
4139
4240 def cleanup (self ):
43- logger .info (f"Cleaning up inventory table { self .config . table } " )
44- self .spark .sql (f"DROP TABLE IF EXISTS { self .config . table . to_spark () } " )
41+ logger .info (f"Cleaning up inventory table { self ._table } " )
42+ self .spark .sql (f"DROP TABLE IF EXISTS { self ._table } " )
4543 logger .info ("Inventory table cleanup complete" )
4644
4745 def save (self , items : list [PermissionsInventoryItem ]):
4846 # TODO: update instead of append
49- logger .info (f"Saving { len (items )} items to inventory table { self .config . table } " )
47+ logger .info (f"Saving { len (items )} items to inventory table { self ._table } " )
5048 serialized_items = pd .DataFrame ([item .as_dict () for item in items ])
5149 df = self .spark .createDataFrame (serialized_items , schema = self ._table_schema )
52- df .write .mode ("append" ).format ("delta" ).saveAsTable (self .config . table . to_spark () )
50+ df .write .mode ("append" ).format ("delta" ).saveAsTable (self ._table )
5351 logger .info ("Successfully saved the items to inventory table" )
5452
5553 def load_all (self ) -> list [PermissionsInventoryItem ]:
56- logger .info (f"Loading inventory table { self .config . table } " )
57- df = self ._table .toPandas ()
54+ logger .info (f"Loading inventory table { self ._table } " )
55+ df = self ._df .toPandas ()
5856
5957 logger .info ("Successfully loaded the inventory table" )
6058 return PermissionsInventoryItem .from_pandas (df )
@@ -78,8 +76,8 @@ def _is_item_relevant_to_groups(item: PermissionsInventoryItem, groups: list[str
7876 raise NotImplementedError (msg )
7977
8078 def load_for_groups (self , groups : list [str ]) -> list [PermissionsInventoryItem ]:
81- logger .info (f"Loading inventory table { self .config . table } and filtering it to relevant groups" )
82- df = self ._table .toPandas ()
79+ logger .info (f"Loading inventory table { self ._table } and filtering it to relevant groups" )
80+ df = self ._df .toPandas ()
8381 all_items = PermissionsInventoryItem .from_pandas (df )
8482 filtered_items = [item for item in all_items if self ._is_item_relevant_to_groups (item , groups )]
8583 logger .info (f"Found { len (filtered_items )} items relevant to the groups among { len (all_items )} items" )
0 commit comments