Skip to content

Commit 240da0a

Browse files
authored
Merge pull request #40 from pndaproject/RELEASE_0.5.0
Release 0.5.0
2 parents f62530a + 4d09bfd commit 240da0a

14 files changed

+196
-94
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
11
# Change Log
22
All notable changes to this project will be documented in this file.
33

4+
## [Unreleased]
5+
6+
## [0.5.0] 2017-11-24
7+
### Added:
8+
- PNDA-3330: Change to use a default application user instead of the hdfs user.
9+
- PNDA-2445: Support for Hortonworks HDP
10+
411
## [0.4.0] 2017-05-23
512
### Added
613
- PNDA-2729: Added support for spark streaming jobs written in python (pyspark). Use `main_py` instead of `main_jar` in properties.json and specify additional files using `py_files`.
714
- PNDA-2784: Make tests pass on RedHat
15+
816
### Changed
917
- PNDA-2700: Spark streaming jobs no longer require upstart.conf or yarn-kill.py files, default ones are supplied by the deployment manager.
1018
- PNDA-2782: Disabled Ubuntu-only test

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -399,9 +399,9 @@ hdfspath_path_name generated from entries in hdfs.json
399399
## Environment Variables ##
400400
These can be obtained with the [environment endpoints API](#environment-endpoints-api)
401401
````
402-
environment_cloudera_manager_host 192.168.1.2
403-
environment_cloudera_manager_password admin
404-
environment_cloudera_manager_username admin
402+
environment_hadoop_manager_host 192.168.1.2
403+
environment_hadoop_manager_password admin
404+
environment_hadoop_manager_username admin
405405
environment_cluster_private_key ./dm.pem
406406
environment_cluster_root_user cloud-user
407407
environment_hbase_rest_port 20550

api/src/main/resources/deployer_utils.py

Lines changed: 114 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,26 +41,126 @@ def connect_cm(cm_api, cm_username, cm_password):
4141
return api
4242

4343

44-
def get_named_service(cm_host, cluster_name, service_name, user_name='admin', password='admin'):
44+
def get_nameservice(cm_host, cluster_name, service_name, user_name='admin', password='admin'):
4545
request_url = 'http://%s:7180/api/v11/clusters/%s/services/%s/nameservices' % (cm_host,
4646
cluster_name,
4747
service_name)
4848
result = requests.get(request_url, auth=(user_name, password))
49-
named_service = ""
49+
nameservice = ""
5050
if result.status_code == 200:
5151
response = result.json()
5252
if 'items' in response:
53-
named_service = response['items'][0]['name']
54-
logging.debug("Found named service %s for %s", named_service, service_name)
55-
return named_service
53+
nameservice = response['items'][0]['name']
54+
logging.debug("Found named service %s for %s", nameservice, service_name)
55+
return nameservice
5656

5757

5858
def fill_hadoop_env(env):
59+
if env['hadoop_distro'] == 'CDH':
60+
fill_hadoop_env_cdh(env)
61+
else:
62+
fill_hadoop_env_hdp(env)
63+
64+
logging.debug(env)
65+
66+
def ambari_request(ambari, uri):
67+
hadoop_manager_ip = ambari[0]
68+
hadoop_manager_username = ambari[1]
69+
hadoop_manager_password = ambari[2]
70+
if uri.startswith("http"):
71+
full_uri = uri
72+
else:
73+
full_uri = 'http://%s:8080/api/v1%s' % (hadoop_manager_ip, uri)
74+
75+
headers = {'X-Requested-By': hadoop_manager_username}
76+
auth = (hadoop_manager_username, hadoop_manager_password)
77+
return requests.get(full_uri, auth=auth, headers=headers).json()
78+
79+
def get_hdfs_hdp(ambari, cluster_name):
80+
core_site = ambari_request(ambari, '/clusters/%s?fields=Clusters/desired_configs/core-site' % cluster_name)
81+
config_version = core_site['Clusters']['desired_configs']['core-site']['tag']
82+
core_site_config = ambari_request(ambari, '/clusters/%s/configurations/?type=core-site&tag=%s' % (cluster_name, config_version))
83+
return core_site_config['items'][0]['properties']['fs.defaultFS']
84+
85+
def component_host(component_detail):
86+
host_list = ''
87+
for host_detail in component_detail['host_components']:
88+
if len(host_list) > 0:
89+
host_list += ','
90+
host_list += host_detail['HostRoles']['host_name']
91+
return host_list
92+
93+
def fill_hadoop_env_hdp(env):
94+
95+
hadoop_manager_ip = env['hadoop_manager_host']
96+
hadoop_manager_username = env['hadoop_manager_username']
97+
hadoop_manager_password = env['hadoop_manager_password']
98+
ambari = (hadoop_manager_ip, hadoop_manager_username, hadoop_manager_password)
99+
cluster_name = ambari_request(ambari, '/clusters')['items'][0]['Clusters']['cluster_name']
100+
101+
logging.debug('getting service list for %s', cluster_name)
102+
env['cm_status_links'] = {}
103+
nameservice = None
104+
105+
env['name_node'] = get_hdfs_hdp(ambari, cluster_name)
106+
107+
services = ambari_request(ambari, '/clusters/%s/services' % cluster_name)['items']
108+
for service in services:
109+
service_name = service['ServiceInfo']['service_name']
110+
env['cm_status_links']['%s' % service_name] = 'http://%s:8080/#/main/services/%s/summary' % (hadoop_manager_ip, service_name)
111+
service_components = ambari_request(ambari, service['href'] + '/components')['items']
112+
113+
for component in service_components:
114+
component_detail = ambari_request(ambari, component['href'])
115+
role_name = component_detail['ServiceComponentInfo']['component_name']
116+
117+
if role_name == "NAMENODE":
118+
env['webhdfs_host'] = '%s' % component_host(component_detail).split(',')[0]
119+
env['webhdfs_port'] = '14000'
120+
121+
elif role_name == "RESOURCEMANAGER":
122+
rm_host = component_host(component_detail)
123+
if len(rm_host.split(',')) > 1:
124+
main_rm_host = rm_host.split(',')[0]
125+
backup_rm_host = rm_host.split(',')[1]
126+
else:
127+
main_rm_host = rm_host
128+
backup_rm_host = None
129+
env['yarn_resource_manager_host'] = '%s' % main_rm_host
130+
env['yarn_resource_manager_port'] = '8088'
131+
env['yarn_resource_manager_mr_port'] = '8050'
132+
if backup_rm_host is not None:
133+
env['yarn_resource_manager_host_backup'] = '%s' % component_host(component_detail)
134+
env['yarn_resource_manager_port_backup'] = '8088'
135+
env['yarn_resource_manager_mr_port_backup'] = '8050'
136+
137+
elif role_name == "NODEMANAGER":
138+
env['yarn_node_managers'] = '%s' % component_host(component_detail)
139+
140+
elif role_name == "ZOOKEEPER_SERVER":
141+
env['zookeeper_quorum'] = '%s' % component_host(component_detail)
142+
env['zookeeper_port'] = '2181'
143+
144+
elif role_name == "HBASE_MASTER":
145+
env['hbase_rest_server'] = '%s' % component_host(component_detail).split(',')[0]
146+
env['hbase_rest_port'] = '20550'
147+
env['hbase_thrift_server'] = '%s' % component_host(component_detail).split(',')[0]
148+
149+
elif role_name == "OOZIE_SERVER":
150+
env['oozie_uri'] = 'http://%s:11000/oozie' % component_host(component_detail)
151+
152+
elif role_name == "HIVE_SERVER":
153+
env['hive_server'] = '%s' % component_host(component_detail)
154+
env['hive_port'] = '10000'
155+
156+
logging.debug(env)
157+
158+
def fill_hadoop_env_cdh(env):
59159
# pylint: disable=E1103
60160
api = connect_cm(
61-
env['cloudera_manager_host'],
62-
env['cloudera_manager_username'],
63-
env['cloudera_manager_password'])
161+
env['hadoop_manager_host'],
162+
env['hadoop_manager_username'],
163+
env['hadoop_manager_password'])
64164

