Skip to content

Commit 0b1a7af

Browse files
authored
[PROD-429] Handle event logs with no application end events (#15)
1 parent ea5c18a commit 0b1a7af

File tree

2 files changed

+73
-71
lines changed

2 files changed

+73
-71
lines changed

README.md

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,11 @@ pip3 install https://github.com/synccomputingcode/spark_log_parser/archive/main.
1515
If you have not already done so, complete the [instructions](https://github.com/synccomputingcode/user_documentation/wiki#accessing-autotuner-input-data) to download the Apache Spark event log.
1616

1717
### Step 1: Parse the log to strip away sensitive information
18-
1. To process a log file, execute the parse.py script in the sync_parser folder, and provide a
19-
log file destination with the -l flag.
20-
18+
1. To process a log file execute the spark-log-parser command with a log file path and a directory in which to store the result like so:
2119
```shell
22-
spark-log-parser -l <log file location> -r <results directory>
20+
spark-log-parser -l <log file location> -r <result directory>
2321
```
24-
25-
The parsed file `parsed-<log file name>` will appear in the results directory.
26-
22+
The parsed file `parsed-<log file name>` will appear in the result directory.
2723

2824
2. Send Sync Computing the parsed log
2925

Lines changed: 70 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,70 @@
1-
from .exceptions import LazyEventValidationException
2-
from .exceptions import ParserErrorMessages as MSGS
3-
4-
import logging
5-
6-
logger = logging.getLogger("EventDataValidation")
7-
8-
class EventDataValidation():
9-
"""
10-
Validate the existence of certain Spark Listener Events
11-
"""
12-
def __init__(self, app=None, debug=False):
13-
14-
self.app = app
15-
self.debug = debug # When 'True' disables exception raises for debugging
16-
self.message = ''
17-
18-
def validate(self):
19-
"""
20-
Run the validation methods. If one or more errors exist then log the error, then throw
21-
and exception. Logging is used here so that the problem is still indicated when in debug
22-
mode.
23-
"""
24-
self.validate_job_events()
25-
self.validate_stage_events()
26-
27-
if (len(self.message)>0):
28-
logger.error(self.message)
29-
if not self.debug:
30-
raise LazyEventValidationException(error_message = self.message)
31-
32-
def validate_job_events(self):
33-
"""
34-
Look for missing job events.
35-
36-
4/20/2022: Currently only the JobComplete is detected, because a missing
37-
JobStart event will result in a more urgent exception being thrown in SparkApplication
38-
"""
39-
40-
miss_job_ends = []
41-
for jid, job in self.app.jobs.items():
42-
if not hasattr(job, 'completion_time'):
43-
miss_job_ends.append(jid)
44-
45-
if len(miss_job_ends)>0:
46-
msg = f'{MSGS.MISSING_EVENT_JOB_END}{miss_job_ends}. '
47-
self.message += f'{MSGS.MISSING_EVENT_JOB_END}{miss_job_ends}. '
48-
49-
def validate_stage_events(self):
50-
"""
51-
Look for missing stage events.
52-
53-
4/20/2022: Currently only the StageSubmitted is detected, because a
54-
missing StageCompleted event will result in a more urgent exception being thrown in
55-
SparkApplication
56-
"""
57-
miss_stage_completes = []
58-
for jid, job in self.app.jobs.items():
59-
for sid, stage in job.stages.items():
60-
if not hasattr(stage, 'completion_time'):
61-
miss_stage_completes.append(sid)
62-
63-
if len(miss_stage_completes)>0:
64-
self.message += f'{MSGS.MISSING_EVENT_STAGE_COMPLETE}{miss_stage_completes}. '
1+
import logging
2+
3+
from .exceptions import LazyEventValidationException
4+
from .exceptions import ParserErrorMessages as MSGS
5+
6+
logger = logging.getLogger("EventDataValidation")
7+
8+
9+
class EventDataValidation:
10+
"""
11+
Validate the existence of certain Spark Listener Events
12+
"""
13+
14+
def __init__(self, app=None, debug=False):
15+
16+
self.app = app
17+
self.debug = debug # When 'True' disables exception raises for debugging
18+
self.message = ""
19+
20+
def validate(self):
21+
"""
22+
Run the validation methods. If one or more errors exist then log the error, then throw
23+
and exception. Logging is used here so that the problem is still indicated when in debug
24+
mode.
25+
"""
26+
if not self.app.finish_time:
27+
self.message += (
28+
MSGS.MISSING_EVENT_GENERIC_MESSAGE + "'Application / Stage / SQL Completion'. "
29+
)
30+
31+
self.validate_job_events()
32+
self.validate_stage_events()
33+
34+
if len(self.message) > 0:
35+
logger.error(self.message)
36+
if not self.debug:
37+
raise LazyEventValidationException(error_message=self.message)
38+
39+
def validate_job_events(self):
40+
"""
41+
Look for missing job events.
42+
43+
4/20/2022: Currently only the JobComplete is detected, because a missing
44+
JobStart event will result in a more urgent exception being thrown in SparkApplication
45+
"""
46+
47+
miss_job_ends = []
48+
for jid, job in self.app.jobs.items():
49+
if not hasattr(job, "completion_time"):
50+
miss_job_ends.append(jid)
51+
52+
if len(miss_job_ends) > 0:
53+
self.message += f"{MSGS.MISSING_EVENT_JOB_END}{miss_job_ends}. "
54+
55+
def validate_stage_events(self):
56+
"""
57+
Look for missing stage events.
58+
59+
4/20/2022: Currently only the StageSubmitted is detected, because a
60+
missing StageCompleted event will result in a more urgent exception being thrown in
61+
SparkApplication
62+
"""
63+
miss_stage_completes = []
64+
for jid, job in self.app.jobs.items():
65+
for sid, stage in job.stages.items():
66+
if not hasattr(stage, "completion_time"):
67+
miss_stage_completes.append(sid)
68+
69+
if len(miss_stage_completes) > 0:
70+
self.message += f"{MSGS.MISSING_EVENT_STAGE_COMPLETE}{miss_stage_completes}. "

0 commit comments

Comments
 (0)