Skip to content

Commit e1ec68c

Browse files
committed
Spark streaming applications work on redhat
No longer require upstart config to be submitted with application, but automatically insert either upstart or systemd config as required. PNDA-2700
1 parent a626083 commit e1ec68c

File tree

6 files changed

+100
-30
lines changed

6 files changed

+100
-30
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
# Change Log
22
All notable changes to this project will be documented in this file.
33

4+
## Unreleased
5+
### Changed
6+
- PNDA-2700: Spark streaming jobs no longer require upstart.conf or yarn-kill.py files, default ones are supplied by the deployment manager.
7+
48
## [0.3.0] 2017-01-20
59
### Fixed
610
- PNDA-2498: Application package data is now stored in HDFS with a reference to the path only held in the HBase record. This prevents HBase being overloaded with large packages (100MB+).

api/src/main/resources/plugins/sparkStreaming.py

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import json
2626
import os
2727
import logging
28+
import platform
29+
from shutil import copy
2830

2931
import deployer_utils
3032
from plugins.base_creator import Creator
@@ -35,12 +37,8 @@ class SparkStreamingCreator(Creator):
3537
def validate_component(self, component):
3638
errors = []
3739
file_list = component['component_detail']
38-
if 'yarn-kill.py' not in file_list:
39-
errors.append('missing file yarn-kill.py')
4040
if 'application.properties' not in file_list:
4141
errors.append('missing file application.properties')
42-
if 'upstart.conf' not in file_list:
43-
errors.append('missing file upstart.conf')
4442
if 'log4j.properties' not in file_list:
4543
errors.append('missing file log4j.properties')
4644
return errors
@@ -68,20 +66,47 @@ def _control_component(self, cmds):
6866

6967
def create_component(self, staged_component_path, application_name, component, properties):
7068
logging.debug("create_component: %s %s %s", application_name, json.dumps(component), properties)
71-
69+
distro = platform.dist()
70+
redhat = distro[0] == 'redhat'
7271
remote_component_tmp_path = '%s/%s/%s' % (
7372
'/tmp/%s' % self._namespace, application_name, component['component_name'])
7473
remote_component_install_path = '%s/%s/%s' % (
7574
'/opt/%s' % self._namespace, application_name, component['component_name'])
75+
service_name = '%s-%s-%s' % (self._namespace, application_name, component['component_name'])
7676

7777
key_file = self._environment['cluster_private_key']
7878
root_user = self._environment['cluster_root_user']
7979
target_host = 'localhost'
8080

81-
self._fill_properties('%s/%s' % (staged_component_path, 'upstart.conf'), properties)
82-
self._fill_properties('%s/%s' % (staged_component_path, 'log4j.properties'), properties)
83-
self._fill_properties('%s/%s' % (staged_component_path, 'application.properties'), properties)
84-
self._fill_properties('%s/%s' % (staged_component_path, 'yarn-kill.py'), properties)
81+
if 'component_spark_submit_args' not in properties:
82+
properties['component_spark_submit_args'] = ''
83+
84+
if 'upstart.conf' in component['component_detail']:
85+
# old style applications for backward compatibility
86+
service_script = 'upstart.conf'
87+
service_script_install_path = '/etc/init/%s.conf' % service_name
88+
else:
89+
# new style applications that don't need to provide upstart.conf or yarn-kill.py
90+
if 'component_main_class' not in properties:
91+
raise Exception('properties.json must contain "main_class" for %s sparkStreaming %s' % (application_name, component['component_name']))
92+
if 'component_main_jar' not in properties:
93+
raise Exception('properties.json must contain "main_jar" for %s sparkStreaming %s' % (application_name, component['component_name']))
94+
95+
this_dir = os.path.dirname(os.path.realpath(__file__))
96+
copy(os.path.join(this_dir, 'yarn-kill.py'), staged_component_path)
97+
distro = platform.dist()
98+
if redhat:
99+
service_script = 'systemd.service.tpl'
100+
service_script_install_path = '/usr/lib/systemd/system/%s.service' % service_name
101+
else:
102+
service_script = 'upstart.conf.tpl'
103+
service_script_install_path = '/etc/init/%s.conf' % service_name
104+
copy(os.path.join(this_dir, service_script), staged_component_path)
105+
106+
self._fill_properties(os.path.join(staged_component_path, service_script), properties)
107+
self._fill_properties(os.path.join(staged_component_path, 'log4j.properties'), properties)
108+
self._fill_properties(os.path.join(staged_component_path, 'application.properties'), properties)
109+
self._fill_properties(os.path.join(staged_component_path, 'yarn-kill.py'), properties)
85110

