Skip to content

Commit 7f34037

Browse files
authored
Merge pull request #21 from pndaproject/PNDA-2700
Spark streaming applications work on redhat
2 parents a626083 + e1ec68c commit 7f34037

File tree

6 files changed

+100
-30
lines changed

6 files changed

+100
-30
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
# Change Log
22
All notable changes to this project will be documented in this file.
33

4+
## Unreleased
5+
### Changed
6+
- PNDA-2700: Spark streaming jobs no longer require upstart.conf or yarn-kill.py files, default ones are supplied by the deployment manager.
7+
48
## [0.3.0] 2017-01-20
59
### Fixed
610
- PNDA-2498: Application package data is now stored in HDFS with a reference to the path only held in the HBase record. This prevents HBase being overloaded with large packages (100MB+).

api/src/main/resources/plugins/sparkStreaming.py

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import json
2626
import os
2727
import logging
28+
import platform
29+
from shutil import copy
2830

2931
import deployer_utils
3032
from plugins.base_creator import Creator
@@ -35,12 +37,8 @@ class SparkStreamingCreator(Creator):
3537
def validate_component(self, component):
3638
errors = []
3739
file_list = component['component_detail']
38-
if 'yarn-kill.py' not in file_list:
39-
errors.append('missing file yarn-kill.py')
4040
if 'application.properties' not in file_list:
4141
errors.append('missing file application.properties')
42-
if 'upstart.conf' not in file_list:
43-
errors.append('missing file upstart.conf')
4442
if 'log4j.properties' not in file_list:
4543
errors.append('missing file log4j.properties')
4644
return errors
@@ -68,20 +66,47 @@ def _control_component(self, cmds):
6866

6967
def create_component(self, staged_component_path, application_name, component, properties):
7068
logging.debug("create_component: %s %s %s", application_name, json.dumps(component), properties)
71-
69+
distro = platform.dist()
70+
redhat = distro[0] == 'redhat'
7271
remote_component_tmp_path = '%s/%s/%s' % (
7372
'/tmp/%s' % self._namespace, application_name, component['component_name'])
7473
remote_component_install_path = '%s/%s/%s' % (
7574
'/opt/%s' % self._namespace, application_name, component['component_name'])
75+
service_name = '%s-%s-%s' % (self._namespace, application_name, component['component_name'])
7676

7777
key_file = self._environment['cluster_private_key']
7878
root_user = self._environment['cluster_root_user']
7979
target_host = 'localhost'
8080

81-
self._fill_properties('%s/%s' % (staged_component_path, 'upstart.conf'), properties)
82-
self._fill_properties('%s/%s' % (staged_component_path, 'log4j.properties'), properties)
83-
self._fill_properties('%s/%s' % (staged_component_path, 'application.properties'), properties)
84-
self._fill_properties('%s/%s' % (staged_component_path, 'yarn-kill.py'), properties)
81+
if 'component_spark_submit_args' not in properties:
82+
properties['component_spark_submit_args'] = ''
83+
84+
if 'upstart.conf' in component['component_detail']:
85+
# old style applications for backward compatibility
86+
service_script = 'upstart.conf'
87+
service_script_install_path = '/etc/init/%s.conf' % service_name
88+
else:
89+
# new style applications that don't need to provide upstart.conf or yarn-kill.py
90+
if 'component_main_class' not in properties:
91+
raise Exception('properties.json must contain "main_class" for %s sparkStreaming %s' % (application_name, component['component_name']))
92+
if 'component_main_jar' not in properties:
93+
raise Exception('properties.json must contain "main_jar" for %s sparkStreaming %s' % (application_name, component['component_name']))
94+
95+
this_dir = os.path.dirname(os.path.realpath(__file__))
96+
copy(os.path.join(this_dir, 'yarn-kill.py'), staged_component_path)
97+
distro = platform.dist()
98+
if redhat:
99+
service_script = 'systemd.service.tpl'
100+
service_script_install_path = '/usr/lib/systemd/system/%s.service' % service_name
101+
else:
102+
service_script = 'upstart.conf.tpl'
103+
service_script_install_path = '/etc/init/%s.conf' % service_name
104+
copy(os.path.join(this_dir, service_script), staged_component_path)
105+
106+
self._fill_properties(os.path.join(staged_component_path, service_script), properties)
107+
self._fill_properties(os.path.join(staged_component_path, 'log4j.properties'), properties)
108+
self._fill_properties(os.path.join(staged_component_path, 'application.properties'), properties)
109+
self._fill_properties(os.path.join(staged_component_path, 'yarn-kill.py'), properties)
85110

