-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathr2dc_create.py
More file actions
166 lines (130 loc) · 5.24 KB
/
r2dc_create.py
File metadata and controls
166 lines (130 loc) · 5.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
"""
R2 Data Catalog - Create Operations
Handles CREATE operations for namespaces and tables in Apache Iceberg.
"""
from r2dc_spark_config import get_spark_session
import argparse
import os
def create_namespace(spark, namespace_name, properties=None):
"""
Creates a new namespace in the catalog.
Args:
spark: SparkSession instance
namespace_name (str): Name of the namespace to create
properties (dict): Optional properties for the namespace
Returns:
DataFrame: Result of the create namespace operation
"""
if properties:
props_str = ", ".join([f"'{k}' = '{v}'" for k, v in properties.items()])
create_query = f"CREATE NAMESPACE IF NOT EXISTS {namespace_name} WITH PROPERTIES ({props_str})"
else:
create_query = f"CREATE NAMESPACE IF NOT EXISTS {namespace_name}"
print(f"Executing: {create_query}")
result = spark.sql(create_query)
return result
def create_table_from_sql(spark, sql_file_path):
"""
Creates a table by reading a CREATE TABLE statement from a SQL file.
Args:
spark: SparkSession instance
sql_file_path (str): Path to the SQL file containing CREATE TABLE statement
Returns:
DataFrame: Result of the create table operation
"""
if not os.path.exists(sql_file_path):
raise FileNotFoundError(f"SQL file not found: {sql_file_path}")
with open(sql_file_path, 'r') as f:
sql_content = f.read().strip()
if not sql_content:
raise ValueError("SQL file is empty")
print(f"Executing SQL from {sql_file_path}:")
print(sql_content)
result = spark.sql(sql_content)
return result
def create_table_direct(spark, table_name, schema_definition, partition_by=None, properties=None):
"""
Creates a table directly using provided schema definition.
Args:
spark: SparkSession instance
table_name (str): Fully qualified table name (e.g., 'namespace.table')
schema_definition (str): Column definitions (e.g., 'id INT, name STRING, created_at TIMESTAMP')
partition_by (list): Optional list of columns to partition by
properties (dict): Optional table properties
Returns:
DataFrame: Result of the create table operation
"""
create_query = f"CREATE TABLE IF NOT EXISTS {table_name} ({schema_definition})"
if partition_by:
partition_clause = ", ".join(partition_by)
create_query += f" PARTITIONED BY ({partition_clause})"
if properties:
props_str = ", ".join([f"'{k}' = '{v}'" for k, v in properties.items()])
create_query += f" TBLPROPERTIES ({props_str})"
print(f"Executing: {create_query}")
result = spark.sql(create_query)
return result
def show_namespaces(spark):
"""
Lists all namespaces in the catalog.
Args:
spark: SparkSession instance
Returns:
DataFrame: List of namespaces
"""
return spark.sql("SHOW NAMESPACES")
def show_tables(spark, namespace=None):
"""
Lists all tables in a namespace.
Args:
spark: SparkSession instance
namespace (str): Optional namespace name
Returns:
DataFrame: List of tables
"""
if namespace:
return spark.sql(f"SHOW TABLES IN {namespace}")
else:
return spark.sql("SHOW TABLES")
def describe_table(spark, table_name):
"""
Describes a table's schema.
Args:
spark: SparkSession instance
table_name (str): Fully qualified table name
Returns:
DataFrame: Table schema description
"""
return spark.sql(f"DESCRIBE EXTENDED {table_name}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Create namespaces and tables in Iceberg catalog')
parser.add_argument('--namespace', help='Create a namespace with this name')
parser.add_argument('--sql-file', help='Path to SQL file containing CREATE TABLE statement')
parser.add_argument('--list-namespaces', action='store_true', help='List all namespaces')
parser.add_argument('--list-tables', help='List tables in specified namespace')
parser.add_argument('--describe', help='Describe a table (provide table name)')
args = parser.parse_args()
spark = get_spark_session("R2DataCatalog-Create")
try:
if args.namespace:
result = create_namespace(spark, args.namespace)
print(f"Namespace '{args.namespace}' created successfully")
if args.sql_file:
result = create_table_from_sql(spark, args.sql_file)
print("Table created successfully from SQL file")
if args.list_namespaces:
print("\nAvailable namespaces:")
show_namespaces(spark).show(truncate=False)
if args.list_tables:
print(f"\nTables in namespace '{args.list_tables}':")
show_tables(spark, args.list_tables).show(truncate=False)
if args.describe:
print(f"\nTable description for '{args.describe}':")
describe_table(spark, args.describe).show(truncate=False)
if not any([args.namespace, args.sql_file, args.list_namespaces, args.list_tables, args.describe]):
parser.print_help()
except Exception as e:
print(f"Error during create operation: {e}")
raise
finally:
spark.stop()