Skip to content

Commit 3611e4c

Browse files
committed
Flink-stop added to stop the flink applications
Flink history server requires applications to be cancelled using "flink stop" or "flink cancel" command, then only applications are getting added to flink history server. PNDA-4796
1 parent 03f711a commit 3611e4c

File tree

4 files changed

+36
-7
lines changed

4 files changed

+36
-7
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#!/usr/bin/env python
2+
3+
import commands
4+
import json
5+
import requests
6+
7+
COMMAND_OUTPUT = commands.getoutput('yarn application -list')
8+
9+
IS_RUNNING = False
10+
11+
for line in COMMAND_OUTPUT.splitlines():
12+
fields = line.split('\t')
13+
if len(fields) >= 6:
14+
app = fields[1].strip()
15+
state = fields[5].strip()
16+
if app == '${component_job_name}':
17+
IS_RUNNING = True
18+
yarn_app_id = fields[0].strip()
19+
tracking_url = fields[8].strip()
20+
break
21+
if IS_RUNNING:
22+
URL = '%s/jobs' % tracking_url
23+
FLINK_JOB_LIST = requests.get(URL)
24+
FLINK_JOB_LIST = json.loads(FLINK_JOB_LIST.text)
25+
FLINK_JOBID = FLINK_JOB_LIST['jobs-running'][0]
26+
STATUS, OUT = commands.getstatusoutput('flink stop %s -yid %s' % (FLINK_JOBID, yarn_app_id))
27+
if STATUS != 0:
28+
commands.getoutput('flink cancel %s -yid %s' % (FLINK_JOBID, yarn_app_id))
29+
commands.getoutput('yarn application -kill %s' % yarn_app_id)

api/src/main/resources/plugins/flink.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def create_component(self, staged_component_path, application_name, user_name, c
7979
raise Exception('properties.json must contain "main_jar or main_py" for %s flink %s' % (application_name, component['component_name']))
8080

8181
this_dir = os.path.dirname(os.path.realpath(__file__))
82-
copy(os.path.join(this_dir, 'yarn-kill.py'), staged_component_path)
82+
copy(os.path.join(this_dir, 'flink-stop.py'), staged_component_path)
8383
service_script = 'flink.systemd.service.tpl' if java_app else 'flink.systemd.service.py.tpl'
8484
service_script_install_path = '/usr/lib/systemd/system/%s.service' % service_name
8585
if 'component_respawn_type' not in properties:
@@ -98,7 +98,7 @@ def create_component(self, staged_component_path, application_name, user_name, c
9898

9999
self._fill_properties(os.path.join(staged_component_path, service_script), properties)
100100
self._fill_properties(os.path.join(staged_component_path, 'application.properties'), properties)
101-
self._fill_properties(os.path.join(staged_component_path, 'yarn-kill.py'), properties)
101+
self._fill_properties(os.path.join(staged_component_path, 'flink-stop.py'), properties)
102102

103103
mkdircommands = []
104104
mkdircommands.append('mkdir -p %s' % remote_component_tmp_path)
@@ -111,7 +111,7 @@ def create_component(self, staged_component_path, application_name, user_name, c
111111
commands = []
112112
commands.append('sudo cp %s/%s %s' % (remote_component_tmp_path, service_script, service_script_install_path))
113113
commands.append('sudo cp %s/* %s' % (remote_component_tmp_path, remote_component_install_path))
114-
commands.append('sudo chmod a+x %s/yarn-kill.py' % (remote_component_install_path))
114+
commands.append('sudo chmod a+x %s/flink-stop.py' % (remote_component_install_path))
115115

116116
if 'component_main_jar' in properties:
117117
commands.append('cd %s && sudo jar uf %s application.properties' % (remote_component_install_path, properties['component_main_jar']))

api/src/main/resources/plugins/flink.systemd.service.py.tpl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ Description=PNDA Application: ${component_application}-${component_name}
55
Type=simple
66
User=${application_user}
77
WorkingDirectory=/opt/${environment_namespace}/${component_application}/${component_name}/
8-
ExecStartPre=/opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
9-
ExecStopPost=/opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
8+
ExecStartPre=/opt/${environment_namespace}/${component_application}/${component_name}/flink-stop.py
9+
ExecStop=/opt/${environment_namespace}/${component_application}/${component_name}/flink-stop.py
1010
Environment=FLINK_VERSION=${component_flink_version}
1111
ExecStart=${environment_flink} run -m yarn-cluster ${component_flink_config_args} -ynm ${component_job_name} -v ${flink_python_jar} ${component_main_py} ${component_application_args}
1212
Restart=${component_respawn_type}

api/src/main/resources/plugins/flink.systemd.service.tpl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ Description=PNDA Application: ${component_application}-${component_name}
55
Type=simple
66
User=${application_user}
77
WorkingDirectory=/opt/${environment_namespace}/${component_application}/${component_name}/
8-
ExecStartPre=/opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
9-
ExecStopPost=/opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
8+
ExecStartPre=/opt/${environment_namespace}/${component_application}/${component_name}/flink-stop.py
9+
ExecStop=/opt/${environment_namespace}/${component_application}/${component_name}/flink-stop.py
1010
Environment=FLINK_VERSION=${component_flink_version}
1111
ExecStart=${environment_flink} run -m yarn-cluster ${component_flink_config_args} -ynm ${component_job_name} --class ${component_main_class} ${component_main_jar} ${component_application_args}
1212
Restart=${component_respawn_type}

0 commit comments

Comments
 (0)