Skip to content

Commit aeafe4f

Browse files
author
syncmachineuser
committed
Updated
1 parent 27427eb commit aeafe4f

File tree

5 files changed

+65
-61
lines changed

5 files changed

+65
-61
lines changed

parsing_models/application_model.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import os
77
import boto3
88
import gzip
9+
from pprint import pprint
910

1011
from .job_model import JobModel
1112
from .executor_model import ExecutorModel
@@ -51,6 +52,11 @@ def __init__(self, eventlogpath, bucket=None, stdoutpath=None):
5152

5253
self.shuffle_partitions = 200
5354

55+
56+
self.cloud_platform = None
57+
self.cloud_provider = None
58+
self.spark_version = None
59+
5460
# if bucket is None, then files are in local directory, else read from s3
5561
# read event log
5662
if bucket is None:
@@ -95,8 +101,9 @@ def __init__(self, eventlogpath, bucket=None, stdoutpath=None):
95101
json_data = get_json(line)
96102
event_type = json_data["Event"]
97103
if event_type == "SparkListenerLogStart":
98-
spark_version_dict = {"spark_version": json_data["Spark Version"]}
99-
self.spark_metadata = {**self.spark_metadata, **spark_version_dict}
104+
#spark_version_dict = {"spark_version": json_data["Spark Version"]}
105+
self.spark_version = json_data["Spark Version"]
106+
self.spark_metadata = {**self.spark_metadata}
100107
elif event_type == "SparkListenerJobStart":
101108

102109
job_id = json_data["Job ID"]
@@ -134,8 +141,9 @@ def __init__(self, eventlogpath, bucket=None, stdoutpath=None):
134141
elif event_type == "SparkListenerStageCompleted":
135142

136143
# stages may not be executed exclusively from one job
137-
self.finish_time = json_data['Stage Info']['Completion Time']/1000
138144
stage_id = json_data['Stage Info']["Stage ID"]
145+
self.finish_time = json_data['Stage Info']['Completion Time']/1000
146+
139147
for job_id in self.jobs_for_stage[stage_id]:
140148
self.jobs[job_id].stages[stage_id].completion_time = json_data['Stage Info']['Completion Time']/1000
141149

@@ -146,8 +154,10 @@ def __init__(self, eventlogpath, bucket=None, stdoutpath=None):
146154

147155
# This if is specifically for databricks logs
148156
if 'spark.databricks.clusterUsageTags.sparkVersion' in curKeys:
149-
json_data['Spark Properties']['platform'] = 'Databricks'
150-
json_data['Spark Properties']['spark_version'] = json_data['Spark Properties']['spark.databricks.clusterUsageTags.sparkVersion']
157+
158+
self.cloud_platform = 'databricks'
159+
self.cloud_provider = json_data['Spark Properties']['spark.databricks.clusterUsageTags.cloudProvider'].lower()
160+
self.spark_version = json_data['Spark Properties']['spark.databricks.clusterUsageTags.sparkVersion']
151161

152162
self.spark_metadata = {**self.spark_metadata, **json_data["Spark Properties"]}
153163

@@ -234,8 +244,11 @@ def __init__(self, eventlogpath, bucket=None, stdoutpath=None):
234244
self.jobs[0].add_event(line, False)
235245

236246

237-
if 'platform' not in self.spark_metadata.keys():
238-
self.spark_metadata['platform'] = 'EMR'
247+
if self.cloud_platform == None:
248+
self.cloud_platform = 'emr'
249+
self.cloud_provider = 'aws'
250+
#self.spark_metadata['cloud_platform'] = 'emr'
251+
#self.spark_metadata['cloud_provider'] = 'aws'
239252

240253
self.dag.decipher_dag()
241254
self.dag.add_broadcast_dependencies(self.stdoutpath)

parsing_models/application_model_v2.py

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,19 @@
1111
import boto3
1212
from collections import defaultdict
1313

