Skip to content
This repository was archived by the owner on Jul 16, 2024. It is now read-only.

Commit f2b3bff

Browse files
author
James Bell
authored
Merge pull request #74 from lemoney/release/2.3.0
Release/2.3.0
2 parents 378d11f + f67d131 commit f2b3bff

File tree

4 files changed

+147
-48
lines changed

4 files changed

+147
-48
lines changed

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
setup(
66
name='tgt_grease',
7-
version='2.2.6',
7+
version='2.3.0',
88
license="MIT",
99
description='Modern distributed automation engine built with love by Target',
1010
long_description="""

tgt_grease/core/Types/Command.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from datetime import datetime
44
import sys
55
import os
6+
import traceback
67

78

89
class Command(object):
@@ -107,12 +108,13 @@ def safe_execute(self, context=None):
107108
except BaseException:
108109
self.exec_data['execVal'] = False
109110
exc_type, exc_obj, exc_tb = sys.exc_info()
111+
tb = traceback.format_exception(exc_type, exc_obj, exc_tb)
110112
self.ioc.getLogger().error(
111113
"Failed to execute [{0}] execute got exception!".format(self.__class__.__name__),
112114
additional={
113-
'file': os.path.split(exc_tb.tb_frame.f_code.co_filename)[1],
114-
'type': exc_type,
115-
'line': exc_tb.tb_lineno
115+
'file': os.path.split(str(str(tb[2]).split('"')[1]))[1],
116+
'type': str(exc_type),
117+
'line': str(str(tb[2]).split(",")[1]).split(' ')[2]
116118
}
117119
)
118120
except:

tgt_grease/core/Types/tests/test_command.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from unittest import TestCase
22
from tgt_grease.core.Types import Command
33
from tgt_grease.core import GreaseContainer
4+
import sys
45

56

67
class TestCmd(Command):
@@ -70,7 +71,33 @@ def test_failed_cmd(self):
7071
self.assertFalse(cmd.getRetVal())
7172

7273
def test_except_cmd(self):
74+
def msg(m, additional=None):
75+
if additional is None:
76+
additional = {}
77+
self.assertEqual(
78+
m,
79+
"Failed to execute [TestCmdExcept] execute got exception!"
80+
)
81+
if sys.version_info[0] < 3:
82+
self.assertDictEqual(
83+
additional,
84+
{
85+
'file': 'test_command.py',
86+
'type': "<type 'exceptions.AttributeError'>",
87+
'line': '34'
88+
}
89+
)
90+
else:
91+
self.assertDictEqual(
92+
additional,
93+
{
94+
'file': 'test_command.py',
95+
'type': "<class 'AttributeError'>",
96+
'line': '34'
97+
}
98+
)
7399
cmd = TestCmdExcept()
100+
cmd.ioc._logger.error = msg
74101
cmd.safe_execute({})
75102
self.assertFalse(cmd.getExecVal())
76103
self.assertFalse(cmd.getRetVal())

tgt_grease/enterprise/Model/Scanning.py

Lines changed: 114 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
from .BaseSource import BaseSourceClass
55
from .DeDuplication import Deduplication
66
from .CentralScheduling import Scheduling
7+
import threading
8+
from psutil import cpu_percent, virtual_memory
79
from uuid import uuid4
810

911

@@ -54,60 +56,128 @@ def Parse(self, source=None, config=None):
5456
"""
5557
self.ioc.getLogger().trace("Starting Parse of Environment", trace=True)
5658
Configuration = self.generate_config_set(source=source, config=config)
57-
for conf in Configuration:
59+
ScanPool = []
60+
lenConfigs = len(Configuration)
61+
i = 0
62+
while i < lenConfigs:
63+
# ensure we don't swamp the system resources
64+
cpu = cpu_percent(interval=.1)
65+
mem = virtual_memory().percent
66+
if \
67+
cpu >= int(self.ioc.getConfig().get('NodeInformation', 'ResourceMax')) or \
68+
mem >= int(self.ioc.getConfig().get('NodeInformation', 'ResourceMax')):
69+
self.ioc.getLogger().trace("Scan sleeping; System resource maximum reached", verbose=True)
70+
# remove variables
71+
del cpu
72+
del mem
73+
continue
74+
conf = Configuration[i]
75+
i += 1
76+
# ensure no kafka prototypes come into sourcing
5877
if conf.get('source') == 'kafka':
5978
continue
79+
# ensure there is an execution environment
80+
server, _ = self.scheduler.determineExecutionServer(conf.get('exe_env', 'general'))
81+
if not server:
82+
self.ioc.getLogger().warning(
83+
'configuration skipped -- execution environment offline',
84+
additional={
85+
'execution_environment': conf.get('exe_env', 'general'),
86+
'configuration': conf.get('name')
87+
},
88+
notify=True
89+
)
90+
continue
6091
inst = self.impTool.load(conf.get('source', str(uuid4())))
6192
if not isinstance(inst, BaseSourceClass):
6293
self.ioc.getLogger().error("Invalid Source [{0}]".format(conf.get('source')), notify=False)
6394
del inst
6495
continue
6596
else:
66-
try:
67-
# If mock mode enabled
68-
if self.ioc.getConfig().get('Sourcing', 'mock'):
69-
data = inst.mock_data(conf)
70-
# else actually do sourcing
71-
else:
72-
if inst.parse_source(conf):
73-
# deduplicate data
74-
data = self.dedup.Deduplicate(
75-
data=inst.get_data(),
76-
source=conf.get('source'),
77-
configuration=conf.get('name', str(uuid4())),
78-
threshold=inst.deduplication_strength,
79-
expiry_hours=inst.deduplication_expiry,
80-
expiry_max=inst.deduplication_expiry_max,
81-
collection='Dedup_Sourcing',
82-
field_set=inst.field_set
83-
)
84-
else:
85-
self.ioc.getLogger().warning(
86-
"Source [{0}] parsing failed".format(conf.get('source')),
87-
notify=False
88-
)
89-
data = []
90-
if len(data) > 0:
91-
if self.scheduler.scheduleDetection(conf.get('source'), conf.get('name'), data):
92-
self.ioc.getLogger().info(
93-
"Data scheduled for detection from source [{0}]".format(conf.get('source')),
94-
trace=True
95-
)
96-
del inst
97-
continue
98-
else:
99-
self.ioc.getLogger().error("Scheduling failed for source document!", notify=False)
100-
del inst
101-
continue
102-
else:
103-
self.ioc.getLogger().trace("Length of data was empty; was not scheduled", trace=True)
104-
del inst
105-
continue
106-
except BaseException as e:
107-
self.ioc.getLogger().error("Failed parsing message got exception! Configuration [{0}] Got [{1}]".format(conf, e))
108-
continue
97+
t = threading.Thread(
98+
target=self.ParseSource,
99+
args=(
100+
self.ioc,
101+
inst,
102+
conf,
103+
self.dedup,
104+
self.scheduler,
105+
),
106+
name="GREASE SOURCING THREAD [{0}]".format(conf.get('name'))
107+
)
108+
t.daemon = True
109+
t.start()
110+
ScanPool.append(t)
111+
# wait for threads to finish out
112+
while len(ScanPool) > 0:
113+
self.ioc.getLogger().trace("Total current scan threads [{0}]".format(len(ScanPool)), trace=True)
114+
threads_final = []
115+
for thread in ScanPool:
116+
if thread.isAlive():
117+
threads_final.append(thread)
118+
ScanPool = threads_final
119+
self.ioc.getLogger().trace("Total current scan threads [{0}]".format(len(ScanPool)), trace=True)
120+
self.ioc.getLogger().trace("Scanning Complete".format(len(ScanPool)), trace=True)
109121
return True
110122

123+
@staticmethod
124+
def ParseSource(ioc, source, configuration, deduplication, scheduler):
125+
"""Parses an individual source and attempts to schedule it
126+
127+
Args:
128+
ioc (GreaseContainer): IoC Instance
129+
source (BaseSourceClass): Source to parse
130+
configuration (dict): Prototype configuration to use
131+
deduplication (Deduplication): Dedup engine instance
132+
scheduler (Scheduling): Central Scheduling instance
133+
134+
Returns:
135+
None: Meant to be run in a thread
136+
137+
"""
138+
try:
139+
# If mock mode enabled
140+
if ioc.getConfig().get('Sourcing', 'mock'):
141+
data = source.mock_data(configuration)
142+
# else actually do sourcing
143+
else:
144+
if source.parse_source(configuration):
145+
# deduplicate data
146+
data = deduplication.Deduplicate(
147+
data=source.get_data(),
148+
source=configuration.get('source'),
149+
configuration=configuration.get('name', str(uuid4())),
150+
threshold=source.deduplication_strength,
151+
expiry_hours=source.deduplication_expiry,
152+
expiry_max=source.deduplication_expiry_max,
153+
collection='Dedup_Sourcing',
154+
field_set=source.field_set
155+
)
156+
else:
157+
ioc.getLogger().warning(
158+
"Source [{0}] parsing failed".format(configuration.get('source')),
159+
notify=False
160+
)
161+
data = []
162+
if len(data) > 0:
163+
if scheduler.scheduleDetection(configuration.get('source'), configuration.get('name'), data):
164+
ioc.getLogger().info(
165+
"Data scheduled for detection from source [{0}]".format(configuration.get('source')),
166+
trace=True
167+
)
168+
del source
169+
else:
170+
ioc.getLogger().error("Scheduling failed for source document!", notify=False)
171+
del source
172+
else:
173+
ioc.getLogger().trace("Length of data was empty; was not scheduled", trace=True)
174+
del source
175+
except BaseException as e:
176+
ioc.getLogger().error(
177+
"Failed parsing message got exception! Configuration [{0}] Got [{1}]".format(configuration, e)
178+
)
179+
del source
180+
111181
def generate_config_set(self, source=None, config=None):
112182
"""Examines configuration and returns list of configs to parse
113183

0 commit comments

Comments
 (0)