Skip to content

Commit d06d054

Browse files
committed
2025-05-21-04-59-55
1 parent eab7f00 commit d06d054

File tree

10 files changed

+84
-39
lines changed

10 files changed

+84
-39
lines changed

.python-version

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3.13

capstone/rainforest/etl/silver/dim_buyer.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,19 +67,23 @@ def transform_upstream(
6767
# Rename common columns in appuser_data to avoid conflicts
6868
appuser_data = appuser_data.selectExpr(
6969
*[
70-
f"`{col}` as appuser_{col}"
71-
if col in common_columns and col != "user_id"
72-
else col
70+
(
71+
f"`{col}` as appuser_{col}"
72+
if col in common_columns and col != "user_id"
73+
else col
74+
)
7375
for col in appuser_data.columns
7476
]
7577
)
7678

7779
# Rename common columns in buyer_data to avoid conflicts
7880
buyer_data = buyer_data.selectExpr(
7981
*[
80-
f"`{col}` as buyer_{col}"
81-
if col in common_columns and col != "user_id"
82-
else col
82+
(
83+
f"`{col}` as buyer_{col}"
84+
if col in common_columns and col != "user_id"
85+
else col
86+
)
8387
for col in buyer_data.columns
8488
]
8589
)

capstone/rainforest/etl/silver/dim_product.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -70,19 +70,23 @@ def transform_upstream(
7070
# Rename common columns in product_data to avoid conflicts
7171
product_data = product_data.selectExpr(
7272
*[
73-
f"`{col}` as product_{col}"
74-
if col in common_columns and col != "brand_id"
75-
else col
73+
(
74+
f"`{col}` as product_{col}"
75+
if col in common_columns and col != "brand_id"
76+
else col
77+
)
7678
for col in product_data.columns
7779
]
7880
)
7981

8082
# Rename common columns in brand_data to avoid conflicts
8183
brand_data = brand_data.selectExpr(
8284
*[
83-
f"`{col}` as brand_{col}"
84-
if col in common_columns and col != "brand_id"
85-
else col
85+
(
86+
f"`{col}` as brand_{col}"
87+
if col in common_columns and col != "brand_id"
88+
else col
89+
)
8690
for col in brand_data.columns
8791
]
8892
)
@@ -102,20 +106,24 @@ def transform_upstream(
102106
# Rename common columns in dim_product_data to avoid conflicts
103107
dim_product_data = dim_product_data.selectExpr(
104108
*[
105-
f"`{col}` as product_{col}"
106-
if col in common_columns
107-
and col not in ["brand_id", "manufacturer_id"]
108-
else col
109+
(
110+
f"`{col}` as product_{col}"
111+
if col in common_columns
112+
and col not in ["brand_id", "manufacturer_id"]
113+
else col
114+
)
109115
for col in dim_product_data.columns
110116
]
111117
)
112118

113119
# Rename common columns in manufacturer_data to avoid conflicts
114120
manufacturer_data = manufacturer_data.selectExpr(
115121
*[
116-
f"`{col}` as manufacturer_{col}"
117-
if col in common_columns and col != "manufacturer_id"
118-
else col
122+
(
123+
f"`{col}` as manufacturer_{col}"
124+
if col in common_columns and col != "manufacturer_id"
125+
else col
126+
)
119127
for col in manufacturer_data.columns
120128
]
121129
)

capstone/rainforest/etl/silver/dim_seller.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,19 +67,23 @@ def transform_upstream(
6767
# Rename common columns in appuser_data to avoid conflicts
6868
appuser_data = appuser_data.selectExpr(
6969
*[
70-
f"`{col}` as appuser_{col}"
71-
if col in common_columns and col != "user_id"
72-
else col
70+
(
71+
f"`{col}` as appuser_{col}"
72+
if col in common_columns and col != "user_id"
73+
else col
74+
)
7375
for col in appuser_data.columns
7476
]
7577
)
7678

7779
# Rename common columns in seller_data to avoid conflicts
7880
seller_data = seller_data.selectExpr(
7981
*[
80-
f"`{col}` as seller_{col}"
81-
if col in common_columns and col != "user_id"
82-
else col
82+
(
83+
f"`{col}` as seller_{col}"
84+
if col in common_columns and col != "user_id"
85+
else col
86+
)
8387
for col in seller_data.columns
8488
]
8589
)

capstone/rainforest/tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def spark():
1919
.config("spark.dynamicAllocation.enabled", "false")
2020
.config("spark.ui.enabled", "false")
2121
.config("spark.ui.showConsoleProgress", "false")
22-
.config("spark.default.parallelism", 6) # my laptop has 6 cores
22+
.config("spark.default.parallelism", 6) # my laptop has 6 cores
2323
.config("spark.executor.cores", "1")
2424
.config("spark.executor.instances", "1")
2525
.config("spark.sql.shuffle.partitions", "1")

data-processing-spark/1-lab-setup/containers/spark/create_buckets.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import boto3
2-
from botocore.exceptions import ClientError
32
from botocore.client import Config
3+
from botocore.exceptions import ClientError
4+
45

56
def create_s3_client(access_key, secret_key, endpoint, region):
67
"""
@@ -18,9 +19,10 @@ def create_s3_client(access_key, secret_key, endpoint, region):
1819
endpoint_url=endpoint,
1920
aws_access_key_id=access_key,
2021
aws_secret_access_key=secret_key,
21-
config=Config(signature_version='s3v4')
22+
config=Config(signature_version='s3v4'),
2223
)
2324

25+
2426
def create_bucket_if_not_exists(s3_client, bucket_name):
2527
"""
2628
Check if an S3 bucket exists, and if not, create it.
@@ -44,6 +46,7 @@ def create_bucket_if_not_exists(s3_client, bucket_name):
4446
else:
4547
print(f"Error: {e}")
4648

49+
4750
# Credentials and Connection Info
4851
access_key = 'minio'
4952
secret_key = 'minio123'
@@ -53,10 +56,9 @@ def create_bucket_if_not_exists(s3_client, bucket_name):
5356
# Client creation and usage
5457
try:
5558
s3_client = create_s3_client(access_key, secret_key, endpoint, region)
56-
bucket_name = 'tpch'# Replace with your bucket name
59+
bucket_name = 'tpch' # Replace with your bucket name
5760
create_bucket_if_not_exists(s3_client, bucket_name)
58-
bucket_name = 'rainforest'# Replace with your bucket name
61+
bucket_name = 'rainforest' # Replace with your bucket name
5962
create_bucket_if_not_exists(s3_client, bucket_name)
6063
except:
6164
print("Full catch, check bucket creation script at create_buckets.py")
62-

data-processing-spark/2-apache-spark-basics/architecture/resource_config.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
import pprint
2+
from typing import Dict
23

34
from pyspark.sql import SparkSession
45

56

7+
def some_function(n_name: str) -> Dict[str, str]:
8+
return None
9+
10+
611
def run_code(spark):
712
print("============================================")
813
print("PRINT SPARKSESSION RESOURCE CONFIGS")
@@ -12,11 +17,13 @@ def run_code(spark):
1217

1318
# Print the resource configurations
1419
print("Resource Configurations:")
15-
pp = pprint.PrettyPrinter(indent=4)
20+
pp = pprint.PrettyPrinter(
21+
indent=4,
22+
)
1623
pp.pprint(dict(conf.getAll()))
1724

1825

19-
if __name__ == '__main__':
26+
if __name__ == "__main__":
2027
spark = (
2128
SparkSession.builder.appName("efficient-data-processing-spark")
2229
.enableHiveSupport()

data-processing-spark/4-data-processing/2-app-job-stage-task/spark_app_config.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33

44
def run_code(spark):
55
print("==========================================")
6-
print(f"Running a simple query with {spark.sparkContext.getConf().get('spark.app.name')}")
6+
print(
7+
f"Running a simple query with {spark.sparkContext.getConf().get('spark.app.name')}"
8+
)
79
print("==========================================")
810

911
spark.sql(
@@ -15,19 +17,23 @@ def run_code(spark):
1517
"""
1618
).show(10)
1719

20+
1821
if __name__ == '__main__':
1922

2023
spark = (
2124
SparkSession.builder.appName("Custom config")
22-
.config("spark.executor.memory", "2g")
23-
.config("spark.executor.cores", "3") # total cores across all executors
24-
.config("spark.cores.max", "3")
25-
.config("spark.memory.fraction", "0.9") # set aside 10% for user memory, rest for Spark data processing
25+
.config("spark.executor.memory", "2g")
26+
.config(
27+
"spark.executor.cores", "3"
28+
) # total cores across all executors
29+
.config("spark.cores.max", "3")
30+
.config(
31+
"spark.memory.fraction", "0.9"
32+
) # set aside 10% for user memory, rest for Spark data processing
2633
.enableHiveSupport()
2734
.getOrCreate()
2835
)
2936
# Set the log level
3037
spark.sparkContext.setLogLevel("ERROR")
3138
run_code(spark=spark)
3239
spark.stop()
33-

main.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
def main():
2+
print("Hello from efficient-data-processing-spark!")
3+
4+
5+
if __name__ == "__main__":
6+
main()

pyproject.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
[project]
2+
name = "efficient-data-processing-spark"
3+
version = "0.1.0"
4+
description = "Add your description here"
5+
readme = "README.md"
6+
requires-python = ">=3.13"
7+
dependencies = []

0 commit comments

Comments
 (0)