14-
logging.basicConfig(format='%(levelname)s:%(message)s')
15-
1614
class sparkApplication():
1715

1816
def __init__(self,
1917
objfile = None, # Previously saved object. This is the fastest and best option
2018
appobj = None, # application_model object
2119
eventlog = None, # spark eventlog path,
2220
stdout = None,
23-
debug = False
2421
):
2522

2623
self.eventlog = eventlog
2724
self.existsSQL = False
2825
self.existsExecutors = False
29-
self.sparkMetadata = {}
26+
#self.sparkMetadata = {}
3027
self.metadata = {}
3128
self.stdout = stdout
3229

@@ -157,13 +154,13 @@ def getExecutorInfo(self, appobj):
157154
df = defaultdict(lambda: [])
158155
for xid, executor in appobj.executors.items():
159156

160-
#print(executor.end_time)
157+
# print(executor.end_time)
161158
# Special case for handling end_time
162159
if executor.end_time is not None:
163160
end_time = executor.end_time/1000 - appobj.start_time
164161
else:
165-
#print('None detected')
166-
end_time = appobj.finish_time - appobj.start_time
162+
# print('None detected')
163+
end_time = executor.end_time
167164

168165
df['executor_id'].append(xid)
169166
df['cores'] .append(executor.cores)
@@ -336,7 +333,9 @@ def getAllTaskData(self, appobj):
336333
'end_time' : end_time,
337334
'duration' : duration,
338335
#'input_mb' : input_mb,
339-
'remote_mb_read': remote_mb_read,
336+
337+
# Duplicate entry:
338+
# 'remote_mb_read': remote_mb_read,
340339
'locality' : locality,
341340

342341
# Disk-based performance metrics
@@ -560,11 +559,24 @@ def getAllDriverAccumData(self, appobj):
560559
self.accumData = df
561560

562561
def getAllMetaData(self, appobj):
563-
self.sparkMetadata = (appobj.spark_metadata)
564-
self.metadata = {"app_name": appobj.app_name,
565-
"start_time": appobj.start_time}
566-
567562

563+
#self.sparkMetadata = (appobj.spark_metadata)
564+
565+
self.metadata = {
566+
'application_info' : {
567+
'timestamp_start_ms' : int(appobj.start_time*1000),
568+
'timestamp_end_ms' : int(appobj.finish_time*1000),
569+
'runtime_sec' : appobj.finish_time - appobj.start_time,
570+
'name' : appobj.app_name,
571+
'id' : appobj.spark_metadata['spark.app.id'],
572+
'spark_version' : appobj.spark_version,
573+
'cloud_platform' : appobj.cloud_platform,
574+
'cloud_provider' : appobj.cloud_provider
575+
576+
},
577+
'spark_params' : appobj.spark_metadata
578+
}
579+
568580
def addMetadata(self, key=None, value=None):
569581

570582
if (key is None) or (value is None):
@@ -640,7 +652,7 @@ def save(self, filepath=None, compress=False):
640652
saveDat['executors'] = self.executorData.reset_index().to_dict('list')
641653

642654
saveDat['metadata'] = self.metadata
643-
saveDat['sparkMetadata'] = self.sparkMetadata
655+
#saveDat['sparkMetadata'] = self.sparkMetadata
644656
saveDat['metadata']['existsSQL'] = self.existsSQL
645657
saveDat['metadata']['existsExecutors'] = self.existsExecutors
646658

@@ -703,21 +715,36 @@ def load(self, filepath=None):
703715
self.metadata = saveDat['metadata']
704716
self.existsSQL = self.metadata.pop('existsSQL')
705717
self.existsExecutors = self.metadata.pop('existsExecutors')
706-
self.sparkMetadata = saveDat.pop('sparkMetadata')
718+
719+
# This is for legacy support and should be removed after it is in production for a few
720+
# weeks. Introduced 3/9/2022 by SDG.
721+
if 'sparkMetadata' in saveDat:
722+
self.sparkMetadata = saveDat.pop('sparkMetadata')
707723

