Skip to content

Commit 8e9543c

Browse files
authored
[PROD-325] Added support for Databricks rollover logs & refactored log handling (#8)
1 parent edef99d commit 8e9543c

19 files changed

+1933
-31
lines changed

makefile

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
FILES := $(shell git diff --name-only --diff-filter=AM $$(git merge-base origin/main HEAD) -- \*.py)
2+
3+
4+
.PHONY: test
5+
test:
6+
pytest
7+
8+
.PHONY: lint
9+
lint:
10+
flake8 --filename ./$(FILES) --max-complexity=10 --ignore=E501,W503
11+
12+
.PHONY: format
13+
format:
14+
ifneq ("$(FILES)"," ")
15+
black $(FILES)
16+
isort $(FILES)
17+
endif
18+
19+
.PHONY: tidy
20+
tidy: format lint
21+

parse.sh

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,11 @@ function print_usage() {
1313
}
1414

1515
results_dir="$HOME/results"
16-
args=()
1716

1817
while getopts l:r:h name; do
1918
case $name in
2019
l)
2120
event_log="$OPTARG"
22-
args=("${args[@]}" -l "$OPTARG")
2321
;;
2422
r)
2523
results_dir="$OPTARG"
@@ -41,6 +39,4 @@ if [[ -z $event_log ]]; then
4139
exit 1
4240
fi
4341

44-
args=("${args[@]}" -r "$results_dir")
45-
46-
exec python3 -m spark_log_parser "${args[@]}"
42+
exec python3 -m spark_log_parser -l "$event_log" -r "$results_dir"

