@@ -15,6 +15,11 @@ def generate_test_row(number, start_range):
1515 doubleCol = float (number / 3.14 )
1616 )
1717
18+ def drop_if_exists (instance , table_name ):
19+ table = instance .table (table_id = table_name )
20+ if table .exists ():
21+ print (f"Deleting existing table: { table_name } " )
22+ table .delete ()
1823
1924def generate_table_id (test_name ):
2025 return f"cbt-{ test_name } -{ uuid .uuid4 ().hex [:20 ]} "
@@ -92,6 +97,12 @@ def delete_bigtable_table(table_name, admin_client):
9297project_id = "gcp-open-lineage-testing"
9398instance_id = "openlineage-test"
9499
100+ client = bigtable .Client (project = project_id , admin = True )
101+ instance = client .instance (instance_id )
102+
103+ drop_if_exists (instance , input_table )
104+ drop_if_exists (instance , output_table )
105+
95106write_dataframe_to_bigtable (test_df , raw_basic_catalog , project_id , instance_id , True )
96107
97108read_df = read_dataframe_from_bigtable (spark , raw_basic_catalog , project_id , instance_id )
@@ -113,11 +124,7 @@ def delete_bigtable_table(table_name, admin_client):
113124
114125spark .stop ()
115126
116- #Cleanup after the run
117-
118- client = bigtable .Client (project = project_id , admin = True )
119-
120- instance = client .instance (instance_id )
127+ # Cleanup after the run
121128
122129bt_table1 = instance .table (table_id = input_table )
123130bt_table2 = instance .table (table_id = output_table )
0 commit comments