Skip to content

Commit 4b0e27b

Browse files
committed
update AUTOMATION ML
1 parent 24be237 commit 4b0e27b

File tree

3 files changed

+166
-12
lines changed

3 files changed

+166
-12
lines changed

accounts/utils/__init__.py

Whitespace-only changes.

accounts/utils/xml_to_yaml.py

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import xml.etree.ElementTree as ET
2+
3+
4+
def workflow_type_parser(root):
5+
workflow_definition_element = root.find('.//{http://www.dke.de/CAEX}InternalElement[@Name="Workflow_definition"]')
6+
if workflow_definition_element is not None:
7+
value_element = workflow_definition_element.find('.//{http://www.dke.de/CAEX}Value')
8+
if value_element is not None:
9+
workflow_definition_value = value_element.text
10+
return workflow_definition_value
11+
raise Exception("Workflow_definition element not found")
12+
13+
def inputs_parser(root):
14+
namespace = {"caex": "http://www.dke.de/CAEX"}
15+
inputs_element = root.find('.//caex:InternalElement[@Name="Inputs"]', namespaces=namespace)
16+
inputs_data = {}
17+
18+
if inputs_element is not None:
19+
for internal_element in inputs_element.findall('.//caex:InternalElement', namespaces=namespace):
20+
internal_element_name = internal_element.get('Name')
21+
attribute_source = internal_element.find('.//caex:Attribute[@Name="server"]/caex:Value', namespaces=namespace)
22+
attribute_destination = internal_element.find('.//caex:Attribute[@Name="path"]/caex:Value', namespaces=namespace)
23+
24+
if attribute_source is not None and attribute_destination is not None:
25+
source_value = attribute_source.text if attribute_source.text is not None else ''
26+
destination_value = attribute_destination.text if attribute_destination.text is not None else ''
27+
28+
inputs_data[internal_element_name] = [
29+
{'server': source_value},
30+
{'path': destination_value}
31+
]
32+
33+
return inputs_data
34+
35+
36+
37+
def outputs_parser(root):
38+
namespace = {"caex": "http://www.dke.de/CAEX"}
39+
outputs_element = root.find('.//caex:InternalElement[@Name="Outputs"]', namespaces=namespace)
40+
outputs_data = {}
41+
42+
if outputs_element is not None:
43+
for internal_element in outputs_element.findall('.//caex:InternalElement', namespaces=namespace):
44+
internal_element_name = internal_element.get('Name')
45+
attribute_source = internal_element.find('.//caex:Attribute[@Name="path"]/caex:Value', namespaces=namespace)
46+
attribute_destination = internal_element.find('.//caex:Attribute[@Name="server"]/caex:Value', namespaces=namespace)
47+
48+
if attribute_source is not None and attribute_destination is not None:
49+
source_value = attribute_source.text if attribute_source.text is not None else ''
50+
destination_value = attribute_destination.text if attribute_destination.text is not None else ''
51+
52+
# Use the dynamic value as the key for outputs_data
53+
outputs_data[internal_element_name] = [
54+
{'path': source_value},
55+
{'server': destination_value},
56+
{'overwrite': True} # Assuming you always want to set overwrite to True
57+
]
58+
59+
return outputs_data
60+
61+
62+
def phases_parser(root):
63+
namespace = {"caex": "http://www.dke.de/CAEX"}
64+
phases_element = root.find('.//caex:InternalElement[@Name="Phases"]', namespaces=namespace)
65+
phases_data = {}
66+
67+
if phases_element is not None:
68+
for phase_element in phases_element.findall('.//caex:InternalElement[@Name="Phase"]', namespaces=namespace):
69+
phase_info = []
70+
# Extract information for the Phase
71+
namePhase = phase_element.find('.//caex:Attribute[@Name="name"]/caex:Value',
72+
namespaces=namespace).text
73+
74+
sequences=(phase_element.find('.//caex:Attribute[@Name="sequence"]/caex:Value',
75+
namespaces=namespace).text).split(">>")
76+
77+
for seq in sequences:
78+
for software_element in phase_element.findall('.//caex:InternalElement[@Name="Software"]',
79+
namespaces=namespace):
80+
if seq==software_element.find('.//caex:Attribute[@Name="type"]/caex:Value',namespaces=namespace).text:
81+
software_info = {}
82+
software_info['type'] = software_element.find('.//caex:Attribute[@Name="type"]/caex:Value',
83+
namespaces=namespace).text
84+
parameters_data = {}
85+
for parameter_element in software_element.findall('.//caex:InternalElement[@Name="Parameters"]',
86+
namespaces=namespace):
87+
for attribute_element in parameter_element.findall('.//caex:Attribute', namespaces=namespace):
88+
attribute_name = attribute_element.get('Name')
89+
attribute_value = attribute_element.find('./caex:Value', namespaces=namespace).text
90+
parameters_data[attribute_name] = attribute_value
91+
92+
software_info['arguments'] = parameters_data
93+
phase_info.append(software_info)
94+
phases_data[namePhase]=phase_info
95+
return phases_data
96+
97+
98+
def parameters_parser(root):
99+
namespace = {"caex": "http://www.dke.de/CAEX"}
100+
outputs_element = root.find('.//caex:InternalElement[@Name="Parameters"]', namespaces=namespace)
101+
parameters_data = {}
102+
103+
if outputs_element is not None:
104+
for internal_element in outputs_element.findall('.//caex:InternalElement', namespaces=namespace):
105+
internal_element_name = internal_element.get('Name')
106+
attribute_param = internal_element.find('.//caex:Attribute[@Name="param"]/caex:Value',
107+
namespaces=namespace)
108+
109+
if attribute_param is not None :
110+
param_value = attribute_param.text if attribute_param.text is not None else ''
111+
parameters_data[internal_element_name]=param_value
112+
return parameters_data
113+
114+
def workflow_parser(xml_file):
115+
tree = ET.parse(xml_file)
116+
root = tree.getroot()
117+
118+
workflow_data = {'workflow_type': None, 'phases': [], 'outputs': {}, 'inputs': {}, 'parameters': {}}
119+
120+
workflow_data['workflow_type'] = workflow_type_parser(root)
121+
workflow_data['inputs'] = inputs_parser(root)
122+
workflow_data['outputs'] = outputs_parser(root)
123+
workflow_data['phases'] = phases_parser(root)
124+
workflow_data['parameters'] = parameters_parser(root)
125+
return workflow_data
126+
127+
128+
def execution(path):
129+
try:
130+
result = workflow_parser(path)
131+
return result
132+
except Exception as e:
133+
print(f"Error: {e}")
134+
135+

