Skip to content

Commit de7cbfb

Browse files
gorskysdSean Gorsky
andauthored
Bugfix/prod 399 triggered prediction error on public api production at 2022 07 29 t 16 07 15 529 z (#12)
* Decoupled extractor and eventlog builder for better integration with backend * Fixed bug where rollover_id diff was not sorted correctly * Fixed cli * incremented version * Formatting * More formatting * more formatting * Modify cli to allow paths with special characters * Fixed bad_eventlog tests * fixed type hints Co-authored-by: Sean Gorsky <[email protected]>
1 parent 2c25284 commit de7cbfb

File tree

8 files changed

+88
-82
lines changed

8 files changed

+88
-82
lines changed

spark_log_parser/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
"""Tools for providing Spark event log"""
22

3-
__version__ = "0.1.1"
3+
__version__ = "0.1.2"

spark_log_parser/cli.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
import sys
44
import tempfile
55
from pathlib import Path
6+
from urllib.parse import unquote
67

78
import spark_log_parser
89

910
logging.captureWarnings(True)
1011

1112
from spark_log_parser.eventlog import EventLogBuilder # noqa: E402
13+
from spark_log_parser.extractor import Extractor # noqa: E402
1214
from spark_log_parser.parsing_models.application_model_v2 import sparkApplication # noqa: E402
1315

1416
logger = logging.getLogger("spark_log_parser")
@@ -39,7 +41,10 @@ def main():
3941
print("--Processing log file: " + str(args.log_file))
4042

4143
with tempfile.TemporaryDirectory() as work_dir:
42-
event_log = EventLogBuilder(args.log_file.resolve().as_uri(), work_dir).build()
44+
45+
log_path = unquote(args.log_file.resolve().as_uri())
46+
event_log_paths = Extractor(log_path, work_dir).extract()
47+
event_log = EventLogBuilder(event_log_paths, work_dir).build()
4348
app = sparkApplication(eventlog=str(event_log))
4449

4550
if args.log_file.suffixes:

spark_log_parser/eventlog.py

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,22 @@
11
import json
22
import tempfile
33
from pathlib import Path
4-
from urllib.parse import ParseResult
54

65
import pandas as pd
76

8-
from spark_log_parser.extractor import Extractor, ExtractThresholds
9-
107

118
class EventLogBuilder:
129
def __init__(
1310
self,
14-
source_url: ParseResult | str,
11+
event_log_paths: list[Path] | list[str],
1512
work_dir: Path | str,
16-
s3_client=None,
17-
extract_thresholds=ExtractThresholds(),
1813
):
19-
self.source_url = source_url
14+
15+
self.event_log_paths = self._validate_event_log_paths(event_log_paths)
2016
self.work_dir = self._validate_work_dir(work_dir)
21-
self.s3_client = s3_client
22-
self.extractor = Extractor(
23-
self.source_url, self.work_dir, self.s3_client, extract_thresholds
24-
)
17+
18+
def _validate_event_log_paths(self, event_log_paths: list[Path] | list[str]) -> list[Path]:
19+
return [Path(x) for x in event_log_paths]
2520

2621
def _validate_work_dir(self, work_dir: Path | str) -> Path:
2722
work_dir_path = work_dir if isinstance(work_dir, Path) else Path(work_dir)
@@ -31,12 +26,11 @@ def _validate_work_dir(self, work_dir: Path | str) -> Path:
3126
return work_dir_path
3227

3328
def build(self) -> Path:
34-
paths = self.extractor.extract()
3529

36-
if not paths:
30+
if not self.event_log_paths:
3731
raise ValueError("No files found")
3832

39-
self.event_log = self._get_event_log(paths)
33+
self.event_log = self._get_event_log(self.event_log_paths)
4034

4135
return self.event_log
4236

@@ -68,9 +62,11 @@ def _get_event_log(self, paths: list[Path]) -> Path:
6862
return log_files[0]
6963

7064
def _concat(self, rollover_dat: list[tuple[str, str, str]]) -> Path:
71-
rollover_df = pd.DataFrame(
72-
rollover_dat, columns=["rollover_index", "context_id", "path"]
73-
).sort_values("rollover_index")
65+
rollover_df = (
66+
pd.DataFrame(rollover_dat, columns=["rollover_index", "context_id", "path"])
67+
.sort_values("rollover_index")
68+
.reset_index()
69+
)
7470

7571
if not len(rollover_df.context_id.unique()) == 1:
7672
raise ValueError("Not all rollover log files have the same Spark context ID")
394 KB
Binary file not shown.

tests/test_bad_eventlog.py

Lines changed: 20 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,46 +3,41 @@
33
from pathlib import Path
44
from zipfile import ZipFile
55

6-
from spark_log_parser.eventlog import EventLogBuilder
6+
from spark_log_parser import eventlog, extractor
77

88

99
class BadEventLog(unittest.TestCase):
10-
def test_multiple_context_ids(self):
11-
event_log = Path("tests", "logs", "bad", "non-unique-context-id.zip").resolve()
10+
def check_value_error(self, event_log_path, msg):
1211

1312
with tempfile.TemporaryDirectory() as temp_dir:
14-
with self.assertRaises(
15-
ValueError, msg="Not all rollover files have the same Spark context ID"
16-
):
17-
EventLogBuilder(event_log.as_uri(), temp_dir).build()
13+
with self.assertRaises(ValueError) as cm:
14+
15+
event_log_paths = extractor.Extractor(event_log_path.as_uri(), temp_dir).extract()
16+
eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
17+
18+
assert str(cm.exception) == msg # , "Exception message matches"
19+
20+
def test_multiple_context_ids(self):
21+
event_log = Path("tests", "logs", "bad", "non-unique-context-id.zip").resolve()
22+
self.check_value_error(
23+
event_log, "Not all rollover log files have the same Spark context ID"
24+
)
1825

1926
def test_missing_dbc_event(self):
2027
event_log = Path("tests", "logs", "bad", "missing-dbc-event.zip").resolve()
21-
22-
with tempfile.TemporaryDirectory() as temp_dir:
23-
with self.assertRaises(ValueError, msg="Expected DBC event not found"):
24-
EventLogBuilder(event_log.as_uri(), temp_dir).build()
28+
self.check_value_error(event_log, "No rollover properties found in log file")
2529

2630
def test_duplicate_log_part(self):
2731
event_log = Path("tests", "logs", "bad", "duplicate-part.tgz").resolve()
28-
29-
with tempfile.TemporaryDirectory() as temp_dir:
30-
with self.assertRaises(ValueError, msg="Duplicate rollover file detected"):
31-
EventLogBuilder(event_log.as_uri(), temp_dir).build()
32+
self.check_value_error(event_log, "Duplicate rollover log file detected")
3233

3334
def test_missing_log_part(self):
3435
event_log = Path("tests", "logs", "bad", "missing-part.zip").resolve()
35-
36-
with tempfile.TemporaryDirectory() as temp_dir:
37-
with self.assertRaises(ValueError, msg="Rollover file appears to be missing"):
38-
EventLogBuilder(event_log.as_uri(), temp_dir).build()
36+
self.check_value_error(event_log, "Rollover log file appears to be missing")
3937

4038
def test_missing_first_part(self):
4139
event_log = Path("tests", "logs", "bad", "missing-first-part.zip").resolve()
42-
43-
with tempfile.TemporaryDirectory() as temp_dir:
44-
with self.assertRaises(ValueError, msg="Rollover file appears to be missing"):
45-
EventLogBuilder(event_log.as_uri(), temp_dir).build()
40+
self.check_value_error(event_log, "Rollover log file appears to be missing")
4641

4742
def test_only_non_first_part(self):
4843
with tempfile.TemporaryDirectory() as temp_dir:
@@ -51,10 +46,8 @@ def test_only_non_first_part(self):
5146
[zinfo for zinfo in zfile.infolist() if not zinfo.is_dir()][0], temp_dir
5247
)
5348

