-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathr2dc_insert.py
More file actions
190 lines (150 loc) · 6.04 KB
/
r2dc_insert.py
File metadata and controls
190 lines (150 loc) · 6.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
"""
R2 Data Catalog - Insert Operations
Handles INSERT operations for Apache Iceberg tables.
"""
from r2dc_spark_config import get_spark_session
import argparse
import os
def insert_from_sql(spark, sql_file_path):
"""
Executes INSERT statement from a SQL file.
Args:
spark: SparkSession instance
sql_file_path (str): Path to the SQL file containing INSERT statement
Returns:
DataFrame: Result of the insert operation
"""
if not os.path.exists(sql_file_path):
raise FileNotFoundError(f"SQL file not found: {sql_file_path}")
with open(sql_file_path, 'r') as f:
sql_content = f.read().strip()
if not sql_content:
raise ValueError("SQL file is empty")
print(f"Executing SQL from {sql_file_path}:")
print(sql_content)
result = spark.sql(sql_content)
return result
def insert_values(spark, table_name, values):
"""
Inserts values directly into a table.
Args:
spark: SparkSession instance
table_name (str): Fully qualified table name
values (str): Values to insert (e.g., "(1, 'Alice'), (2, 'Bob')")
Returns:
DataFrame: Result of the insert operation
"""
insert_query = f"INSERT INTO {table_name} VALUES {values}"
print(f"Executing: {insert_query}")
result = spark.sql(insert_query)
return result
def insert_from_dataframe(spark, df, table_name, mode="append"):
"""
Inserts data from a DataFrame into an Iceberg table.
Args:
spark: SparkSession instance
df: DataFrame to insert
table_name (str): Fully qualified table name
mode (str): Write mode - 'append', 'overwrite', 'error', 'ignore'
Returns:
None
"""
print(f"Inserting {df.count()} rows into {table_name} with mode '{mode}'")
df.writeTo(table_name).using("iceberg").mode(mode).save()
print("Insert completed successfully")
def insert_from_csv(spark, table_name, csv_path, header=True, mode="append"):
"""
Reads CSV file and inserts data into an Iceberg table.
Args:
spark: SparkSession instance
table_name (str): Fully qualified table name
csv_path (str): Path to CSV file
header (bool): Whether CSV has header row
mode (str): Write mode - 'append', 'overwrite', 'error', 'ignore'
Returns:
None
"""
if not os.path.exists(csv_path):
raise FileNotFoundError(f"CSV file not found: {csv_path}")
print(f"Reading CSV file: {csv_path}")
df = spark.read.option("header", str(header).lower()).csv(csv_path)
print(f"CSV Schema:")
df.printSchema()
insert_from_dataframe(spark, df, table_name, mode)
def insert_from_parquet(spark, table_name, parquet_path, mode="append"):
"""
Reads Parquet file and inserts data into an Iceberg table.
Args:
spark: SparkSession instance
table_name (str): Fully qualified table name
parquet_path (str): Path to Parquet file
mode (str): Write mode - 'append', 'overwrite', 'error', 'ignore'
Returns:
None
"""
if not os.path.exists(parquet_path):
raise FileNotFoundError(f"Parquet file not found: {parquet_path}")
print(f"Reading Parquet file: {parquet_path}")
df = spark.read.parquet(parquet_path)
print(f"Parquet Schema:")
df.printSchema()
insert_from_dataframe(spark, df, table_name, mode)
def insert_select(spark, target_table, source_query):
"""
Inserts data using INSERT INTO ... SELECT.
Args:
spark: SparkSession instance
target_table (str): Target table name
source_query (str): SELECT query to get source data
Returns:
DataFrame: Result of the insert operation
"""
insert_query = f"INSERT INTO {target_table} {source_query}"
print(f"Executing: {insert_query}")
result = spark.sql(insert_query)
return result
def show_table_data(spark, table_name, limit=10):
"""
Shows sample data from a table.
Args:
spark: SparkSession instance
table_name (str): Fully qualified table name
limit (int): Number of rows to display
Returns:
DataFrame: Sample data
"""
return spark.sql(f"SELECT * FROM {table_name} LIMIT {limit}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Insert data into Iceberg tables')
parser.add_argument('--sql-file', help='Path to SQL file containing INSERT statement')
parser.add_argument('--csv-file', help='Path to CSV file to insert')
parser.add_argument('--parquet-file', help='Path to Parquet file to insert')
parser.add_argument('--table', help='Target table name (required with --csv-file or --parquet-file)')
parser.add_argument('--mode', default='append', choices=['append', 'overwrite', 'error', 'ignore'],
help='Write mode (default: append)')
parser.add_argument('--show', help='Show sample data from table')
parser.add_argument('--limit', type=int, default=10, help='Number of rows to show (default: 10)')
args = parser.parse_args()
spark = get_spark_session("R2DataCatalog-Insert")
try:
if args.sql_file:
result = insert_from_sql(spark, args.sql_file)
print("Insert operation completed successfully")
if args.csv_file:
if not args.table:
raise ValueError("--table is required when using --csv-file")
insert_from_csv(spark, args.table, args.csv_file, mode=args.mode)
if args.parquet_file:
if not args.table:
raise ValueError("--table is required when using --parquet-file")
insert_from_parquet(spark, args.table, args.parquet_file, mode=args.mode)
if args.show:
print(f"\nSample data from '{args.show}' (limit {args.limit}):")
show_table_data(spark, args.show, args.limit).show(truncate=False)
if not any([args.sql_file, args.csv_file, args.parquet_file, args.show]):
parser.print_help()
except Exception as e:
print(f"Error during insert operation: {e}")
raise
finally:
spark.stop()