65165
for cluster_detail in api.get_all_clusters():
66166
cluster_name = cluster_detail.name
@@ -73,14 +173,14 @@ def fill_hadoop_env(env):
73173
for service in cluster.get_all_services():
74174
env['cm_status_links']['%s' % service.name] = service.serviceUrl
75175
if service.type == "HDFS":
76-
named_service = get_named_service(env['cloudera_manager_host'], cluster_name,
176+
nameservice = get_nameservice(env['hadoop_manager_host'], cluster_name,
77177
service.name,
78-
user_name=env['cloudera_manager_username'],
79-
password=env['cloudera_manager_password'])
80-
if named_service:
81-
env['name_node'] = 'hdfs://%s' % named_service
178+
user_name=env['hadoop_manager_username'],
179+
password=env['hadoop_manager_password'])
180+
if nameservice:
181+
env['name_node'] = 'hdfs://%s' % nameservice
82182
for role in service.get_all_roles():
83-
if not named_service and role.type == "NAMENODE":
183+
if not nameservice and role.type == "NAMENODE":
84184
env['name_node'] = 'hdfs://%s:8020' % api.get_host(role.hostRef.hostId).hostname
85185
if role.type == "HTTPFS":
86186
env['webhdfs_host'] = '%s' % api.get_host(role.hostRef.hostId).ipAddress
@@ -151,9 +251,6 @@ def fill_hadoop_env(env):
151251
env['hue_port'] = '8888'
152252
break
153253

