2
2
"cells" : [
3
3
{
4
4
"cell_type" : " code" ,
5
- "execution_count" : 5 ,
5
+ "execution_count" : 3 ,
6
6
"id" : " e9ae4c8b-4599-4fbb-a545-76b6e3bcb84d" ,
7
7
"metadata" : {},
8
8
"outputs" : [
12
12
"text" : [
13
13
" == Physical Plan ==\n " ,
14
14
" AdaptiveSparkPlan isFinalPlan=false\n " ,
15
- " +- ObjectHashAggregate(keys=[device_id#937, device_type#940], functions=[collect_list(user_id#907, 0, 0)])\n " ,
16
- " +- ObjectHashAggregate(keys=[device_id#937, device_type#940], functions=[partial_collect_list(user_id#907, 0, 0)])\n " ,
17
- " +- Project [device_id#937, device_type#940, user_id#907]\n " ,
18
- " +- SortMergeJoin [device_id#937], [device_id#908], Inner\n " ,
19
- " :- Sort [device_id#937 ASC NULLS FIRST], false, 0\n " ,
20
- " : +- Exchange hashpartitioning(device_id#937, 4), ENSURE_REQUIREMENTS, [plan_id=1320]\n " ,
21
- " : +- Filter isnotnull(device_id#937)\n " ,
22
- " : +- FileScan csv [device_id#937,device_type#940] Batched: false, DataFilters: [isnotnull(device_id#937)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/devices.csv], PartitionFilters: [], PushedFilters: [IsNotNull(device_id)], ReadSchema: struct<device_id:int,device_type:string>\n " ,
23
- " +- Sort [device_id#908 ASC NULLS FIRST], false, 0\n " ,
24
- " +- Exchange hashpartitioning(device_id#908, 4), ENSURE_REQUIREMENTS, [plan_id=1321]\n " ,
25
- " +- Filter isnotnull(device_id#908)\n " ,
26
- " +- InMemoryTableScan [user_id#907, device_id#908], [isnotnull(device_id#908)]\n " ,
27
- " +- InMemoryRelation [user_id#907, device_id#908, event_counts#945L, host_array#946], StorageLevel(disk, memory, deserialized, 1 replicas)\n " ,
28
- " +- ObjectHashAggregate(keys=[user_id#198, device_id#199], functions=[count(1), collect_list(distinct host#201, 0, 0)])\n " ,
29
- " +- Exchange hashpartitioning(user_id#198, device_id#199, 4), ENSURE_REQUIREMENTS, [plan_id=1338]\n " ,
30
- " +- ObjectHashAggregate(keys=[user_id#198, device_id#199], functions=[merge_count(1), partial_collect_list(distinct host#201, 0, 0)])\n " ,
31
- " +- *(2) HashAggregate(keys=[user_id#198, device_id#199, host#201], functions=[merge_count(1)])\n " ,
32
- " +- Exchange hashpartitioning(user_id#198, device_id#199, host#201, 4), ENSURE_REQUIREMENTS, [plan_id=1333]\n " ,
33
- " +- *(1) HashAggregate(keys=[user_id#198, device_id#199, host#201], functions=[partial_count(1)])\n " ,
34
- " +- *(1) Filter isnotnull(user_id#198)\n " ,
35
- " +- FileScan csv [user_id#198,device_id#199,host#201] Batched: false, DataFilters: [isnotnull(user_id#198)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/events.csv], PartitionFilters: [], PushedFilters: [IsNotNull(user_id)], ReadSchema: struct<user_id:int,device_id:int,host:string>\n " ,
15
+ " +- ObjectHashAggregate(keys=[device_id#598, device_type#601], functions=[collect_list(user_id#568, 0, 0)])\n " ,
16
+ " +- ObjectHashAggregate(keys=[device_id#598, device_type#601], functions=[partial_collect_list(user_id#568, 0, 0)])\n " ,
17
+ " +- Project [device_id#598, device_type#601, user_id#568]\n " ,
18
+ " +- SortMergeJoin [device_id#598], [device_id#569], Inner\n " ,
19
+ " :- Sort [device_id#598 ASC NULLS FIRST], false, 0\n " ,
20
+ " : +- Exchange hashpartitioning(device_id#598, 4), ENSURE_REQUIREMENTS, [plan_id=735]\n " ,
21
+ " : +- Filter isnotnull(device_id#598)\n " ,
22
+ " : +- FileScan csv [device_id#598,device_type#601] Batched: false, DataFilters: [isnotnull(device_id#598)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/devices.csv], PartitionFilters: [], PushedFilters: [IsNotNull(device_id)], ReadSchema: struct<device_id:int,device_type:string>\n " ,
23
+ " +- Sort [device_id#569 ASC NULLS FIRST], false, 0\n " ,
24
+ " +- Exchange hashpartitioning(device_id#569, 4), ENSURE_REQUIREMENTS, [plan_id=736]\n " ,
25
+ " +- Filter isnotnull(device_id#569)\n " ,
26
+ " +- InMemoryTableScan [user_id#568, device_id#569], [isnotnull(device_id#569)]\n " ,
27
+ " +- InMemoryRelation [user_id#568, device_id#569, event_counts#606L, host_array#607], StorageLevel(disk, memory, deserialized, 1 replicas)\n " ,
28
+ " +- AdaptiveSparkPlan isFinalPlan=false\n " ,
29
+ " +- ObjectHashAggregate(keys=[user_id#17, device_id#18], functions=[count(1), collect_list(distinct host#20, 0, 0)])\n " ,
30
+ " +- Exchange hashpartitioning(user_id#17, device_id#18, 4), ENSURE_REQUIREMENTS, [plan_id=752]\n " ,
31
+ " +- ObjectHashAggregate(keys=[user_id#17, device_id#18], functions=[merge_count(1), partial_collect_list(distinct host#20, 0, 0)])\n " ,
32
+ " +- HashAggregate(keys=[user_id#17, device_id#18, host#20], functions=[merge_count(1)])\n " ,
33
+ " +- Exchange hashpartitioning(user_id#17, device_id#18, host#20, 4), ENSURE_REQUIREMENTS, [plan_id=748]\n " ,
34
+ " +- HashAggregate(keys=[user_id#17, device_id#18, host#20], functions=[partial_count(1)])\n " ,
35
+ " +- Filter isnotnull(user_id#17)\n " ,
36
+ " +- FileScan csv [user_id#17,device_id#18,host#20] Batched: false, DataFilters: [isnotnull(user_id#17)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/events.csv], PartitionFilters: [], PushedFilters: [IsNotNull(user_id)], ReadSchema: struct<user_id:int,device_id:int,host:string>\n " ,
36
37
" \n " ,
37
38
" \n " ,
38
39
" == Physical Plan ==\n " ,
39
40
" AdaptiveSparkPlan isFinalPlan=false\n " ,
40
- " +- ObjectHashAggregate(keys=[user_id#907], functions=[max(event_counts#945L), collect_list(device_id#908, 0, 0)])\n " ,
41
- " +- ObjectHashAggregate(keys=[user_id#907], functions=[partial_max(event_counts#945L), partial_collect_list(device_id#908, 0, 0)])\n " ,
42
- " +- Project [user_id#907, device_id#908, event_counts#945L]\n " ,
43
- " +- SortMergeJoin [user_id#907], [user_id#953], Inner\n " ,
44
- " :- Sort [user_id#907 ASC NULLS FIRST], false, 0\n " ,
45
- " : +- Exchange hashpartitioning(user_id#907, 4), ENSURE_REQUIREMENTS, [plan_id=1374]\n " ,
46
- " : +- Filter isnotnull(user_id#907)\n " ,
47
- " : +- FileScan csv [user_id#907,device_id#908] Batched: false, DataFilters: [isnotnull(user_id#907)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/events.csv], PartitionFilters: [], PushedFilters: [IsNotNull(user_id)], ReadSchema: struct<user_id:int,device_id:int>\n " ,
48
- " +- Sort [user_id#953 ASC NULLS FIRST], false, 0\n " ,
49
- " +- Exchange hashpartitioning(user_id#953, 4), ENSURE_REQUIREMENTS, [plan_id=1375]\n " ,
50
- " +- Filter isnotnull(user_id#953)\n " ,
51
- " +- InMemoryTableScan [user_id#953, event_counts#945L], [isnotnull(user_id#953)]\n " ,
52
- " +- InMemoryRelation [user_id#953, device_id#954, event_counts#945L, host_array#946], StorageLevel(disk, memory, deserialized, 1 replicas)\n " ,
53
- " +- ObjectHashAggregate(keys=[user_id#198, device_id#199], functions=[count(1), collect_list(distinct host#201, 0, 0)])\n " ,
54
- " +- Exchange hashpartitioning(user_id#198, device_id#199, 4), ENSURE_REQUIREMENTS, [plan_id=1392]\n " ,
55
- " +- ObjectHashAggregate(keys=[user_id#198, device_id#199], functions=[merge_count(1), partial_collect_list(distinct host#201, 0, 0)])\n " ,
56
- " +- *(2) HashAggregate(keys=[user_id#198, device_id#199, host#201], functions=[merge_count(1)])\n " ,
57
- " +- Exchange hashpartitioning(user_id#198, device_id#199, host#201, 4), ENSURE_REQUIREMENTS, [plan_id=1387]\n " ,
58
- " +- *(1) HashAggregate(keys=[user_id#198, device_id#199, host#201], functions=[partial_count(1)])\n " ,
59
- " +- *(1) Filter isnotnull(user_id#198)\n " ,
60
- " +- FileScan csv [user_id#198,device_id#199,host#201] Batched: false, DataFilters: [isnotnull(user_id#198)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/events.csv], PartitionFilters: [], PushedFilters: [IsNotNull(user_id)], ReadSchema: struct<user_id:int,device_id:int,host:string>\n " ,
41
+ " +- ObjectHashAggregate(keys=[user_id#568], functions=[max(event_counts#606L), collect_list(device_id#569, 0, 0)])\n " ,
42
+ " +- ObjectHashAggregate(keys=[user_id#568], functions=[partial_max(event_counts#606L), partial_collect_list(device_id#569, 0, 0)])\n " ,
43
+ " +- Project [user_id#568, device_id#569, event_counts#606L]\n " ,
44
+ " +- SortMergeJoin [user_id#568], [user_id#614], Inner\n " ,
45
+ " :- Sort [user_id#568 ASC NULLS FIRST], false, 0\n " ,
46
+ " : +- Exchange hashpartitioning(user_id#568, 4), ENSURE_REQUIREMENTS, [plan_id=788]\n " ,
47
+ " : +- Filter isnotnull(user_id#568)\n " ,
48
+ " : +- FileScan csv [user_id#568,device_id#569] Batched: false, DataFilters: [isnotnull(user_id#568)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/events.csv], PartitionFilters: [], PushedFilters: [IsNotNull(user_id)], ReadSchema: struct<user_id:int,device_id:int>\n " ,
49
+ " +- Sort [user_id#614 ASC NULLS FIRST], false, 0\n " ,
50
+ " +- Exchange hashpartitioning(user_id#614, 4), ENSURE_REQUIREMENTS, [plan_id=789]\n " ,
51
+ " +- Filter isnotnull(user_id#614)\n " ,
52
+ " +- InMemoryTableScan [user_id#614, event_counts#606L], [isnotnull(user_id#614)]\n " ,
53
+ " +- InMemoryRelation [user_id#614, device_id#615, event_counts#606L, host_array#607], StorageLevel(disk, memory, deserialized, 1 replicas)\n " ,
54
+ " +- AdaptiveSparkPlan isFinalPlan=false\n " ,
55
+ " +- ObjectHashAggregate(keys=[user_id#17, device_id#18], functions=[count(1), collect_list(distinct host#20, 0, 0)])\n " ,
56
+ " +- Exchange hashpartitioning(user_id#17, device_id#18, 4), ENSURE_REQUIREMENTS, [plan_id=805]\n " ,
57
+ " +- ObjectHashAggregate(keys=[user_id#17, device_id#18], functions=[merge_count(1), partial_collect_list(distinct host#20, 0, 0)])\n " ,
58
+ " +- HashAggregate(keys=[user_id#17, device_id#18, host#20], functions=[merge_count(1)])\n " ,
59
+ " +- Exchange hashpartitioning(user_id#17, device_id#18, host#20, 4), ENSURE_REQUIREMENTS, [plan_id=801]\n " ,
60
+ " +- HashAggregate(keys=[user_id#17, device_id#18, host#20], functions=[partial_count(1)])\n " ,
61
+ " +- Filter isnotnull(user_id#17)\n " ,
62
+ " +- FileScan csv [user_id#17,device_id#18,host#20] Batched: false, DataFilters: [isnotnull(user_id#17)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/events.csv], PartitionFilters: [], PushedFilters: [IsNotNull(user_id)], ReadSchema: struct<user_id:int,device_id:int,host:string>\n " ,
61
63
" \n " ,
62
64
" \n "
63
65
]
73
75
" eventsAggregated: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [user_id: int, device_id: int ... 2 more fields]\n " ,
74
76
" usersAndDevices: org.apache.spark.sql.DataFrame = [user_id: int, user_id: int ... 2 more fields]\n " ,
75
77
" devicesOnEvents: org.apache.spark.sql.DataFrame = [device_id: int, device_type: string ... 3 more fields]\n " ,
76
- " res4 : Array[org.apache.spark.sql.Row] = Array([-2147470439,-2147470439,3,WrappedArray(378988111, 378988111, 378988111)])\n "
78
+ " res1 : Array[org.apache.spark.sql.Row] = Array([-2147470439,-2147470439,3,WrappedArray(378988111, 378988111, 378988111)])\n "
77
79
]
78
80
},
79
- "execution_count" : 5 ,
81
+ "execution_count" : 3 ,
80
82
"metadata" : {},
81
83
"output_type" : " execute_result"
82
84
}
107
109
" //Caching here should be < 5 GBs or used for broadcast join\n " ,
108
110
" //You need to tune executor memory otherwise it'll spill to disk and be slow\n " ,
109
111
" //Don't really try using any of the other StorageLevel besides MEMORY_ONLY\n " ,
112
+ " \n " ,
110
113
" val eventsAggregated = spark.sql(f\"\"\"\n " ,
111
114
" SELECT user_id, \n " ,
112
115
" device_id, \n " ,
207
210
},
208
211
"nbformat" : 4 ,
209
212
"nbformat_minor" : 5
210
- }
213
+ }
0 commit comments