724+
# SPC113 - SDG
725+
# Because of the way jobData is created, if there are no job Events in the eventlog then the
726+
# correct fields will not exist. A second condition checking for the 'job_id' field is
727+
# necessary here to ensure this method will run if this is the case.
728+
#
729+
# Note: stageData is initialized differently so this same issue does not exist for that
730+
# structure. Furthermore, in the event that 'jobData' has no values within, 'stageData' will
731+
# also have no values and an invalidLog exception will be thrown during log validation
732+
# in SparkApplicaionAdvanced.
733+
if ('jobData' in saveDat) and ('job_id' in saveDat['jobData']):
734+
self.jobData = pd.DataFrame.from_dict(saveDat['jobData'])
735+
self.jobData = self.jobData.set_index('job_id')
708736

709-
if 'jobData' in saveDat: self.jobData = pd.DataFrame.from_dict(saveDat['jobData'] ).set_index('job_id')
710737
if 'stageData' in saveDat: self.stageData = pd.DataFrame.from_dict(saveDat['stageData']).set_index('stage_id')
711738
if 'taskData' in saveDat: self.taskData = pd.DataFrame.from_dict(saveDat['taskData'] ).set_index('task_id')
712-
if 'accumData' in saveDat:
739+
if 'accumData' in saveDat:
713740
self.accumData = pd.DataFrame.from_dict(saveDat['accumData'] )
714741
if 'sql_id' in self.accumData.columns:
715742
self.accumData = self.accumData.set_index('sql_id')
716743

717744
if self.existsSQL:
718745
self.sqlData = pd.DataFrame.from_dict(saveDat['sqlData']).set_index('sql_id')
719746
if self.existsExecutors:
720-
self.executorData = pd.DataFrame.from_dict(saveDat['executors']).set_index('executor_id')
747+
self.executorData = pd.DataFrame.from_dict(saveDat['executors']).set_index('executor_id')
721748

722749
logging.info('Loaded object from: %s [%.2f]' % (filepath, (time.time()-t1)))
723750

parsing_models/stage_model.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
import collections
2-
import json
31
import numpy
4-
import matplotlib.pyplot as plt
5-
import seaborn as sns
62

73
from .task_model import TaskModel
84

parsing_models/utility.py

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,3 @@
1-
import re
2-
3-
def convert_to_MiB(size_string):
4-
"""
5-
Function to convert to data sizes to MiB from string
6-
e.g. "10g", "10 GB"
7-
"""
8-
# default if no units mentioned is MiB
9-
if size_string.isdigit():
10-
data_size = float(size_string)
11-
else:
12-
pattern = "([0-9]+) *([a-zA-z]+)"
13-
match = re.match(pattern, size_string.strip(' '))
14-
15-
data_size = float(match.group(1))
16-
unit = match.group(2)
17-
18-
if unit == 'kiB':
19-
data_size = data_size / 1024
20-
elif unit == 'GiB' or unit == 'g':
21-
data_size = data_size * 1024
22-
elif unit == 'GB':
23-
data_size = data_size * 10**9 / 1024 / 1024
24-
elif unit == 'MB':
25-
data_size = data_size * 10**6 / 1024 / 1024
26-
elif unit == 'kB':
27-
data_size = data_size * 10**3 / 1024 / 1024
28-
elif unit == 'bytes' or unit == 'B':
29-
data_size = data_size / 1024 / 1024
30-
return data_size
31-
32-
331
def db_to_aws_configs(appobj):
342
meta = appobj.sparkMetadata
353
rt = 'spark.databricks.clusterUsageTags.'

spark_predictor

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
Subproject commit 22fb7c178388a5302fbb6d29545171cd963b9237
1+
Subproject commit d09b5326da36328df30159a00de696b05f5e8b59

0 commit comments

Comments
 (0)