Skip to content

Commit c36bef4

Browse files
author
syncmachineuser
committed
Updated
1 parent 5122a3e commit c36bef4

File tree

11 files changed

+1760
-0
lines changed

11 files changed

+1760
-0
lines changed

docs/event_log_download.pdf

570 KB
Binary file not shown.

docs/output.png

1.53 MB
Loading

parsing_models/__init__.py

Whitespace-only changes.
Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
import collections
2+
import json
3+
import logging
4+
import numpy
5+
6+
import os
7+
import boto3
8+
import gzip
9+
10+
from .job_model import JobModel
11+
from .executor_model import ExecutorModel
12+
from .dag_model import DagModel
13+
14+
def get_json(line):
15+
# Need to first strip the trailing newline, and then escape newlines (which can appear
16+
# in the middle of some of the JSON) so that JSON library doesn't barf.
17+
return json.loads(line.strip("\n").replace("\n", "\\n"))
18+
19+
class ApplicationModel:
20+
"""
21+
Model for a spark application. A spark application consists of one or more jobs, which consists of one or more stages, which consists of one or more tasks.
22+
23+
Spark application parameters can be parsed from an event log.
24+
25+
Using parts of the trace analyzer from Kay Ousterhout: https://github.com/kayousterhout/trace-analysis
26+
27+
"""
28+
29+
def __init__(self, eventlogpath, bucket=None, stdoutpath=None):
30+
# set default parameters
31+
self.eventlogpath = eventlogpath
32+
self.dag = DagModel()
33+
self.jobs = collections.defaultdict(JobModel)
34+
self.sql = collections.defaultdict(dict)
35+
self.accum_metrics = collections.defaultdict(dict)
36+
self.executors = collections.defaultdict(ExecutorModel)
37+
self.jobs_for_stage = {}
38+
self.num_executors = 0
39+
self.max_executors = 0
40+
self.executorRemovedEarly = False
41+
self.parallelism = None
42+
self.memory_per_executor = None
43+
self.cores_per_executor = None
44+
self.num_instances = None
45+
self.start_time = None
46+
self.finish_time = None
47+
self.platformIdentified = False
48+
self.platform = None
49+
self.spark_metadata = {}
50+
self.stdoutpath = stdoutpath
51+
52+
self.shuffle_partitions = 200
53+
54+
# if bucket is None, then files are in local directory, else read from s3
55+
# read event log
56+
if bucket is None:
57+
58+
if '.gz' in eventlogpath:
59+
f = gzip.open(eventlogpath, "rt")
60+
else:
61+
f = open(eventlogpath, "r")
62+
test_line = f.readline()
63+
# print(get_json(test_line))
64+
65+
else:
66+
s3_resource = boto3.resource('s3')
67+
bucket = s3_resource.Bucket(bucket)
68+
key = eventlogpath
69+
obj = bucket.Object(key=key)
70+
response = obj.get()
71+
72+
if '.gz' in key:
73+
filestring = gzip.decompress(response['Body'].read()).decode('utf-8')
74+
else:
75+
filestring = response['Body'].read().decode('utf-8')
76+
77+
f = filestring.splitlines(True)
78+
test_line = f[0]
79+
80+
try:
81+
get_json(test_line)
82+
is_json = True
83+
# print("Parsing file %s as JSON" % eventlogpath)
84+
except:
85+
is_json = False
86+
print("Not json file. Check eventlog.")
87+
88+
if bucket is None:
89+
f.seek(0)
90+
91+
hosts = []
92+
93+
for line in f:
94+
if is_json:
95+
json_data = get_json(line)
96+
event_type = json_data["Event"]
97+
if event_type == "SparkListenerLogStart":
98+
spark_version_dict = {"spark_version": json_data["Spark Version"]}
99+
self.spark_metadata = {**self.spark_metadata, **spark_version_dict}
100+
elif event_type == "SparkListenerJobStart":
101+
102+
job_id = json_data["Job ID"]
103+
self.jobs[job_id].submission_time = json_data['Submission Time']/1000
104+
# Avoid using "Stage Infos" here, which was added in 1.2.0.
105+
stage_ids = json_data["Stage IDs"]
106+
107+
# print("Stage ids: %s" % stage_ids)
108+
for stage_id in stage_ids:
109+
if stage_id not in self.jobs_for_stage:
110+
self.jobs_for_stage[stage_id] = [job_id]
111+
else:
112+
self.jobs_for_stage[stage_id].append(job_id)
113+
114+
elif event_type == "SparkListenerJobEnd":
115+
job_id = json_data["Job ID"]
116+
self.jobs[job_id].completion_time = json_data['Completion Time']/1000
117+
self.jobs[job_id].result = json_data['Job Result']['Result']
118+
elif event_type == "SparkListenerTaskEnd":
119+
stage_id = json_data["Stage ID"]
120+
# Add the event to all of the jobs that depend on the stage.
121+
for jid in self.jobs_for_stage[stage_id]:
122+
self.jobs[jid].add_event(json_data, True)
123+
124+
elif event_type == "SparkListenerStageCompleted":
125+
stage_id = json_data['Stage Info']["Stage ID"]
126+
127+
# stages may not be executed exclusively from one job
128+
for job_id in self.jobs_for_stage[stage_id]:
129+
self.jobs[job_id].stages[stage_id].submission_time = json_data['Stage Info']['Submission Time']/1000
130+
self.jobs[job_id].stages[stage_id].completion_time = json_data['Stage Info']['Completion Time']/1000
131+
self.jobs[job_id].stages[stage_id].stage_name = json_data['Stage Info']['Stage Name']
132+
self.jobs[job_id].stages[stage_id].num_tasks = json_data['Stage Info']['Number of Tasks']
133+
self.jobs[job_id].stages[stage_id].stage_info = json_data['Stage Info']
134+
135+
elif event_type == "SparkListenerEnvironmentUpdate":
136+
137+
curKeys = json_data["Spark Properties"].keys()
138+
139+
# This if is specifically for databricks logs
140+
if 'spark.databricks.clusterUsageTags.sparkVersion' in curKeys:
141+
json_data['Spark Properties']['platform'] = 'Databricks'
142+
json_data['Spark Properties']['spark_version'] = json_data['Spark Properties']['spark.databricks.clusterUsageTags.sparkVersion']
143+
144+
self.spark_metadata = {**self.spark_metadata, **json_data["Spark Properties"]}
145+
146+
##################################
147+
# Note to predictor team:
148+
# Keeping these for now so nothing breaks, but adding transferring the entire Spark Properties dictionary above
149+
# so we can see what has been set and don't have to manually grab every property. Should be able to remove the
150+
# section below once we make minor tweaks to predictor.
151+
##################################
152+
153+
# if 'spark.executor.instances' in curKeys:
154+
# self.num_executors = int(json_data["Spark Properties"]["spark.executor.instances"])
155+
if 'spark.default.parallelism' in curKeys:
156+
self.parallelism = int(json_data["Spark Properties"]["spark.default.parallelism"])
157+
if 'spark.executor.memory' in curKeys:
158+
self.memory_per_executor = json_data["Spark Properties"]["spark.executor.memory"]
159+
#if 'spark.executor.cores' in curKeys:
160+
# self.cores_per_executor = int(json_data["Spark Properties"]["spark.executor.cores"])
161+
if 'spark.sql.shuffle.partitions' in curKeys:
162+
self.shuffle_partitions = int(json_data["Spark Properties"]["spark.sql.shuffle.partitions"])
163+
164+
elif event_type == "SparkListenerExecutorAdded":
165+
hosts.append(json_data["Executor Info"]["Host"])
166+
self.cores_per_executor = int(json_data["Executor Info"]["Total Cores"])
167+
self.num_executors = self.num_executors+1
168+
self.max_executors = max(self.num_executors, self.max_executors)
169+
self.executors[int(json_data['Executor ID'])] = ExecutorModel(json_data)
170+
self.executors[int(json_data['Executor ID'])].removed_reason = ''
171+
172+
# So far logs I've looked at only explicitly remove Executors when there is a problem like
173+
# lost worker. Use this to flag premature executor removal
174+
elif event_type == "SparkListenerExecutorRemoved":
175+
self.executorRemovedEarly = True
176+
self.num_executors = self.num_executors-1
177+
self.executors[int(json_data['Executor ID'])].end_time = json_data['Timestamp']
178+
self.executors[int(json_data['Executor ID'])].removed_reason = json_data['Removed Reason']
179+
180+
elif event_type == "SparkListenerApplicationStart":
181+
self.start_time = json_data["Timestamp"]/1000
182+
183+
elif event_type == "SparkListenerApplicationEnd":
184+
self.finish_time = json_data["Timestamp"]/1000
185+
186+
elif event_type == "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart":
187+
sql_id = json_data['executionId']
188+
self.sql[sql_id]['start_time'] = json_data['time']/1000
189+
self.sql[sql_id]['description'] = json_data['description']
190+
self.parse_all_accum_metrics(json_data)
191+
192+
elif event_type == "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd":
193+
sql_id = json_data['executionId']
194+
self.sql[sql_id]['end_time']= json_data['time']/1000
195+
self.finish_time = json_data["time"]/1000
196+
197+
elif event_type == "org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate":
198+
self.parse_all_accum_metrics(json_data)
199+
200+
# populate accumulated metrics with updated values
201+
elif event_type == "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates":
202+
sql_id = json_data['executionId']
203+
for metric in json_data['accumUpdates']:
204+
accum_id = metric[0]
205+
self.accum_metrics[accum_id]['value'] = metric[1]
206+
self.accum_metrics[accum_id]['sql_id'] = sql_id
207+
208+
elif event_type == "org.apache.spark.sql.execution.ui.SparkListenerEffectiveSQLConf":
209+
#########################
210+
# Note to predictor team:
211+
# This is spark parameters for each sql execution id
212+
# Need to decide how we want to deal with this.
213+
# Right now the last one is saved, but need to figure out if we want to deal with each execution id's parameters
214+
#########################
215+
self.spark_metadata = {**self.spark_metadata, **json_data["effectiveSQLConf"]}
216+
217+
# Add DAG components
218+
# using stage submitted to preserve order stages are submitted
219+
if event_type in ['SparkListenerJobStart','SparkListenerStageSubmitted']:
220+
self.dag.parse_dag(json_data)
221+
222+
else:
223+
# The file will only contain information for one job.
224+
self.jobs[0].add_event(line, False)
225+
226+
227+
if 'platform' not in self.spark_metadata.keys():
228+
self.spark_metadata['platform'] = 'EMR'
229+
230+
self.dag.decipher_dag()
231+
self.dag.add_broadcast_dependencies(self.stdoutpath)
232+
233+
self.num_instances = len(numpy.unique(hosts))
234+
self.executors_per_instance = numpy.ceil(self.num_executors/self.num_instances)
235+
236+
# print("Finished reading input data:")
237+
for job_id, job in self.jobs.items():
238+
job.initialize_job()
239+
# print("Job", job_id, " has stages: ", job.stages.keys())
240+
241+
def output_all_job_info(self):
242+
for job_id, job in self.jobs.items():
243+
features_filename = f'{os.path.dirname(self.eventlogpath)}/e{self.num_executors}_p{self.parallelism}_mem{self.memory_per_executor}_job{job_id}'
244+
job.write_features(features_filename)
245+
246+
247+
def plot_task_runtime_distribution(self):
248+
"""
249+
For each stage, plot task runtime distribution and calculate how closely it follows normal distribution
250+
251+
Returns true if normal distribution
252+
"""
253+
# TODO: fill in plotting and calculation
254+
return
255+
256+
# Accumulated metrics including broadcast data
257+
def parse_all_accum_metrics(self, accum_data):
258+
# Search recursively for accumulated metrics (can be many layers deep)
259+
for k, v in accum_data.items():
260+
if k == 'metrics':
261+
for metric in v:
262+
accum_id = metric['accumulatorId']
263+
self.accum_metrics[accum_id]['name'] = metric['name']
264+
self.accum_metrics[accum_id]['metric_type'] = metric['metricType']
265+
if isinstance(v, dict):
266+
self.parse_all_accum_metrics(v)
267+
if isinstance(v, list):
268+
for d in v:
269+
if isinstance(d, dict):
270+
self.parse_all_accum_metrics(d)
271+
272+

0 commit comments

Comments
 (0)