Skip to content

Commit d516b6d

Browse files
committed
Add Bronze ingestion (Google Sheets) and Silver ETL for Dim_Downtimes
1 parent 4093a9a commit d516b6d

File tree

2 files changed

+209
-0
lines changed

2 files changed

+209
-0
lines changed

src/bronze/get_downtimes.py

Whitespace-only changes.

src/silver/python/dim_downtimes.py

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
from pyspark.sql import SparkSession, functions as F
2+
import os
3+
4+
5+
# =========================================
6+
# 1. Spark Session (produção)
7+
# =========================================
8+
9+
def get_spark_session() -> SparkSession:
10+
return (
11+
SparkSession.builder
12+
.appName("NAU – Dim_Downtimes (Silver)")
13+
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
14+
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
15+
# Heartbeat/timeouts ajustados para RGW/CEPH
16+
.config("spark.network.timeout", "600s")
17+
.config("spark.executor.heartbeatInterval", "60s")
18+
# S3A / RGW
19+
.config("spark.hadoop.fs.s3a.access.key", os.getenv("S3_ACCESS_KEY"))
20+
.config("spark.hadoop.fs.s3a.secret.key", os.getenv("S3_SECRET_KEY"))
21+
.config("spark.hadoop.fs.s3a.endpoint", os.getenv("S3_ENDPOINT"))
22+
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
23+
.config("spark.hadoop.fs.s3a.path.style.access", "true")
24+
# Partições menores (dataset pequeno)
25+
.config("spark.sql.shuffle.partitions", "4")
26+
.getOrCreate()
27+
)
28+
29+
30+
spark = get_spark_session()
31+
32+
33+
# =========================================
34+
# 2. Paths Bronze/Silver
35+
# =========================================
36+
37+
BRONZE_BUCKET = os.getenv("BRONZE_BUCKET")
38+
SILVER_BUCKET = os.getenv("SILVER_BUCKET")
39+
40+
bronze_path = f"{BRONZE_BUCKET.rstrip('/')}/external/nau_downtimes"
41+
silver_path = f"{SILVER_BUCKET.rstrip('/')}/dim_downtimes"
42+
43+
44+
# =========================================
45+
# 3. Leitura Bronze (Delta)
46+
# =========================================
47+
48+
df_bronze = (
49+
spark.read
50+
.format("delta")
51+
.load(bronze_path)
52+
)
53+
54+
55+
# =========================================
56+
# 4. Filtrar apenas linhas válidas (from/to preenchidos)
57+
# =========================================
58+
59+
df_valid = df_bronze.filter(
60+
(F.trim("from_lisbon_time") != "") &
61+
(F.trim("to_lisbon_time") != "")
62+
)
63+
64+
65+
# =========================================
66+
# 5. Normalização, trims, NULLs, booleanos
67+
# =========================================
68+
69+
df_norm = (
70+
df_valid
71+
.withColumn("from_lisbon_time", F.trim("from_lisbon_time"))
72+
.withColumn("to_lisbon_time", F.trim("to_lisbon_time"))
73+
.withColumn("impact", F.nullif(F.trim("impact"), ""))
74+
.withColumn("description", F.nullif(F.trim("description"), ""))
75+
.withColumn("affected_applications", F.nullif(F.trim("affected_applications"), ""))
76+
.withColumn("expected_bool", F.col("expected") == "TRUE")
77+
.withColumn("detected_by_nagios_bool", F.col("detected_by_nagios") == "TRUE")
78+
.withColumn("detected_by_icinga_bool", F.col("detected_by_icinga") == "TRUE")
79+
.withColumn("detected_by_uptimerobot_bool", F.col("detected_by_uptimerobot") == "TRUE")
80+
.withColumn("is_lms_affected", F.col("lms_nau_edu_pt_studio_nau_edu_pt") == "TRUE")
81+
.withColumn("is_www_affected", F.col("www_nau_edu_pt") == "TRUE")
82+
.withColumn("is_partial_outage", F.col("only_some_sub_service_s_affected") == "TRUE")
83+
)
84+
85+
86+
# =========================================
87+
# 6. Converter timestamps (H:mm e H:mm:ss)
88+
# =========================================
89+
90+
df_norm = (
91+
df_norm
92+
.withColumn(
93+
"from_ts",
94+
F.coalesce(
95+
F.to_timestamp("from_lisbon_time", "yyyy-MM-dd H:mm:ss"),
96+
F.to_timestamp("from_lisbon_time", "yyyy-MM-dd H:mm")
97+
)
98+
)
99+
.withColumn(
100+
"to_ts",
101+
F.coalesce(
102+
F.to_timestamp("to_lisbon_time", "yyyy-MM-dd H:mm:ss"),
103+
F.to_timestamp("to_lisbon_time", "yyyy-MM-dd H:mm")
104+
)
105+
)
106+
)
107+
108+
df_norm = df_norm.filter(
109+
F.col("from_ts").isNotNull() &
110+
F.col("to_ts").isNotNull()
111+
)
112+
113+
114+
# =========================================
115+
# 7. Cálculo da duração + mismatch
116+
# =========================================
117+
118+
df_norm = (
119+
df_norm
120+
.withColumn("downtime_duration_minutes_source", F.col("duration_in_minutes").cast("int"))
121+
.withColumn(
122+
"downtime_duration_minutes",
123+
F.floor((F.col("to_ts").cast("long") - F.col("from_ts").cast("long")) / 60).cast("int")
124+
)
125+
.withColumn(
126+
"downtime_has_duration_mismatch",
127+
F.when(
128+
F.col("downtime_duration_minutes_source").isNotNull() &
129+
(F.col("downtime_duration_minutes_source") != F.col("downtime_duration_minutes")),
130+
True
131+
).otherwise(False)
132+
)
133+
)
134+
135+
136+
# =========================================
137+
# 8. Renomear colunas para padrão Silver
138+
# =========================================
139+
140+
df_final = (
141+
df_norm
142+
.withColumnRenamed("impact", "downtime_impact")
143+
.withColumnRenamed("description", "downtime_description")
144+
.withColumnRenamed("affected_applications", "downtime_affected_applications")
145+
)
146+
147+
148+
# =========================================
149+
# 9. Hash único (downtime_hash)
150+
# =========================================
151+
152+
df_final = (
153+
df_final.withColumn(
154+
"downtime_hash",
155+
F.sha2(
156+
F.concat_ws(
157+
"||",
158+
F.col("from_ts").cast("string"),
159+
F.col("to_ts").cast("string"),
160+
F.coalesce(F.col("downtime_impact"), F.lit("")),
161+
F.coalesce(F.col("downtime_description"), F.lit("")),
162+
F.coalesce(F.col("downtime_affected_applications"), F.lit(""))
163+
),
164+
256
165+
)
166+
)
167+
)
168+
169+
170+
# =========================================
171+
# 10. Seleção final de colunas
172+
# =========================================
173+
174+
df_output = df_final.select(
175+
"from_ts",
176+
"to_ts",
177+
"downtime_impact",
178+
"downtime_description",
179+
"downtime_affected_applications",
180+
"expected_bool",
181+
"detected_by_nagios_bool",
182+
"detected_by_icinga_bool",
183+
"detected_by_uptimerobot_bool",
184+
"is_lms_affected",
185+
"is_www_affected",
186+
"is_partial_outage",
187+
"downtime_duration_minutes_source",
188+
"downtime_duration_minutes",
189+
"downtime_has_duration_mismatch",
190+
"ingestion_timestamp",
191+
"source_file_id",
192+
"source_sheet_name",
193+
"downtime_hash"
194+
)
195+
196+
197+
# =========================================
198+
# 11. Escrita Silver (Delta) – Produção
199+
# =========================================
200+
201+
(
202+
df_output
203+
.coalesce(1)
204+
.write
205+
.format("delta")
206+
.mode("overwrite")
207+
.option("overwriteSchema", "true")
208+
.save(silver_path)
209+
)

0 commit comments

Comments
 (0)