154-
logging.debug(env)
155-
156-
157254
def tree(archive_filepath):
158255
file_handle = file(archive_filepath, 'rb')
159256
tar_file = tarfile.open(None, 'r', file_handle)

api/src/main/resources/dm-config.json

Lines changed: 0 additions & 24 deletions
This file was deleted.

api/src/main/resources/plugins/base_creator.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import json
3737
import string
3838
import collections
39+
import getpass
3940
import requests
4041
import hbase_descriptor
4142
import opentsdb_descriptor
@@ -166,6 +167,7 @@ def _instantiate_properties(self, application_name, component, property_override
166167
props['component_name'] = component['component_name']
167168
props['component_job_name'] = '%s-%s-job' % (props['component_application'], props['component_name'])
168169
props['component_hdfs_root'] = '/user/%s/%s' % (application_name, component['component_name'])
170+
props['application_user'] = self._get_application_user()
169171
return props
170172

171173
def _fill_properties(self, local_file, props):
@@ -327,3 +329,10 @@ def _find_yarn_app_info(self, all_yarn_applications, job_name):
327329
if result is None or self._get_yarn_start_time(app) > self._get_yarn_start_time(result):
328330
result = app
329331
return result
332+
333+
def _get_application_user(self):
334+
application_user = getpass.getuser()
335+
# if running as root, make sure to start the application under a different user.
336+
if application_user == 'root':
337+
application_user = self._environment['application_default_user']
338+
return application_user

api/src/main/resources/plugins/jupyter.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,14 @@ def create_component(self, staged_component_path, application_name, component, p
6767
key_file = self._environment['cluster_private_key']
6868
root_user = self._environment['cluster_root_user']
6969
target_host = self._environment['jupyter_host']
70+
application_user = properties['application_user']
7071
delete_commands = []
7172

7273
mkdircommands = []
73-
remote_component_tmp_path = '%s/%s/%s' % ('/tmp/%s' % self._namespace, application_name, component['component_name'])
74+
remote_component_tmp_path = '%s/%s/%s/%s' % ('/tmp/%s' % self._namespace, application_user, application_name, component['component_name'])
75+
remote_notebook_path = '/home/%s/%s' % (application_user, self._environment['jupyter_notebook_directory'])
7476
mkdircommands.append('mkdir -p %s' % remote_component_tmp_path)
77+
mkdircommands.append('sudo -u %s mkdir -p %s' % (application_user, remote_notebook_path))
7578
deployer_utils.exec_ssh(target_host, root_user, key_file, mkdircommands)
7679

7780
file_list = component['component_detail']
@@ -82,7 +85,7 @@ def create_component(self, staged_component_path, application_name, component, p
8285
os.system("scp -i %s -o StrictHostKeyChecking=no %s/%s %s@%s:%s" %
8386
(key_file, staged_component_path, file_name, root_user, target_host, remote_component_tmp_path))
8487

85-
remote_component_install_path = '%s/%s_%s' % (self._environment['jupyter_notebook_directory'], application_name, file_name)
88+
remote_component_install_path = '%s/%s_%s' % (remote_notebook_path, application_name, file_name)
8689
deployer_utils.exec_ssh(
8790
target_host, root_user, key_file,
8891
['sudo mv %s %s' % (remote_component_tmp_path + '/*.ipynb', remote_component_install_path)])

api/src/main/resources/plugins/oozie.py

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434

3535
class OozieCreator(Creator):
3636

37+
OOZIE_USER_NAME = 'hdfs'
38+
3739
def validate_component(self, component):
3840
errors = []
3941
file_list = component['component_detail']
@@ -50,19 +52,19 @@ def destroy_component(self, application_name, create_data):
5052
application_name,
5153
json.dumps(create_data))
5254
# terminate oozie jobs
53-
self._kill_oozie(create_data['job_handle'])
55+
self._kill_oozie(create_data['job_handle'], create_data['application_user'])
5456

5557
# delete component from hdfs
5658
remote_path = create_data['component_hdfs_root'][1:]
5759
self._hdfs_client.remove(remote_path, recursive=True)
5860

5961
def start_component(self, application_name, create_data):
6062
logging.debug("start_component: %s %s", application_name, json.dumps(create_data))
61-
self._start_oozie(create_data['job_handle'])
63+
self._start_oozie(create_data['job_handle'], create_data['application_user'])
6264

6365
def stop_component(self, application_name, create_data):
6466
logging.debug("stop_component: %s %s", application_name, json.dumps(create_data))
65-
self._stop_oozie(create_data['job_handle'])
67+
self._stop_oozie(create_data['job_handle'], create_data['application_user'])
6668

6769
def create_component(self, staged_component_path, application_name, component, properties):
6870
logging.debug(
@@ -87,7 +89,7 @@ def create_component(self, staged_component_path, application_name, component, p
8789
properties['deployment_end'] = end.strftime("%Y-%m-%dT%H:%MZ")
8890

8991
# insert required oozie properties
90-
properties['user.name'] = 'hdfs' # or serviceId?
92+
properties['user.name'] = properties['application_user']
9193
# Oozie ShareLib - supports actions
9294
properties['oozie.use.system.libpath'] = 'true'
9395
# platform shared libs e.g. hbase
@@ -102,12 +104,14 @@ def create_component(self, staged_component_path, application_name, component, p
102104
properties[def_path] = '%s/%s' % (self._environment['name_node'], remote_path)
103105

104106
# deploy everything to various hadoop services
105-
undeploy = self._deploy_to_hadoop(properties, staged_component_path, remote_path)
107+
undeploy = self._deploy_to_hadoop(properties, staged_component_path, remote_path, properties['application_user'])
106108

107109
# return something that can be used to undeploy later
108-
return {'job_handle': undeploy['id'], 'component_hdfs_root': properties['component_hdfs_root']}
110+
return {'job_handle': undeploy['id'],
111+
'component_hdfs_root': properties['component_hdfs_root'],
112+
'application_user': properties['application_user']}
109113

110-
def _deploy_to_hadoop(self, properties, staged_component_path, remote_path, exclude=None):
114+
def _deploy_to_hadoop(self, properties, staged_component_path, remote_path, application_user, exclude=None):
111115
if exclude is None:
112116
exclude = []
113117
exclude.extend(['hdfs.json',
@@ -125,7 +129,7 @@ def _deploy_to_hadoop(self, properties, staged_component_path, remote_path, excl
125129

126130
# submit to oozie
127131
result = self._submit_oozie(properties)
128-
self._stop_oozie(result['id'])
132+
self._stop_oozie(result['id'], application_user)
129133

130134
return result
131135

@@ -149,20 +153,20 @@ def _submit_oozie(self, job_properties):
149153

150154
return result
151155

152-
def _kill_oozie(self, job_id):
156+
def _kill_oozie(self, job_id, oozie_user):
153157
logging.debug("_kill_oozie: %s", job_id)
154-
oozie_url = '%s/v1/job/%s?action=kill' % (self._environment['oozie_uri'], job_id)
158+
oozie_url = '%s/v1/job/%s?action=kill&user.name=%s' % (self._environment['oozie_uri'], job_id, oozie_user)
155159
requests.put(oozie_url)
156160

157-
def _start_oozie(self, job_id):
161+
def _start_oozie(self, job_id, oozie_user):
158162
logging.debug("_start_oozie: %s", job_id)
159-
oozie_url = '%s/v1/job/%s?action=resume' % (self._environment['oozie_uri'], job_id)
163+
oozie_url = '%s/v1/job/%s?action=resume&user.name=%s' % (self._environment['oozie_uri'], job_id, oozie_user)
160164
requests.put(oozie_url)
161-
oozie_url = '%s/v1/job/%s?action=start' % (self._environment['oozie_uri'], job_id)
165+
oozie_url = '%s/v1/job/%s?action=start&user.name=%s' % (self._environment['oozie_uri'], job_id, oozie_user)
162166
requests.put(oozie_url)
163167

164-
def _stop_oozie(self, job_id):
168+
def _stop_oozie(self, job_id, oozie_user):
165169
logging.debug("_stop_oozie: %s", job_id)
166-
oozie_url = '%s/v1/job/%s?action=suspend' % (self._environment['oozie_uri'], job_id)
170+
oozie_url = '%s/v1/job/%s?action=suspend&user.name=%s' % (self._environment['oozie_uri'], job_id, oozie_user)
167171
print oozie_url
168172
requests.put(oozie_url)

0 commit comments

Comments
 (0)