Skip to content

Commit e9bc28c

Browse files
PNDA-4461: Deployment Manager - deploy Flink applications
1 parent 5d54d14 commit e9bc28c

File tree

5 files changed

+214
-20
lines changed

5 files changed

+214
-20
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
"""
2+
Name: base_common.py
3+
Purpose: Common methods for flink.py and sparkStreaming.py plugins.
4+
Author: PNDA team
5+
6+
Created: 20/04/2018
7+
8+
Copyright (c) 2016 Cisco and/or its affiliates.
9+
10+
This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License").
11+
You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
12+
13+
The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc.
14+
and/or its affiliated entities, under various laws including copyright, international treaties, patent,
15+
and/or contract. Any use of the material herein must be in accordance with the terms of the License.
16+
All rights not expressly granted by the License are reserved.
17+
18+
Unless required by applicable law or agreed to separately in writing, software distributed under the
19+
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
20+
either express or implied.
21+
"""
22+
23+
# pylint: disable=C0103
24+
25+
import json
26+
import logging
27+
import deployer_utils
28+
from plugins.base_creator import Creator
29+
30+
31+
class Common(Creator):
32+
33+
def destroy_component(self, application_name, create_data):
34+
logging.debug("destroy_component: %s %s", application_name, json.dumps(create_data))
35+
self._control_component(create_data['ssh'])
36+
37+
def start_component(self, application_name, create_data):
38+
logging.debug("start_component: %s %s", application_name, json.dumps(create_data))
39+
self._control_component(create_data['start_cmds'])
40+
41+
def stop_component(self, application_name, create_data):
42+
logging.debug("stop_component: %s %s", application_name, json.dumps(create_data))
43+
self._control_component(create_data['stop_cmds'])
44+
45+
def _control_component(self, cmds):
46+
key_file = self._environment['cluster_private_key']
47+
root_user = self._environment['cluster_root_user']
48+
target_host = 'localhost'
49+
deployer_utils.exec_ssh(target_host, root_user, key_file, cmds)
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
"""
2+
Name: flink.py
3+
Purpose: Submits flink batch/streaming jobs based on application package
4+
Author: PNDA team
5+
6+
Created: 02/03/2018
7+
8+
Copyright (c) 2016 Cisco and/or its affiliates.
9+
10+
This software is licensed to you under the terms of the Apache License, Version 2.0 (the "License").
11+
You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
12+
13+
The code, technical concepts, and all information contained herein, are the property of Cisco Technology, Inc.
14+
and/or its affiliated entities, under various laws including copyright, international treaties, patent,
15+
and/or contract. Any use of the material herein must be in accordance with the terms of the License.
16+
All rights not expressly granted by the License are reserved.
17+
18+
Unless required by applicable law or agreed to separately in writing, software distributed under the
19+
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
20+
either express or implied.
21+
"""
22+
23+
# pylint: disable=C0103
24+
25+
import json
26+
import os
27+
import logging
28+
import platform
29+
from shutil import copy
30+
import deployer_utils
31+
from plugins.base_common import Common
32+
33+
34+
class FlinkCreator(Common):
35+
36+
def validate_component(self, component):
37+
errors = []
38+
file_list = component['component_detail']
39+
if 'application.properties' not in file_list:
40+
errors.append('missing file application.properties')
41+
42+
return errors
43+
44+
def get_component_type(self):
45+
return 'flink'
46+
47+
def create_component(self, staged_component_path, application_name, user_name, component, properties):
48+
logging.debug("create_component: %s %s %s %s", application_name, user_name, json.dumps(component), properties)
49+
distro = platform.dist()
50+
usesSystemd = distro[0] in ('redhat', 'centos')
51+
remote_component_tmp_path = '%s/%s/%s' % (
52+
'/tmp/%s' % self._namespace, application_name, component['component_name'])
53+
remote_component_install_path = '%s/%s/%s' % (
54+
'/opt/%s' % self._namespace, application_name, component['component_name'])
55+
service_name = '%s-%s-%s' % (self._namespace, application_name, component['component_name'])
56+
57+
key_file = self._environment['cluster_private_key']
58+
root_user = self._environment['cluster_root_user']
59+
target_host = 'localhost'
60+
61+
if 'component_flink_version' not in properties:
62+
properties['component_flink_version'] = '1.4'
63+
if 'component_flink_config_args' not in properties:
64+
properties['component_flink_config_args'] = '-yn 1'
65+
if 'component_py_files' not in properties:
66+
properties['component_py_files'] = ''
67+
if 'component_flink_job_type' not in properties:
68+
properties['component_flink_job_type'] = 'streaming'
69+
if 'component_application_args' not in properties:
70+
properties['component_application_args'] = ''
71+
72+
java_app = None
73+
if 'component_main_jar' in properties and 'component_main_class' in properties:
74+
java_app = True
75+
elif 'component_main_py' in properties:
76+
java_app = False
77+
flink_lib_dir = properties['environment_flink_lib_dir']
78+
for jar in os.listdir(flink_lib_dir):
79+
if os.path.isfile(os.path.join(flink_lib_dir, jar)) and 'flink-python' in jar:
80+
properties['flink_python_jar'] = '%s/%s' % (flink_lib_dir, jar)
81+
else:
82+
raise Exception('properties.json must contain "main_jar or main_py" for %s flink %s' % (application_name, component['component_name']))
83+
84+
this_dir = os.path.dirname(os.path.realpath(__file__))
85+
copy(os.path.join(this_dir, 'yarn-kill.py'), staged_component_path)
86+
if usesSystemd:
87+
service_script = 'flink.systemd.service.tpl' if java_app else 'flink.systemd.service.py.tpl'
88+
service_script_install_path = '/usr/lib/systemd/system/%s.service' % service_name
89+
if properties['component_flink_job_type'] == 'batch':
90+
properties['respawn'] = '# Restart=always'
91+
properties['respawn_limit'] = '# RestartSec=2'
92+
else:
93+
properties['respawn'] = 'Restart=always'
94+
properties['respawn_limit'] = 'RestartSec=2'
95+
96+
copy(os.path.join(this_dir, service_script), staged_component_path)
97+
98+
self._fill_properties(os.path.join(staged_component_path, service_script), properties)
99+
self._fill_properties(os.path.join(staged_component_path, 'application.properties'), properties)
100+
self._fill_properties(os.path.join(staged_component_path, 'yarn-kill.py'), properties)
101+
102+
mkdircommands = []
103+
mkdircommands.append('mkdir -p %s' % remote_component_tmp_path)
104+
mkdircommands.append('sudo mkdir -p %s' % remote_component_install_path)
105+
deployer_utils.exec_ssh(target_host, root_user, key_file, mkdircommands)
106+
107+
os.system("scp -i %s -o StrictHostKeyChecking=no %s %s@%s:%s"
108+
% (key_file, staged_component_path + '/*', root_user, target_host, remote_component_tmp_path))
109+
110+
commands = []
111+
commands.append('sudo cp %s/%s %s' % (remote_component_tmp_path, service_script, service_script_install_path))
112+
commands.append('sudo cp %s/* %s' % (remote_component_tmp_path, remote_component_install_path))
113+
commands.append('sudo chmod a+x %s/yarn-kill.py' % (remote_component_install_path))
114+
115+
if 'component_main_jar' in properties:
116+
commands.append('cd %s && sudo jar uf %s application.properties' % (remote_component_install_path, properties['component_main_jar']))
117+
commands.append('sudo rm -rf %s' % (remote_component_tmp_path))
118+
deployer_utils.exec_ssh(target_host, root_user, key_file, commands)
119+
120+
undo_commands = []
121+
undo_commands.append('sudo service %s stop\n' % service_name)
122+
undo_commands.append('sudo rm -rf %s\n' % remote_component_install_path)
123+
undo_commands.append('sudo rm %s\n' % service_script_install_path)
124+
logging.debug("uninstall commands: %s", undo_commands)
125+
126+
start_commands = []
127+
if usesSystemd:
128+
start_commands.append('sudo systemctl daemon-reload\n')
129+
start_commands.append('sudo service %s start\n' % service_name)
130+
logging.debug("start commands: %s", start_commands)
131+
132+
stop_commands = []
133+
stop_commands.append('sudo service %s stop\n' % service_name)
134+
logging.debug("stop commands: %s", stop_commands)
135+
136+
return {'ssh': undo_commands, 'start_cmds': start_commands, 'stop_cmds': stop_commands}
137+
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[Unit]
2+
Description=PNDA Application: ${component_application}-${component_name}
3+
4+
[Service]
5+
Type=simple
6+
User=${application_user}
7+
WorkingDirectory=/opt/${environment_namespace}/${component_application}/${component_name}/
8+
ExecStartPre=/opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
9+
ExecStopPost=/opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
10+
Environment=FLINK_VERSION=${component_flink_version}
11+
ExecStart=/usr/bin/flink run -m yarn-cluster ${component_flink_config_args} -ynm ${component_job_name} -v ${flink_python_jar} ${component_main_py} ${component_application_args}
12+
${respawn}
13+
${respawn_limit}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[Unit]
2+
Description=PNDA Application: ${component_application}-${component_name}
3+
4+
[Service]
5+
Type=simple
6+
User=${application_user}
7+
WorkingDirectory=/opt/${environment_namespace}/${component_application}/${component_name}/
8+
ExecStartPre=/opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
9+
ExecStopPost=/opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
10+
Environment=FLINK_VERSION=${component_flink_version}
11+
ExecStart=/usr/bin/flink run -m yarn-cluster ${component_flink_config_args} -ynm ${component_job_name} --class ${component_main_class} ${component_main_jar} ${component_application_args}
12+
${respawn}
13+
${respawn_limit}

