2929# ===============================================================
3030
3131def start_spark ():
32- return (
32+ from pyspark .sql import SparkSession
33+ import os
34+
35+ S3_ACCESS_KEY = os .getenv ("S3_ACCESS_KEY" )
36+ S3_SECRET_KEY = os .getenv ("S3_SECRET_KEY" )
37+ S3_ENDPOINT = os .getenv ("S3_ENDPOINT" , "https://rgw.nau.fccn.pt" )
38+
39+ spark = (
3340 SparkSession .builder
3441 .appName ("dim_external_course_etl" )
3542 .config ("spark.sql.extensions" , "io.delta.sql.DeltaSparkSessionExtension" )
3643 .config ("spark.sql.catalog.spark_catalog" , "org.apache.spark.sql.delta.catalog.DeltaCatalog" )
3744 .config ("spark.databricks.delta.schema.autoMerge.enabled" , "true" )
38- .getOrCreate ()
45+ # Config S3A / Ceph
46+ .config ("spark.hadoop.fs.s3a.access.key" , S3_ACCESS_KEY )
47+ .config ("spark.hadoop.fs.s3a.secret.key" , S3_SECRET_KEY )
48+ .config ("spark.hadoop.fs.s3a.endpoint" , S3_ENDPOINT )
49+ .config ("spark.hadoop.fs.s3a.path.style.access" , "true" )
50+ .config ("spark.hadoop.fs.s3a.impl" , "org.apache.hadoop.fs.s3a.S3AFileSystem" )
3951 )
4052
41- spark = start_spark ()
53+ return spark . getOrCreate ()
4254
55+ spark = start_spark ()
4356
4457# ===============================================================
4558# 2. Configurações (paths)
@@ -152,14 +165,14 @@ def clean_course_bronze(df_bronze):
152165 "display_org_with_default" ,
153166 "display_name" ,
154167 "display_number_with_default" ,
155- F .col ("created" ).cast ("timestamp" ),
156- F .col ("modified" ).cast ("timestamp" ),
157- F .col ("start" ).cast ("timestamp" ),
158- F .col ("end" ).cast ("timestamp" ),
159- F .col ("enrollment_start" ).cast ("timestamp" ),
160- F .col ("enrollment_end" ).cast ("timestamp" ),
161- "certificate_available_date" ,
162- "announcement" ,
168+ F .col ("created" ).cast ("timestamp" ). alias ( "created" ) ,
169+ F .col ("modified" ).cast ("timestamp" ). alias ( "modified" ) ,
170+ F .col ("start" ).cast ("timestamp" ). alias ( "start" ) ,
171+ F .col ("end" ).cast ("timestamp" ). alias ( "end" ) ,
172+ F .col ("enrollment_start" ).cast ("timestamp" ). alias ( "enrollment_start" ) ,
173+ F .col ("enrollment_end" ).cast ("timestamp" ). alias ( "enrollment_end" ) ,
174+ F . col ( "certificate_available_date" ). cast ( "timestamp" ). alias ( "certificate_available_date" ) ,
175+ F . col ( "announcement" ). cast ( "timestamp" ). alias ( "announcement" ) ,
163176 "catalog_visibility" ,
164177 "self_paced" ,
165178 "visible_to_staff_only" ,
@@ -172,7 +185,7 @@ def clean_course_bronze(df_bronze):
172185 "has_any_active_web_certificate" ,
173186 "cert_name_short" ,
174187 "cert_name_long" ,
175- "lowest_passing_grade" ,
188+ F . col ( "lowest_passing_grade" ). cast ( "decimal(5,2)" ). alias ( "lowest_passing_grade" ) ,
176189 "advertised_start" ,
177190 "effort" ,
178191 "short_description" ,
@@ -182,12 +195,14 @@ def clean_course_bronze(df_bronze):
182195 "marketing_url" ,
183196 "social_sharing_url" ,
184197 "language" ,
185- "max_student_enrollments_allowed" ,
186- "ingestion_date" ,
198+ F . col ( "max_student_enrollments_allowed" ). cast ( "int" ). alias ( "max_student_enrollments_allowed" ) ,
199+ F . col ( "ingestion_date" ). cast ( "timestamp" ). alias ( "ingestion_date" ) ,
187200 "source_name"
188201 )
189202 .dropDuplicates (["course_id" ])
203+ .filter (F .col ("course_id" ).isNotNull ())
190204 )
205+
191206 logger .info (f"Registos após clean_course_bronze: { df .count ()} " )
192207 return df
193208
@@ -196,38 +211,70 @@ def clean_course_bronze(df_bronze):
196211# ===============================================================
197212
198213def apply_hash (df ):
199- business_cols = [c for c in df .columns if c not in ("record_hash" ,)]
200- df_hash = df .withColumn (
214+ """
215+ Calcula hash determinístico apenas com colunas de negócio.
216+ Exclui colunas técnicas (SKs, timestamps SCD, ingestão, hash anterior).
217+ """
218+
219+ technical_cols = {
220+ "course_sk" ,
221+ "organization_sk" ,
222+ "record_hash" ,
223+ "valid_from" ,
224+ "valid_to" ,
225+ "is_current" ,
226+ "ingestion_timestamp" ,
227+ "ingestion_date" ,
228+ "source_name"
229+ }
230+
231+ business_cols = [c for c in df .columns if c not in technical_cols ]
232+
233+ return df .withColumn (
201234 "record_hash" ,
202235 F .sha2 (
203236 F .concat_ws (
204- "||" , * (F .coalesce (F .col (c ).cast ("string" ), F .lit ("NULL" )) for c in business_cols )
237+ "||" ,
238+ * [
239+ F .coalesce (F .col (c ).cast ("string" ), F .lit ("NULL" ))
240+ for c in business_cols
241+ ]
205242 ),
206243 256
207244 )
208245 )
209- return df_hash
246+
210247
211248
212249# ===============================================================
213250# 8. Join com Dim_Organizations enriched
214251# ===============================================================
215252
216253def join_dim_org (df_course , df_dim_org_enriched ):
254+ # 1) Criar lookup único por código normalizado
255+ df_org_lookup = (
256+ df_dim_org_enriched
257+ .select (
258+ normalize_org_code ("organization_code" ).alias ("org_code_dim" ),
259+ "organization_sk_current"
260+ )
261+ .dropDuplicates (["org_code_dim" ]) # 👈 GARANTE 1:1 POR CÓDIGO
262+ )
263+
264+ # 2) Fazer o join curso → organização atual
217265 df_join = (
218266 df_course .alias ("c" )
219267 .join (
220- df_dim_org_enriched .select (
221- normalize_org_code ("organization_code" ).alias ("org_code_dim" ),
222- "organization_sk_current"
223- ).alias ("o" ),
268+ df_org_lookup .alias ("o" ),
224269 F .col ("c.course_org_code" ) == F .col ("o.org_code_dim" ),
225270 "left"
226271 )
227272 .withColumnRenamed ("organization_sk_current" , "organization_sk" )
273+ .drop ("org_code_dim" )
228274 )
229275 return df_join
230276
277+
231278# ===============================================================
232279# 9. Create Course Unique ID (Surrogate Key)
233280# ===============================================================
@@ -308,7 +355,28 @@ def merge_table(df_stage):
308355 spark .sql (merge_sql )
309356
310357# ===============================================================
311- # 11. Pipeline principal
358+ # 11. Validate Silver Table
359+ # ===============================================================
360+
361+ def validate_silver_table (spark , path : str ):
362+ logger .info (f"Validando tabela Silver em: { path } " )
363+ try :
364+ df_silver = spark .read .format ("delta" ).load (path )
365+ logger .info ("Schema da Dim_External_Course (Silver):" )
366+ df_silver .printSchema ()
367+
368+ try :
369+ row_count = df_silver .count ()
370+ logger .info (f"Total de registos na Dim_External_Course (Silver): { row_count } " )
371+ except Exception as e :
372+ logger .warning (f"Falha ao fazer count() na Silver (possível fecho da sessão Spark). Erro: { e } " )
373+
374+ except Exception as e :
375+ logger .error (f"Falha ao validar a tabela Silver: { e } " )
376+
377+ return row_count
378+ # ===============================================================
379+ # 12. Pipeline principal
312380# ===============================================================
313381
314382def main ():
@@ -320,11 +388,11 @@ def main():
320388 df_clean = clean_course_bronze (df_bronze )
321389 df_hash = apply_hash (df_clean )
322390 df_joined = join_dim_org (df_hash , df_dim_org_enriched )
323- df_with_sk = add_course_sk (df_joined ) # ⬅️ NOVO
391+ df_with_sk = add_course_sk (df_joined )
324392 df_stage = build_stage (df_with_sk )
325393
326394 merge_table (df_stage )
327-
395+ validate_silver_table ( spark , SILVER_PATH_COURSE )
328396 logger .info ("Fim do ETL Dim_External_Course" )
329397
330398
@@ -333,8 +401,6 @@ def main():
333401# ===============================================================
334402
335403if __name__ == "__main__" :
336- try :
337- main ()
338- finally :
339- spark .stop ()
340- logger .info ("SparkSession stopped." )
404+ main ()
405+ spark .stop ()
406+ logger .info ("Stop Spark Session" )
0 commit comments