Skip to content

Commit 67b1133

Browse files
committed
week 3
1 parent b06d53e commit 67b1133

14 files changed

+364
-265
lines changed

bootcamp/materials/3-spark-fundamentals/notebooks/Caching.ipynb

Lines changed: 49 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"cells": [
33
{
44
"cell_type": "code",
5-
"execution_count": 5,
5+
"execution_count": 3,
66
"id": "e9ae4c8b-4599-4fbb-a545-76b6e3bcb84d",
77
"metadata": {},
88
"outputs": [
@@ -12,52 +12,54 @@
1212
"text": [
1313
"== Physical Plan ==\n",
1414
"AdaptiveSparkPlan isFinalPlan=false\n",
15-
"+- ObjectHashAggregate(keys=[device_id#937, device_type#940], functions=[collect_list(user_id#907, 0, 0)])\n",
16-
" +- ObjectHashAggregate(keys=[device_id#937, device_type#940], functions=[partial_collect_list(user_id#907, 0, 0)])\n",
17-
" +- Project [device_id#937, device_type#940, user_id#907]\n",
18-
" +- SortMergeJoin [device_id#937], [device_id#908], Inner\n",
19-
" :- Sort [device_id#937 ASC NULLS FIRST], false, 0\n",
20-
" : +- Exchange hashpartitioning(device_id#937, 4), ENSURE_REQUIREMENTS, [plan_id=1320]\n",
21-
" : +- Filter isnotnull(device_id#937)\n",
22-
" : +- FileScan csv [device_id#937,device_type#940] Batched: false, DataFilters: [isnotnull(device_id#937)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/devices.csv], PartitionFilters: [], PushedFilters: [IsNotNull(device_id)], ReadSchema: struct<device_id:int,device_type:string>\n",
23-
" +- Sort [device_id#908 ASC NULLS FIRST], false, 0\n",
24-
" +- Exchange hashpartitioning(device_id#908, 4), ENSURE_REQUIREMENTS, [plan_id=1321]\n",
25-
" +- Filter isnotnull(device_id#908)\n",
26-
" +- InMemoryTableScan [user_id#907, device_id#908], [isnotnull(device_id#908)]\n",
27-
" +- InMemoryRelation [user_id#907, device_id#908, event_counts#945L, host_array#946], StorageLevel(disk, memory, deserialized, 1 replicas)\n",
28-
" +- ObjectHashAggregate(keys=[user_id#198, device_id#199], functions=[count(1), collect_list(distinct host#201, 0, 0)])\n",
29-
" +- Exchange hashpartitioning(user_id#198, device_id#199, 4), ENSURE_REQUIREMENTS, [plan_id=1338]\n",
30-
" +- ObjectHashAggregate(keys=[user_id#198, device_id#199], functions=[merge_count(1), partial_collect_list(distinct host#201, 0, 0)])\n",
31-
" +- *(2) HashAggregate(keys=[user_id#198, device_id#199, host#201], functions=[merge_count(1)])\n",
32-
" +- Exchange hashpartitioning(user_id#198, device_id#199, host#201, 4), ENSURE_REQUIREMENTS, [plan_id=1333]\n",
33-
" +- *(1) HashAggregate(keys=[user_id#198, device_id#199, host#201], functions=[partial_count(1)])\n",
34-
" +- *(1) Filter isnotnull(user_id#198)\n",
35-
" +- FileScan csv [user_id#198,device_id#199,host#201] Batched: false, DataFilters: [isnotnull(user_id#198)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/events.csv], PartitionFilters: [], PushedFilters: [IsNotNull(user_id)], ReadSchema: struct<user_id:int,device_id:int,host:string>\n",
15+
"+- ObjectHashAggregate(keys=[device_id#598, device_type#601], functions=[collect_list(user_id#568, 0, 0)])\n",
16+
" +- ObjectHashAggregate(keys=[device_id#598, device_type#601], functions=[partial_collect_list(user_id#568, 0, 0)])\n",
17+
" +- Project [device_id#598, device_type#601, user_id#568]\n",
18+
" +- SortMergeJoin [device_id#598], [device_id#569], Inner\n",
19+
" :- Sort [device_id#598 ASC NULLS FIRST], false, 0\n",
20+
" : +- Exchange hashpartitioning(device_id#598, 4), ENSURE_REQUIREMENTS, [plan_id=735]\n",
21+
" : +- Filter isnotnull(device_id#598)\n",
22+
" : +- FileScan csv [device_id#598,device_type#601] Batched: false, DataFilters: [isnotnull(device_id#598)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/devices.csv], PartitionFilters: [], PushedFilters: [IsNotNull(device_id)], ReadSchema: struct<device_id:int,device_type:string>\n",
23+
" +- Sort [device_id#569 ASC NULLS FIRST], false, 0\n",
24+
" +- Exchange hashpartitioning(device_id#569, 4), ENSURE_REQUIREMENTS, [plan_id=736]\n",
25+
" +- Filter isnotnull(device_id#569)\n",
26+
" +- InMemoryTableScan [user_id#568, device_id#569], [isnotnull(device_id#569)]\n",
27+
" +- InMemoryRelation [user_id#568, device_id#569, event_counts#606L, host_array#607], StorageLevel(disk, memory, deserialized, 1 replicas)\n",
28+
" +- AdaptiveSparkPlan isFinalPlan=false\n",
29+
" +- ObjectHashAggregate(keys=[user_id#17, device_id#18], functions=[count(1), collect_list(distinct host#20, 0, 0)])\n",
30+
" +- Exchange hashpartitioning(user_id#17, device_id#18, 4), ENSURE_REQUIREMENTS, [plan_id=752]\n",
31+
" +- ObjectHashAggregate(keys=[user_id#17, device_id#18], functions=[merge_count(1), partial_collect_list(distinct host#20, 0, 0)])\n",
32+
" +- HashAggregate(keys=[user_id#17, device_id#18, host#20], functions=[merge_count(1)])\n",
33+
" +- Exchange hashpartitioning(user_id#17, device_id#18, host#20, 4), ENSURE_REQUIREMENTS, [plan_id=748]\n",
34+
" +- HashAggregate(keys=[user_id#17, device_id#18, host#20], functions=[partial_count(1)])\n",
35+
" +- Filter isnotnull(user_id#17)\n",
36+
" +- FileScan csv [user_id#17,device_id#18,host#20] Batched: false, DataFilters: [isnotnull(user_id#17)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/events.csv], PartitionFilters: [], PushedFilters: [IsNotNull(user_id)], ReadSchema: struct<user_id:int,device_id:int,host:string>\n",
3637
"\n",
3738
"\n",
3839
"== Physical Plan ==\n",
3940
"AdaptiveSparkPlan isFinalPlan=false\n",
40-
"+- ObjectHashAggregate(keys=[user_id#907], functions=[max(event_counts#945L), collect_list(device_id#908, 0, 0)])\n",
41-
" +- ObjectHashAggregate(keys=[user_id#907], functions=[partial_max(event_counts#945L), partial_collect_list(device_id#908, 0, 0)])\n",
42-
" +- Project [user_id#907, device_id#908, event_counts#945L]\n",
43-
" +- SortMergeJoin [user_id#907], [user_id#953], Inner\n",
44-
" :- Sort [user_id#907 ASC NULLS FIRST], false, 0\n",
45-
" : +- Exchange hashpartitioning(user_id#907, 4), ENSURE_REQUIREMENTS, [plan_id=1374]\n",
46-
" : +- Filter isnotnull(user_id#907)\n",
47-
" : +- FileScan csv [user_id#907,device_id#908] Batched: false, DataFilters: [isnotnull(user_id#907)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/events.csv], PartitionFilters: [], PushedFilters: [IsNotNull(user_id)], ReadSchema: struct<user_id:int,device_id:int>\n",
48-
" +- Sort [user_id#953 ASC NULLS FIRST], false, 0\n",
49-
" +- Exchange hashpartitioning(user_id#953, 4), ENSURE_REQUIREMENTS, [plan_id=1375]\n",
50-
" +- Filter isnotnull(user_id#953)\n",
51-
" +- InMemoryTableScan [user_id#953, event_counts#945L], [isnotnull(user_id#953)]\n",
52-
" +- InMemoryRelation [user_id#953, device_id#954, event_counts#945L, host_array#946], StorageLevel(disk, memory, deserialized, 1 replicas)\n",
53-
" +- ObjectHashAggregate(keys=[user_id#198, device_id#199], functions=[count(1), collect_list(distinct host#201, 0, 0)])\n",
54-
" +- Exchange hashpartitioning(user_id#198, device_id#199, 4), ENSURE_REQUIREMENTS, [plan_id=1392]\n",
55-
" +- ObjectHashAggregate(keys=[user_id#198, device_id#199], functions=[merge_count(1), partial_collect_list(distinct host#201, 0, 0)])\n",
56-
" +- *(2) HashAggregate(keys=[user_id#198, device_id#199, host#201], functions=[merge_count(1)])\n",
57-
" +- Exchange hashpartitioning(user_id#198, device_id#199, host#201, 4), ENSURE_REQUIREMENTS, [plan_id=1387]\n",
58-
" +- *(1) HashAggregate(keys=[user_id#198, device_id#199, host#201], functions=[partial_count(1)])\n",
59-
" +- *(1) Filter isnotnull(user_id#198)\n",
60-
" +- FileScan csv [user_id#198,device_id#199,host#201] Batched: false, DataFilters: [isnotnull(user_id#198)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/events.csv], PartitionFilters: [], PushedFilters: [IsNotNull(user_id)], ReadSchema: struct<user_id:int,device_id:int,host:string>\n",
41+
"+- ObjectHashAggregate(keys=[user_id#568], functions=[max(event_counts#606L), collect_list(device_id#569, 0, 0)])\n",
42+
" +- ObjectHashAggregate(keys=[user_id#568], functions=[partial_max(event_counts#606L), partial_collect_list(device_id#569, 0, 0)])\n",
43+
" +- Project [user_id#568, device_id#569, event_counts#606L]\n",
44+
" +- SortMergeJoin [user_id#568], [user_id#614], Inner\n",
45+
" :- Sort [user_id#568 ASC NULLS FIRST], false, 0\n",
46+
" : +- Exchange hashpartitioning(user_id#568, 4), ENSURE_REQUIREMENTS, [plan_id=788]\n",
47+
" : +- Filter isnotnull(user_id#568)\n",
48+
" : +- FileScan csv [user_id#568,device_id#569] Batched: false, DataFilters: [isnotnull(user_id#568)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/events.csv], PartitionFilters: [], PushedFilters: [IsNotNull(user_id)], ReadSchema: struct<user_id:int,device_id:int>\n",
49+
" +- Sort [user_id#614 ASC NULLS FIRST], false, 0\n",
50+
" +- Exchange hashpartitioning(user_id#614, 4), ENSURE_REQUIREMENTS, [plan_id=789]\n",
51+
" +- Filter isnotnull(user_id#614)\n",
52+
" +- InMemoryTableScan [user_id#614, event_counts#606L], [isnotnull(user_id#614)]\n",
53+
" +- InMemoryRelation [user_id#614, device_id#615, event_counts#606L, host_array#607], StorageLevel(disk, memory, deserialized, 1 replicas)\n",
54+
" +- AdaptiveSparkPlan isFinalPlan=false\n",
55+
" +- ObjectHashAggregate(keys=[user_id#17, device_id#18], functions=[count(1), collect_list(distinct host#20, 0, 0)])\n",
56+
" +- Exchange hashpartitioning(user_id#17, device_id#18, 4), ENSURE_REQUIREMENTS, [plan_id=805]\n",
57+
" +- ObjectHashAggregate(keys=[user_id#17, device_id#18], functions=[merge_count(1), partial_collect_list(distinct host#20, 0, 0)])\n",
58+
" +- HashAggregate(keys=[user_id#17, device_id#18, host#20], functions=[merge_count(1)])\n",
59+
" +- Exchange hashpartitioning(user_id#17, device_id#18, host#20, 4), ENSURE_REQUIREMENTS, [plan_id=801]\n",
60+
" +- HashAggregate(keys=[user_id#17, device_id#18, host#20], functions=[partial_count(1)])\n",
61+
" +- Filter isnotnull(user_id#17)\n",
62+
" +- FileScan csv [user_id#17,device_id#18,host#20] Batched: false, DataFilters: [isnotnull(user_id#17)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/events.csv], PartitionFilters: [], PushedFilters: [IsNotNull(user_id)], ReadSchema: struct<user_id:int,device_id:int,host:string>\n",
6163
"\n",
6264
"\n"
6365
]
@@ -73,10 +75,10 @@
7375
"eventsAggregated: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [user_id: int, device_id: int ... 2 more fields]\n",
7476
"usersAndDevices: org.apache.spark.sql.DataFrame = [user_id: int, user_id: int ... 2 more fields]\n",
7577
"devicesOnEvents: org.apache.spark.sql.DataFrame = [device_id: int, device_type: string ... 3 more fields]\n",
76-
"res4: Array[org.apache.spark.sql.Row] = Array([-2147470439,-2147470439,3,WrappedArray(378988111, 378988111, 378988111)])\n"
78+
"res1: Array[org.apache.spark.sql.Row] = Array([-2147470439,-2147470439,3,WrappedArray(378988111, 378988111, 378988111)])\n"
7779
]
7880
},
79-
"execution_count": 5,
81+
"execution_count": 3,
8082
"metadata": {},
8183
"output_type": "execute_result"
8284
}
@@ -107,6 +109,7 @@
107109
"//Caching here should be < 5 GBs or used for broadcast join\n",
108110
"//You need to tune executor memory otherwise it'll spill to disk and be slow\n",
109111
"//Don't really try using any of the other StorageLevel besides MEMORY_ONLY\n",
112+
"\n",
110113
"val eventsAggregated = spark.sql(f\"\"\"\n",
111114
" SELECT user_id, \n",
112115
" device_id, \n",
@@ -207,4 +210,4 @@
207210
},
208211
"nbformat": 4,
209212
"nbformat_minor": 5
210-
}
213+
}