86111
mkdircommands = []
87112
mkdircommands.append('mkdir -p %s' % remote_component_tmp_path)
@@ -99,32 +124,33 @@ def create_component(self, staged_component_path, application_name, component, p
99124
['sudo mkdir -p %s' % remote_component_install_path,
100125
'sudo mv %s %s' % (remote_component_tmp_path + '/log4j.properties', remote_component_install_path + '/log4j.properties')])
101126

127+
if 'component_main_jar' in properties:
128+
main_jar_name = properties['component_main_jar']
129+
else:
130+
main_jar_name = '*.jar'
131+
102132
commands = []
103-
service_name = '%s-%s-%s' % (self._namespace, application_name, component['component_name'])
104-
upstart_script = '/etc/init/%s.conf' % service_name
105-
commands.append('sudo cp %s/upstart.conf %s' %
106-
(remote_component_tmp_path, upstart_script))
107-
commands.append('sudo cp %s/* %s' %
108-
(remote_component_tmp_path, remote_component_install_path))
109-
commands.append('sudo chmod a+x %s/yarn-kill.py' %
110-
(remote_component_install_path))
111-
commands.append('cd %s && sudo jar uf *.jar application.properties' %
112-
(remote_component_install_path))
133+
commands.append('sudo cp %s/%s %s' % (remote_component_tmp_path, service_script, service_script_install_path))
134+
commands.append('sudo cp %s/* %s' % (remote_component_tmp_path, remote_component_install_path))
135+
commands.append('sudo chmod a+x %s/yarn-kill.py' % (remote_component_install_path))
136+
commands.append('cd %s && sudo jar uf %s application.properties' % (remote_component_install_path, main_jar_name))
113137
commands.append('sudo rm -rf %s' % (remote_component_tmp_path))
114138
deployer_utils.exec_ssh(target_host, root_user, key_file, commands)
115139

116140
undo_commands = []
117-
undo_commands.append('sudo initctl stop %s\n' % service_name)
141+
undo_commands.append('sudo service %s stop\n' % service_name)
118142
undo_commands.append('sudo rm -rf %s\n' % remote_component_install_path)
119-
undo_commands.append('sudo rm %s\n' % upstart_script)
143+
undo_commands.append('sudo rm %s\n' % service_script_install_path)
120144
logging.debug("uninstall commands: %s", undo_commands)
121145

122146
start_commands = []
123-
start_commands.append('sudo initctl start %s\n' % service_name)
147+
if redhat:
148+
start_commands.append('sudo systemctl daemon-reload\n')
149+
start_commands.append('sudo service %s start\n' % service_name)
124150
logging.debug("start commands: %s", start_commands)
125151

126152
stop_commands = []
127-
stop_commands.append('sudo initctl stop %s\n' % service_name)
153+
stop_commands.append('sudo service %s stop\n' % service_name)
128154
logging.debug("stop commands: %s", stop_commands)
129155

130156
return {'ssh': undo_commands, 'start_cmds': start_commands, 'stop_cmds': stop_commands}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
[Unit]
2+
Description=PNDA Application: ${component_application}-${component_name}
3+
4+
[Service]
5+
Type=simple
6+
User=hdfs
7+
WorkingDirectory=/opt/${environment_namespace}/${component_application}/${component_name}/
8+
ExecStartPre=/opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
9+
ExecStopPost=/opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
10+
ExecStart=/usr/bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:////opt/${environment_namespace}/${component_application}/${component_name}/log4j.properties" --class ${component_main_class} --name '${component_job_name}' --master yarn-cluster --files log4j.properties ${component_spark_submit_args} ${component_main_jar}
11+
Restart=always
12+
RestartSec=2
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
start on runlevel [2345]
2+
stop on runlevel [016]
3+
respawn
4+
respawn limit unlimited
5+
pre-start exec /opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
6+
pre-stop exec /opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
7+
env programDir=/opt/${environment_namespace}/${component_application}/${component_name}/
8+
exec sudo -u hdfs spark-submit --driver-java-options "-Dlog4j.configuration=file:///${programDir}log4j.properties" --class ${component_main_class} --name '${component_job_name}' --master yarn-cluster --files ${programDir}log4j.properties ${component_spark_submit_args} ${programDir}${component_main_jar}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#!/usr/bin/env python
2+
import subprocess
3+
COMMAND_OUTPUT = subprocess.check_output(['yarn', 'application', '-list'])
4+
5+
IS_RUNNING = False
6+
7+
for line in COMMAND_OUTPUT.splitlines():
8+
fields = line.split('\t')
9+
if len(fields) >= 6:
10+
app = fields[1].strip()
11+
state = fields[5].strip()
12+
if app == '${component_job_name}':
13+
IS_RUNNING = True
14+
yarn_app_id = fields[0].strip()
15+
break
16+
17+
if IS_RUNNING:
18+
print 'app is running, killing it...'
19+
subprocess.check_output(['yarn', 'application', '-kill', yarn_app_id])

