Skip to content

Commit ca9e618

Browse files
committed
Timetravel WIP
1 parent 40cdbb7 commit ca9e618

File tree

1 file changed

+128
-100
lines changed

1 file changed

+128
-100
lines changed

runner.py

Lines changed: 128 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -128,14 +128,14 @@ def __init__(self,
128128
# transient variables that are created by the runner itself
129129
# these are accessed and processed on cleanup and then reset
130130
# They are __ as they should not be changed because this could break the state of the runner
131-
self.__stdout_logs = {}
131+
self.__stdout_logs = OrderedDict()
132132
self.__containers = {}
133133
self.__networks = []
134134
self.__ps_to_kill = []
135135
self.__ps_to_read = []
136136
self.__metric_providers = []
137137
self.__notes_helper = Notes()
138-
self.__phases = {}
138+
self.__phases = OrderedDict()
139139
self.__start_measurement = None
140140
self.__end_measurement = None
141141
self.__services_to_pause_phase = {}
@@ -974,17 +974,6 @@ def setup_services(self):
974974
if ps.stderr:
975975
self.add_to_log(container_name, f"stderr {ps.stderr}", d_command)
976976

977-
# Obsolete warnings. But left in, cause reasoning for NotImplementedError still holds
978-
# elif el['type'] == 'Dockerfile':
979-
# raise NotImplementedError('Green Metrics Tool can currently not consume Dockerfiles. \
980-
# This will be a premium feature, as it creates a lot of server usage and thus slows down \
981-
# Tests per Minute for our server.')
982-
# elif el['type'] == 'Docker-Compose':
983-
# raise NotImplementedError('Green Metrics Tool will not support that, because we wont support \
984-
# all features from docker compose, like for instance volumes and binding arbitrary directories')
985-
# else:
986-
# raise RuntimeError('Unknown type detected in setup: ', el.get('type', None))
987-
988977
print(TerminalColors.HEADER, '\nCurrent known containers: ', self.__containers, TerminalColors.ENDC)
989978

990979
def get_logs(self):
@@ -1069,87 +1058,120 @@ def end_phase(self, phase):
10691058
def run_flows(self):
10701059
config = GlobalConfig().config
10711060
# run the flows
1072-
for el in self._usage_scenario['flow']:
1073-
print(TerminalColors.HEADER, '\nRunning flow: ', el['name'], TerminalColors.ENDC)
1074-
1075-
self.start_phase(el['name'].replace('[', '').replace(']',''), transition=False)
1061+
ps_to_kill_tmp = []
1062+
ps_to_read_tmp = []
1063+
flow_id = 0
1064+
flows_len = len(self._usage_scenario['flow'])
1065+
while flow_id < flows_len:
1066+
flow = self._usage_scenario['flow'][flow_id]
1067+
ps_to_kill_tmp = []
1068+
ps_to_read_tmp = []
1069+
print(TerminalColors.HEADER, '\nRunning flow: ', flow['name'], TerminalColors.ENDC)
10761070

1077-
for inner_el in el['commands']:
1078-
if 'note' in inner_el:
1079-
self.__notes_helper.add_note({'note': inner_el['note'], 'detail_name': el['container'], 'timestamp': int(time.time_ns() / 1_000)})
1071+
try:
1072+
self.start_phase(flow['name'].replace('[', '').replace(']',''), transition=False)
10801073

1081-
if inner_el['type'] == 'console':
1082-
print(TerminalColors.HEADER, '\nConsole command', inner_el['command'], 'on container', el['container'], TerminalColors.ENDC)
1074+
for cmd_obj in flow['commands']:
1075+
if 'note' in cmd_obj:
1076+
self.__notes_helper.add_note({'note': cmd_obj['note'], 'detail_name': flow['container'], 'timestamp': int(time.time_ns() / 1_000)})
10831077

1084-
docker_exec_command = ['docker', 'exec']
1078+
if cmd_obj['type'] == 'console':
1079+
print(TerminalColors.HEADER, '\nConsole command', cmd_obj['command'], 'on container', flow['container'], TerminalColors.ENDC)
10851080

1086-
docker_exec_command.append(el['container'])
1087-
if shell := inner_el.get('shell', False):
1088-
docker_exec_command.append(shell)
1089-
docker_exec_command.append('-c')
1090-
docker_exec_command.append(inner_el['command'])
1091-
else:
1092-
for cmd in inner_el['command'].split():
1093-
docker_exec_command.append(cmd)
1094-
1095-
# Note: In case of a detach wish in the usage_scenario.yml:
1096-
# We are NOT using the -d flag from docker exec, as this prohibits getting the stdout.
1097-
# Since Popen always make the process asynchronous we can leverage this to emulate a detached
1098-
# behavior
1099-
1100-
stderr_behaviour = stdout_behaviour = subprocess.DEVNULL
1101-
if inner_el.get('log-stdout', False):
1102-
stdout_behaviour = subprocess.PIPE
1103-
if inner_el.get('log-stderr', True):
1104-
stderr_behaviour = subprocess.PIPE
1105-
1106-
1107-
if inner_el.get('detach', False) is True:
1108-
print('Process should be detached. Running asynchronously and detaching ...')
1109-
#pylint: disable=consider-using-with
1110-
ps = subprocess.Popen(
1111-
docker_exec_command,
1112-
stderr=stderr_behaviour,
1113-
stdout=stdout_behaviour,
1114-
encoding='UTF-8',
1115-
)
1116-
if stderr_behaviour == subprocess.PIPE:
1117-
os.set_blocking(ps.stderr.fileno(), False)
1118-
if stdout_behaviour == subprocess.PIPE:
1119-
os.set_blocking(ps.stdout.fileno(), False)
1081+
docker_exec_command = ['docker', 'exec']
11201082

1121-
self.__ps_to_kill.append({'ps': ps, 'cmd': inner_el['command'], 'ps_group': False})
1122-
else:
1123-
print(f"Process should be synchronous. Alloting {config['measurement']['flow-process-runtime']}s runtime ...")
1124-
ps = subprocess.run(
1125-
docker_exec_command,
1126-
stderr=stderr_behaviour,
1127-
stdout=stdout_behaviour,
1128-
encoding='UTF-8',
1129-
check=False, # cause it will be checked later and also ignore-errors checked
1130-
timeout=config['measurement']['flow-process-runtime'],
1131-
)
1083+
docker_exec_command.append(flow['container'])
1084+
if shell := cmd_obj.get('shell', False):
1085+
docker_exec_command.append(shell)
1086+
docker_exec_command.append('-c')
1087+
docker_exec_command.append(cmd_obj['command'])
1088+
else:
1089+
for cmd in cmd_obj['command'].split():
1090+
docker_exec_command.append(cmd)
1091+
1092+
# Note: In case of a detach wish in the usage_scenario.yml:
1093+
# We are NOT using the -d flag from docker exec, as this prohibits getting the stdout.
1094+
# Since Popen always make the process asynchronous we can leverage this to emulate a detached
1095+
# behavior
1096+
1097+
stderr_behaviour = stdout_behaviour = subprocess.DEVNULL
1098+
if cmd_obj.get('log-stdout', False):
1099+
stdout_behaviour = subprocess.PIPE
1100+
if cmd_obj.get('log-stderr', True):
1101+
stderr_behaviour = subprocess.PIPE
1102+
1103+
1104+
if cmd_obj.get('detach', False) is True:
1105+
print('Process should be detached. Running asynchronously and detaching ...')
1106+
#pylint: disable=consider-using-with
1107+
ps = subprocess.Popen(
1108+
docker_exec_command,
1109+
stderr=stderr_behaviour,
1110+
stdout=stdout_behaviour,
1111+
encoding='UTF-8',
1112+
)
1113+
if stderr_behaviour == subprocess.PIPE:
1114+
os.set_blocking(ps.stderr.fileno(), False)
1115+
if stdout_behaviour == subprocess.PIPE:
1116+
os.set_blocking(ps.stdout.fileno(), False)
1117+
1118+
ps_to_kill_tmp.append({'ps': ps, 'cmd': cmd_obj['command'], 'ps_group': False})
1119+
else:
1120+
print(f"Process should be synchronous. Alloting {config['measurement']['flow-process-runtime']}s runtime ...")
1121+
ps = subprocess.run(
1122+
docker_exec_command,
1123+
stderr=stderr_behaviour,
1124+
stdout=stdout_behaviour,
1125+
encoding='UTF-8',
1126+
check=False, # cause it will be checked later and also ignore-errors checked
1127+
timeout=config['measurement']['flow-process-runtime'],
1128+
)
1129+
1130+
ps_to_read_tmp.append({
1131+
'cmd': docker_exec_command,
1132+
'ps': ps,
1133+
'container_name': flow['container'],
1134+
'read-notes-stdout': cmd_obj.get('read-notes-stdout', False),
1135+
'ignore-errors': cmd_obj.get('ignore-errors', False),
1136+
'read-sci-stdout': cmd_obj.get('read-sci-stdout', False),
1137+
'detail_name': flow['container'],
1138+
'detach': cmd_obj.get('detach', False),
1139+
})
11321140

1133-
self.__ps_to_read.append({
1134-
'cmd': docker_exec_command,
1135-
'ps': ps,
1136-
'container_name': el['container'],
1137-
'read-notes-stdout': inner_el.get('read-notes-stdout', False),
1138-
'ignore-errors': inner_el.get('ignore-errors', False),
1139-
'read-sci-stdout': inner_el.get('read-sci-stdout', False),
1140-
'detail_name': el['container'],
1141-
'detach': inner_el.get('detach', False),
1142-
})
11431141

1142+
else:
1143+
raise RuntimeError('Unknown command type in flow: ', cmd_obj['type'])
11441144

1145-
else:
1146-
raise RuntimeError('Unknown command type in flow: ', inner_el['type'])
1145+
if self._debugger.active:
1146+
self._debugger.pause('Waiting to start next command in flow')
11471147

1148-
if self._debugger.active:
1149-
self._debugger.pause('Waiting to start next command in flow')
1148+
self.end_phase(flow['name'].replace('[', '').replace(']',''))
1149+
self.__ps_to_kill += ps_to_kill_tmp
1150+
self.__ps_to_read += ps_to_read_tmp # will otherwise be discarded, bc they confuse execption handling
1151+
self.check_process_returncodes()
1152+
flow_id += 1
1153+
except BaseException as exc:
1154+
print('Exception occured: ', exc)
1155+
print(TerminalColors.OKCYAN, '\nWhat do you want to do?\n1 -- Restart current flow\n2 -- Restart all flows\n3 -- Reload containers and restart flows\n0 / CTRL+C -- Abort', TerminalColors.ENDC)
1156+
value = sys.stdin.readline().strip()
1157+
1158+
self.__ps_to_read = [] # clear, so we do not read old processes
1159+
if ps_to_kill_tmp:
1160+
print('Trying to kill detached processes of current flow')
1161+
process_helpers.kill_ps(ps_to_kill_tmp)
1162+
1163+
if value == '0':
1164+
raise KeyboardInterrupt("Manual abort")
1165+
if value == '1':
1166+
self.__phases.pop(flow['name'])
1167+
if value == '2':
1168+
self.__phases = OrderedDict()
1169+
if value == '3':
1170+
self.cleanup(inline=True)
1171+
self.setup_networks()
1172+
self.setup_services()
1173+
flow_id = 0
11501174

1151-
self.end_phase(el['name'].replace('[', '').replace(']',''))
1152-
self.check_process_returncodes()
11531175

11541176
# this function should never be called twice to avoid double logging of metrics
11551177
def stop_metric_providers(self):
@@ -1244,12 +1266,12 @@ def store_phases(self):
12441266
# internally PostgreSQL stores JSON ordered. This means our name-indexed dict will get
12451267
# re-ordered. Therefore we change the structure and make it a list now.
12461268
# We did not make this before, as we needed the duplicate checking of dicts
1247-
self.__phases = list(self.__phases.values())
1269+
phases = list(self.__phases.values())
12481270
DB().query("""
12491271
UPDATE runs
12501272
SET phases=%s
12511273
WHERE id = %s
1252-
""", params=(json.dumps(self.__phases), self._run_id))
1274+
""", params=(json.dumps(phases), self._run_id))
12531275

12541276
def read_container_logs(self):
12551277
print(TerminalColors.HEADER, '\nCapturing container logs', TerminalColors.ENDC)
@@ -1297,44 +1319,50 @@ def save_stdout_logs(self):
12971319
""", params=(logs_as_str, self._run_id))
12981320

