Skip to content

Commit a974926

Browse files
gorskysdSean Gorsky
andauthored
Bugfix/prod 426 spark log parser error key error submission time (#13)
* Decoupled extractor and eventlog builder for better integration with backend * Fixed bug where rollover_id diff was not sorted correctly * Fixed cli * incremented version * Formatting * More formatting * more formatting * Modify cli to allow paths with special characters * Added bypass to account for missing SubmissionTime in StageSubmitted events * Incremented version * Better structuring * Fixed bad_eventlog tests * Returned size threshold to 5GB and cleaned up if logic * Incremented version and set higher size threshold for cli use * Fixed error message on tests * more test text changes Co-authored-by: Sean Gorsky <[email protected]>
1 parent de7cbfb commit a974926

File tree

7 files changed

+33
-24
lines changed

7 files changed

+33
-24
lines changed

spark_log_parser/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
"""Tools for providing Spark event log"""
2+
__version__ = "0.1.3"
23

3-
__version__ = "0.1.2"

spark_log_parser/cli.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
logging.captureWarnings(True)
1111

1212
from spark_log_parser.eventlog import EventLogBuilder # noqa: E402
13-
from spark_log_parser.extractor import Extractor # noqa: E402
13+
from spark_log_parser.extractor import Extractor, ExtractThresholds # noqa: E402
1414
from spark_log_parser.parsing_models.application_model_v2 import sparkApplication # noqa: E402
1515

1616
logger = logging.getLogger("spark_log_parser")
@@ -42,8 +42,10 @@ def main():
4242

4343
with tempfile.TemporaryDirectory() as work_dir:
4444

45-
log_path = unquote(args.log_file.resolve().as_uri())
46-
event_log_paths = Extractor(log_path, work_dir).extract()
45+
event_log_paths = Extractor(
46+
unquote(args.log_file.resolve().as_uri()), work_dir, thresholds=ExtractThresholds(size=20000000000)
47+
).extract()
48+
4749
event_log = EventLogBuilder(event_log_paths, work_dir).build()
4850
app = sparkApplication(eventlog=str(event_log))
4951

spark_log_parser/eventlog.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ def __init__(
1515
self.event_log_paths = self._validate_event_log_paths(event_log_paths)
1616
self.work_dir = self._validate_work_dir(work_dir)
1717

18+
1819
def _validate_event_log_paths(self, event_log_paths: list[Path] | list[str]) -> list[Path]:
1920
return [Path(x) for x in event_log_paths]
2021

spark_log_parser/parsing_models/application_model.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,12 @@ def __init__(self, eventlogpath, bucket=None, stdoutpath=None, debug=False): #
143143
missing_event=f"Job Start for Stage {stage_id}"
144144
)
145145

146+
if "Submission Time" not in json_data["Stage Info"]:
147+
# PROD-426 Submission Time key may be missing from stages that
148+
# don't get submitted. There is usually a StageCompleted event
149+
# shortly after.
150+
continue
151+
146152
for job_id in self.jobs_for_stage[stage_id]:
147153
self.jobs[job_id].stages[stage_id].submission_time = (
148154
json_data["Stage Info"]["Submission Time"] / 1000

tests/test_eventlog.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ def test_simple_emr_log():
2323
event_log_path = Path("tests", "logs", "emr.zip").resolve()
2424
event = get_event(event_log_path)
2525

26-
assert all(key in event for key in ["Event", "Spark Version"]), "All keys are present"
27-
assert event["Event"] == "SparkListenerLogStart", "Expected first event is present"
26+
assert all(key in event for key in ["Event", "Spark Version"]), "Not all keys are present"
27+
assert event["Event"] == "SparkListenerLogStart", "First event is not as expected"
2828

2929

3030
def test_simple_databricks_log():
@@ -33,7 +33,7 @@ def test_simple_databricks_log():
3333
assert all(
3434
key in event
3535
for key in ["Event", "Spark Version", "Timestamp", "Rollover Number", "SparkContext Id"]
36-
), "All keys are present"
36+
), "Not all keys are present"
3737

3838

3939
def test_raw_databricks_log():
@@ -43,9 +43,10 @@ def test_raw_databricks_log():
4343
assert all(
4444
key in event
4545
for key in ["Event", "Spark Version", "Timestamp", "Rollover Number", "SparkContext Id"]
46-
), "All keys are present"
46+
), "Not all keys are present"
47+
48+
assert event["Event"] == "DBCEventLoggingListenerMetadata", "First event is not as expected"
4749