accounts/views.py

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import configuration as cfg
2424
from rest_framework.authtoken.models import Token
2525
from login_register_project import settings
26+
from accounts.utils import xml_to_yaml
2627

2728
log = logging.getLogger(__name__)
2829

@@ -530,20 +531,31 @@ def __init__(self, request, name, numNodes, name_sim, execTime, qos, checkpoint_
530531
self.dOPTION = dOPTION
531532

532533
def run(self):
534+
extension = get_file_extension((self.name))
535+
if extension == ".yaml":
536+
workflow = read_and_write_yaml(self.name)
537+
else:
538+
workflow =xml_to_yaml.execution("documents/" + self.name)
539+
log.info("BEFORE OPENING")
540+
log.info(workflow.get("workflow_type"))
541+
log.info("DESCRIPTION WORKFLOW")
542+
log.info(workflow)
533543
machine_found = Machine.objects.get(id=self.request.session['machine_chosen'])
534544
fqdn = machine_found.fqdn
535545
machine_folder = extract_substring(fqdn)
536-
workflow = read_and_write(self.name)
537546
userMachine = machine_found.user
538547
workflow_name = workflow.get("workflow_type")
548+
log.info("workflow_name")
549+
log.info(workflow_name)
539550
principal_folder = machine_found.wdir
540551
wdirPath, nameWdir = wdir_folder(principal_folder)
541552
cmd1 = "source /etc/profile; mkdir -p " + principal_folder + "/" + nameWdir + "/workflows/; echo " + str(
542553
workflow) + " > " + principal_folder + "/" + nameWdir + "/workflows/" + str(
543554
self.name) + "; cd " + principal_folder + "; BACKUPDIR=$(ls -td ./*/ | head -1); echo EXECUTION_FOLDER:$BACKUPDIR;"
544555
ssh = connection(self.request.session["content"], machine_found.id)
545556
stdin, stdout, stderr = ssh.exec_command(cmd1)
546-
557+
log.info("COMMAND 1")
558+
log.info(cmd1)
547559
execution_folder = wdirPath + "/execution"
548560
workflow_folder = wdirPath + "/workflows"
549561

@@ -1106,13 +1118,13 @@ def __init__(self, request, connectionID):
11061118
threading.Thread.__init__(self)
11071119
self.request = request
11081120
self.timeout = 120 * 60
1109-
self.connectionID=connectionID
1121+
self.connectionID = connectionID
11101122

11111123
def run(self):
11121124
timeout_start = time.time()
11131125
while time.time() < timeout_start + self.timeout:
1114-
conn=Connection.objects.get(idConn_id=self.connectionID)
1115-
if conn.status=="Disconnect":
1126+
conn = Connection.objects.get(idConn_id=self.connectionID)
1127+
if conn.status == "Disconnect":
11161128
break
11171129
boolException = update_table(self.request)
11181130
if not boolException:
@@ -1123,7 +1135,6 @@ def run(self):
11231135
return
11241136

11251137

1126-
11271138
def update_table(request):
11281139
machine_found = Machine.objects.get(id=request.session['machine_chosen'])
11291140
machineID = machine_found.id
@@ -1138,6 +1149,8 @@ def update_table(request):
11381149
values = str(stdout).split()
11391150

11401151
if str(values[4]) == "COMPLETED" and executionE.status != "COMPLETED":
1152+
Execution.objects.filter(jobID=executionE.jobID).update(status=values[4], time=values[3],
1153+
nodes=int(values[2]))
11411154
ftp_folder_path = executionE.results_ftp_path
11421155
results_path = "results"
11431156
local_folder_path = os.path.join(executionE.wdir, results_path)
@@ -1147,7 +1160,7 @@ def update_table(request):
11471160
nodes=int(values[2]))
11481161
executionTimeout = Execution.objects.all().filter(author=request.user, autorestart=True, status="TIMEOUT")
11491162
for executionT in executionTimeout:
1150-
executionT.status="CONTINUE"
1163+
executionT.status = "CONTINUE"
11511164
checkpointing(executionT.jobID, request, executionT.machine_id)
11521165
return True
11531166