54-
with self.assertRaises(ValueError, msg="Rollover file appears to be missing"):
55-
EventLogBuilder(Path(temp_dir).as_uri(), temp_dir).build()
49+
self.check_value_error(Path(temp_dir), "Rollover log file appears to be missing")
5650

5751
def test_empty_log_dir(self):
5852
with tempfile.TemporaryDirectory() as temp_dir:
59-
with self.assertRaises(ValueError, msg="No log files found"):
60-
EventLogBuilder(Path(temp_dir).as_uri(), temp_dir).build()
53+
self.check_value_error(Path(temp_dir), "No files found")

tests/test_eventlog.py

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,34 +4,32 @@
44
import zipfile
55
from pathlib import Path
66

7-
from spark_log_parser import eventlog
7+
from spark_log_parser import eventlog, extractor
88

99

10-
def test_simple_emr_log():
11-
event_log_path = Path("tests", "logs", "emr.zip").resolve()
10+
def get_event(event_log_path):
1211

1312
with tempfile.TemporaryDirectory() as temp_dir:
14-
event_log = eventlog.EventLogBuilder(
15-
source_url=event_log_path.as_uri(), work_dir=temp_dir
16-
).build()
13+
event_log_paths = extractor.Extractor(event_log_path.resolve().as_uri(), temp_dir).extract()
14+
event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
1715

