forked from elastic/ecs
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathecs_helpers.py
More file actions
279 lines (214 loc) · 8.66 KB
/
ecs_helpers.py
File metadata and controls
279 lines (214 loc) · 8.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import glob
import os
import yaml
import git
import pathlib
from typing import (
Any,
Dict,
List,
Optional,
OrderedDict,
Set,
Union,
)
import warnings
from collections import OrderedDict
from copy import deepcopy
from ecs_types import (
Field,
FieldEntry,
FieldNestedEntry,
)
# Dictionary helpers
def dict_copy_keys_ordered(dct: Field, copied_keys: List[str]) -> Field:
ordered_dict = OrderedDict()
for key in copied_keys:
if key in dct:
ordered_dict[key] = dct[key]
return ordered_dict
def dict_copy_existing_keys(source: Field, destination: Field, keys: List[str]) -> None:
for key in keys:
if key in source:
destination[key] = source[key]
def dict_sorted_by_keys(dct: FieldNestedEntry, sort_keys: List[str]) -> List[FieldNestedEntry]:
if not isinstance(sort_keys, list):
sort_keys = [sort_keys]
tuples: List[List[Union[int, str, FieldNestedEntry]]] = []
for key in dct:
nested = dct[key]
sort_criteria = []
for sort_key in sort_keys:
sort_criteria.append(nested[sort_key])
sort_criteria.append(nested)
tuples.append(sort_criteria)
return list(map(lambda t: t[-1], sorted(tuples)))
def ordered_dict_insert(
dct: Field,
new_key: str, new_value: Union[str, bool],
before_key: Optional[str] = None,
after_key: Optional[str] = None
) -> None:
output = OrderedDict()
inserted: bool = False
for key, value in dct.items():
if not inserted and before_key is not None and key == before_key:
output[new_key] = new_value
inserted = True
output[key] = value
if not inserted and after_key is not None and key == after_key:
output[new_key] = new_value
inserted = True
if not inserted:
output[new_key] = new_value
dct.clear()
for key, value in output.items():
dct[key] = value
def safe_merge_dicts(a: Dict[Any, Any], b: Dict[Any, Any]) -> Dict[Any, Any]:
"""Merges two dictionaries into one. If duplicate keys are detected a ValueError is raised."""
c = deepcopy(a)
for key in b:
if key not in c:
c[key] = b[key]
else:
raise ValueError('Duplicate key found when merging dictionaries: {0}'.format(key))
return c
def fields_subset(subset, fields):
retained_fields = {}
allowed_options = ['fields']
for key, val in subset.items():
for option in val:
if option not in allowed_options:
raise ValueError('Unsupported option found in subset: {}'.format(option))
# A missing fields key is shorthand for including all subfields
if 'fields' not in val or val['fields'] == '*':
retained_fields[key] = fields[key]
elif isinstance(val['fields'], dict):
# Copy the full field over so we get all the options, then replace the 'fields' with the right subset
retained_fields[key] = fields[key]
retained_fields[key]['fields'] = fields_subset(val['fields'], fields[key]['fields'])
return retained_fields
def yaml_ordereddict(dumper, data):
# YAML representation of an OrderedDict will be like a dictionary, but
# respecting the order of the dictionary.
# Almost sure it's unndecessary with Python 3.
value = []
for item_key, item_value in data.items():
node_key = dumper.represent_data(item_key)
node_value = dumper.represent_data(item_value)
value.append((node_key, node_value))
return yaml.nodes.MappingNode(u'tag:yaml.org,2002:map', value)
yaml.add_representer(OrderedDict, yaml_ordereddict)
def dict_clean_string_values(dict: Dict[Any, Any]) -> None:
"""Remove superfluous spacing in all field values of a dict"""
for key in dict:
value = dict[key]
if isinstance(value, str):
dict[key] = value.strip()
# File helpers
YAML_EXT = {'yml', 'yaml'}
def is_yaml(path: str) -> bool:
"""Returns True if path matches an element of the yaml extensions set"""
return set(path.split('.')[1:]).intersection(YAML_EXT) != set()
def safe_list(o: Union[str, List[str]]) -> List[str]:
"""converts o to a list if it isn't already a list"""
if isinstance(o, list):
return o
else:
return o.split(',')
def glob_yaml_files(paths: List[str]) -> List[str]:
"""Accepts string, or list representing a path, wildcard or folder. Returns list of matched yaml files"""
all_files: List[str] = []
for path in safe_list(paths):
if is_yaml(path):
all_files.extend(glob.glob(path))
else:
for t in YAML_EXT:
all_files.extend(glob.glob(os.path.join(path, '*.' + t)))
return sorted(all_files)
def get_tree_by_ref(ref: str) -> git.objects.tree.Tree:
repo: git.repo.base.Repo = git.Repo(os.getcwd())
commit: git.objects.commit.Commit = repo.commit(ref)
return commit.tree
def path_exists_in_git_tree(tree: git.objects.tree.Tree, file_path: str) -> bool:
try:
_ = tree[file_path]
except KeyError:
return False
return True
def usage_doc_files() -> List[str]:
usage_docs_dir: str = os.path.join(os.path.dirname(__file__), '../../docs/reference')
usage_docs_path: pathlib.PosixPath = pathlib.Path(usage_docs_dir)
if usage_docs_path.is_dir():
return [x.name for x in usage_docs_path.glob('ecs-*-usage.md') if x.is_file()]
return []
def ecs_files() -> List[str]:
"""Return the schema file list to load"""
schema_glob: str = os.path.join(os.path.dirname(__file__), '../../schemas/*.yml')
return sorted(glob.glob(schema_glob))
def make_dirs(path: str) -> None:
try:
os.makedirs(path, exist_ok=True)
except OSError as e:
print('Unable to create output directory: {}'.format(e))
raise e
def yaml_dump(
filename: str,
data: Dict[str, FieldNestedEntry],
preamble: Optional[str] = None
) -> None:
with open(filename, 'w') as outfile:
if preamble:
outfile.write(preamble)
yaml.dump(data, outfile, default_flow_style=False, allow_unicode=True)
def yaml_load(filename: str) -> Set[str]:
with open(filename) as f:
return yaml.safe_load(f.read())
# List helpers
def list_subtract(original: List[Any], subtracted: List[Any]) -> List[Any]:
"""Subtract two lists. original = subtracted"""
return [item for item in original if item not in subtracted]
def list_extract_keys(lst: List[Field], key_name: str) -> List[str]:
"""Returns an array of values for 'key_name', from a list of dictionaries"""
acc = []
for d in lst:
acc.append(d[key_name])
return acc
# Helpers for the deeply nested fields structure
def is_intermediate(field: FieldEntry) -> bool:
"""Encapsulates the check to see if a field is an intermediate field or a "real" field."""
return ('intermediate' in field['field_details'] and field['field_details']['intermediate'])
def remove_top_level_reusable_false(ecs_nested: Dict[str, FieldNestedEntry]) -> Dict[str, FieldNestedEntry]:
"""Returns same structure as ecs_nested, but skips all field sets with reusable.top_level: False"""
components: Dict[str, FieldNestedEntry] = {}
for (fieldset_name, fieldset) in ecs_nested.items():
if fieldset.get('reusable', None):
if not fieldset['reusable']['top_level']:
continue
components[fieldset_name] = fieldset
return components
# Warning helper
def strict_warning(msg: str) -> None:
"""Call warnings.warn(msg) for operations that would throw an Exception
if operating in `--strict` mode. Allows a custom message to be passed.
:param msg: custom text which will be displayed with wrapped boilerplate
for strict warning messages.
"""
warn_message: str = f"{msg}\n\nThis will cause an exception when running in strict mode.\nWarning check:"
warnings.warn(warn_message, stacklevel=3)