api/src/main/resources/plugins/sparkStreaming.py

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@
2828
import platform
2929
from shutil import copy
3030
import deployer_utils
31-
from plugins.base_creator import Creator
31+
from plugins.base_common import Common
3232

3333

34-
class SparkStreamingCreator(Creator):
34+
class SparkStreamingCreator(Common):
3535

3636
def validate_component(self, component):
3737
errors = []
@@ -49,24 +49,6 @@ def validate_component(self, component):
4949
def get_component_type(self):
5050
return 'sparkStreaming'
5151

52-
def destroy_component(self, application_name, create_data):
53-
logging.debug("destroy_component: %s %s", application_name, json.dumps(create_data))
54-
self._control_component(create_data['ssh'])
55-
56-
def start_component(self, application_name, create_data):
57-
logging.debug("start_component: %s %s", application_name, json.dumps(create_data))
58-
self._control_component(create_data['start_cmds'])
59-
60-
def stop_component(self, application_name, create_data):
61-
logging.debug("stop_component: %s %s", application_name, json.dumps(create_data))
62-
self._control_component(create_data['stop_cmds'])
63-
64-
def _control_component(self, cmds):
65-
key_file = self._environment['cluster_private_key']
66-
root_user = self._environment['cluster_root_user']
67-
target_host = 'localhost'
68-
deployer_utils.exec_ssh(target_host, root_user, key_file, cmds)
69-
7052
def create_component(self, staged_component_path, application_name, user_name, component, properties):
7153
logging.debug("create_component: %s %s %s %s", application_name, user_name, json.dumps(component), properties)
7254
distro = platform.dist()

0 commit comments

Comments
 (0)