Skip to content

Commit 3bf88a0

Browse files
committed
added source files for py testing
1 parent 67b1133 commit 3bf88a0

File tree

7 files changed

+1885
-1
lines changed

7 files changed

+1885
-1
lines changed

bootcamp/materials/3-spark-fundamentals/notebooks/Homework3_Draft.ipynb

Lines changed: 1797 additions & 0 deletions
Large diffs are not rendered by default.

bootcamp/materials/3-spark-fundamentals/notebooks/event_data_pyspark.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
"name": "stderr",
1111
"output_type": "stream",
1212
"text": [
13-
"24/12/06 20:42:52 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.\n"
13+
"24/12/11 15:50:40 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.\n"
1414
]
1515
},
1616
{
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
from pyspark.sql import SparkSession
2+
3+
query = """
4+
5+
WITH with_previous AS (
6+
SELECT actor
7+
, actorid
8+
, current_year
9+
, quality_class
10+
, is_active
11+
, LAG(quality_class, 1) OVER (PARTITION BY actorid ORDER BY current_year) AS previous_quality_class
12+
, LAG(is_active, 1) OVER (PARTITION BY actorid ORDER BY current_year) AS previous_is_active
13+
FROM actors
14+
WHERE current_year < 2021
15+
),
16+
with_indicators AS (
17+
SELECT *
18+
, CASE
19+
WHEN quality_class <> previous_quality_class THEN 1
20+
WHEN is_active <> previous_is_active THEN 1
21+
ELSE 0
22+
END AS change_indicator
23+
FROM with_previous
24+
),
25+
with_streaks AS (
26+
SELECT *
27+
, SUM(change_indicator) OVER (PARTITION BY actorid ORDER BY current_year) AS streak_identifier
28+
FROM with_indicators
29+
)
30+
SELECT
31+
actor
32+
, actorid
33+
, quality_class
34+
, is_active
35+
, MIN(current_year) AS start_year
36+
, MAX(current_year) AS end_year
37+
, 2020 AS current_year
38+
FROM with_streaks
39+
GROUP BY actor
40+
, actorid
41+
, quality_class
42+
, is_active
43+
, streak_identifier
44+
ORDER BY actor
45+
, streak_identifier
46+
47+
"""
48+
49+
50+
def do_actor_scd_transformation(spark, dataframe):
51+
dataframe.createOrReplaceTempView("actors")
52+
return spark.sql(query)
53+
54+
def main():
55+
spark = SparkSession.builder \
56+
.master("local") \
57+
.appName("actors_scd") \
58+
.getOrCreate()
59+
output_df = do_actor_scd_transformation(spark, spark.table("actors"))
60+
output_df.write.mode("overwrite").insertInto("actors_scd")
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from chispa.dataframe_comparer import *
2+
from ..jobs.actors_scd_job import do_actor_scd_transformation
3+
from collections import namedtuple
4+
5+
ActorYear = namedtuple("ActorYear", "actor current_year quality_class")
6+
ActorScd = namedtuple("ActorScd", "actor quality_class start_year end_year")
7+
8+
9+
def test_scd_generation(spark):
10+
source_data = [
11+
ActorYear("Meat Loaf", 2018, 'Good'),
12+
ActorYear("Meat Loaf", 2019, 'Good'),
13+
ActorYear("Meat Loaf", 2020, 'Bad'),
14+
ActorYear("Meat Loaf", 2021, 'Bad'),
15+
ActorYear("Skid Markel", 2020, 'Bad'),
16+
ActorYear("Skid Markel", 2021, 'Bad')
17+
]
18+
source_df = spark.createDataFrame(source_data)
19+
20+
actual_df = do_actor_scd_transformation(spark, source_df)
21+
expected_data = [
22+
ActorScd("Meat Loaf", 'Good', 2018, 2019),
23+
ActorScd("Meat Loaf", 'Bad', 2020, 2021),
24+
ActorScd("Skid Markel", 'Bad', 2020, 2021)
25+
]
26+
expected_df = spark.createDataFrame(expected_data)
27+
assert_df_equality(actual_df, expected_df)

0 commit comments

Comments
 (0)