11import os
22import pandas as pd
3- from google .cloud import storage
43from abc import ABC , abstractmethod
54import requests
65from io import BytesIO
@@ -119,19 +118,18 @@ def validate_schema(self, schema: list[dict]):
119118 raise ValueError (f"Extra columns: { extra_columns } " )
120119
121120 type_mapping = {
122- "int" : lambda x : pd .to_numeric (x ),
123- "float" : lambda x : pd .to_numeric (x , downcast = "float " ),
121+ "int" : lambda x : pd .to_numeric (x , errors = "coerce" ),
122+ "float" : lambda x : pd .to_numeric (x , errors = "coerce " ),
124123 "string" : lambda x : x .astype (str ),
125124 "datetime" : lambda x : pd .to_datetime (x ),
126125 }
127126
128127 for col , expected_type in schema_dict .items ():
129128 try :
130129 self .data [col ] = type_mapping [expected_type ](self .data [col ])
131- if self .data [col ].isna ().any ():
132- logger .warning (f"Conversion failed for column '{ col } '" )
133- raise ValueError (f"Conversion failed for column '{ col } '" )
134- logger .info (f"Successfully converted column '{ col } ' to { expected_type } " )
130+ logger .info (
131+ f"[Schema Validation] Successfully converted column '{ col } ' to { expected_type } "
132+ )
135133 except KeyError as e :
136134 logger .warning (
137135 f"Unsupported type '{ expected_type } ' for column '{ col } ': { e } "
@@ -157,41 +155,3 @@ def cleanup(self, file_name: str):
157155 logging .info (f"File { file_name } cleaned up from local." )
158156 else :
159157 logging .info (f"File { file_name } does not exist. No need to clean up." )
160-
161-
162- class GCSUploader :
163- def __init__ (self , bucket_name : str ):
164- """
165- Create a class to upload data to Google Cloud Storage bucket
166-
167- Args:
168- bucket_name (str): Name of the bucket
169- """
170- self .client = storage .Client ()
171- self .bucket_name = bucket_name
172-
173- def upload (self , file_name : str , destination : str ):
174- """
175- Upload file to Google Cloud Storage bucket
176-
177- Args:
178- file_name (str): Name of the file to upload
179- destination (str): Destination path in the bucket
180- """
181- bucket = self .client .bucket (self .bucket_name )
182- blob = bucket .blob (destination )
183- blob .upload_from_filename (file_name )
184- logging .info (
185- f"File { file_name } uploaded to gs://{ self .bucket_name } /{ destination } "
186- )
187-
188- def check_file_exists (self , file_name : str ):
189- """
190- Check if the file exists in the bucket
191-
192- Args:
193- file_name (str): Name of the file to check
194- """
195- bucket = self .client .bucket (self .bucket_name )
196- blob = bucket .blob (file_name )
197- return blob .exists ()
0 commit comments