-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathcidr_freeze_parser.py
More file actions
executable file
·195 lines (159 loc) · 5.56 KB
/
cidr_freeze_parser.py
File metadata and controls
executable file
·195 lines (159 loc) · 5.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
#!/usr/bin/env python3
import os
import shutil
import sys
import time
from collections import defaultdict
from dump_file import parse_dump_file
from rules import process_custom
import freeze_rules
FRAME_SEQ_TO_TICKET = \
freeze_rules.fixed.get_rules() + \
freeze_rules.actions.get_rules() + \
freeze_rules.commit.get_rules() + \
freeze_rules.lazyReparse.get_rules() + \
freeze_rules.misc.get_rules() + \
freeze_rules.plugin.get_rules() + \
freeze_rules.resolve.get_rules() + \
freeze_rules.clangd.get_rules() + \
freeze_rules.debugger.get_rules() + \
freeze_rules.tests.get_rules()
def print_usage():
print("Usage: {} [thread dumps file]".format(os.path.basename(__file__)))
def extract_edt_call_stack(lines):
"""
Extract EDT call stack from all dump
:param lines: List[str] param. List of all file lines.
:return: list of EDT stack trace calls
"""
res = []
in_edt = False
before_ats = True
for l in lines:
if not in_edt:
if "AWT-EventQueue" in l:
in_edt = True
else:
if l.startswith("\tat "):
before_ats = False
res.append(l)
elif not before_ats:
if not l.strip():
break
return res
def match_stack(stack):
"""
Search EDT stack for known freeze described in FRAME_SEQ_TO_TICKET
:param stack:
:return:
"""
if stack is not None:
messages = set()
for rule in FRAME_SEQ_TO_TICKET:
message = rule.is_matched(stack)
if message:
messages.add(message)
if messages:
return messages
custom = process_custom(stack)
if custom is not None:
return {custom}
else:
return set()
else:
return set()
class ThreadDumpInfo:
def __init__(self, file_name, messages, lines):
self.file_name = file_name
self.messages = messages
self.lines = lines
def process_thread_dump(file_name, lines):
"""
:param file_name:
:param lines: List[str] param
:return: ThreadDumpInfo
"""
edt_stack = extract_edt_call_stack(lines)
dump_info = parse_dump_file(lines)
messages = match_stack(dump_info)
return ThreadDumpInfo(file_name, messages, edt_stack)
def process_file(file_name):
with open(file_name) as f:
try:
readlines = f.readlines()
except (IOError, UnicodeError):
readlines = [] # will be reported as "UNKNOWN"
return process_thread_dump(file_name, readlines)
def get_summary(infos):
all_tickets = defaultdict(int)
detailed = []
unknown = []
for info in infos:
if not info.messages:
unknown.append(info.file_name)
detailed.append(
info.file_name + ": " + (", ".join(info.messages) if info.messages else "UNKNOWN") +
"\n" +
("" if info.messages else ("\n" + "".join(info.lines) + "\n"))
)
for t in info.messages:
all_tickets[t] += 1
return "All found tickets:\n{}\nUnknown traces ({}):\n{}".format(
tickets_to_string(all_tickets),
len(unknown),
"".join(detailed))
def tickets_to_string(all_tickets):
return "\n".join(
" {}: {}".format(t, u) for t, u in sorted(all_tickets.items(), key=lambda k: (k[1], k[0]), reverse=True))
def collect_files(arg):
if os.path.isfile(arg):
if not os.path.basename(arg).startswith("."):
yield arg
elif os.path.isdir(arg):
for folder_name, subfolders, filenames in os.walk(arg):
if ".git" in folder_name:
continue
if filenames and 'threadDumps-freeze-20' in folder_name:
# assume all freezes in this folder have the same cause
yield folder_name + '/' + filenames[0]
else:
for f in filenames:
if not os.path.basename(f).startswith("."):
yield folder_name + '/' + f
else:
raise ValueError("Invalid file or folder: " + str(arg))
def parse_args_and_process_files(given_filenames):
filenames = [f for arg in given_filenames for f in collect_files(arg)]
infos = [process_file(f) for f in filenames]
return get_summary(infos)
def write_to_otuput_dir(summary):
output_dir = os.path.join(os.path.dirname(__file__), "out")
if not os.path.exists(output_dir):
os.makedirs(output_dir)
result_file = "result-" + time.strftime("%Y%m%d-%H%M%S") + ".txt"
with open(os.path.join(output_dir, result_file), "w") as out_file:
out_file.writelines(summary)
def split_reports(filename, out_dir):
with open(filename) as f:
text = f.read()
parts = text.split('==========')
if os.path.exists(out_dir):
shutil.rmtree(out_dir)
os.makedirs(out_dir)
for id, dump in zip(parts[0::2], parts[1::2]):
with open(f'{out_dir}/{id.strip()}.txt', 'w+') as out_file:
out_file.write(dump.replace('com.intellij.diagnostic.Freeze', 'AWT-EventQueue-0\njava.lang.Thread.State: RUNNABLE'))
def main():
if len(sys.argv) < 2:
print_usage()
else:
program_arguments = sys.argv[1:]
if program_arguments[0] == '--split':
split_reports(program_arguments[1], program_arguments[2])
return
summary = parse_args_and_process_files(filter(lambda a: a != '-o', program_arguments))
print(summary)
if '-o' in program_arguments:
write_to_otuput_dir(summary)
if __name__ == '__main__':
main()