12991321

1300-
def cleanup(self):
1322+
def cleanup(self, inline=False):
13011323
#https://github.com/green-coding-berlin/green-metrics-tool/issues/97
13021324
print(TerminalColors.OKCYAN, '\nStarting cleanup routine', TerminalColors.ENDC)
13031325

1304-
print('Stopping metric providers')
1305-
for metric_provider in self.__metric_providers:
1306-
metric_provider.stop_profiling()
1326+
if inline is False:
1327+
print('Stopping metric providers')
1328+
for metric_provider in self.__metric_providers:
1329+
metric_provider.stop_profiling()
1330+
self.__metric_providers = []
1331+
13071332

13081333
print('Stopping containers')
13091334
for container_id in self.__containers:
13101335
subprocess.run(['docker', 'rm', '-f', container_id], check=True, stderr=subprocess.DEVNULL)
1336+
self.__containers = {}
13111337

13121338
print('Removing network')
13131339
for network_name in self.__networks:
13141340
# no check=True, as the network might already be gone. We do not want to fail here
13151341
subprocess.run(['docker', 'network', 'rm', network_name], stderr=subprocess.DEVNULL, check=False)
1342+
self.__networks = []
13161343

1317-
if not self._no_file_cleanup:
1344+
if inline is False or not self._no_file_cleanup:
13181345
print('Removing files')
13191346
subprocess.run(['rm', '-Rf', self._tmp_folder], stderr=subprocess.DEVNULL, check=True)
13201347

1321-
self.remove_docker_images()
1348+
if inline is False:
1349+
self.remove_docker_images()
13221350

13231351
process_helpers.kill_ps(self.__ps_to_kill)
1324-
print(TerminalColors.OKBLUE, '-Cleanup gracefully completed', TerminalColors.ENDC)
1325-
1326-
self.__notes_helper = Notes()
1327-
self.__containers = {}
1328-
self.__networks = []
13291352
self.__ps_to_kill = []
13301353
self.__ps_to_read = []
1331-
self.__metric_providers = []
1332-
self.__phases = {}
1333-
self.__start_measurement = None
1354+
1355+
if inline is False:
1356+
self.__start_measurement = None
1357+
self.__notes_helper = Notes()
1358+
1359+
self.__phases = OrderedDict()
13341360
self.__end_measurement = None
13351361
self.__join_default_network = False
13361362
#self.__filename = self._original_filename # # we currently do not use this variable
13371363

1364+
print(TerminalColors.OKBLUE, '-Cleanup gracefully completed', TerminalColors.ENDC)
1365+
13381366
def run(self):
13391367
'''
13401368
The run function is just a wrapper for the intended sequential flow of a GMT run.

0 commit comments

Comments
 (0)