Skip to content

Commit aa37c72

Browse files
committed
mds6_rdd_dags fp_v2
1 parent cbbdb56 commit aa37c72

File tree

1 file changed

+46
-37
lines changed

1 file changed

+46
-37
lines changed

dags/rdd/landing_to_bronze.py

Lines changed: 46 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,66 @@
11
import sys
2+
import os
23
import logging
4+
import requests
35
from pyspark.sql import SparkSession
46

5-
6-
# -------------------- logging config --------------------
7+
# -------------------- logging --------------------
78
logging.basicConfig(
89
level=logging.INFO,
9-
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
10+
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s"
1011
)
1112
logger = logging.getLogger("landing_to_bronze")
12-
# --------------------------------------------------------
13+
# -------------------------------------------------
1314

15+
def download_csv(url: str, local_path: str):
16+
"""Скачивает CSV с URL в локальный файл"""
17+
logger.info(f"Downloading CSV from {url} to {local_path}")
18+
r = requests.get(url)
19+
r.raise_for_status() # если ошибка HTTP, падение
20+
with open(local_path, "wb") as f:
21+
f.write(r.content)
22+
logger.info("Download completed")
23+
return local_path
1424

15-
def main(table_name: str):
16-
logger.info("Starting landing_to_bronze job")
17-
logger.info(f"Table name: {table_name}")
1825

19-
spark = (
20-
SparkSession.builder
21-
.appName(f"Landing to Bronze - {table_name}")
22-
.getOrCreate()
23-
)
26+
def main(table_name: str):
27+
logger.info(f"Starting landing_to_bronze job for table: {table_name}")
2428

29+
spark = SparkSession.builder.appName(f"Landing to Bronze - {table_name}").getOrCreate()
2530
logger.info("SparkSession created")
2631

27-
# URL to source CSV
28-
url = f"https://ftp.goit.study/neoversity/{table_name}.csv"
29-
logger.info(f"Source URL: {url}")
32+
# Определяем URL для скачивания
33+
urls = {
34+
"athlete_bio": "https://ftp.goit.study/neoversity/athlete_bio.csv",
35+
"athlete_event_results": "https://ftp.goit.study/neoversity/athlete_event_results.csv"
36+
}
37+
38+
if table_name not in urls:
39+
logger.error(f"Unknown table_name: {table_name}")
40+
spark.stop()
41+
sys.exit(1)
42+
43+
url = urls[table_name]
3044

31-
# Read CSV
32-
logger.info("Reading CSV from source")
33-
df = (
34-
spark.read
35-
.option("header", "true")
36-
.option("inferSchema", "true")
37-
.csv(url)
38-
)
45+
# Локальный временный файл
46+
tmp_dir = "/tmp"
47+
os.makedirs(tmp_dir, exist_ok=True)
48+
local_csv_path = os.path.join(tmp_dir, f"{table_name}.csv")
3949

40-
logger.info(f"CSV loaded successfully. Rows count: {df.count()}")
50+
# Скачиваем CSV
51+
download_csv(url, local_csv_path)
4152

42-
# Write to bronze (relative path)
43-
output_path = f"bronze/{table_name}"
44-
logger.info(f"Writing data to parquet. Output path: {output_path}")
53+
# Читаем CSV через Spark
54+
logger.info("Reading CSV into Spark DataFrame")
55+
df = spark.read.option("header", "true").option("inferSchema", "true").csv(local_csv_path)
56+
logger.info(f"CSV loaded. Number of rows: {df.count()}")
4557

46-
(
47-
df.write
48-
.mode("overwrite")
49-
.parquet(output_path)
50-
)
58+
# Путь для сохранения bronze
59+
bronze_path = os.path.join("bronze", table_name)
60+
os.makedirs(bronze_path, exist_ok=True)
5161

62+
logger.info(f"Writing DataFrame to bronze path: {bronze_path}")
63+
df.write.mode("overwrite").parquet(bronze_path)
5264
logger.info("Write completed successfully")
5365

5466
spark.stop()
@@ -58,10 +70,7 @@ def main(table_name: str):
5870

5971
if __name__ == "__main__":
6072
if len(sys.argv) != 2:
61-
logger.error(
62-
"Invalid arguments. Usage: python landing_to_bronze.py <table_name>"
63-
)
73+
logger.error("Usage: python landing_to_bronze.py <table_name>")
6474
sys.exit(1)
6575

66-
table = sys.argv[1]
67-
main(table)
76+
main(sys.argv[1])

0 commit comments

Comments
 (0)