Skip to content

Commit 4093a9a

Browse files
committed
Add Silver dimensions: dim_external_course and dim_organization
1 parent 5d1a626 commit 4093a9a

File tree

2 files changed

+306
-0
lines changed

2 files changed

+306
-0
lines changed

src/silver/python/dim_external_course.py

Whitespace-only changes.
Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
# dim_organizations_scd2.py
2+
3+
import os
4+
from pyspark.sql import SparkSession, DataFrame
5+
from pyspark.sql import functions as F
6+
from pyspark.sql.window import Window
7+
8+
9+
# =============================================================================
10+
# Constantes de paths (bronze / silver)
11+
# =============================================================================
12+
13+
BRONZE_BUCKET = os.getenv("S3_BUCKET_BRONZE", "s3a://nau-local-analytics-bronze")
14+
SILVER_BUCKET = os.getenv("S3_BUCKET_SILVER", "s3a://nau-local-analytics-silver")
15+
16+
BRONZE_PATH_ORG_CURRENT = f"{BRONZE_BUCKET.rstrip('/')}/organizations_organization"
17+
BRONZE_PATH_ORG_HIST = f"{BRONZE_BUCKET.rstrip('/')}/organizations_organizationhistory"
18+
19+
SILVER_PATH_DIM_ORG = f"{SILVER_BUCKET.rstrip('/')}/dim_organizations"
20+
TARGET_TABLE_DIM_ORG = "default.dim_organizations"
21+
22+
23+
# =============================================================================
24+
# Criação da SparkSession
25+
# =============================================================================
26+
27+
def get_spark() -> SparkSession:
28+
"""
29+
Retorna uma SparkSession. Assume que a stack já foi iniciada
30+
com as configs de Delta via spark-submit / cluster.
31+
"""
32+
spark = (
33+
SparkSession.builder
34+
.appName("NAU - Dim_Organizations SCD2")
35+
.getOrCreate()
36+
)
37+
return spark
38+
39+
40+
# =============================================================================
41+
# 1. Leitura e normalização da Bronze
42+
# =============================================================================
43+
44+
def read_org_current(spark: SparkSession) -> DataFrame:
45+
"""
46+
Lê a tabela organizations_organization da camada Bronze
47+
e faz a normalização básica dos campos.
48+
"""
49+
df = (
50+
spark.read
51+
.format("delta")
52+
.load(BRONZE_PATH_ORG_CURRENT)
53+
)
54+
55+
df_clean = (
56+
df.select(
57+
F.col("id").cast("int").alias("organization_id"),
58+
F.trim(F.col("short_name")).alias("organization_code"),
59+
F.trim(F.col("name")).alias("organization_name"),
60+
F.when(F.trim(F.col("description")) == "", None)
61+
.otherwise(F.trim(F.col("description"))).alias("description"),
62+
F.trim(F.col("logo")).alias("logo"),
63+
F.col("modified").cast("timestamp").alias("modified_at"),
64+
F.col("ingestion_date").cast("timestamp").alias("ingestion_date"),
65+
F.col("source_name").alias("source_name"),
66+
)
67+
)
68+
69+
return df_clean
70+
71+
72+
def read_org_history(spark: SparkSession) -> DataFrame:
73+
"""
74+
Lê a tabela organizations_organizationhistory da Bronze
75+
e normaliza os campos de histórico.
76+
"""
77+
df_hist = (
78+
spark.read
79+
.format("delta")
80+
.load(BRONZE_PATH_ORG_HIST)
81+
)
82+
83+
df_hist_clean = (
84+
df_hist.select(
85+
F.col("id").cast("int").alias("organization_id"),
86+
F.trim(F.col("short_name")).alias("organization_code"),
87+
F.trim(F.col("name")).alias("organization_name"),
88+
F.when(F.trim(F.col("description")) == "", None)
89+
.otherwise(F.trim(F.col("description"))).alias("description"),
90+
F.trim(F.col("logo")).alias("logo"),
91+
F.col("modified").cast("timestamp").alias("modified_at"),
92+
F.col("history_date").cast("timestamp").alias("history_date"),
93+
F.col("history_type").alias("history_type"),
94+
F.col("history_user_id").cast("int").alias("history_user_id"),
95+
F.col("ingestion_date").cast("timestamp").alias("ingestion_date"),
96+
F.col("source_name").alias("source_name"),
97+
)
98+
)
99+
100+
return df_hist_clean
101+
102+
103+
# =============================================================================
104+
# 2. Construção dos eventos SCD2 (histórico + sintético)
105+
# =============================================================================
106+
107+
def build_all_events(
108+
df_org_current: DataFrame,
109+
df_org_hist: DataFrame
110+
) -> DataFrame:
111+
"""
112+
Constrói um dataframe com todos os eventos de alteração da organização:
113+
114+
- Eventos históricos vindos de organizations_organizationhistory.
115+
- Eventos “sintéticos” para organizações sem histórico (pelo menos 1 linha).
116+
"""
117+
118+
# IDs que já aparecem na tabela de histórico
119+
hist_ids = df_org_hist.select("organization_id").distinct()
120+
121+
# Organizações sem qualquer registo de histórico
122+
df_orgs_sem_hist = (
123+
df_org_current
124+
.join(hist_ids, on="organization_id", how="left_anti")
125+
)
126+
127+
# Para estas, criamos um evento sintético (history_type '+')
128+
df_extra_events_sem_hist = (
129+
df_orgs_sem_hist.select(
130+
F.col("organization_id"),
131+
F.col("organization_code"),
132+
F.col("organization_name"),
133+
F.col("description"),
134+
F.col("logo"),
135+
F.col("modified_at"),
136+
# Consideramos o modified_at como history_date para o evento sintético
137+
F.col("modified_at").alias("history_date"),
138+
F.lit("+").alias("history_type"),
139+
F.lit(None).cast("int").alias("history_user_id"),
140+
F.col("ingestion_date"),
141+
F.col("source_name"),
142+
)
143+
)
144+
145+
# União de histórico real + sintético
146+
df_all_events = df_org_hist.unionByName(df_extra_events_sem_hist)
147+
148+
return df_all_events
149+
150+
151+
# =============================================================================
152+
# 3. Aplicar lógica SCD2 (effective_start, effective_end, is_current)
153+
# =============================================================================
154+
155+
def build_scd2_dataframe(df_all_events: DataFrame) -> DataFrame:
156+
"""
157+
Ordena os eventos por organização e data, define os intervalos
158+
de vigência (effective_start, effective_end) e marca is_current.
159+
Também calcula o record_hash e o surrogate key (organization_sk).
160+
"""
161+
162+
w = Window.partitionBy("organization_id").orderBy(
163+
F.col("history_date").asc(),
164+
F.col("modified_at").asc()
165+
)
166+
167+
df_with_lag_lead = (
168+
df_all_events
169+
.withColumn("prev_modified_at", F.lag("modified_at").over(w))
170+
.withColumn("next_org_id", F.lead("organization_id").over(w))
171+
)
172+
173+
start_default = F.to_timestamp(F.lit("1900-01-01 00:00:00"))
174+
175+
df_scd2 = (
176+
df_with_lag_lead
177+
# effective_start
178+
.withColumn(
179+
"effective_start",
180+
F.when(F.col("history_type") == F.lit("+"), start_default)
181+
.otherwise(F.expr("prev_modified_at + INTERVAL 1 SECOND"))
182+
)
183+
# effective_end: NULL para o último registo (versão atual)
184+
.withColumn(
185+
"effective_end",
186+
F.when(F.col("next_org_id").isNull(), F.lit(None).cast("timestamp"))
187+
.otherwise(F.col("modified_at"))
188+
)
189+
.drop("prev_modified_at", "next_org_id")
190+
)
191+
192+
# record_hash baseado nos campos de negócio principais
193+
business_cols = [
194+
"organization_id",
195+
"organization_code",
196+
"organization_name",
197+
"description",
198+
"logo",
199+
]
200+
201+
df_scd2_hashed = (
202+
df_scd2.withColumn(
203+
"record_hash",
204+
F.sha2(
205+
F.concat_ws(
206+
"||",
207+
*[
208+
F.coalesce(F.col(c).cast("string"), F.lit("NULL"))
209+
for c in business_cols
210+
],
211+
),
212+
256,
213+
),
214+
)
215+
.withColumn("ingestion_timestamp", F.current_timestamp())
216+
.withColumn(
217+
"is_current",
218+
F.col("effective_end").isNull()
219+
)
220+
)
221+
222+
# Surrogate key: ordem dentro de cada organização pelo effective_start
223+
w_sk = Window.partitionBy("organization_id").orderBy(
224+
F.col("effective_start").asc(),
225+
F.col("history_date").asc(),
226+
F.col("modified_at").asc()
227+
)
228+
229+
df_dim_organizations = (
230+
df_scd2_hashed
231+
.withColumn("organization_sk", F.row_number().over(w_sk).cast("long"))
232+
.select(
233+
"organization_sk",
234+
"organization_id",
235+
"organization_code",
236+
"organization_name",
237+
"description",
238+
"logo",
239+
"modified_at",
240+
"history_date",
241+
"history_type",
242+
"history_user_id",
243+
"effective_start",
244+
"effective_end",
245+
"is_current",
246+
"record_hash",
247+
"ingestion_date",
248+
"source_name",
249+
"ingestion_timestamp",
250+
)
251+
)
252+
253+
return df_dim_organizations
254+
255+
256+
# =============================================================================
257+
# 4. Escrita na Silver (full refresh / overwrite)
258+
# =============================================================================
259+
260+
def write_dim_organizations(
261+
spark: SparkSession,
262+
df_dim_org: DataFrame
263+
) -> None:
264+
"""
265+
Escreve a dimensão na Silver em formato Delta, em modo overwrite
266+
(full refresh) e garante a criação/atualização da tabela Hive.
267+
"""
268+
269+
(
270+
df_dim_org.write
271+
.format("delta")
272+
.mode("overwrite")
273+
.option("overwriteSchema", "true")
274+
.save(SILVER_PATH_DIM_ORG)
275+
)
276+
277+
# Regista/atualiza a tabela no metastore com o mesmo schema
278+
spark.sql(f"""
279+
CREATE TABLE IF NOT EXISTS {TARGET_TABLE_DIM_ORG}
280+
USING DELTA
281+
LOCATION '{SILVER_PATH_DIM_ORG}'
282+
""")
283+
284+
# Caso já exista, o CREATE TABLE IF NOT EXISTS não altera o schema,
285+
# mas como gravámos com overwrite + overwriteSchema, o Delta no path
286+
# já está consistente. Se quiseres forçar REPLACE TABLE, podes adaptar.
287+
288+
289+
# =============================================================================
290+
# 5. main()
291+
# =============================================================================
292+
293+
def main() -> None:
294+
spark = get_spark()
295+
296+
df_org_current = read_org_current(spark)
297+
df_org_hist = read_org_history(spark)
298+
299+
df_all_events = build_all_events(df_org_current, df_org_hist)
300+
df_dim_org = build_scd2_dataframe(df_all_events)
301+
302+
write_dim_organizations(spark, df_dim_org)
303+
304+
305+
if __name__ == "__main__":
306+
main()

0 commit comments

Comments
 (0)