@@ -1191,14 +1204,15 @@ def stopExecution(eIDstop, request):
11911204
{'form': form, 'executions': executions, 'executionsDone': executionsDone,
11921205
'executionsFailed': executionsFailed, 'executionsTimeout': executionTimeout})
11931206

1207+
11941208
def checkpointing(jobIDCheckpoint, request, machine_id):
11951209
ssh = connection(request.session['content'], machine_id)
11961210
checkpointID = Execution.objects.all().get(author=request.user, jobID=jobIDCheckpoint)
11971211
command = "source /etc/profile; cd " + checkpointID.wdir + "; source checkpoint_script.sh;"
11981212
stdin, stdout, stderr = ssh.exec_command(command)
11991213
stdout = stdout.readlines()
12001214
s = "Submitted batch job"
1201-
execTime=checkpointID.execution_time
1215+
execTime = checkpointID.execution_time
12021216
while (len(stdout) == 0):
12031217
import time
12041218
time.sleep(1)
@@ -1233,7 +1247,7 @@ def checkpointing(jobIDCheckpoint, request, machine_id):
12331247
checkpointID = Execution.objects.all().get(author=request.user, jobID=jobIDCheckpoint)
12341248
checkpointID.status = "CONTINUE"
12351249
checkpointID.save()
1236-
#monitor_checkpoint(request.session['jobID'], request, execTime, machine_id)
1250+
# monitor_checkpoint(request.session['jobID'], request, execTime, machine_id)
12371251
return
12381252

12391253

@@ -1255,8 +1269,8 @@ def checkpointing_noAutorestart(jobIDCheckpoint, request):
12551269
request.session['jobID'] = jobID
12561270
form = Execution()
12571271
form.jobID = jobID
1258-
form.eID= uuid.uuid4()
1259-
form.machine_id= checkpointID.machine_id
1272+
form.eID = uuid.uuid4()
1273+
form.machine_id = checkpointID.machine_id
12601274
form.user = checkpointID.user
12611275
form.author = request.user
12621276
form.nodes = checkpointID.nodes
@@ -1332,7 +1346,7 @@ def create_workflow(request):
13321346
return render(request, 'accounts/create_workflow.html', {'form': form})
13331347

13341348

1335-
def read_and_write(name):
1349+
def read_and_write_yaml(name):
13361350
with open("documents/" + str(name)) as file:
13371351
try:
13381352
workflow = yaml.safe_load(file)
@@ -1349,6 +1363,11 @@ def ssh_keys_result(request):
13491363
return render('accounts/dashboard')
13501364

13511365

1366+
def get_file_extension(file_path):
1367+
_, extension = os.path.splitext(file_path)
1368+
return extension
1369+
1370+
13521371
def ssh_keys_generation(request): # method to generate the ssh keys of a specific machine
13531372
if request.method == 'POST':
13541373
form = Key_Gen_Form(request.POST)

0 commit comments

Comments
 (0)