86111
mkdircommands = []
87112
mkdircommands.append('mkdir -p %s' % remote_component_tmp_path)
@@ -99,32 +124,33 @@ def create_component(self, staged_component_path, application_name, component, p
99124
['sudo mkdir -p %s' % remote_component_install_path,
100125
'sudo mv %s %s' % (remote_component_tmp_path + '/log4j.properties', remote_component_install_path + '/log4j.properties')])
101126

127+
if 'component_main_jar' in properties:
128+
main_jar_name = properties['component_main_jar']
129+
else:
130+
main_jar_name = '*.jar'
131+
102132
commands = []
103-
service_name = '%s-%s-%s' % (self._namespace, application_name, component['component_name'])
104-
upstart_script = '/etc/init/%s.conf' % service_name
105-
commands.append('sudo cp %s/upstart.conf %s' %
106-
(remote_component_tmp_path, upstart_script))
107-
commands.append('sudo cp %s/* %s' %
108-
(remote_component_tmp_path, remote_component_install_path))
109-
commands.append('sudo chmod a+x %s/yarn-kill.py' %
110-
(remote_component_install_path))
111-
commands.append('cd %s && sudo jar uf *.jar application.properties' %
112-
(remote_component_install_path))
133+
commands.append('sudo cp %s/%s %s' % (remote_component_tmp_path, service_script, service_script_install_path))
134+
commands.append('sudo cp %s/* %s' % (remote_component_tmp_path, remote_component_install_path))
135+
commands.append('sudo chmod a+x %s/yarn-kill.py' % (remote_component_install_path))
136+
commands.append('cd %s && sudo jar uf %s application.properties' % (remote_component_install_path, main_jar_name))
113137
commands.append('sudo rm -rf %s' % (remote_component_tmp_path))
114138
deployer_utils.exec_ssh(target_host, root_user, key_file, commands)
115139

116140
undo_commands = []
117-
undo_commands.append('sudo initctl stop %s\n' % service_name)
141+
undo_commands.append('sudo service %s stop\n' % service_name)
118142
undo_commands.append('sudo rm -rf %s\n' % remote_component_install_path)
119-
undo_commands.append('sudo rm %s\n' % upstart_script)
143+
undo_commands.append('sudo rm %s\n' % service_script_install_path)
120144
logging.debug("uninstall commands: %s", undo_commands)
121145

122146
start_commands = []
123-
start_commands.append('sudo initctl start %s\n' % service_name)
147+
if redhat:
148+
start_commands.append('sudo systemctl daemon-reload\n')
149+
start_commands.append('sudo service %s start\n' % service_name)
124150
logging.debug("start commands: %s", start_commands)
125151

126152
stop_commands = []
127-
stop_commands.append('sudo initctl stop %s\n' % service_name)
153+
stop_commands.append('sudo service %s stop\n' % service_name)
128154
logging.debug("stop commands: %s", stop_commands)
129155

130156
return {'ssh': undo_commands, 'start_cmds': start_commands, 'stop_cmds': stop_commands}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
[Unit]
2+
Description=PNDA Application: ${component_application}-${component_name}
3+
4+
[Service]
5+
Type=simple
6+
User=hdfs
7+
WorkingDirectory=/opt/${environment_namespace}/${component_application}/${component_name}/
8+
ExecStartPre=/opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
9+
ExecStopPost=/opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
10+
ExecStart=/usr/bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:////opt/${environment_namespace}/${component_application}/${component_name}/log4j.properties" --class ${component_main_class} --name '${component_job_name}' --master yarn-cluster --files log4j.properties ${component_spark_submit_args} ${component_main_jar}
11+
Restart=always
12+
RestartSec=2
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
start on runlevel [2345]
2+
stop on runlevel [016]
3+
respawn
4+
respawn limit unlimited
5+
pre-start exec /opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
6+
pre-stop exec /opt/${environment_namespace}/${component_application}/${component_name}/yarn-kill.py
7+
env programDir=/opt/${environment_namespace}/${component_application}/${component_name}/
8+
exec sudo -u hdfs spark-submit --driver-java-options "-Dlog4j.configuration=file:///${programDir}log4j.properties" --class ${component_main_class} --name '${component_job_name}' --master yarn-cluster --files ${programDir}log4j.properties ${component_spark_submit_args} ${programDir}${component_main_jar}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#!/usr/bin/env python
2+
import subprocess
3+
COMMAND_OUTPUT = subprocess.check_output(['yarn', 'application', '-list'])
4+
5+
IS_RUNNING = False
6+
7+
for line in COMMAND_OUTPUT.splitlines():
8+
fields = line.split('\t')
9+
if len(fields) >= 6:
10+
app = fields[1].strip()
11+
state = fields[5].strip()
12+
if app == '${component_job_name}':
13+
IS_RUNNING = True
14+
yarn_app_id = fields[0].strip()
15+
break
16+
17+
if IS_RUNNING:
18+
print 'app is running, killing it...'
19+
subprocess.check_output(['yarn', 'application', '-kill', yarn_app_id])

