Skip to content

Commit d863f41

Browse files
authored
Merge pull request #91 from dharaneeshvrd/PNDA-4796
PNDA-4796: Flink-stop added to stop the flink applications
2 parents 03f711a + 3611e4c commit d863f41

File tree

4 files changed

+36
-7
lines changed

4 files changed

+36
-7
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#!/usr/bin/env python
2+
3+
import commands
4+
import json
5+
import requests
6+
7+
COMMAND_OUTPUT = commands.getoutput('yarn application -list')
8+
9+
IS_RUNNING = False
10+
11+
for line in COMMAND_OUTPUT.splitlines():
12+
fields = line.split('\t')
13+
if len(fields) >= 6:
14+
app = fields[1].strip()
15+
state = fields[5].strip()
16+
if app == '${component_job_name}':
17+
IS_RUNNING = True
18+
yarn_app_id = fields[0].strip()
19+
tracking_url = fields[8].strip()
20+
break
21+
if IS_RUNNING:
22+
URL = '%s/jobs' % tracking_url
23+
FLINK_JOB_LIST = requests.get(URL)
24+
FLINK_JOB_LIST = json.loads(FLINK_JOB_LIST.text)
25+
FLINK_JOBID = FLINK_JOB_LIST['jobs-running'][0]
26+
STATUS, OUT = commands.getstatusoutput('flink stop %s -yid %s' % (FLINK_JOBID, yarn_app_id))
27+
if STATUS != 0:
28+
commands.getoutput('flink cancel %s -yid %s' % (FLINK_JOBID, yarn_app_id))
29+
commands.getoutput('yarn application -kill %s' % yarn_app_id)

api/src/main/resources/plugins/flink.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def create_component(self, staged_component_path, application_name, user_name, c
7979
raise Exception('properties.json must contain "main_jar or main_py" for %s flink %s' % (application_name, component['component_name']))
8080

8181
this_dir = os.path.dirname(os.path.realpath(__file__))
82-
copy(os.path.join(this_dir, 'yarn-kill.py'), staged_component_path)
82+
copy(os.path.join(this_dir, 'flink-stop.py'), staged_component_path)
8383
service_script = 'flink.systemd.service.tpl' if java_app else 'flink.systemd.service.py.tpl'
8484
service_script_install_path = '/usr/lib/systemd/system/%s.service' % service_name
8585
if 'component_respawn_type' not in properties:
@@ -98,7 +98,7 @@ def create_component(self, staged_component_path, application_name, user_name, c
9898

9999
self._fill_properties(os.path.join(staged_component_path, service_script), properties)
100100
self._fill_properties(os.path.join(staged_component_path, 'application.properties'), properties)
101-
self._fill_properties(os.path.join(staged_component_path, 'yarn-kill.py'), properties)
101+
self._fill_properties(os.path.join(staged_component_path, 'flink-stop.py'), properties)
102102

103103
mkdircommands = []
104104
mkdircommands.append('mkdir -p %s' % remote_component_tmp_path)
@@ -111,7 +111,7 @@ def create_component(self, staged_component_path, application_name, user_name, c
111111
commands = []
112112
commands.append('sudo cp %s/%s %s' % (remote_component_tmp_path, service_script, service_script_install_path))
113113
commands.append('sudo cp %s/* %s' % (remote_component_tmp_path, remote_component_install_path))
114-
commands.append('sudo chmod a+x %s/yarn-kill.py' % (remote_component_install_path))
114+
commands.append('sudo chmod a+x %s/flink-stop.py' % (remote_component_install_path))
115115

116116
if 'component_main_jar' in properties:
117117
commands.append('cd %s && sudo jar uf %s application.properties' % (remote_component_install_path, properties['component_main_jar']))

api/src/main/resources/plugins/flink.systemd.service.py.tpl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ Description=PNDA Application: ${component_application}-${component_name}
55
Type=simple
66
User=${application_user}
77
WorkingDirectory=/opt/${environment_namespace}/${component_application}/${component_name}/
8-
ExecStartPre=/opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
9-
ExecStopPost=/opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
8+
ExecStartPre=/opt/${environment_namespace}/${component_application}/${component_name}/flink-stop.py
9+
ExecStop=/opt/${environment_namespace}/${component_application}/${component_name}/flink-stop.py
1010
Environment=FLINK_VERSION=${component_flink_version}
1111
ExecStart=${environment_flink} run -m yarn-cluster ${component_flink_config_args} -ynm ${component_job_name} -v ${flink_python_jar} ${component_main_py} ${component_application_args}
1212
Restart=${component_respawn_type}

api/src/main/resources/plugins/flink.systemd.service.tpl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ Description=PNDA Application: ${component_application}-${component_name}
55
Type=simple
66
User=${application_user}
77
WorkingDirectory=/opt/${environment_namespace}/${component_application}/${component_name}/
8-
ExecStartPre=/opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
9-
ExecStopPost=/opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
8+
ExecStartPre=/opt/${environment_namespace}/${component_application}/${component_name}/flink-stop.py
9+
ExecStop=/opt/${environment_namespace}/${component_application}/${component_name}/flink-stop.py
1010
Environment=FLINK_VERSION=${component_flink_version}
1111
ExecStart=${environment_flink} run -m yarn-cluster ${component_flink_config_args} -ynm ${component_job_name} --class ${component_main_class} ${component_main_jar} ${component_application_args}
1212
Restart=${component_respawn_type}

0 commit comments

Comments
 (0)