bootcamp/materials/3-spark-fundamentals/notebooks/DatasetApi.ipynb

Lines changed: 48 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,37 @@
22
"cells": [
33
{
44
"cell_type": "code",
5-
"execution_count": 14,
5+
"execution_count": 1,
66
"id": "22b842be-6a82-4127-b937-ead4103a92e8",
77
"metadata": {},
88
"outputs": [
99
{
1010
"data": {
1111
"text/plain": [
12-
"res13: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@7f9ff709\n"
12+
"Intitializing Scala interpreter ..."
1313
]
1414
},
15-
"execution_count": 14,
15+
"metadata": {},
16+
"output_type": "display_data"
17+
},
18+
{
19+
"data": {
20+
"text/plain": [
21+
"Spark Web UI available at http://0a64d2ba5c88:4042\n",
22+
"SparkContext available as 'sc' (version = 3.5.1, master = local[*], app id = local-1733519375641)\n",
23+
"SparkSession available as 'spark'\n"
24+
]
25+
},
26+
"metadata": {},
27+
"output_type": "display_data"
28+
},
29+
{
30+
"data": {
31+
"text/plain": [
32+
"res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@591302c9\n"
33+
]
34+
},
35+
"execution_count": 1,
1636
"metadata": {},
1737
"output_type": "execute_result"
1838
}
@@ -23,42 +43,28 @@
2343
},
2444
{
2545
"cell_type": "code",
26-
"execution_count": 32,
46+
"execution_count": null,
47+
"id": "d8d4270e-b96d-4437-808a-994b0bb996b5",
48+
"metadata": {},
49+
"outputs": [],
50+
"source": [
51+
"# If something is nullabe, you need to wrap the value type in Option[] - this helps enforce assumptions about the pipeline"
52+
]
53+
},
54+
{
55+
"cell_type": "code",
56+
"execution_count": 2,
2757
"id": "73b5384f-be28-49e3-9bcf-4b9783ba7d91",
2858
"metadata": {},
2959
"outputs": [
3060
{
31-
"ename": "org.apache.spark.SparkRuntimeException",
32-
"evalue": " Error while decoding: java.lang.NullPointerException",
61+
"ename": "<console>",
62+
"evalue": "80: error: illegal start of simple expression",
3363
"output_type": "error",
3464
"traceback": [
35-
"org.apache.spark.SparkRuntimeException: Error while decoding: java.lang.NullPointerException",
36-
"newInstance(class Event).",
37-
" at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionDecodingError(QueryExecutionErrors.scala:1543)",
38-
" at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:178)",
39-
" at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:166)",
40-
" at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)",
41-
" at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)",
42-
" at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)",
43-
" at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)",
44-
" at scala.collection.TraversableLike.map(TraversableLike.scala:286)",
45-
" at scala.collection.TraversableLike.map$(TraversableLike.scala:279)",
46-
" at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)",
47-
" at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4177)",
48-
" at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3161)",
49-
" at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)",
50-
" at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)",
51-
" at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)",
52-
" at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)",
53-
" at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)",
54-
" at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)",
55-
" at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)",
56-
" at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)",
57-
" at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)",
58-
" at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)",
59-
" at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)",
60-
" ... 49 elided",
61-
"Caused by: java.lang.NullPointerException",
65+
"<console>:80: error: illegal start of simple expression",
66+
" .map( case (row: EventWithDeviceInfo) => {",
67+
" ^",
6268
""
6369
]
6470
}
@@ -186,6 +192,14 @@
186192
"combinedViaDatasets.take(5)\n"
187193
]
188194
},
195+
{
196+
"cell_type": "code",
197+
"execution_count": null,
198+
"id": "3ce150e3-e5b7-4ece-8803-8189762625ea",
199+
"metadata": {},
200+
"outputs": [],
201+
"source": []
202+
},
189203
{
190204
"cell_type": "code",
191205
"execution_count": null,
@@ -226,4 +240,4 @@
226240
},
227241
"nbformat": 4,
228242
"nbformat_minor": 5
229-
}
243+
}

0 commit comments

Comments
 (0)