48-
assert event["Event"] == "DBCEventLoggingListenerMetadata", "Expected first event is present"
4950

5051

5152
def test_log_in_dir():
@@ -55,9 +56,9 @@ def test_log_in_dir():
5556
assert all(
5657
key in event
5758
for key in ["Event", "Spark Version", "Timestamp", "Rollover Number", "SparkContext Id"]
58-
), "All keys are present"
59+
), "Not all keys are present"
5960

60-
assert event["Event"] == "DBCEventLoggingListenerMetadata", "Expected first event is present"
61+
assert event["Event"] == "DBCEventLoggingListenerMetadata", "First event is not as expected"
6162

6263

6364
class RolloverLog(unittest.TestCase):
@@ -100,11 +101,11 @@ def validate_log(self, event_log_path: Path, log_file_total: int, log_entry_tota
100101
"Rollover Number",
101102
"SparkContext Id",
102103
]
103-
), "All keys are present"
104+
), "Not all keys are present"
104105
assert (
105106
rollover_count == event["Rollover Number"]
106-
), "Contiguous monotonically increasing IDs"
107+
), "Rollover IDs are not contiguous and monotonically increasing"
107108
rollover_count += 1
108109

109-
assert rollover_count == log_file_total, "All log parts are present"
110-
assert i + 1 == log_entry_total, "All events are present"
110+
assert rollover_count == log_file_total, "Not all log parts are present"
111+
assert i + 1 == log_entry_total, "Not all events are present"

tests/test_eventlog_s3.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,8 @@ def test_emr_log_from_s3(event_log_url, event_log_file_archive, event_log_s3_dir
7171
with open(event_log) as log_fobj:
7272
event = json.loads(log_fobj.readline())
7373

74-
assert all(key in event for key in ["Event", "Spark Version"]), "All keys are present"
75-
76-
assert event["Event"] == "SparkListenerLogStart", "Expected first event is present"
74+
assert all(key in event for key in ["Event", "Spark Version"]), "Not all keys are present"
75+
assert event["Event"] == "SparkListenerLogStart", "Expected first event is not present"
7776

7877

7978
@pytest.mark.parametrize(
@@ -158,13 +157,13 @@ def test_databricks_log_from_s3_dir(event_log_url, event_log_file_archive, event
158157
"Rollover Number",
159158
"SparkContext Id",
160159
]
161-
), "All keys are present"
160+
), "Not all keys are present"
162161
assert (
163162
rollover_count == event["Rollover Number"]
164-
), "Contiguous monotonically increasing IDs"
163+
), "Rollover IDs are not contiguous and monotonically increasing"
165164
rollover_count += 1
166165
except Exception as exc:
167166
raise ValueError("Problem with line %d: %s" % (i, event_str), exc)
168167

169-
assert rollover_count == 3, "All log parts are present"
170-
assert i + 1 == 16945, "All events are present"
168+
assert rollover_count == 3, "Not all log parts are present"
169+
assert i + 1 == 16945, "Not all events are present"

tests/test_parse.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def test_simple_databricks_log():
3737
"stageData",
3838
"taskData",
3939
]
40-
), "All keys are present"
40+
), "Not all keys are present"
4141

4242
assert (
4343
parsed["metadata"]["application_info"]["name"] == "Databricks Shell"
@@ -60,7 +60,7 @@ def test_simple_emr_log():
6060
"stageData",
6161
"taskData",
6262
]
63-
), "All keys are present"
63+
), "Not all keys are present"
6464

6565
assert (
6666
parsed["metadata"]["application_info"]["name"] == "Text Similarity"

0 commit comments

Comments
 (0)