Skip to content

Commit 7b5d15d

Browse files
committed
add dim_date.py in silver bucket.
1 parent 7c91332 commit 7b5d15d

File tree

1 file changed

+273
-0
lines changed

1 file changed

+273
-0
lines changed

src/silver/python/dim_date.py

Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
# ===============================================================
2+
# Dim Date - Silver ETL
3+
# Produção | Sistema NAU | FCCN
4+
# Estrutura com boas práticas para pipelines Spark/Delta
5+
# ===============================================================
6+
import os
7+
import sys
8+
import logging
9+
from pyspark.sql import SparkSession, functions as F
10+
11+
12+
# -----------------------------
13+
# Logger
14+
# -----------------------------
15+
def get_logger(name: str = "dim_date") -> logging.Logger:
16+
logger = logging.getLogger(name)
17+
18+
if logger.handlers:
19+
return logger
20+
21+
level_str = os.getenv("LOG_LEVEL", "INFO").upper()
22+
level = getattr(logging, level_str, logging.INFO)
23+
logger.setLevel(level)
24+
25+
h = logging.StreamHandler(sys.stdout)
26+
h.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s - %(message)s"))
27+
logger.addHandler(h)
28+
logger.propagate = False
29+
return logger
30+
31+
32+
logger = get_logger("dim_date")
33+
34+
35+
# -----------------------------
36+
# SparkSession
37+
# -----------------------------
38+
def get_spark_session() -> SparkSession:
39+
S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY")
40+
S3_SECRET_KEY = os.getenv("S3_SECRET_KEY")
41+
S3_ENDPOINT = os.getenv("S3_ENDPOINT")
42+
43+
if not S3_ACCESS_KEY or not S3_SECRET_KEY or not S3_ENDPOINT:
44+
raise RuntimeError("Missing S3_ACCESS_KEY / S3_SECRET_KEY / S3_ENDPOINT")
45+
46+
spark = (
47+
SparkSession.builder
48+
.appName("NAU Analytics - Dim_Date")
49+
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
50+
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
51+
.config("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY)
52+
.config("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY)
53+
.config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT)
54+
.config("spark.hadoop.fs.s3a.path.style.access", "true")
55+
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
56+
.getOrCreate()
57+
)
58+
59+
spark.sparkContext.setLogLevel("WARN")
60+
spark.conf.set("spark.sql.shuffle.partitions", os.getenv("SPARK_SHUFFLE_PARTITIONS", "4"))
61+
return spark
62+
63+
64+
# -----------------------------
65+
# Helpers
66+
# -----------------------------
67+
def is_delta_path(spark: SparkSession, path: str) -> bool:
68+
try:
69+
spark.sql(f"DESCRIBE DETAIL delta.`{path}`").collect()
70+
return True
71+
except Exception:
72+
return False
73+
74+
75+
def enrich_date_attributes(df):
76+
# df precisa ter coluna: date (date type)
77+
dias_pt = F.array(
78+
F.lit("Domingo"),
79+
F.lit("Segunda-feira"),
80+
F.lit("Terça-feira"),
81+
F.lit("Quarta-feira"),
82+
F.lit("Quinta-feira"),
83+
F.lit("Sexta-feira"),
84+
F.lit("Sábado"),
85+
)
86+
87+
meses_pt = F.array(
88+
F.lit("Janeiro"),
89+
F.lit("Fevereiro"),
90+
F.lit("Março"),
91+
F.lit("Abril"),
92+
F.lit("Maio"),
93+
F.lit("Junho"),
94+
F.lit("Julho"),
95+
F.lit("Agosto"),
96+
F.lit("Setembro"),
97+
F.lit("Outubro"),
98+
F.lit("Novembro"),
99+
F.lit("Dezembro"),
100+
)
101+
102+
df = (
103+
df
104+
.withColumn("date_sk", F.date_format("date", "yyyyMMdd").cast("int"))
105+
.withColumn("year", F.year("date"))
106+
.withColumn("month", F.month("date"))
107+
.withColumn("day", F.dayofmonth("date"))
108+
.withColumn("day_of_week", F.dayofweek("date")) # 1=Domingo
109+
.withColumn("week_of_year", F.weekofyear("date"))
110+
.withColumn("day_name_pt", dias_pt[F.col("day_of_week") - 1])
111+
.withColumn("month_name_pt", meses_pt[F.col("month") - 1])
112+
.withColumn("year_month", (F.col("year") * 100 + F.col("month")).cast("int"))
113+
.withColumn("semester", F.when(F.col("month") <= 6, F.lit(1)).otherwise(F.lit(2)))
114+
.withColumn("quarter", F.ceil(F.col("month") / F.lit(3)).cast("int"))
115+
.withColumn("bimester", F.ceil(F.col("month") / F.lit(2)).cast("int"))
116+
.withColumn("year_semester", F.concat_ws(".", F.col("year").cast("string"), F.col("semester").cast("string")))
117+
.withColumn("year_quarter", F.concat_ws(".", F.col("year").cast("string"), F.col("quarter").cast("string")))
118+
.withColumn("year_bimester", F.concat_ws(".", F.col("year").cast("string"), F.col("bimester").cast("string")))
119+
.withColumn(
120+
"date_long_pt",
121+
F.concat(
122+
F.lpad(F.col("day").cast("string"), 2, "0"),
123+
F.lit(" de "),
124+
F.col("month_name_pt"),
125+
F.lit(" de "),
126+
F.col("year").cast("string"),
127+
)
128+
)
129+
.withColumn("is_weekend", F.when(F.col("day_of_week").isin(1, 7), F.lit(True)).otherwise(F.lit(False)))
130+
.withColumn("ingestion_timestamp", F.current_timestamp())
131+
)
132+
133+
return df.select(
134+
"date_sk",
135+
"date",
136+
"year",
137+
"month",
138+
"day",
139+
"day_of_week",
140+
"day_name_pt",
141+
"month_name_pt",
142+
"week_of_year",
143+
"year_month",
144+
"semester",
145+
"quarter",
146+
"bimester",
147+
"year_semester",
148+
"year_quarter",
149+
"year_bimester",
150+
"date_long_pt",
151+
"is_weekend",
152+
"ingestion_timestamp",
153+
)
154+
155+
156+
def build_dim_date_df(spark: SparkSession, start_date: str, end_date: str):
157+
# Spark SQL puro, rápido e simples
158+
df = spark.sql(f"""
159+
SELECT explode(sequence(to_date('{start_date}'), to_date('{end_date}'), interval 1 day)) AS date
160+
""")
161+
return enrich_date_attributes(df)
162+
163+
164+
def upsert_dim_date(spark: SparkSession, silver_path: str, desired_start: str, desired_end: str):
165+
if not is_delta_path(spark, silver_path):
166+
logger.info(f"Delta não existe em {silver_path}. Criando tabela inicial...")
167+
df_all = build_dim_date_df(spark, desired_start, desired_end)
168+
df_all.write.format("delta").mode("overwrite").save(silver_path)
169+
logger.info(f"Dim_Date criada com {df_all.count()} linhas.")
170+
return
171+
172+
# Existe: só estende (ou faz upsert de tudo, mas sem necessidade)
173+
cur = spark.read.format("delta").load(silver_path)
174+
175+
limits = cur.agg(
176+
F.min("date").alias("min_date"),
177+
F.max("date").alias("max_date"),
178+
F.count("*").alias("cnt"),
179+
).collect()[0]
180+
181+
min_date = limits["min_date"]
182+
max_date = limits["max_date"]
183+
logger.info(f"Dim_Date atual: min_date={min_date}, max_date={max_date}, rows={limits['cnt']}")
184+
185+
# se o desired_end já está coberto, sai
186+
desired_end_row = spark.sql(f"SELECT to_date('{desired_end}') AS d").collect()[0]
187+
desired_end_dt = desired_end_row["d"]
188+
189+
if max_date is not None and max_date >= desired_end_dt:
190+
logger.info("Nada a fazer: tabela já cobre o intervalo desejado.")
191+
return
192+
193+
# novo start = max_date + 1 (ou desired_start se a tabela estiver vazia)
194+
if max_date is None:
195+
new_start = desired_start
196+
else:
197+
new_start = spark.sql(f"SELECT date_add(to_date('{max_date}'), 1) AS d").collect()[0]["d"]
198+
199+
logger.info(f"Gerando apenas novas datas: {new_start} -> {desired_end}")
200+
df_new = build_dim_date_df(spark, str(new_start), desired_end)
201+
202+
df_new.createOrReplaceTempView("stg_dim_date")
203+
204+
spark.sql(f"""
205+
MERGE INTO delta.`{silver_path}` t
206+
USING stg_dim_date s
207+
ON t.date = s.date
208+
WHEN NOT MATCHED THEN INSERT *
209+
""")
210+
211+
logger.info(f"Merge concluído. Novas linhas (staging): {df_new.count()}")
212+
213+
214+
def validate_dim_date(spark: SparkSession, silver_path: str):
215+
df = spark.read.format("delta").load(silver_path)
216+
stats = df.agg(
217+
F.min("date").alias("min_date"),
218+
F.max("date").alias("max_date"),
219+
F.count("*").alias("total"),
220+
F.countDistinct("date").alias("distinct_dates"),
221+
F.countDistinct("date_sk").alias("distinct_sk"),
222+
).collect()[0]
223+
224+
logger.info(f"VALIDATION: total={stats['total']} distinct_dates={stats['distinct_dates']} distinct_sk={stats['distinct_sk']}")
225+
logger.info(f"VALIDATION: min_date={stats['min_date']} max_date={stats['max_date']}")
226+
227+
if stats["total"] != stats["distinct_dates"]:
228+
logger.warning("Duplicatas detectadas em date (total != distinct_dates).")
229+
if stats["total"] != stats["distinct_sk"]:
230+
logger.warning("Duplicatas detectadas em date_sk (total != distinct_sk).")
231+
232+
233+
# -----------------------------
234+
# Main
235+
# -----------------------------
236+
def main():
237+
logger.info("Starting Dim_Date ETL")
238+
spark = None
239+
240+
try:
241+
start_date = os.getenv("DIM_DATE_START", "2018-01-01")
242+
end_date = os.getenv("DIM_DATE_END", "2032-12-31")
243+
244+
silver_base = os.getenv("SILVER_BUCKET")
245+
if not silver_base:
246+
raise RuntimeError("SILVER_BUCKET not set")
247+
248+
silver_path = f"{silver_base.rstrip('/')}/dim_date"
249+
logger.info(f"Params: start={start_date} end={end_date}")
250+
logger.info(f"Target: {silver_path}")
251+
252+
spark = get_spark_session()
253+
254+
upsert_dim_date(spark, silver_path, start_date, end_date)
255+
validate_dim_date(spark, silver_path)
256+
257+
logger.info("Dim_Date ETL finished successfully")
258+
259+
except Exception:
260+
logger.exception("Error while running Dim_Date ETL")
261+
raise SystemExit(1)
262+
263+
finally:
264+
if spark is not None:
265+
try:
266+
spark.stop()
267+
logger.info("SparkSession stopped")
268+
except Exception:
269+
logger.warning("Failed to stop SparkSession cleanly", exc_info=True)
270+
271+
272+
if __name__ == "__main__":
273+
main()

0 commit comments

Comments
 (0)