11import csv
22import os
33import random
4+ import sys
45
56from faker import Faker
67from probes .logging_config import initialize_logger
@@ -24,21 +25,25 @@ def generate_random_data(num_records: int, file_path: str) -> str:
2425 Returns:
2526 str: File path to CSV file
2627 """
27- fake = Faker ()
28- with open (file_path , mode = "w" , newline = "" , encoding = "utf-8" ) as csvfile :
29- writer = csv .writer (csvfile , quoting = csv .QUOTE_ALL )
30- writer .writerow (["id" , "name" , "email" , "address" ])
31- for i in range (1 , num_records + 1 ):
32- writer .writerow ([i , fake .name (), fake .email (), fake .address ()])
33- with open (file_path , newline = "" , encoding = "utf-8" ) as csvfile :
34- reader = csv .reader (csvfile )
35- rows = list (reader )
36- # Subtract 1 for the header row
37- actual_records = len (rows ) - 1
38- assert actual_records == num_records , logger .error (
39- f"Expected { num_records } records, but found { actual_records } ."
40- )
41- return file_path
28+ try :
29+ fake = Faker ()
30+ with open (file_path , mode = "w" , newline = "" , encoding = "utf-8" ) as csvfile :
31+ writer = csv .writer (csvfile , quoting = csv .QUOTE_ALL )
32+ writer .writerow (["id" , "name" , "email" , "address" ])
33+ for i in range (1 , num_records + 1 ):
34+ writer .writerow ([i , fake .name (), fake .email (), fake .address ()])
35+ with open (file_path , newline = "" , encoding = "utf-8" ) as csvfile :
36+ reader = csv .reader (csvfile )
37+ rows = list (reader )
38+ # Subtract 1 for the header row
39+ actual_records = len (rows ) - 1
40+ assert actual_records == num_records , logger .error (
41+ f"Expected { num_records } records, but found { actual_records } ."
42+ )
43+ return file_path
44+ except Exception as e :
45+ logger .error (f"Error generating random data: { e } " )
46+ sys .exit (1 )
4247
4348
4449def create_data_table (cursor : snowflake .connector .cursor .SnowflakeCursor ) -> str :
@@ -48,20 +53,26 @@ def create_data_table(cursor: snowflake.connector.cursor.SnowflakeCursor) -> str
4853 Returns:
4954 str: The name of the created table.
5055 """
51- table_name = random_string (7 , "test_data_" )
52- create_table_query = f"""
53- CREATE OR REPLACE TABLE { table_name } (
54- id INT,
55- name STRING,
56- email STRING,
57- address STRING
58- );
59- """
60- cursor .execute (create_table_query )
61- if cursor .fetchone ():
62- print ({"created_table" : True })
63- else :
56+ try :
57+ table_name = random_string (7 , "test_data_" )
58+ create_table_query = f"""
59+ CREATE OR REPLACE TABLE { table_name } (
60+ id INT,
61+ name STRING,
62+ email STRING,
63+ address STRING
64+ );
65+ """
66+ cursor .execute (create_table_query )
67+ if cursor .fetchone ():
68+ print ({"created_table" : True })
69+ else :
70+ print ({"created_table" : False })
71+ sys .exit (1 )
72+ except Exception as e :
73+ logger .error (f"Error creating table: { e } " )
6474 print ({"created_table" : False })
75+ sys .exit (1 )
6576 return table_name
6677
6778
@@ -72,15 +83,21 @@ def create_data_stage(cursor: snowflake.connector.cursor.SnowflakeCursor) -> str
7283 Returns:
7384 str: The name of the created stage.
7485 """
75- stage_name = random_string (7 , "test_data_stage_" )
76- create_stage_query = f"CREATE OR REPLACE STAGE { stage_name } ;"
86+ try :
87+ stage_name = random_string (7 , "test_data_stage_" )
88+ create_stage_query = f"CREATE OR REPLACE STAGE { stage_name } ;"
7789
78- cursor .execute (create_stage_query )
79- if cursor .fetchone ():
80- print ({"created_stage" : True })
81- else :
90+ cursor .execute (create_stage_query )
91+ if cursor .fetchone ():
92+ print ({"created_stage" : True })
93+ else :
94+ print ({"created_stage" : False })
95+ sys .exit (1 )
96+ return stage_name
97+ except Exception as e :
8298 print ({"created_stage" : False })
83- return stage_name
99+ logger .error (f"Error creating stage: { e } " )
100+ sys .exit (1 )
84101
85102
86103def copy_into_table_from_stage (
@@ -94,18 +111,24 @@ def copy_into_table_from_stage(
94111 stage_name (str): The name of the stage from which data will be copied.
95112 cur (snowflake.connector.cursor.SnowflakeCursor): The cursor to execute the SQL command.
96113 """
97- cur .execute (
98- f"""
99- COPY INTO { table_name }
100- FROM @{ stage_name }
101- FILE_FORMAT = (TYPE = CSV FIELD_OPTIONALLY_ENCLOSED_BY = '"' SKIP_HEADER = 1);"""
102- )
103-
104- # Check if the data was loaded successfully
105- if cur .fetchall ()[0 ][1 ] == "LOADED" :
106- print ({"copied_data_from_stage_into_table" : True })
107- else :
114+ try :
115+ cur .execute (
116+ f"""
117+ COPY INTO { table_name }
118+ FROM @{ stage_name }
119+ FILE_FORMAT = (TYPE = CSV FIELD_OPTIONALLY_ENCLOSED_BY = '"' SKIP_HEADER = 1);"""
120+ )
121+
122+ # Check if the data was loaded successfully
123+ if cur .fetchall ()[0 ][1 ] == "LOADED" :
124+ print ({"copied_data_from_stage_into_table" : True })
125+ else :
126+ print ({"copied_data_from_stage_into_table" : False })
127+ sys .exit (1 )
128+ except Exception as e :
129+ logger .error (f"Error copying data from stage to table: { e } " )
108130 print ({"copied_data_from_stage_into_table" : False })
131+ sys .exit (1 )
109132
110133
111134def put_file_to_stage (
@@ -119,25 +142,37 @@ def put_file_to_stage(
119142 stage_name (str): The name of the stage where the file will be uploaded.
120143 cur (snowflake.connector.cursor.SnowflakeCursor): The cursor to execute the SQL command.
121144 """
122- response = cur .execute (
123- f"PUT file://{ file_name } @{ stage_name } AUTO_COMPRESS=TRUE"
124- ).fetchall ()
125- logger .error (response )
126-
127- if response [0 ][6 ] == "UPLOADED" :
128- print ({"PUT_operation" : True })
129- else :
145+ try :
146+ response = cur .execute (
147+ f"PUT file://{ file_name } @{ stage_name } AUTO_COMPRESS=TRUE"
148+ ).fetchall ()
149+ logger .error (response )
150+
151+ if response [0 ][6 ] == "UPLOADED" :
152+ print ({"PUT_operation" : True })
153+ else :
154+ print ({"PUT_operation" : False })
155+ sys .exit (1 )
156+ except Exception as e :
157+ logger .error (f"Error uploading file to stage: { e } " )
130158 print ({"PUT_operation" : False })
159+ sys .exit (1 )
131160
132161
133162def count_data_from_table (
134163 table_name : str , num_records : int , cur : snowflake .connector .cursor .SnowflakeCursor
135164):
136- count = cur .execute (f"SELECT COUNT(*) FROM { table_name } " ).fetchone ()[0 ]
137- if count == num_records :
138- print ({"data_transferred_completely" : True })
139- else :
165+ try :
166+ count = cur .execute (f"SELECT COUNT(*) FROM { table_name } " ).fetchone ()[0 ]
167+ if count == num_records :
168+ print ({"data_transferred_completely" : True })
169+ else :
170+ print ({"data_transferred_completely" : False })
171+ sys .exit (1 )
172+ except Exception as e :
173+ logger .error (f"Error counting data from table: { e } " )
140174 print ({"data_transferred_completely" : False })
175+ sys .exit (1 )
141176
142177
143178def compare_fetched_data (
@@ -157,21 +192,25 @@ def compare_fetched_data(
157192 repetitions (int): Number of times to repeat the comparison. Default is 10.
158193 fetch_limit (int): Number of rows to fetch from the table for comparison. Default is 100.
159194 """
160-
161- fetched_data = cur .execute (
162- f"SELECT * FROM { table_name } LIMIT { fetch_limit } "
163- ).fetchall ()
164-
165- with open (file_name , newline = "" , encoding = "utf-8" ) as csvfile :
166- reader = csv .reader (csvfile )
167- csv_data = list (reader )[1 :] # Skip header row
168- for _ in range (repetitions ):
169- random_index = random .randint (0 , fetch_limit - 1 )
170- for y in range (len (fetched_data [0 ])):
171- if str (fetched_data [random_index ][y ]) != csv_data [random_index ][y ]:
172- print ({"data_integrity_check" : False })
173- break
174- print ({"data_integrity_check" : True })
195+ try :
196+ fetched_data = cur .execute (
197+ f"SELECT * FROM { table_name } LIMIT { fetch_limit } "
198+ ).fetchall ()
199+
200+ with open (file_name , newline = "" , encoding = "utf-8" ) as csvfile :
201+ reader = csv .reader (csvfile )
202+ csv_data = list (reader )[1 :] # Skip header row
203+ for _ in range (repetitions ):
204+ random_index = random .randint (0 , fetch_limit - 1 )
205+ for y in range (len (fetched_data [0 ])):
206+ if str (fetched_data [random_index ][y ]) != csv_data [random_index ][y ]:
207+ print ({"data_integrity_check" : False })
208+ break
209+ print ({"data_integrity_check" : True })
210+ except Exception as e :
211+ logger .error (f"Error comparing fetched data: { e } " )
212+ print ({"data_integrity_check" : False })
213+ sys .exit (1 )
175214
176215
177216def execute_get_command (stage_name : str , conn : snowflake .connector .SnowflakeConnection ):
@@ -194,7 +233,12 @@ def execute_get_command(stage_name: str, conn: snowflake.connector.SnowflakeConn
194233 print ({"GET_operation" : True })
195234 else :
196235 print ({"GET_operation" : False })
236+ sys .exit (1 )
197237
238+ except Exception as e :
239+ logger .error (f"Error downloading file from stage: { e } " )
240+ print ({"GET_operation" : False })
241+ sys .exit (1 )
198242 finally :
199243 try :
200244 for file in os .listdir (download_dir ):
@@ -206,6 +250,7 @@ def execute_get_command(stage_name: str, conn: snowflake.connector.SnowflakeConn
206250 logger .error (
207251 f"Error cleaning up directory { download_dir } . It may not exist or be empty."
208252 )
253+ sys .exit (1 )
209254
210255
211256def perform_put_fetch_get (connection_parameters : dict , num_records : int = 1000 ):
@@ -252,20 +297,25 @@ def perform_put_fetch_get(connection_parameters: dict, num_records: int = 1000):
252297 logger .error ("Performing GET operation" )
253298 execute_get_command (stage_name , conn )
254299 logger .error ("File downloaded from stage to local directory" )
255-
256300 except Exception as e :
257- logger .error (f"Error during PUT/GET operation: { e } " )
301+ logger .error (f"Error during PUT_FETCH_GET operation: { e } " )
302+ sys .exit (1 )
258303
259304 finally :
305+ logger .error ("INFORMATIONING" )
306+ try :
260307 # Cleanup: Remove data from the stage and delete table
261- with connect (connection_parameters ) as conn :
262- with conn .cursor () as cur :
263- cur .execute (f"REMOVE @{ stage_name } " )
264- cur .execute (f"DROP TABLE { table_name } " )
308+ with connect (connection_parameters ) as conn :
309+ with conn .cursor () as cur :
310+ cur .execute (f"REMOVE @{ stage_name } " )
311+ cur .execute (f"DROP TABLE { table_name } " )
312+ sys .exit (0 )
313+ except Exception as e :
314+ logger .error (f"Error during cleanup: { e } " )
315+ sys .exit (1 )
265316
266317
267- # Disabled in MVP, uncomment to run
268- # @prober_function
318+ @prober_function
269319def perform_put_fetch_get_100_lines (connection_parameters : dict ):
270320 """
271321 Performs a PUT and GET operation for 1,000 rows using the provided connection parameters.
0 commit comments