Skip to content

Commit bb82e49

Browse files
gorskysdSean Gorsky
andauthored
Prod 399 version 2 (#16)
* Added ability to pass through parsed logs. Added more detail to ValueError messages * Change kwargs to be better aligned and more informative * incremented version * PR cleanup * more tidying * Cleaned up cli.py to still copy parsed logs to destination Co-authored-by: Sean Gorsky <[email protected]>
1 parent 0b1a7af commit bb82e49

File tree

10 files changed

+132
-97
lines changed

10 files changed

+132
-97
lines changed

spark_log_parser/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,2 @@
11
"""Tools for providing Spark event log"""
2-
__version__ = "0.1.4"
3-
2+
__version__ = "0.1.5"

spark_log_parser/cli.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
import logging
33
import sys
44
import tempfile
5+
import json
6+
import shutil
57
from pathlib import Path
68
from urllib.parse import unquote
79

@@ -40,22 +42,28 @@ def main():
4042
print("\n" + "*" * 12 + " Running the Log Parser for Spark Predictor " + "*" * 12 + "\n")
4143
print("--Processing log file: " + str(args.log_file))
4244

43-
with tempfile.TemporaryDirectory() as work_dir:
44-
45-
event_log_paths = Extractor(
46-
unquote(args.log_file.resolve().as_uri()), work_dir, thresholds=ExtractThresholds(size=20000000000)
47-
).extract()
48-
49-
event_log = EventLogBuilder(event_log_paths, work_dir).build()
50-
app = sparkApplication(eventlog=str(event_log))
51-
5245
if args.log_file.suffixes:
5346
result_path = args.result_dir.joinpath(
5447
"parsed-" + args.log_file.name[: -len("".join(args.log_file.suffixes))]
5548
)
5649
else:
5750
result_path = args.result_dir.joinpath("parsed-" + args.log_file.name)
5851

59-
app.save(str(result_path))
52+
with tempfile.TemporaryDirectory() as work_dir:
53+
54+
event_log_paths = Extractor(
55+
unquote(args.log_file.resolve().as_uri()),
56+
work_dir,
57+
thresholds=ExtractThresholds(size=20000000000),
58+
).extract()
59+
60+
event_log, parsed = EventLogBuilder(event_log_paths, work_dir).build()
61+
62+
if not parsed:
63+
app = sparkApplication(spark_eventlog_path=str(event_log))
64+
app.save(str(result_path))
65+
else:
66+
print("--Input log was already parsed")
67+
shutil.copyfile(event_log, str(result_path) + ".json")
6068

6169
print(f"--Result saved to: {result_path}.json")

spark_log_parser/eventlog.py

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,9 @@ def __init__(
1111
event_log_paths: list[Path] | list[str],
1212
work_dir: Path | str,
1313
):
14-
1514
self.event_log_paths = self._validate_event_log_paths(event_log_paths)
1615
self.work_dir = self._validate_work_dir(work_dir)
1716

18-
1917
def _validate_event_log_paths(self, event_log_paths: list[Path] | list[str]) -> list[Path]:
2018
return [Path(x) for x in event_log_paths]
2119

@@ -26,41 +24,58 @@ def _validate_work_dir(self, work_dir: Path | str) -> Path:
2624

2725
return work_dir_path
2826

29-
def build(self) -> Path:
27+
def build(self) -> tuple[Path, bool]:
3028

3129
if not self.event_log_paths:
3230
raise ValueError("No files found")
3331

34-
self.event_log = self._get_event_log(self.event_log_paths)
32+
self.event_log, self.parsed = self._get_event_log(self.event_log_paths)
3533

36-
return self.event_log
34+
return self.event_log, self.parsed
35+
36+
def _get_event_log(self, paths: list[Path]) -> tuple[Path, bool]:
3737

38-
def _get_event_log(self, paths: list[Path]) -> Path:
3938
log_files = []
4039
rollover_dat = []
40+
parsed = False
4141
for path in paths:
42-
with open(path) as fobj:
43-
try:
42+
try: # Test if it is a raw log
43+
with open(path) as fobj:
4444
line = json.loads(fobj.readline())
45+
if "Event" in line:
46+
log_files.append(path)
47+
if line["Event"] == "DBCEventLoggingListenerMetadata":
48+
rollover_dat.append(
49+
(line["Rollover Number"], line["SparkContext Id"], path)
50+
)
51+
else:
52+
raise ValueError
53+
54+
except ValueError:
55+
try: # Test if it is a parsed log
56+
with open(path) as fobj:
57+
data = json.load(fobj)
58+
if "jobData" in data:
59+
log_files.append(path)
60+
parsed = True
4561
except ValueError:
4662
continue
47-
if "Event" in line:
48-
log_files.append(path)
49-
if line["Event"] == "DBCEventLoggingListenerMetadata":
50-
rollover_dat.append(
51-
(line["Rollover Number"], line["SparkContext Id"], path)
52-
)
63+
64+
if len(log_files) > 1 and parsed:
65+
raise ValueError("A parsed log file was submitted with other log files")
5366

5467
if rollover_dat:
5568
if len(log_files) > len(rollover_dat):
56-
raise ValueError("No rollover properties found in log file")
69+
raise ValueError(
70+
"Rollover logs were detected, but not all files had rollover properties"
71+
)
5772

58-
return self._concat(rollover_dat)
73+
return self._concat(rollover_dat), False
5974

6075
if len(log_files) > 1:
61-
raise ValueError("No rollover properties found in log file")
76+
raise ValueError("Multiple files detected without log rollover properties")
6277

63-
return log_files[0]
78+
return log_files[0], parsed
6479

6580
def _concat(self, rollover_dat: list[tuple[str, str, str]]) -> Path:
6681
rollover_df = (

spark_log_parser/parsing_models/application_model_v2.py

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,42 +17,38 @@
1717
class sparkApplication:
1818
def __init__(
1919
self,
20-
objfile=None, # Previously saved object. This is the fastest and best option
21-
appobj=None, # application_model object
22-
eventlog=None, # spark eventlog path,
20+
spark_eventlog_parsed_path=None,
21+
spark_eventlog_path=None,
2322
stdout=None,
2423
debug=False,
2524
):
2625

27-
self.eventlog = eventlog
26+
self.spark_eventlog_path = spark_eventlog_path
27+
self.spark_eventlog_parsed_path = spark_eventlog_parsed_path
2828
self.existsSQL = False
2929
self.existsExecutors = False
30-
# self.sparkMetadata = {}
3130
self.metadata = {}
3231
self.stdout = stdout
3332
self.debug = debug
3433

35-
if objfile is not None: # Load a previously saved sparkApplication Model
36-
self.load(filepath=objfile)
34+
if self.spark_eventlog_parsed_path is not None:
35+
self.load(filepath=self.spark_eventlog_parsed_path)
3736

38-
if (appobj is not None) or (eventlog is not None): # Load an application_model or eventlog
37+
elif self.spark_eventlog_path is not None:
3938

40-
if eventlog is not None:
41-
t0 = time.time()
42-
if "s3://" in eventlog:
43-
path = eventlog.replace("s3://", "").split("/")
44-
bucket = path[0]
45-
path = "/".join(path[1:])
46-
else:
47-
path = eventlog
48-
bucket = None
49-
50-
appobj = ApplicationModel(
51-
eventlogpath=path, bucket=bucket, stdoutpath=stdout, debug=debug
52-
)
53-
logging.info("Loaded object from spark eventlog [%.2fs]" % (time.time() - t0))
39+
t0 = time.time()
40+
if "s3://" in self.spark_eventlog_path:
41+
path = self.spark_eventlog_path.replace("s3://", "").split("/")
42+
bucket = path[0]
43+
path = "/".join(path[1:])
5444
else:
55-
logging.info("Loaded object from ApplicationModel object")
45+
path = self.spark_eventlog_path
46+
bucket = None
47+
48+
appobj = ApplicationModel(
49+
eventlogpath=path, bucket=bucket, stdoutpath=stdout, debug=debug
50+
)
51+
logging.info("Loaded object from spark eventlog [%.2fs]" % (time.time() - t0))
5652

5753
self.validate_app(appobj, self.debug)
5854

@@ -651,9 +647,11 @@ def save(self, filepath=None, compress=False):
651647

652648
def save_to_local(self, saveDat, filepath, compress):
653649
if filepath is None:
654-
if self.eventlog is None:
650+
if self.spark_eventlog_path is None:
655651
raise Exception('No input eventlog found. Must specify "filepath".')
656-
inputFile = os.path.basename(os.path.normpath(self.eventlog)).replace(".gz", "")
652+
inputFile = os.path.basename(os.path.normpath(self.spark_eventlog_path)).replace(
653+
".gz", ""
654+
)
657655
filepath = inputFile + "-sync"
658656

659657
if compress is False:

tests/logs/bad/mixed_parsed.zip

3.91 MB
Binary file not shown.
607 KB
Binary file not shown.

tests/test_bad_eventlog.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ def test_multiple_context_ids(self):
2525

2626
def test_missing_dbc_event(self):
2727
event_log = Path("tests", "logs", "bad", "missing-dbc-event.zip").resolve()
28-
self.check_value_error(event_log, "No rollover properties found in log file")
28+
self.check_value_error(
29+
event_log, "Rollover logs were detected, but not all files had rollover properties"
30+
)
2931

3032
def test_duplicate_log_part(self):
3133
event_log = Path("tests", "logs", "bad", "duplicate-part.tgz").resolve()
@@ -39,6 +41,10 @@ def test_missing_first_part(self):
3941
event_log = Path("tests", "logs", "bad", "missing-first-part.zip").resolve()
4042
self.check_value_error(event_log, "Rollover log file appears to be missing")
4143

44+
def test_mixed_parsed(self):
45+
event_log = Path("tests", "logs", "bad", "mixed_parsed.zip").resolve()
46+
self.check_value_error(event_log, "A parsed log file was submitted with other log files")
47+
4248
def test_only_non_first_part(self):
4349
with tempfile.TemporaryDirectory() as temp_dir:
4450
with ZipFile(Path("tests", "logs", "bad", "missing-first-part.zip")) as zfile:

tests/test_eventlog.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,54 +11,58 @@ def get_event(event_log_path):
1111

1212
with tempfile.TemporaryDirectory() as temp_dir:
1313
event_log_paths = extractor.Extractor(event_log_path.resolve().as_uri(), temp_dir).extract()
14-
event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
14+
event_log, parsed = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
1515

1616
with open(event_log) as log_fobj:
1717
event = json.loads(log_fobj.readline())
1818

19-
return event
19+
return event, parsed
2020

2121

2222
def test_simple_emr_log():
2323
event_log_path = Path("tests", "logs", "emr.zip").resolve()
24-
event = get_event(event_log_path)
24+
event, parsed = get_event(event_log_path)
2525

2626
assert all(key in event for key in ["Event", "Spark Version"]), "Not all keys are present"
2727
assert event["Event"] == "SparkListenerLogStart", "First event is not as expected"
28+
assert not parsed
2829

2930

3031
def test_simple_databricks_log():
3132
event_log_path = Path("tests", "logs", "databricks.zip").resolve()
32-
event = get_event(event_log_path)
33+
event, parsed = get_event(event_log_path)
34+
3335
assert all(
3436
key in event
3537
for key in ["Event", "Spark Version", "Timestamp", "Rollover Number", "SparkContext Id"]
3638
), "Not all keys are present"
39+
assert not parsed
3740

3841

3942
def test_raw_databricks_log():
4043
event_log_path = Path("tests", "logs", "databricks.json").resolve()
41-
event = get_event(event_log_path)
44+
event, parsed = get_event(event_log_path)
4245

4346
assert all(
4447
key in event
4548
for key in ["Event", "Spark Version", "Timestamp", "Rollover Number", "SparkContext Id"]
4649
), "Not all keys are present"
4750

4851
assert event["Event"] == "DBCEventLoggingListenerMetadata", "First event is not as expected"
49-
52+
assert not parsed
5053

5154

5255
def test_log_in_dir():
5356
event_log_path = Path("tests", "logs", "log_in_dir", "databricks.json.gz").resolve()
54-
event = get_event(event_log_path)
57+
event, parsed = get_event(event_log_path)
5558

5659
assert all(
5760
key in event
5861
for key in ["Event", "Spark Version", "Timestamp", "Rollover Number", "SparkContext Id"]
5962
), "Not all keys are present"
6063

6164
assert event["Event"] == "DBCEventLoggingListenerMetadata", "First event is not as expected"
65+
assert not parsed
6266

6367

6468
class RolloverLog(unittest.TestCase):
@@ -82,7 +86,7 @@ def validate_log(self, event_log_path: Path, log_file_total: int, log_entry_tota
8286
event_log_paths = extractor.Extractor(
8387
event_log_path.resolve().as_uri(), temp_dir
8488
).extract()
85-
event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
89+
event_log, parsed = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
8690

8791
with open(event_log) as log_fobj:
8892
event = json.loads(log_fobj.readline())
@@ -109,3 +113,9 @@ def validate_log(self, event_log_path: Path, log_file_total: int, log_entry_tota
109113

110114
assert rollover_count == log_file_total, "Not all log parts are present"
111115
assert i + 1 == log_entry_total, "Not all events are present"
116+
assert not parsed
117+
118+
119+
if __name__ == "__main__":
120+
121+
test_simple_emr_log()

tests/test_eventlog_s3.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,14 @@ def test_emr_log_from_s3(event_log_url, event_log_file_archive, event_log_s3_dir
6666

6767
with tempfile.TemporaryDirectory() as temp_dir, stubber:
6868
event_log_paths = extractor.Extractor(event_log_url, temp_dir, s3).extract()
69-
event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
69+
event_log, parsed = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
7070

7171
with open(event_log) as log_fobj:
7272
event = json.loads(log_fobj.readline())
7373

7474
assert all(key in event for key in ["Event", "Spark Version"]), "Not all keys are present"
7575
assert event["Event"] == "SparkListenerLogStart", "Expected first event is not present"
76+
assert not parsed
7677

7778

7879
@pytest.mark.parametrize(
@@ -135,7 +136,7 @@ def test_databricks_log_from_s3_dir(event_log_url, event_log_file_archive, event
135136

136137
with stubber:
137138
event_log_paths = extractor.Extractor(event_log_url, temp_dir, s3).extract()
138-
event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
139+
event_log, _ = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
139140

140141
stubber.assert_no_pending_responses()
141142

0 commit comments

Comments
 (0)