Skip to content

Commit 64988ae

Browse files
olha00 final pj
1 parent 615a13c commit 64988ae

File tree

5 files changed

+162
-0
lines changed

5 files changed

+162
-0
lines changed
3.95 KB
Binary file not shown.

olha00/bronze_to_silver.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import re
2+
from pathlib import Path
3+
from pyspark.sql import SparkSession
4+
from pyspark.sql.functions import udf
5+
from pyspark.sql.types import StringType
6+
7+
BASE_DIR = Path(__file__).resolve().parent
8+
BRONZE_DIR = BASE_DIR / "bronze"
9+
SILVER_DIR = BASE_DIR / "silver"
10+
11+
spark = SparkSession.builder.appName("BronzeToSilverLayer").getOrCreate()
12+
13+
def clean_text(text):
14+
return re.sub(r"[^a-zA-Z0-9,.\\\"\' ]", '', str(text))
15+
16+
clean_text_udf = udf(clean_text, StringType())
17+
18+
SILVER_DIR.mkdir(parents=True, exist_ok=True)
19+
20+
df_bio = spark.read.parquet(str(BRONZE_DIR / "athlete_bio"))
21+
df_results = spark.read.parquet(str(BRONZE_DIR / "athlete_event_results"))
22+
23+
df_bio_cleaned = df_bio.withColumn("name", clean_text_udf(df_bio["name"]))
24+
df_results_cleaned = df_results.withColumn("event", clean_text_udf(df_results["event"]))
25+
26+
df_bio_cleaned.write.mode("overwrite").parquet(str(SILVER_DIR / "athlete_bio"))
27+
df_results_cleaned.write.mode("overwrite").parquet(str(SILVER_DIR / "athlete_event_results"))
28+
29+
df_bio_cleaned.show(3)
30+
df_results_cleaned.show(3)
31+
32+
print(f"Bio rows: {df_bio_cleaned.count()}")
33+
print(f"Results rows: {df_results_cleaned.count()}")
34+
35+
spark.stop()

olha00/landing_to_bronze.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import requests
2+
from pyspark.sql import SparkSession
3+
from pathlib import Path
4+
5+
BASE_DIR = Path(__file__).resolve().parent
6+
BRONZE_DIR = BASE_DIR / "bronze"
7+
8+
spark = SparkSession.builder.appName("LandingToBronzeLayer").getOrCreate()
9+
10+
11+
def download_data(local_file_path):
12+
url = "https://ftp.goit.study/neoversity/"
13+
downloading_url = url + local_file_path + ".csv"
14+
print(f"Downloading: {downloading_url}")
15+
response = requests.get(downloading_url)
16+
17+
if response.status_code == 200:
18+
save_path = BRONZE_DIR / f"{local_file_path}.csv"
19+
with open(save_path, "wb") as file:
20+
file.write(response.content)
21+
print(f"Saved: {save_path}")
22+
else:
23+
print(f"Failed: {local_file_path} (Code: {response.status_code})")
24+
25+
26+
def main():
27+
BRONZE_DIR.mkdir(parents=True, exist_ok=True) # Create folder
28+
29+
files = ["athlete_bio", "athlete_event_results"]
30+
31+
for filename in files:
32+
download_data(filename)
33+
34+
35+
for filename in files:
36+
csv_path = BRONZE_DIR / f"{filename}.csv"
37+
df = spark.read.option("header", True).csv(str(csv_path))
38+
print(f"Preview {filename}:")
39+
df.show(3)
40+
print(f"Rows: {df.count()}")
41+
42+
df.write.mode("overwrite").parquet(str(BRONZE_DIR / filename))
43+
print(f"Parquet saved: {BRONZE_DIR / filename}")
44+
45+
if __name__ == "__main__":
46+
main()
47+
48+
spark.stop()

olha00/project_solution.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import os
2+
from datetime import datetime
3+
from airflow import DAG
4+
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
5+
6+
7+
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
8+
9+
default_args = {
10+
"owner": "airflow",
11+
"start_date": datetime(2025, 12, 10),
12+
"depends_on_past": False,
13+
"retries": 1,
14+
}
15+
16+
with DAG(
17+
dag_id="ola_de_fp2",
18+
default_args=default_args,
19+
schedule_interval=None,
20+
catchup=False,
21+
description="ETL pipeline from landing to gold using Spark and Airflow",
22+
) as dag:
23+
24+
landing_to_bronze = SparkSubmitOperator(
25+
task_id="ola_landing_to_bronze",
26+
application=os.path.join(BASE_DIR, "landing_to_bronze.py"),
27+
conn_id="spark-default",
28+
verbose=True,
29+
)
30+
31+
bronze_to_silver = SparkSubmitOperator(
32+
task_id="ola_bronze_to_silver",
33+
application=os.path.join(BASE_DIR, "bronze_to_silver.py"),
34+
conn_id="spark-default",
35+
verbose=True,
36+
)
37+
38+
silver_to_gold = SparkSubmitOperator(
39+
task_id="ola_silver_to_gold",
40+
application=os.path.join(BASE_DIR, "silver_to_gold.py"),
41+
conn_id="spark-default",
42+
verbose=True,
43+
)
44+
45+
landing_to_bronze >> bronze_to_silver >> silver_to_gold

olha00/silver_to_gold.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from pyspark.sql import SparkSession
2+
from pyspark.sql.functions import avg, current_timestamp
3+
from pathlib import Path
4+
5+
BASE_DIR = Path(__file__).resolve().parent
6+
SILVER_DIR = BASE_DIR / "silver"
7+
GOLD_DIR = BASE_DIR / "gold"
8+
9+
spark = SparkSession.builder.appName("SilverToGoldLayer").getOrCreate()
10+
11+
GOLD_DIR.mkdir(parents=True, exist_ok=True)
12+
13+
df_bio = spark.read.parquet(str(SILVER_DIR / "athlete_bio"))
14+
df_results = spark.read.parquet(str(SILVER_DIR / "athlete_event_results"))
15+
16+
df_joined = df_results.join(df_bio, on="athlete_id", how="inner")
17+
18+
df_avg = df_joined.groupBy(
19+
"sport",
20+
"medal",
21+
"sex",
22+
df_bio["country_noc"]
23+
).agg(
24+
avg("weight").alias("avg_weight"),
25+
avg("height").alias("avg_height")
26+
).withColumn(
27+
"timestamp", current_timestamp()
28+
)
29+
30+
df_avg.show()
31+
32+
df_avg.write.mode("overwrite").parquet(str(GOLD_DIR / "avg_stats"))
33+
34+
spark.stop()

0 commit comments

Comments
 (0)