Skip to content

Commit d4525b9

Browse files
authored
[PROD-399] Handle edge case of single rollover file with index > 0 (#9)
1 parent 8e9543c commit d4525b9

File tree

4 files changed

+75
-40
lines changed

4 files changed

+75
-40
lines changed

spark_log_parser/__main__.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import argparse
22
import logging
3-
import os
43
import sys
54
import tempfile
65
from pathlib import Path
@@ -12,17 +11,25 @@
1211

1312
logger = logging.getLogger("spark_log_parser")
1413

14+
1515
parser = argparse.ArgumentParser("spark_log_parser")
16-
parser.add_argument("-l", "--log-file", required=True, type=Path, help="path to event log")
1716
parser.add_argument(
18-
"-r", "--result-dir", required=True, help="path to directory in which to save parsed logs"
17+
"-l", "--log-file", required=True, type=Path, help="path to event log file or directory"
18+
)
19+
parser.add_argument(
20+
"-r",
21+
"--result-dir",
22+
required=True,
23+
type=Path,
24+
help="path to directory in which to save the parsed log",
1925
)
2026
args = parser.parse_args()
2127

22-
if not os.path.isdir(args.result_dir):
28+
if not args.result_dir.is_dir():
2329
logger.error("%s is not a directory", args.result_dir)
2430
sys.exit(1)
2531

32+
2633
print("\n" + "*" * 12 + " Running the Log Parser for Spark Predictor " + "*" * 12 + "\n")
2734
print("--Processing log file: " + str(args.log_file))
2835

@@ -31,12 +38,12 @@
3138
app = sparkApplication(eventlog=str(event_log))
3239

3340
if args.log_file.suffixes:
34-
result_path = os.path.join(
35-
args.result_dir, "parsed-" + args.log_file.name[: -len("".join(args.log_file.suffixes))]
41+
result_path = args.result_dir.joinpath(
42+
"parsed-" + args.log_file.name[: -len("".join(args.log_file.suffixes))]
3643
)
3744
else:
38-
result_path = os.path.join(args.result_dir, "parsed-" + args.log_file.name)
45+
result_path = args.result_dir.joinpath("parsed-" + args.log_file.name)
3946

40-
app.save(result_path)
47+
app.save(str(result_path))
4148

4249
print(f"--Result saved to: {result_path}.json")

spark_log_parser/eventlog.py

Lines changed: 44 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -23,51 +23,63 @@ def _validate_work_dir(self, work_dir: Path | str) -> Path:
2323
return work_dir_path
2424

2525
def build(self) -> Path:
26-
event_logs = self.extractor.extract()
26+
paths = self.extractor.extract()
2727

28-
self.event_log = self._concat(event_logs)
28+
if not paths:
29+
raise ValueError("No files found")
2930

30-
return self.event_log
31+
self.event_log = self._get_event_log(paths)
3132

32-
def _concat(self, event_logs: list[Path]) -> Path:
33-
if len(event_logs) == 1:
34-
return event_logs[0]
33+
return self.event_log
3534

36-
dat = []
37-
for log in event_logs:
38-
with open(log) as log_file:
35+
def _get_event_log(self, paths: list[Path]) -> Path:
36+
log_files = []
37+
rollover_dat = []
38+
for path in paths:
39+
with open(path) as fobj:
3940
try:
40-
line = json.loads(log_file.readline())
41+
line = json.loads(fobj.readline())
4142
except ValueError:
42-
continue # Maybe a Databricks pricing file
43-
if line["Event"] == "DBCEventLoggingListenerMetadata":
44-
dat.append((line["Rollover Number"], line["SparkContext Id"], log))
45-
else:
46-
raise ValueError("Expected DBC event not found")
43+
continue
44+
if "Event" in line:
45+
log_files.append(path)
46+
if line["Event"] == "DBCEventLoggingListenerMetadata":
47+
rollover_dat.append(
48+
(line["Rollover Number"], line["SparkContext Id"], path)
49+
)
50+
51+
if rollover_dat:
52+
if len(log_files) > len(rollover_dat):
53+
raise ValueError("No rollover properties found in log file")
54+
55+
return self._concat(rollover_dat)
56+
57+
if len(log_files) > 1:
58+
raise ValueError("No rollover properties found in log file")
59+
60+
return log_files[0]
61+
62+
def _concat(self, rollover_dat: list[tuple[str, str, str]]) -> Path:
63+
rollover_df = pd.DataFrame(
64+
rollover_dat, columns=["rollover_index", "context_id", "path"]
65+
).sort_values("rollover_index")
66+
67+
if not len(rollover_df.context_id.unique()) == 1:
68+
raise ValueError("Not all rollover log files have the same Spark context ID")
4769

48-
df = pd.DataFrame(dat, columns=["rollover_index", "context_id", "path"]).sort_values(
49-
"rollover_index"
50-
)
70+
diffs = rollover_df.rollover_index.diff()
5171

52-
self._validate_rollover_logs(df)
72+
if any(diffs > 1) or rollover_df.rollover_index[0] > 0:
73+
raise ValueError("Rollover log file appears to be missing")
74+
75+
if any(diffs < 1):
76+
raise ValueError("Duplicate rollover log file detected")
5377

5478
event_log = Path(tempfile.mkstemp(suffix="-concatenated.json", dir=str(self.work_dir))[1])
5579
with open(event_log, "w") as fobj:
56-
for path in df.path:
80+
for path in rollover_df.path:
5781
with open(path) as part_fobj:
5882
for line in part_fobj:
5983
fobj.write(line)
6084

6185
return event_log
62-
63-
def _validate_rollover_logs(self, df: pd.DataFrame):
64-
if not len(df.context_id.unique()) == 1:
65-
raise ValueError("Not all rollover files have the same Spark context ID")
66-
67-
diffs = df.rollover_index.diff()[1:]
68-
69-
if any(diffs > 1) or df.rollover_index[0] > 0:
70-
raise ValueError("Rollover file appears to be missing")
71-
72-
if any(diffs < 1):
73-
raise ValueError("Duplicate rollover file detected")
1.19 KB
Binary file not shown.

tests/test_bad_eventlog.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import tempfile
22
import unittest
33
from pathlib import Path
4+
from zipfile import ZipFile
45

56
from spark_log_parser.eventlog import EventLogBuilder
67

@@ -42,3 +43,18 @@ def test_missing_first_part(self):
4243
with tempfile.TemporaryDirectory() as temp_dir:
4344
with self.assertRaises(ValueError, msg="Rollover file appears to be missing"):
4445
EventLogBuilder(event_log.as_uri(), temp_dir).build()
46+
47+
def test_only_non_first_part(self):
48+
with tempfile.TemporaryDirectory() as temp_dir:
49+
with ZipFile(Path("tests", "logs", "bad", "missing-first-part.zip")) as zfile:
50+
zfile.extract(
51+
[zinfo for zinfo in zfile.infolist() if not zinfo.is_dir()][0], temp_dir
52+
)
53+
54+
with self.assertRaises(ValueError, msg="Rollover file appears to be missing"):
55+
EventLogBuilder(Path(temp_dir).as_uri(), temp_dir).build()
56+
57+
def test_empty_log_dir(self):
58+
with tempfile.TemporaryDirectory() as temp_dir:
59+
with self.assertRaises(ValueError, msg="No log files found"):
60+
EventLogBuilder(Path(temp_dir).as_uri(), temp_dir).build()

0 commit comments

Comments
 (0)