api/src/main/resources/test_application_creator.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ class ApplicationCreatorTests(unittest.TestCase):
5454
"component_detail": {
5555
"properties.json": {
5656
"property1": "1",
57-
"property2": "two"
57+
"main_class": "abc",
58+
"main_jar": "abc.jar",
5859
}
5960
},
6061
"component_path": "test_package-1.0.2/sparkStreaming/componentC",
@@ -155,8 +156,9 @@ class ApplicationCreatorTests(unittest.TestCase):
155156
@patch('application_creator.shutil')
156157
@patch('application_creator.os')
157158
@patch('application_creator.tarfile')
159+
@patch('shutil.copy')
158160
# pylint: disable=unused-argument
159-
def test_create_application(self, tar_mock, os_mock, shutil_mock, spur_ssh,
161+
def test_create_application(self, copy_mock, tar_mock, os_mock, shutil_mock, spur_ssh,
160162
hdfs_client_mock, post_mock, put_mock, exec_ssh_mock,
161163
os_sys_mock, dt_mock, hive_mock, hbase_mock):
162164
dt_mock.utcnow.return_value = (datetime(2013, 01, 01))
@@ -183,7 +185,7 @@ def json(self):
183185
exec_ssh_mock.assert_any_call('nm1', 'root_user', 'keyfile.pem', ['sudo mkdir -p /opt/ns/aname/componentC', 'sudo mv /tmp/ns/aname/componentC/log4j.properties /opt/ns/aname/componentC/log4j.properties'])
184186
exec_ssh_mock.assert_any_call('nm2', 'root_user', 'keyfile.pem', ['mkdir -p /tmp/ns/aname/componentC'])
185187
exec_ssh_mock.assert_any_call('nm2', 'root_user', 'keyfile.pem', ['sudo mkdir -p /opt/ns/aname/componentC', 'sudo mv /tmp/ns/aname/componentC/log4j.properties /opt/ns/aname/componentC/log4j.properties'])
186-
exec_ssh_mock.assert_any_call('localhost', 'root_user', 'keyfile.pem', ['sudo cp /tmp/ns/aname/componentC/upstart.conf /etc/init/ns-aname-componentC.conf', 'sudo cp /tmp/ns/aname/componentC/* /opt/ns/aname/componentC', 'sudo chmod a+x /opt/ns/aname/componentC/yarn-kill.py', 'cd /opt/ns/aname/componentC && sudo jar uf *.jar application.properties', 'sudo rm -rf /tmp/ns/aname/componentC'])
188+
exec_ssh_mock.assert_any_call('localhost', 'root_user', 'keyfile.pem', ['sudo cp /tmp/ns/aname/componentC/upstart.conf.tpl /etc/init/ns-aname-componentC.conf', 'sudo cp /tmp/ns/aname/componentC/* /opt/ns/aname/componentC', 'sudo chmod a+x /opt/ns/aname/componentC/yarn-kill.py', 'cd /opt/ns/aname/componentC && sudo jar uf abc.jar application.properties', 'sudo rm -rf /tmp/ns/aname/componentC'])
187189

188190
@patch('starbase.Connection')
189191
@patch('pyhs2.connect')
@@ -260,8 +262,9 @@ def json(self):
260262
@patch('application_creator.shutil')
261263
@patch('application_creator.os')
262264
@patch('application_creator.tarfile')
265+
@patch('shutil.copy')
263266
# pylint: disable=unused-argument
264-
def test_app_name_ok(self, tar_mock, os_mock, shutil_mock, spur_ssh,
267+
def test_app_name_ok(self, copy_mock, tar_mock, os_mock, shutil_mock, spur_ssh,
265268
hdfs_client_mock, post_mock, put_mock, exec_ssh_mock,
266269
os_sys_mock, dt_mock, hive_mock, hbase_mock):
267270

@@ -310,9 +313,7 @@ def test_validate_package(self):
310313
'componentA': ['missing file workflow.xml'],
311314
'componentB': ['missing file workflow.xml']},
312315
'sparkStreaming': {
313-
'componentC': ['missing file yarn-kill.py',
314-
'missing file application.properties',
315-
'missing file upstart.conf',
316+
'componentC': ['missing file application.properties',
316317
'missing file log4j.properties']}}
317318
self.assertEqual(result, expected_report)
318319

0 commit comments

Comments
 (0)