pyproject.toml

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ build-backend = "flit.buildapi"
66
name = "spark_log_parser"
77
authors = [{"name" = "Sync Computing"}]
88
readme = "README.md"
9-
requires-python = ">=3.6.1"
9+
requires-python = ">=3.10.3"
1010
classifiers = [
1111
"Intended Audience :: Information Technology",
1212
"Intended Audience :: System Administrators",
@@ -16,20 +16,26 @@ classifiers = [
1616
"Programming Language :: Python",
1717
"Programming Language :: Python :: 3",
1818
"Programming Language :: Python :: 3 :: Only",
19-
"Programming Language :: Python :: 3.6",
20-
"Programming Language :: Python :: 3.7",
21-
"Programming Language :: Python :: 3.8",
22-
"Programming Language :: Python :: 3.9",
2319
"Programming Language :: Python :: 3.10",
2420
]
2521
dependencies = [
2622
"boto3==1.20.24",
2723
"numpy==1.21.4",
2824
"pandas==1.3.5",
2925
"ujson==5.3.0",
26+
"pydantic==1.9.0",
27+
"requests==2.26.0",
3028
]
3129
dynamic = ["version", "description"]
3230

31+
[project.optional-dependencies]
32+
test = [
33+
"pytest==7.1.2",
34+
"black==22.6.0",
35+
"isort==5.10.1",
36+
"flake8==4.0.1",
37+
]
38+
3339
[project.urls]
3440
Home = "https://github.com/synccomputingcode/spark_log_parser"
3541

@@ -38,3 +44,12 @@ line-length = 100
3844

3945
[tool.isort]
4046
profile = "black"
47+
line_length = 100
48+
49+
[tool.pytest.ini_options]
50+
pythonpath = [
51+
"."
52+
]
53+
filterwarnings = [
54+
"ignore::UserWarning"
55+
]

spark_log_parser/__main__.py

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,42 @@
1+
import argparse
12
import logging
3+
import os
4+
import sys
5+
import tempfile
6+
from pathlib import Path
27

38
logging.captureWarnings(True)
49

5-
from spark_log_parser.parsing_models.application_model_v2 import sparkApplication
6-
7-
import os
8-
import argparse
9-
import sys
10+
from spark_log_parser.eventlog import EventLogBuilder # noqa: E402
11+
from spark_log_parser.parsing_models.application_model_v2 import sparkApplication # noqa: E402
1012

1113
logger = logging.getLogger("spark_log_parser")
12-
13-
parser = argparse.ArgumentParser("spark_log_parser")
14-
parser.add_argument("-l", "--log-file", required=True, help="path to event log")
15-
parser.add_argument("-r", "--result-dir", required=True, help="path to directory in which to save parsed logs")
16-
args = parser.parse_args()
17-
18-
print("\n" + "*" * 12 + " Running the Log Parser for Spark Predictor " + "*" * 12 + "\n")
1914

20-
log_path = os.path.abspath(args.log_file)
15+
parser = argparse.ArgumentParser("spark_log_parser")
16+
parser.add_argument("-l", "--log-file", required=True, type=Path, help="path to event log")
17+
parser.add_argument(
18+
"-r", "--result-dir", required=True, help="path to directory in which to save parsed logs"
19+
)
20+
args = parser.parse_args()
2121

2222
if not os.path.isdir(args.result_dir):
2323
logger.error("%s is not a directory", args.result_dir)
2424
sys.exit(1)
2525

26-
print("\n--Processing log file: " + log_path)
26+
print("\n" + "*" * 12 + " Running the Log Parser for Spark Predictor " + "*" * 12 + "\n")
27+
print("--Processing log file: " + str(args.log_file))
2728

28-
log_name = os.path.basename(log_path)
29-
result_path = os.path.join(args.result_dir, "parsed-" + log_name)
29+
with tempfile.TemporaryDirectory() as work_dir:
30+
event_log = EventLogBuilder(args.log_file.resolve().as_uri(), work_dir).build()
31+
app = sparkApplication(eventlog=str(event_log))
3032

31-
if os.path.exists(result_path):
32-
os.remove(result_path)
33+
if args.log_file.suffixes:
34+
result_path = os.path.join(
35+
args.result_dir, "parsed-" + args.log_file.name[: -len("".join(args.log_file.suffixes))]
36+
)
37+
else:
38+
result_path = os.path.join(args.result_dir, "parsed-" + args.log_file.name)
3339

34-
appobj = sparkApplication(eventlog=log_path)
35-
appobj.save(result_path)
40+
app.save(result_path)
3641

37-
print(f"--Log directory saved to: {result_path}")
42+
print(f"--Result saved to: {result_path}.json")

spark_log_parser/eventlog.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import json
2+
import tempfile
3+
from pathlib import Path
4+
from urllib.parse import ParseResult
5+
6+
import pandas as pd
7+
8+
from spark_log_parser.extractor import Extractor
9+
10+
11+
class EventLogBuilder:
12+
def __init__(self, source_url: ParseResult | str, work_dir: Path | str, s3_client=None):
13+
self.source_url = source_url
14+
self.work_dir = self._validate_work_dir(work_dir)
15+
self.s3_client = s3_client
16+
self.extractor = Extractor(self.source_url, self.work_dir, self.s3_client)
17+
18+
def _validate_work_dir(self, work_dir: Path | str) -> Path:
19+
work_dir_path = work_dir if isinstance(work_dir, Path) else Path(work_dir)
20+
if not work_dir_path.is_dir():
21+
raise ValueError("Path is not a directory")
22+
23+
return work_dir_path
24+
25+
def build(self) -> Path:
26+
event_logs = self.extractor.extract()
27+
28+
self.event_log = self._concat(event_logs)
29+
30+
return self.event_log
31+
32+
def _concat(self, event_logs: list[Path]) -> Path:
33+
if len(event_logs) == 1:
34+
return event_logs[0]
35+
36+
dat = []
37+
for log in event_logs:
38+
with open(log) as log_file:
39+
try:
40+
line = json.loads(log_file.readline())
41+
except ValueError:
42+
continue # Maybe a Databricks pricing file
43+
if line["Event"] == "DBCEventLoggingListenerMetadata":
44+
dat.append((line["Rollover Number"], line["SparkContext Id"], log))
45+
else:
46+
raise ValueError("Expected DBC event not found")
47+
48+
df = pd.DataFrame(dat, columns=["rollover_index", "context_id", "path"]).sort_values(
49+
"rollover_index"
50+
)
51+
52+
self._validate_rollover_logs(df)
53+
54+
event_log = Path(tempfile.mkstemp(suffix="-concatenated.json", dir=str(self.work_dir))[1])
55+
with open(event_log, "w") as fobj:
56+
for path in df.path:
57+
with open(path) as part_fobj:
58+
for line in part_fobj:
59+
fobj.write(line)
60+
61+
return event_log
62+
63+
def _validate_rollover_logs(self, df: pd.DataFrame):
64+
if not len(df.context_id.unique()) == 1:
65+
raise ValueError("Not all rollover files have the same Spark context ID")
66+
67+
diffs = df.rollover_index.diff()[1:]
68+
69+
if any(diffs > 1) or df.rollover_index[0] > 0:
70+
raise ValueError("Rollover file appears to be missing")
71+
72+
if any(diffs < 1):
73+
raise ValueError("Duplicate rollover file detected")

0 commit comments

Comments
 (0)