Skip to content

Commit feb5ebd

Browse files
authored
Added console_script, updated version and added configurable extraction thresholds (#10)
1 parent d4525b9 commit feb5ebd

File tree

8 files changed

+93
-105
lines changed

8 files changed

+93
-105
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ If you have not already done so, complete the [instructions](https://github.com/
1919
log file destination with the -l flag.
2020

2121
```shell
22-
python3 -m spark_log_parser -l <log file location> -r <results directory>
22+
spark-log-parser -l <log file location> -r <results directory>
2323
```
2424

2525
The parsed file `parsed-<log file name>` will appear in the results directory.

parse.sh

Lines changed: 0 additions & 42 deletions
This file was deleted.

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ test = [
3636
"flake8==4.0.1",
3737
]
3838

39+
[project.scripts]
40+
spark-log-parser = "spark_log_parser.cli:main"
41+
3942
[project.urls]
4043
Home = "https://github.com/synccomputingcode/spark_log_parser"
4144

spark_log_parser/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
"""Tools for providing Spark event log"""
22

3-
__version__ = "0.0.1"
3+
__version__ = "0.1.0"

spark_log_parser/__main__.py

Lines changed: 2 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,3 @@
1-
import argparse
2-
import logging
3-
import sys
4-
import tempfile
5-
from pathlib import Path
1+
from spark_log_parser.cli import main
62

7-
logging.captureWarnings(True)
8-
9-
from spark_log_parser.eventlog import EventLogBuilder # noqa: E402
10-
from spark_log_parser.parsing_models.application_model_v2 import sparkApplication # noqa: E402
11-
12-
logger = logging.getLogger("spark_log_parser")
13-
14-
15-
parser = argparse.ArgumentParser("spark_log_parser")
16-
parser.add_argument(
17-
"-l", "--log-file", required=True, type=Path, help="path to event log file or directory"
18-
)
19-
parser.add_argument(
20-
"-r",
21-
"--result-dir",
22-
required=True,
23-
type=Path,
24-
help="path to directory in which to save the parsed log",
25-
)
26-
args = parser.parse_args()
27-
28-
if not args.result_dir.is_dir():
29-
logger.error("%s is not a directory", args.result_dir)
30-
sys.exit(1)
31-
32-
33-
print("\n" + "*" * 12 + " Running the Log Parser for Spark Predictor " + "*" * 12 + "\n")
34-
print("--Processing log file: " + str(args.log_file))
35-
36-
with tempfile.TemporaryDirectory() as work_dir:
37-
event_log = EventLogBuilder(args.log_file.resolve().as_uri(), work_dir).build()
38-
app = sparkApplication(eventlog=str(event_log))
39-
40-
if args.log_file.suffixes:
41-
result_path = args.result_dir.joinpath(
42-
"parsed-" + args.log_file.name[: -len("".join(args.log_file.suffixes))]
43-
)
44-
else:
45-
result_path = args.result_dir.joinpath("parsed-" + args.log_file.name)
46-
47-
app.save(str(result_path))
48-
49-
print(f"--Result saved to: {result_path}.json")
3+
main()

spark_log_parser/cli.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import argparse
2+
import logging
3+
import sys
4+
import tempfile
5+
from pathlib import Path
6+
7+
import spark_log_parser
8+
9+
logging.captureWarnings(True)
10+
11+
from spark_log_parser.eventlog import EventLogBuilder # noqa: E402
12+
from spark_log_parser.parsing_models.application_model_v2 import sparkApplication # noqa: E402
13+
14+
logger = logging.getLogger("spark_log_parser")
15+
16+
17+
def main():
18+
parser = argparse.ArgumentParser("spark-log-parser")
19+
parser.add_argument(
20+
"-l", "--log-file", required=True, type=Path, help="path to event log file or directory"
21+
)
22+
parser.add_argument(
23+
"-r",
24+
"--result-dir",
25+
required=True,
26+
type=Path,
27+
help="path to directory in which to save the parsed log",
28+
)
29+
parser.add_argument(
30+
"--version", action="version", version="%(prog)s " + spark_log_parser.__version__
31+
)
32+
args = parser.parse_args()
33+
34+
if not args.result_dir.is_dir():
35+
logger.error("%s is not a directory", args.result_dir)
36+
sys.exit(1)
37+
38+
print("\n" + "*" * 12 + " Running the Log Parser for Spark Predictor " + "*" * 12 + "\n")
39+
print("--Processing log file: " + str(args.log_file))
40+
41+
with tempfile.TemporaryDirectory() as work_dir:
42+
event_log = EventLogBuilder(args.log_file.resolve().as_uri(), work_dir).build()
43+
app = sparkApplication(eventlog=str(event_log))
44+
45+
if args.log_file.suffixes:
46+
result_path = args.result_dir.joinpath(
47+
"parsed-" + args.log_file.name[: -len("".join(args.log_file.suffixes))]
48+
)
49+
else:
50+
result_path = args.result_dir.joinpath("parsed-" + args.log_file.name)
51+
52+
app.save(str(result_path))
53+
54+
print(f"--Result saved to: {result_path}.json")

spark_log_parser/eventlog.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,23 @@
55

66
import pandas as pd
77

8-
from spark_log_parser.extractor import Extractor
8+
from spark_log_parser.extractor import Extractor, ExtractThresholds
99

1010

1111
class EventLogBuilder:
12-
def __init__(self, source_url: ParseResult | str, work_dir: Path | str, s3_client=None):
12+
def __init__(
13+
self,
14+
source_url: ParseResult | str,
15+
work_dir: Path | str,
16+
s3_client=None,
17+
extract_thresholds=ExtractThresholds(),
18+
):
1319
self.source_url = source_url
1420
self.work_dir = self._validate_work_dir(work_dir)
1521
self.s3_client = s3_client
16-
self.extractor = Extractor(self.source_url, self.work_dir, self.s3_client)
22+
self.extractor = Extractor(
23+
self.source_url, self.work_dir, self.s3_client, extract_thresholds
24+
)
1725

1826
def _validate_work_dir(self, work_dir: Path | str) -> Path:
1927
work_dir_path = work_dir if isinstance(work_dir, Path) else Path(work_dir)

spark_log_parser/extractor.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@
88
from urllib.parse import ParseResult, urlparse
99

1010
import requests
11+
from pydantic import BaseModel
12+
13+
14+
class ExtractThresholds(BaseModel):
15+
entries = 100
16+
size = 5000000000
17+
ratio = 100
1118

1219

1320
class Extractor:
@@ -18,17 +25,21 @@ class Extractor:
1825
ALLOWED_SCHEMES = {"https", "s3", "file"}
1926
FILE_SKIP_PATTERNS = [".DS_Store".lower(), "__MACOSX".lower(), "/."]
2027

21-
THRESHOLD_ENTRIES = 100
22-
THRESHOLD_SIZE = 5000000000
23-
THRESHOLD_RATIO = 100
24-
25-
def __init__(self, source_url: ParseResult | str, work_dir: Path | str, s3_client=None):
28+
def __init__(
29+
self,
30+
source_url: ParseResult | str,
31+
work_dir: Path | str,
32+
s3_client=None,
33+
thresholds=ExtractThresholds(),
34+
):
2635
self.source_url = self._validate_url(source_url)
2736
self.work_dir = self._validate_work_dir(work_dir)
2837
self.s3_client = self._validate_s3_client(s3_client)
2938
self.file_total = 0
3039
self.size_total = 0
3140

41+
self.thresholds = thresholds
42+
3243
def _validate_url(self, url: ParseResult | str) -> ParseResult:
3344
parsed_url = url if isinstance(url, ParseResult) else urlparse(url)
3445
if parsed_url.scheme not in self.ALLOWED_SCHEMES:
@@ -204,13 +215,13 @@ def _add_to_stats_and_verify(self, size, count=1):
204215
self.file_total += count
205216

206217
ratio = size / self.size_total
207-
if ratio > self.THRESHOLD_RATIO:
218+
if ratio > self.thresholds.ratio:
208219
raise AssertionError("Encountered suspicious compression ratio in the archive")
209220

210-
if self.size_total > self.THRESHOLD_SIZE:
221+
if self.size_total > self.thresholds.size:
211222
raise AssertionError("The archive is too big")
212223

213-
if self.file_total > self.THRESHOLD_ENTRIES:
224+
if self.file_total > self.thresholds.entries:
214225
raise AssertionError("Too many files in the archive")
215226

216227
def _remove_from_stats(self, size, count=1):
@@ -236,9 +247,9 @@ def _download(self):
236247
for content in result["Contents"]:
237248
s3_content_count += 1
238249
s3_content_size += content["Size"]
239-
if s3_content_count > self.THRESHOLD_ENTRIES:
250+
if s3_content_count > self.thresholds.entries:
240251
raise AssertionError("Too many objects at %s" % self.source_url)
241-
if s3_content_size > self.THRESHOLD_SIZE:
252+
if s3_content_size > self.thresholds.size:
242253
raise AssertionError(
243254
"Size limit exceeded while downloading from %s" % self.source_url
244255
)

0 commit comments

Comments
 (0)