Skip to content

Commit 0cc7c5a

Browse files
authored
Merge pull request #81 from pndaproject/PNDA-4540
Add component properties to control restart of systemd jobs
2 parents d6d9cb2 + 464bb14 commit 0cc7c5a

File tree

7 files changed

+26
-20
lines changed

7 files changed

+26
-20
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,8 @@ The following varibles are only injected for Spark streaming components. They ma
531531
````
532532
component_spark_version major version of spark to use. Only applicable to HDP clusters, when using CDH PNDA does not support side-by-side Spark frameworks and whatever version is run by the spark-submit command will be used.
533533
component_spark_submit_args additional arguments to spark-submit
534+
component_respawn_type whether to restart the process when it exits. Valid values are 'always' and 'no'.
535+
component_respawn_timeout_sec used with component_respawn_type to set how long to wait (in seconds) before restarting the process when it exits.
534536
(java only) component_main_jar the jar containing the job code
535537
(python only) component_main_py the python file containing the job code
536538
(python only) component_py_files additional python files to pass to spark-submit

api/src/main/resources/plugins/flink.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import json
2626
import os
2727
import logging
28-
import platform
2928
from shutil import copy
3029
import deployer_utils
3130
from plugins.base_common import Common
@@ -46,8 +45,6 @@ def get_component_type(self):
4645

4746
def create_component(self, staged_component_path, application_name, user_name, component, properties):
4847
logging.debug("create_component: %s %s %s %s", application_name, user_name, json.dumps(component), properties)
49-
distro = platform.dist()
50-
usesSystemd = distro[0] in ('redhat', 'centos')
5148
remote_component_tmp_path = '%s/%s/%s' % (
5249
'/tmp/%s' % self._namespace, application_name, component['component_name'])
5350
remote_component_install_path = '%s/%s/%s' % (
@@ -83,15 +80,19 @@ def create_component(self, staged_component_path, application_name, user_name, c
8380

8481
this_dir = os.path.dirname(os.path.realpath(__file__))
8582
copy(os.path.join(this_dir, 'yarn-kill.py'), staged_component_path)
86-
if usesSystemd:
87-
service_script = 'flink.systemd.service.tpl' if java_app else 'flink.systemd.service.py.tpl'
88-
service_script_install_path = '/usr/lib/systemd/system/%s.service' % service_name
83+
service_script = 'flink.systemd.service.tpl' if java_app else 'flink.systemd.service.py.tpl'
84+
service_script_install_path = '/usr/lib/systemd/system/%s.service' % service_name
85+
if 'component_respawn_type' not in properties:
8986
if properties['component_flink_job_type'] == 'batch':
90-
properties['respawn'] = '# Restart=always'
91-
properties['respawn_limit'] = '# RestartSec=2'
87+
properties['component_respawn_type'] = 'no'
9288
else:
93-
properties['respawn'] = 'Restart=always'
94-
properties['respawn_limit'] = 'RestartSec=2'
89+
properties['component_respawn_type'] = 'always'
90+
91+
if 'component_respawn_timeout_sec' not in properties:
92+
if properties['component_flink_job_type'] == 'batch':
93+
properties['component_respawn_timeout_sec'] = '0'
94+
else:
95+
properties['component_respawn_timeout_sec'] = '2'
9596

9697
copy(os.path.join(this_dir, service_script), staged_component_path)
9798

@@ -124,8 +125,7 @@ def create_component(self, staged_component_path, application_name, user_name, c
124125
logging.debug("uninstall commands: %s", undo_commands)
125126

126127
start_commands = []
127-
if usesSystemd:
128-
start_commands.append('sudo systemctl daemon-reload\n')
128+
start_commands.append('sudo systemctl daemon-reload\n')
129129
start_commands.append('sudo service %s start\n' % service_name)
130130
logging.debug("start commands: %s", start_commands)
131131

api/src/main/resources/plugins/flink.systemd.service.py.tpl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@ ExecStartPre=/opt/${environment_namespace}/${component_application}/${component_
99
ExecStopPost=/opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
1010
Environment=FLINK_VERSION=${component_flink_version}
1111
ExecStart=/usr/bin/flink run -m yarn-cluster ${component_flink_config_args} -ynm ${component_job_name} -v ${flink_python_jar} ${component_main_py} ${component_application_args}
12-
${respawn}
13-
${respawn_limit}
12+
Restart=${component_respawn_type}
13+
RestartSec=${component_respawn_timeout_sec}

api/src/main/resources/plugins/flink.systemd.service.tpl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@ ExecStartPre=/opt/${environment_namespace}/${component_application}/${component_
99
ExecStopPost=/opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
1010
Environment=FLINK_VERSION=${component_flink_version}
1111
ExecStart=/usr/bin/flink run -m yarn-cluster ${component_flink_config_args} -ynm ${component_job_name} --class ${component_main_class} ${component_main_jar} ${component_application_args}
12-
${respawn}
13-
${respawn_limit}
12+
Restart=${component_respawn_type}
13+
RestartSec=${component_respawn_timeout_sec}

api/src/main/resources/plugins/sparkStreaming.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ def create_component(self, staged_component_path, application_name, user_name, c
8989
copy(os.path.join(this_dir, 'yarn-kill.py'), staged_component_path)
9090
service_script = 'systemd.service.tpl' if java_app else 'systemd.service.py.tpl'
9191
service_script_install_path = '/usr/lib/systemd/system/%s.service' % service_name
92+
if 'component_respawn_type' not in properties:
93+
properties['component_respawn_type'] = 'always'
94+
if 'component_respawn_timeout_sec' not in properties:
95+
properties['component_respawn_timeout_sec'] = '2'
9296
copy(os.path.join(this_dir, service_script), staged_component_path)
9397

9498
self._fill_properties(os.path.join(staged_component_path, service_script), properties)

api/src/main/resources/plugins/systemd.service.py.tpl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@ ExecStartPre=/opt/${environment_namespace}/${component_application}/${component_
99
ExecStopPost=/opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
1010
Environment=SPARK_MAJOR_VERSION=${component_spark_version}
1111
ExecStart=${environment_spark_submit} --driver-java-options "-Dlog4j.configuration=file:////opt/${environment_namespace}/${component_application}/${component_name}/log4j.properties" --conf 'spark.executor.extraJavaOptions=-Dlog4j.configuration=file:////opt/${environment_namespace}/${component_application}/${component_name}/log4j.properties' --name '${component_job_name}' --master yarn-cluster --py-files application.properties,${component_py_files} ${component_spark_submit_args} ${component_main_py}
12-
Restart=always
13-
RestartSec=2
12+
Restart=${component_respawn_type}
13+
RestartSec=${component_respawn_timeout_sec}

api/src/main/resources/plugins/systemd.service.tpl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@ ExecStartPre=/opt/${environment_namespace}/${component_application}/${component_
99
ExecStopPost=/opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
1010
Environment=SPARK_MAJOR_VERSION=${component_spark_version}
1111
ExecStart=${environment_spark_submit} --driver-java-options "-Dlog4j.configuration=file:////opt/${environment_namespace}/${component_application}/${component_name}/log4j.properties" --class ${component_main_class} --name '${component_job_name}' --master yarn-cluster --files log4j.properties ${component_spark_submit_args} ${component_main_jar}
12-
Restart=always
13-
RestartSec=2
12+
Restart=${component_respawn_type}
13+
RestartSec=${component_respawn_timeout_sec}

0 commit comments

Comments
 (0)