1816
with open(event_log) as log_fobj:
1917
event = json.loads(log_fobj.readline())
2018

21-
assert all(key in event for key in ["Event", "Spark Version"]), "All keys are present"
19+
return event
20+
21+
22+
def test_simple_emr_log():
23+
event_log_path = Path("tests", "logs", "emr.zip").resolve()
24+
event = get_event(event_log_path)
2225

26+
assert all(key in event for key in ["Event", "Spark Version"]), "All keys are present"
2327
assert event["Event"] == "SparkListenerLogStart", "Expected first event is present"
2428

2529

2630
def test_simple_databricks_log():
2731
event_log_path = Path("tests", "logs", "databricks.zip").resolve()
28-
29-
with tempfile.TemporaryDirectory() as temp_dir:
30-
event_log = eventlog.EventLogBuilder(event_log_path.as_uri(), temp_dir).build()
31-
32-
with open(event_log) as log_fobj:
33-
event = json.loads(log_fobj.readline())
34-
32+
event = get_event(event_log_path)
3533
assert all(
3634
key in event
3735
for key in ["Event", "Spark Version", "Timestamp", "Rollover Number", "SparkContext Id"]
@@ -40,12 +38,19 @@ def test_simple_databricks_log():
4038

4139
def test_raw_databricks_log():
4240
event_log_path = Path("tests", "logs", "databricks.json").resolve()
41+
event = get_event(event_log_path)
4342

44-
with tempfile.TemporaryDirectory() as temp_dir:
45-
event_log = eventlog.EventLogBuilder(event_log_path.as_uri(), temp_dir).build()
43+
assert all(
44+
key in event
45+
for key in ["Event", "Spark Version", "Timestamp", "Rollover Number", "SparkContext Id"]
46+
), "All keys are present"
4647

47-
with open(event_log) as log_fobj:
48-
event = json.loads(log_fobj.readline())
48+
assert event["Event"] == "DBCEventLoggingListenerMetadata", "Expected first event is present"
49+
50+
51+
def test_log_in_dir():
52+
event_log_path = Path("tests", "logs", "log_in_dir", "databricks.json.gz").resolve()
53+
event = get_event(event_log_path)
4954

5055
assert all(
5156
key in event
@@ -71,9 +76,12 @@ def test_databricks_messy_rollover_log_dir(self):
7176
zfile.extractall(temp_dir_path)
7277
self.validate_log(temp_dir_path, 3, 16945)
7378

74-
def validate_log(self, event_log: Path, log_file_total: int, log_entry_total: int):
79+
def validate_log(self, event_log_path: Path, log_file_total: int, log_entry_total: int):
7580
with tempfile.TemporaryDirectory() as temp_dir:
76-
event_log = eventlog.EventLogBuilder(event_log.as_uri(), temp_dir).build()
81+
event_log_paths = extractor.Extractor(
82+
event_log_path.resolve().as_uri(), temp_dir
83+
).extract()
84+
event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
7785

7886
with open(event_log) as log_fobj:
7987
event = json.loads(log_fobj.readline())

tests/test_eventlog_s3.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import pytest
1111
from botocore.stub import ANY, Stubber
1212

13-
from spark_log_parser import eventlog
13+
from spark_log_parser import eventlog, extractor
1414

1515

1616
@pytest.mark.parametrize(
@@ -65,7 +65,8 @@ def test_emr_log_from_s3(event_log_url, event_log_file_archive, event_log_s3_dir
6565
chunk = fobj.read(chunk_size)
6666

6767
with tempfile.TemporaryDirectory() as temp_dir, stubber:
68-
event_log = eventlog.EventLogBuilder(event_log_url, temp_dir, s3).build()
68+
event_log_paths = extractor.Extractor(event_log_url, temp_dir, s3).extract()
69+
event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
6970

7071
with open(event_log) as log_fobj:
7172
event = json.loads(log_fobj.readline())
@@ -134,7 +135,8 @@ def test_databricks_log_from_s3_dir(event_log_url, event_log_file_archive, event
134135
chunk = zobj.read(chunk_size)
135136

136137
with stubber:
137-
event_log = eventlog.EventLogBuilder(event_log_url, temp_dir, s3).build()
138+
event_log_paths = extractor.Extractor(event_log_url, temp_dir, s3).extract()
139+
event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
138140

139141
stubber.assert_no_pending_responses()
140142

tests/test_parse.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,30 @@
22
import tempfile
33
from pathlib import Path
44

5-
from spark_log_parser import eventlog
5+
from spark_log_parser import eventlog, extractor
66
from spark_log_parser.parsing_models.application_model_v2 import sparkApplication
77

88

9-
def test_simple_databricks_log():
10-
event_log_path = Path("tests", "logs", "databricks.zip").resolve()
9+
def get_parsed_log(event_log_path):
1110

1211
with tempfile.TemporaryDirectory() as temp_dir:
13-
event_log = eventlog.EventLogBuilder(event_log_path.as_uri(), temp_dir).build()
12+
event_log_paths = extractor.Extractor(event_log_path.resolve().as_uri(), temp_dir).extract()
13+
event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
1414

1515
result_path = str(Path(temp_dir, "result"))
1616
sparkApplication(eventlog=str(event_log)).save(result_path)
1717

1818
with open(result_path + ".json") as result_fobj:
1919
parsed = json.load(result_fobj)
2020

21+
return parsed
22+
23+
24+
def test_simple_databricks_log():
25+
event_log_path = Path("tests", "logs", "databricks.zip").resolve()
26+
27+
parsed = get_parsed_log(event_log_path)
28+
2129
assert all(
2230
key in parsed
2331
for key in [
@@ -39,14 +47,7 @@ def test_simple_databricks_log():
3947
def test_simple_emr_log():
4048
event_log_path = Path("tests", "logs", "emr.zip").resolve()
4149

42-
with tempfile.TemporaryDirectory() as temp_dir:
43-
event_log = eventlog.EventLogBuilder(event_log_path.as_uri(), temp_dir).build()
44-
45-
result_path = str(Path(temp_dir, "result"))
46-
sparkApplication(eventlog=str(event_log)).save(result_path)
47-
48-
with open(str(result_path) + ".json") as result_fobj:
49-
parsed = json.load(result_fobj)
50+
parsed = get_parsed_log(event_log_path)
5051

5152
assert all(
5253
key in parsed
@@ -70,7 +71,8 @@ def test_emr_missing_sql_events():
7071
event_log_path = Path("tests", "logs", "emr_missing_sql_events.zip").resolve()
7172

7273
with tempfile.TemporaryDirectory() as temp_dir:
73-
event_log = eventlog.EventLogBuilder(event_log_path.as_uri(), temp_dir).build()
74+
event_log_paths = extractor.Extractor(event_log_path.resolve().as_uri(), temp_dir).extract()
75+
event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
7476
obj = sparkApplication(eventlog=str(event_log))
7577

7678
assert list(obj.sqlData.index.values) == [0, 2, 3, 5, 6, 7, 8]

0 commit comments

Comments
 (0)