api/src/main/resources/test_application_creator.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ class ApplicationCreatorTests(unittest.TestCase):
5454
"component_detail": {
5555
"properties.json": {
5656
"property1": "1",
57-
"property2": "two"
57+
"main_class": "abc",
58+
"main_jar": "abc.jar",
5859
}
5960
},
6061
"component_path": "test_package-1.0.2/sparkStreaming/componentC",
@@ -155,8 +156,9 @@ class ApplicationCreatorTests(unittest.TestCase):
155156
@patch('application_creator.shutil')
156157
@patch('application_creator.os')
157158
@patch('application_creator.tarfile')
159+
@patch('shutil.copy')
158160
# pylint: disable=unused-argument
159-
def test_create_application(self, tar_mock, os_mock, shutil_mock, spur_ssh,
161+
def test_create_application(self, copy_mock, tar_mock, os_mock, shutil_mock, spur_ssh,
160162
hdfs_client_mock, post_mock, put_mock, exec_ssh_mock,
161163
os_sys_mock, dt_mock, hive_mock, hbase_mock):
162164
dt_mock.utcnow.return_value = (datetime(2013, 01, 01))
@@ -183,7 +185,7 @@ def json(self):
183185
exec_ssh_mock.assert_any_call('nm1', 'root_user', 'keyfile.pem', ['sudo mkdir -p /opt/ns/aname/componentC', 'sudo mv /tmp/ns/aname/componentC/log4j.properties /opt/ns/aname/componentC/log4j.properties'])
184186
exec_ssh_mock.assert_any_call('nm2', 'root_user', 'keyfile.pem', ['mkdir -p /tmp/ns/aname/componentC'])
185187
exec_ssh_mock.assert_any_call('nm2', 'root_user', 'keyfile.pem', ['sudo mkdir -p /opt/ns/aname/componentC', 'sudo mv /tmp/ns/aname/componentC/log4j.properties /opt/ns/aname/componentC/log4j.properties'])
186-
exec_ssh_mock.assert_any_call('localhost', 'root_user', 'keyfile.pem', ['sudo cp /tmp/ns/aname/componentC/upstart.conf /etc/init/ns-aname-componentC.conf', 'sudo cp /tmp/ns/aname/componentC/* /opt/ns/aname/componentC', 'sudo chmod a+x /opt/ns/aname/componentC/yarn-kill.py', 'cd /opt/ns/aname/componentC && sudo jar uf *.jar application.properties', 'sudo rm -rf /tmp/ns/aname/componentC'])
188+
exec_ssh_mock.assert_any_call('localhost', 'root_user', 'keyfile.pem', ['sudo cp /tmp/ns/aname/componentC/upstart.conf.tpl /etc/init/ns-aname-componentC.conf', 'sudo cp /tmp/ns/aname/componentC/* /opt/ns/aname/componentC', 'sudo chmod a+x /opt/ns/aname/componentC/yarn-kill.py', 'cd /opt/ns/aname/componentC && sudo jar uf abc.jar application.properties', 'sudo rm -rf /tmp/ns/aname/componentC'])
187189

188190
@patch('starbase.Connection')
189191
@patch('pyhs2.connect')
@@ -260,8 +262,9 @@ def json(self):
260262
@patch('application_creator.shutil')
261263
@patch('application_creator.os')
262264
@patch('application_creator.tarfile')
265+
@patch('shutil.copy')
263266
# pylint: disable=unused-argument
264-
def test_app_name_ok(self, tar_mock, os_mock, shutil_mock, spur_ssh,
267+
def test_app_name_ok(self, copy_mock, tar_mock, os_mock, shutil_mock, spur_ssh,
265268
hdfs_client_mock, post_mock, put_mock, exec_ssh_mock,
266269
os_sys_mock, dt_mock, hive_mock, hbase_mock):
267270

@@ -310,9 +313,7 @@ def test_validate_package(self):
310313
'componentA': ['missing file workflow.xml'],
311314
'componentB': ['missing file workflow.xml']},
312315
'sparkStreaming': {
313-
'componentC': ['missing file yarn-kill.py',
314-
'missing file application.properties',
315-
'missing file upstart.conf',
316+
'componentC': ['missing file application.properties',
316317
'missing file log4j.properties']}}
317318
self.assertEqual(result, expected_report)
318319

0 commit comments

Comments
 (0)