Skip to content

Commit ff4d589

Browse files
committed
add course_sk in table: dim_external_course
1 parent 9817bdc commit ff4d589

File tree

1 file changed

+52
-9
lines changed

1 file changed

+52
-9
lines changed

src/silver/python/dim_external_course.py

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,29 @@
11
# ===============================================================
22
# Dim External Course - Silver ETL
33
# Produção | Sistema NAU | FCCN
4-
# Autor: Manoel
54
# Estrutura com boas práticas para pipelines Spark/Delta
65
# ===============================================================
76

87
import os
98
import sys
9+
import logging
1010
from datetime import datetime
11+
1112
from pyspark.sql import SparkSession
1213
from pyspark.sql import functions as F
1314
from pyspark.sql.window import Window
1415
from pyspark.sql.utils import AnalysisException
1516

17+
# ===============================================================
18+
# Logging
19+
# ===============================================================
20+
21+
logging.basicConfig(
22+
level=logging.INFO,
23+
format="%(asctime)s [%(levelname)s] %(message)s",
24+
)
25+
logger = logging.getLogger("dim_external_course_etl")
26+
1627
# ===============================================================
1728
# 1. Configuração do Spark
1829
# ===============================================================
@@ -177,9 +188,9 @@ def clean_course_bronze(df_bronze):
177188
)
178189
.dropDuplicates(["course_id"])
179190
)
191+
logger.info(f"Registos após clean_course_bronze: {df.count()}")
180192
return df
181193

182-
183194
# ===============================================================
184195
# 7. Apply Hash
185196
# ===============================================================
@@ -217,9 +228,30 @@ def join_dim_org(df_course, df_dim_org_enriched):
217228
)
218229
return df_join
219230

231+
# ===============================================================
232+
# 9. Create Course Unique ID (Surrogate Key)
233+
# ===============================================================
234+
def add_course_sk(df):
235+
"""
236+
Gera course_sk sequencial (1..N), determinístico por course_id.
237+
"""
238+
if "course_id" not in df.columns:
239+
raise ValueError("DataFrame não contém coluna 'course_id'; não é possível criar course_sk.")
240+
241+
window = Window.orderBy("course_id")
242+
243+
df_with_sk = df.withColumn(
244+
"course_sk",
245+
F.row_number().over(window).cast("int")
246+
)
247+
248+
# course_sk primeiro, resto na ordem atual
249+
cols = df_with_sk.columns
250+
ordered_cols = ["course_sk"] + [c for c in cols if c != "course_sk"]
251+
return df_with_sk.select(*ordered_cols)
220252

221253
# ===============================================================
222-
# 9. Build staging for SCD1
254+
# 10. Build staging for SCD1
223255
# ===============================================================
224256

225257
def build_stage(df):
@@ -246,6 +278,8 @@ def merge_table(df_stage):
246278
table_exists = False
247279

248280
if not table_exists:
281+
logger.info("Tabela alvo não existe. Criando Delta inicial em %s", SILVER_PATH_COURSE)
282+
249283
(
250284
df_stage.write
251285
.format("delta")
@@ -261,6 +295,8 @@ def merge_table(df_stage):
261295
""")
262296
return
263297

298+
logger.info("Executando MERGE SCD1 em %s", TARGET_TABLE)
299+
264300
merge_sql = f"""
265301
MERGE INTO {TARGET_TABLE} AS t
266302
USING stg_dim_external_course AS s
@@ -271,27 +307,34 @@ def merge_table(df_stage):
271307

272308
spark.sql(merge_sql)
273309

274-
275310
# ===============================================================
276311
# 11. Pipeline principal
277312
# ===============================================================
278313

279314
def main():
315+
logger.info("Início do ETL Dim_External_Course")
316+
280317
df_bronze, df_dim_org = load_sources()
281318

282319
df_dim_org_enriched = build_dim_org_enriched(df_dim_org)
283-
df_clean = clean_course_bronze(df_bronze)
284-
df_hash = apply_hash(df_clean)
320+
df_clean = clean_course_bronze(df_bronze)
321+
df_hash = apply_hash(df_clean)
285322
df_joined = join_dim_org(df_hash, df_dim_org_enriched)
286-
df_stage = build_stage(df_joined)
323+
df_with_sk = add_course_sk(df_joined) # ⬅️ NOVO
324+
df_stage = build_stage(df_with_sk)
287325

288326
merge_table(df_stage)
289327

328+
logger.info("Fim do ETL Dim_External_Course")
329+
290330

291331
# ===============================================================
292332
# Entry point
293333
# ===============================================================
294334

295335
if __name__ == "__main__":
296-
main()
297-
spark.stop()
336+
try:
337+
main()
338+
finally:
339+
spark.stop()
340+
logger.info("SparkSession stopped.")

0 commit comments

Comments
 (0)