1010import pkg_resources
1111import logging
1212
13- logging .basicConfig (level = logging .INFO , format = '%(asctime)s - %(levelname)s - %(message)s' )
13+ logging .basicConfig (level = logging .INFO ,
14+ format = '%(asctime)s - %(levelname)s - %(message)s' )
15+
1416
1517def check_for_new_files ():
1618 """
@@ -32,6 +34,7 @@ def check_for_new_files():
3234 logging .info (f"New files detected: { [record [1 ] for record in records ]} " )
3335 return records
3436
37+
3538def process_files (** kwargs ):
3639 """
3740 Processes the files detected from the database and marks them as processed.
@@ -47,7 +50,7 @@ def process_files(**kwargs):
4750 for record in records :
4851 key , value = record
4952 logging .info (f"Processing file: { key } , uploaded at { value } " )
50-
53+
5154 bucketName , fileName = key .split ('/' , 1 )
5255 logging .info (f"{ bucketName } ,{ fileName } " )
5356
@@ -62,17 +65,22 @@ def process_files(**kwargs):
6265 pg_hook .run (update_sql )
6366
6467# Function to install a library if not already installed
68+
69+
6570def install (package , version = "7.0.3" ):
6671 try :
6772 # Check if the package is installed
6873 pkg_resources .get_distribution (package )
6974 logging .info (f"{ package } is already installed." )
7075 except pkg_resources .DistributionNotFound :
7176 logging .info (f"{ package } not found. Installing version { version } ..." )
72- subprocess .check_call ([sys .executable , "-m" , "pip" , "install" , f"{ package } =={ version } " ])
77+ subprocess .check_call (
78+ [sys .executable , "-m" , "pip" , "install" , f"{ package } =={ version } " ])
79+
7380
7481def fileProcessor (bucket_name , object_name ):
75- install ('minio' , version = "7.0.3" ) # Install compatible Minio version if needed
82+ # Install compatible Minio version if needed
83+ install ('minio' , version = "7.0.3" )
7684 from minio import Minio
7785 from minio .error import S3Error
7886
@@ -97,7 +105,8 @@ def fileProcessor(bucket_name, object_name):
97105 try :
98106 # Download the object from MinIO to a local file
99107 minio_client .fget_object (bucket_name , object_name , local_file_path )
100- logging .info (f"File '{ object_name } ' successfully downloaded from MinIO." )
108+ logging .info (
109+ f"File '{ object_name } ' successfully downloaded from MinIO." )
101110
102111 # Process the file here
103112 sendProcessedInfoToDb (local_file_path )
@@ -111,13 +120,15 @@ def fileProcessor(bucket_name, object_name):
111120 except Exception as e :
112121 logging .info (f"Unexpected error: { e } " )
113122
123+
114124def sendProcessedInfoToDb (localFilePath ):
115125 dataCoffeePrice = pd .read_csv (localFilePath )
116126
117- coffeePrice = dataCoffeePrice ["value" ]
118- atTime = dataCoffeePrice ["date" ]
127+ coffeePrice = dataCoffeePrice ["value" ]
128+ atTime = dataCoffeePrice ["date" ]
119129
120- logging .info (f"The coffee price is { coffeePrice .iloc [0 ]} at date { atTime .iloc [0 ]} ." )
130+ logging .info (
131+ f"The coffee price is { coffeePrice .iloc [0 ]} at date { atTime .iloc [0 ]} ." )
121132
122133 pg_hook = PostgresHook (postgres_conn_id = 'postgres' )
123134
@@ -130,7 +141,8 @@ def sendProcessedInfoToDb(localFilePath):
130141 pg_hook .run (insertSql )
131142
132143 logging .info ("Newest price instance added to the coffee_price table" )
133-
144+
145+
134146# Define default arguments for the DAG
135147default_args = {
136148 'owner' : 'your_name' ,
@@ -165,5 +177,3 @@ def sendProcessedInfoToDb(localFilePath):
165177
166178# Define task dependencies
167179check_files_task >> process_files_task
168-
169-
0 commit comments