Skip to content

Commit 9817bdc

Browse files
committed
Add dim_external_course in Silver Layer
1 parent d516b6d commit 9817bdc

File tree

1 file changed

+297
-0
lines changed

1 file changed

+297
-0
lines changed
Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
# ===============================================================
2+
# Dim External Course - Silver ETL
3+
# Produção | Sistema NAU | FCCN
4+
# Autor: Manoel
5+
# Estrutura com boas práticas para pipelines Spark/Delta
6+
# ===============================================================
7+
8+
import os
9+
import sys
10+
from datetime import datetime
11+
from pyspark.sql import SparkSession
12+
from pyspark.sql import functions as F
13+
from pyspark.sql.window import Window
14+
from pyspark.sql.utils import AnalysisException
15+
16+
# ===============================================================
17+
# 1. Configuração do Spark
18+
# ===============================================================
19+
20+
def start_spark():
21+
return (
22+
SparkSession.builder
23+
.appName("dim_external_course_etl")
24+
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
25+
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
26+
.config("spark.databricks.delta.schema.autoMerge.enabled", "true")
27+
.getOrCreate()
28+
)
29+
30+
spark = start_spark()
31+
32+
33+
# ===============================================================
34+
# 2. Configurações (paths)
35+
# ===============================================================
36+
37+
BRONZE_PATH_COURSE = os.getenv(
38+
"BRONZE_PATH_COURSE",
39+
"s3a://nau-local-analytics-bronze/course_overviews_courseoverview"
40+
)
41+
42+
SILVER_PATH_COURSE = os.getenv(
43+
"SILVER_PATH_COURSE",
44+
"s3a://nau-local-analytics-silver/dim_external_course"
45+
)
46+
47+
DIM_ORG_PATH = os.getenv(
48+
"DIM_ORG_PATH",
49+
"s3a://nau-local-analytics-silver/dim_organizations"
50+
)
51+
52+
TARGET_TABLE = "default.dim_external_course"
53+
54+
55+
# ===============================================================
56+
# 3. Funções Utilitárias
57+
# ===============================================================
58+
59+
def load_delta(path: str):
60+
"""Carrega um Delta Lake com segurança."""
61+
try:
62+
return spark.read.format("delta").load(path)
63+
except Exception as e:
64+
raise RuntimeError(f"Erro ao ler Delta em {path}: {e}")
65+
66+
67+
def normalize_org_code(col):
68+
"""Normaliza códigos de organização (uppercase, trim)."""
69+
return F.upper(F.trim(col))
70+
71+
72+
# ===============================================================
73+
# 4. Leitura das tabelas Bronze e Silver
74+
# ===============================================================
75+
76+
def load_sources():
77+
df_bronze = load_delta(BRONZE_PATH_COURSE)
78+
79+
df_dim_org = (
80+
load_delta(DIM_ORG_PATH)
81+
.select(
82+
"organization_sk",
83+
"organization_id",
84+
normalize_org_code("organization_code").alias("organization_code"),
85+
"effective_start",
86+
"effective_end",
87+
"is_current"
88+
)
89+
)
90+
91+
return df_bronze, df_dim_org
92+
93+
94+
# ===============================================================
95+
# 5. Construção da Dim_Organizations Enriched (SCD2)
96+
# ===============================================================
97+
98+
def build_dim_org_enriched(df_dim_org):
99+
"""Cria visão SCD2 enriquecida com organization_sk_current."""
100+
df_current = (
101+
df_dim_org
102+
.filter(F.col("is_current") == True)
103+
.select(
104+
"organization_id",
105+
normalize_org_code("organization_code").alias("org_code"),
106+
"organization_sk"
107+
)
108+
.withColumnRenamed("organization_sk", "organization_sk_current")
109+
)
110+
111+
# Join entre históricos e atual
112+
df_enriched = (
113+
df_dim_org
114+
.join(df_current, on="organization_id", how="left")
115+
.select(
116+
"organization_sk",
117+
"organization_sk_current",
118+
"organization_id",
119+
"organization_code",
120+
"effective_start",
121+
"effective_end",
122+
"is_current"
123+
)
124+
)
125+
126+
return df_enriched
127+
128+
129+
# ===============================================================
130+
# 6. Limpeza e normalização da Bronze
131+
# ===============================================================
132+
133+
def clean_course_bronze(df_bronze):
134+
df = (
135+
df_bronze
136+
.select(
137+
F.col("id").alias("course_id"),
138+
F.col("_location").alias("course_location"),
139+
F.col("version").cast("int").alias("course_version"),
140+
normalize_org_code("org").alias("course_org_code"),
141+
"display_org_with_default",
142+
"display_name",
143+
"display_number_with_default",
144+
F.col("created").cast("timestamp"),
145+
F.col("modified").cast("timestamp"),
146+
F.col("start").cast("timestamp"),
147+
F.col("end").cast("timestamp"),
148+
F.col("enrollment_start").cast("timestamp"),
149+
F.col("enrollment_end").cast("timestamp"),
150+
"certificate_available_date",
151+
"announcement",
152+
"catalog_visibility",
153+
"self_paced",
154+
"visible_to_staff_only",
155+
"invitation_only",
156+
"mobile_available",
157+
"eligible_for_financial_aid",
158+
"certificates_display_behavior",
159+
"certificates_show_before_end",
160+
"cert_html_view_enabled",
161+
"has_any_active_web_certificate",
162+
"cert_name_short",
163+
"cert_name_long",
164+
"lowest_passing_grade",
165+
"advertised_start",
166+
"effort",
167+
"short_description",
168+
"course_image_url",
169+
"banner_image_url",
170+
"course_video_url",
171+
"marketing_url",
172+
"social_sharing_url",
173+
"language",
174+
"max_student_enrollments_allowed",
175+
"ingestion_date",
176+
"source_name"
177+
)
178+
.dropDuplicates(["course_id"])
179+
)
180+
return df
181+
182+
183+
# ===============================================================
184+
# 7. Apply Hash
185+
# ===============================================================
186+
187+
def apply_hash(df):
188+
business_cols = [c for c in df.columns if c not in ("record_hash",)]
189+
df_hash = df.withColumn(
190+
"record_hash",
191+
F.sha2(
192+
F.concat_ws(
193+
"||", *(F.coalesce(F.col(c).cast("string"), F.lit("NULL")) for c in business_cols)
194+
),
195+
256
196+
)
197+
)
198+
return df_hash
199+
200+
201+
# ===============================================================
202+
# 8. Join com Dim_Organizations enriched
203+
# ===============================================================
204+
205+
def join_dim_org(df_course, df_dim_org_enriched):
206+
df_join = (
207+
df_course.alias("c")
208+
.join(
209+
df_dim_org_enriched.select(
210+
normalize_org_code("organization_code").alias("org_code_dim"),
211+
"organization_sk_current"
212+
).alias("o"),
213+
F.col("c.course_org_code") == F.col("o.org_code_dim"),
214+
"left"
215+
)
216+
.withColumnRenamed("organization_sk_current", "organization_sk")
217+
)
218+
return df_join
219+
220+
221+
# ===============================================================
222+
# 9. Build staging for SCD1
223+
# ===============================================================
224+
225+
def build_stage(df):
226+
return (
227+
df.withColumn("is_current", F.lit(True))
228+
.withColumn("valid_from", F.current_timestamp())
229+
.withColumn("valid_to", F.to_timestamp(F.lit("9999-12-31 23:59:59")))
230+
.withColumn("ingestion_timestamp", F.current_timestamp())
231+
)
232+
233+
234+
# ===============================================================
235+
# 10. MERGE final (SCD1)
236+
# ===============================================================
237+
238+
def merge_table(df_stage):
239+
240+
df_stage.createOrReplaceTempView("stg_dim_external_course")
241+
242+
try:
243+
spark.sql(f"DESCRIBE TABLE {TARGET_TABLE}")
244+
table_exists = True
245+
except AnalysisException:
246+
table_exists = False
247+
248+
if not table_exists:
249+
(
250+
df_stage.write
251+
.format("delta")
252+
.mode("overwrite")
253+
.option("overwriteSchema", "true")
254+
.save(SILVER_PATH_COURSE)
255+
)
256+
257+
spark.sql(f"""
258+
CREATE TABLE IF NOT EXISTS {TARGET_TABLE}
259+
USING DELTA
260+
LOCATION '{SILVER_PATH_COURSE}'
261+
""")
262+
return
263+
264+
merge_sql = f"""
265+
MERGE INTO {TARGET_TABLE} AS t
266+
USING stg_dim_external_course AS s
267+
ON t.course_id = s.course_id
268+
WHEN MATCHED AND t.record_hash <> s.record_hash THEN UPDATE SET *
269+
WHEN NOT MATCHED THEN INSERT *
270+
"""
271+
272+
spark.sql(merge_sql)
273+
274+
275+
# ===============================================================
276+
# 11. Pipeline principal
277+
# ===============================================================
278+
279+
def main():
280+
df_bronze, df_dim_org = load_sources()
281+
282+
df_dim_org_enriched = build_dim_org_enriched(df_dim_org)
283+
df_clean = clean_course_bronze(df_bronze)
284+
df_hash = apply_hash(df_clean)
285+
df_joined = join_dim_org(df_hash, df_dim_org_enriched)
286+
df_stage = build_stage(df_joined)
287+
288+
merge_table(df_stage)
289+
290+
291+
# ===============================================================
292+
# Entry point
293+
# ===============================================================
294+
295+
if __name__ == "__main__":
296+
main()
297+
spark.stop()

0 commit comments

Comments
 (0)