Skip to content

Commit a940e32

Browse files
authored
Merge pull request #27 from pndaproject/hdp
Support for Hortonworks HDP
2 parents 4f06ce8 + 90097a6 commit a940e32

File tree

6 files changed

+145
-65
lines changed

6 files changed

+145
-65
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
# Change Log
22
All notable changes to this project will be documented in this file.
33

4+
## [Unreleased]
5+
### Added:
6+
- PNDA-2445: Support for Hortonworks HDP
7+
48
## [0.4.0] 2017-05-23
59
### Added
610
- PNDA-2729: Added support for spark streaming jobs written in python (pyspark). Use `main_py` instead of `main_jar` in properties.json and specify additional files using `py_files`.
711
- PNDA-2784: Make tests pass on RedHat
12+
813
### Changed
914
- PNDA-2700: Spark streaming jobs no longer require upstart.conf or yarn-kill.py files, default ones are supplied by the deployment manager.
1015
- PNDA-2782: Disabled Ubuntu-only test

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -399,9 +399,9 @@ hdfspath_path_name generated from entries in hdfs.json
399399
## Environment Variables ##
400400
These can be obtained with the [environment endpoints API](#environment-endpoints-api)
401401
````
402-
environment_cloudera_manager_host 192.168.1.2
403-
environment_cloudera_manager_password admin
404-
environment_cloudera_manager_username admin
402+
environment_hadoop_manager_host 192.168.1.2
403+
environment_hadoop_manager_password admin
404+
environment_hadoop_manager_username admin
405405
environment_cluster_private_key ./dm.pem
406406
environment_cluster_root_user cloud-user
407407
environment_hbase_rest_port 20550

api/src/main/resources/deployer_utils.py

Lines changed: 114 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,26 +41,126 @@ def connect_cm(cm_api, cm_username, cm_password):
4141
return api
4242

4343

44-
def get_named_service(cm_host, cluster_name, service_name, user_name='admin', password='admin'):
44+
def get_nameservice(cm_host, cluster_name, service_name, user_name='admin', password='admin'):
4545
request_url = 'http://%s:7180/api/v11/clusters/%s/services/%s/nameservices' % (cm_host,
4646
cluster_name,
4747
service_name)
4848
result = requests.get(request_url, auth=(user_name, password))
49-
named_service = ""
49+
nameservice = ""
5050
if result.status_code == 200:
5151
response = result.json()
5252
if 'items' in response:
53-
named_service = response['items'][0]['name']
54-
logging.debug("Found named service %s for %s", named_service, service_name)
55-
return named_service
53+
nameservice = response['items'][0]['name']
54+
logging.debug("Found named service %s for %s", nameservice, service_name)
55+
return nameservice
5656

5757

5858
def fill_hadoop_env(env):
59+
if env['hadoop_distro'] == 'CDH':
60+
fill_hadoop_env_cdh(env)
61+
else:
62+
fill_hadoop_env_hdp(env)
63+
64+
logging.debug(env)
65+
66+
def ambari_request(ambari, uri):
67+
hadoop_manager_ip = ambari[0]
68+
hadoop_manager_username = ambari[1]
69+
hadoop_manager_password = ambari[2]
70+
if uri.startswith("http"):
71+
full_uri = uri
72+
else:
73+
full_uri = 'http://%s:8080/api/v1%s' % (hadoop_manager_ip, uri)
74+
75+
headers = {'X-Requested-By': hadoop_manager_username}
76+
auth = (hadoop_manager_username, hadoop_manager_password)
77+
return requests.get(full_uri, auth=auth, headers=headers).json()
78+
79+
def get_hdfs_hdp(ambari, cluster_name):
80+
core_site = ambari_request(ambari, '/clusters/%s?fields=Clusters/desired_configs/core-site' % cluster_name)
81+
config_version = core_site['Clusters']['desired_configs']['core-site']['tag']
82+
core_site_config = ambari_request(ambari, '/clusters/%s/configurations/?type=core-site&tag=%s' % (cluster_name, config_version))
83+
return core_site_config['items'][0]['properties']['fs.defaultFS']
84+
85+
def component_host(component_detail):
86+
host_list = ''
87+
for host_detail in component_detail['host_components']:
88+
if len(host_list) > 0:
89+
host_list += ','
90+
host_list += host_detail['HostRoles']['host_name']
91+
return host_list
92+
93+
def fill_hadoop_env_hdp(env):
94+
95+
hadoop_manager_ip = env['hadoop_manager_host']
96+
hadoop_manager_username = env['hadoop_manager_username']
97+
hadoop_manager_password = env['hadoop_manager_password']
98+
ambari = (hadoop_manager_ip, hadoop_manager_username, hadoop_manager_password)
99+
cluster_name = ambari_request(ambari, '/clusters')['items'][0]['Clusters']['cluster_name']
100+
101+
logging.debug('getting service list for %s', cluster_name)
102+
env['cm_status_links'] = {}
103+
nameservice = None
104+
105+
env['name_node'] = get_hdfs_hdp(ambari, cluster_name)
106+
107+
services = ambari_request(ambari, '/clusters/%s/services' % cluster_name)['items']
108+
for service in services:
109+
service_name = service['ServiceInfo']['service_name']
110+
env['cm_status_links']['%s' % service_name] = 'http://%s:8080/#/main/services/%s/summary' % (hadoop_manager_ip, service_name)
111+
service_components = ambari_request(ambari, service['href'] + '/components')['items']
112+
113+
for component in service_components:
114+
component_detail = ambari_request(ambari, component['href'])
115+
role_name = component_detail['ServiceComponentInfo']['component_name']
116+
117+
if role_name == "NAMENODE":
118+
env['webhdfs_host'] = '%s' % component_host(component_detail).split(',')[0]
119+
env['webhdfs_port'] = '14000'
120+
121+
elif role_name == "RESOURCEMANAGER":
122+
rm_host = component_host(component_detail)
123+
if len(rm_host.split(',')) > 1:
124+
main_rm_host = rm_host.split(',')[0]
125+
backup_rm_host = rm_host.split(',')[1]
126+
else:
127+
main_rm_host = rm_host
128+
backup_rm_host = None
129+
env['yarn_resource_manager_host'] = '%s' % main_rm_host
130+
env['yarn_resource_manager_port'] = '8088'
131+
env['yarn_resource_manager_mr_port'] = '8050'
132+
if backup_rm_host is not None:
133+
env['yarn_resource_manager_host_backup'] = '%s' % component_host(component_detail)
134+
env['yarn_resource_manager_port_backup'] = '8088'
135+
env['yarn_resource_manager_mr_port_backup'] = '8050'
136+
137+
elif role_name == "NODEMANAGER":
138+
env['yarn_node_managers'] = '%s' % component_host(component_detail)
139+
140+
elif role_name == "ZOOKEEPER_SERVER":
141+
env['zookeeper_quorum'] = '%s' % component_host(component_detail)
142+
env['zookeeper_port'] = '2181'
143+
144+
elif role_name == "HBASE_MASTER":
145+
env['hbase_rest_server'] = '%s' % component_host(component_detail).split(',')[0]
146+
env['hbase_rest_port'] = '20550'
147+
env['hbase_thrift_server'] = '%s' % component_host(component_detail).split(',')[0]
148+
149+
elif role_name == "OOZIE_SERVER":
150+
env['oozie_uri'] = 'http://%s:11000/oozie' % component_host(component_detail)
151+
152+
elif role_name == "HIVE_SERVER":
153+
env['hive_server'] = '%s' % component_host(component_detail)
154+
env['hive_port'] = '10000'
155+
156+
logging.debug(env)
157+
158+
def fill_hadoop_env_cdh(env):
59159
# pylint: disable=E1103
60160
api = connect_cm(
61-
env['cloudera_manager_host'],
62-
env['cloudera_manager_username'],
63-
env['cloudera_manager_password'])
161+
env['hadoop_manager_host'],
162+
env['hadoop_manager_username'],
163+
env['hadoop_manager_password'])
64164

65165
for cluster_detail in api.get_all_clusters():
66166
cluster_name = cluster_detail.name
@@ -73,14 +173,14 @@ def fill_hadoop_env(env):
73173
for service in cluster.get_all_services():
74174
env['cm_status_links']['%s' % service.name] = service.serviceUrl
75175
if service.type == "HDFS":
76-
named_service = get_named_service(env['cloudera_manager_host'], cluster_name,
176+
nameservice = get_nameservice(env['hadoop_manager_host'], cluster_name,
77177
service.name,
78-
user_name=env['cloudera_manager_username'],
79-
password=env['cloudera_manager_password'])
80-
if named_service:
81-
env['name_node'] = 'hdfs://%s' % named_service
178+
user_name=env['hadoop_manager_username'],
179+
password=env['hadoop_manager_password'])
180+
if nameservice:
181+
env['name_node'] = 'hdfs://%s' % nameservice
82182
for role in service.get_all_roles():
83-
if not named_service and role.type == "NAMENODE":
183+
if not nameservice and role.type == "NAMENODE":
84184
env['name_node'] = 'hdfs://%s:8020' % api.get_host(role.hostRef.hostId).hostname
85185
if role.type == "HTTPFS":
86186
env['webhdfs_host'] = '%s' % api.get_host(role.hostRef.hostId).ipAddress
@@ -151,9 +251,6 @@ def fill_hadoop_env(env):
151251
env['hue_port'] = '8888'
152252
break
153253

154-
logging.debug(env)
155-
156-
157254
def tree(archive_filepath):
158255
file_handle = file(archive_filepath, 'rb')
159256
tar_file = tarfile.open(None, 'r', file_handle)

api/src/main/resources/dm-config.json

Lines changed: 0 additions & 24 deletions
This file was deleted.

api/src/main/resources/plugins/oozie.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434

3535
class OozieCreator(Creator):
3636

37+
OOZIE_USER_NAME = 'hdfs'
38+
3739
def validate_component(self, component):
3840
errors = []
3941
file_list = component['component_detail']
@@ -50,19 +52,19 @@ def destroy_component(self, application_name, create_data):
5052
application_name,
5153
json.dumps(create_data))
5254
# terminate oozie jobs
53-
self._kill_oozie(create_data['job_handle'])
55+
self._kill_oozie(create_data['job_handle'], self.OOZIE_USER_NAME)
5456

5557
# delete component from hdfs
5658
remote_path = create_data['component_hdfs_root'][1:]
5759
self._hdfs_client.remove(remote_path, recursive=True)
5860

5961
def start_component(self, application_name, create_data):
6062
logging.debug("start_component: %s %s", application_name, json.dumps(create_data))
61-
self._start_oozie(create_data['job_handle'])
63+
self._start_oozie(create_data['job_handle'], self.OOZIE_USER_NAME)
6264

6365
def stop_component(self, application_name, create_data):
6466
logging.debug("stop_component: %s %s", application_name, json.dumps(create_data))
65-
self._stop_oozie(create_data['job_handle'])
67+
self._stop_oozie(create_data['job_handle'], self.OOZIE_USER_NAME)
6668

6769
def create_component(self, staged_component_path, application_name, component, properties):
6870
logging.debug(
@@ -87,7 +89,7 @@ def create_component(self, staged_component_path, application_name, component, p
8789
properties['deployment_end'] = end.strftime("%Y-%m-%dT%H:%MZ")
8890

8991
# insert required oozie properties
90-
properties['user.name'] = 'hdfs' # or serviceId?
92+
properties['user.name'] = self.OOZIE_USER_NAME
9193
# Oozie ShareLib - supports actions
9294
properties['oozie.use.system.libpath'] = 'true'
9395
# platform shared libs e.g. hbase
@@ -125,7 +127,7 @@ def _deploy_to_hadoop(self, properties, staged_component_path, remote_path, excl
125127

126128
# submit to oozie
127129
result = self._submit_oozie(properties)
128-
self._stop_oozie(result['id'])
130+
self._stop_oozie(result['id'], self.OOZIE_USER_NAME)
129131

130132
return result
131133

@@ -149,20 +151,20 @@ def _submit_oozie(self, job_properties):
149151

150152
return result
151153

152-
def _kill_oozie(self, job_id):
154+
def _kill_oozie(self, job_id, oozie_user):
153155
logging.debug("_kill_oozie: %s", job_id)
154-
oozie_url = '%s/v1/job/%s?action=kill' % (self._environment['oozie_uri'], job_id)
156+
oozie_url = '%s/v1/job/%s?action=kill&user.name=%s' % (self._environment['oozie_uri'], job_id, oozie_user)
155157
requests.put(oozie_url)
156158

157-
def _start_oozie(self, job_id):
159+
def _start_oozie(self, job_id, oozie_user):
158160
logging.debug("_start_oozie: %s", job_id)
159-
oozie_url = '%s/v1/job/%s?action=resume' % (self._environment['oozie_uri'], job_id)
161+
oozie_url = '%s/v1/job/%s?action=resume&user.name=%s' % (self._environment['oozie_uri'], job_id, oozie_user)
160162
requests.put(oozie_url)
161-
oozie_url = '%s/v1/job/%s?action=start' % (self._environment['oozie_uri'], job_id)
163+
oozie_url = '%s/v1/job/%s?action=start&user.name=%s' % (self._environment['oozie_uri'], job_id, oozie_user)
162164
requests.put(oozie_url)
163165

164-
def _stop_oozie(self, job_id):
166+
def _stop_oozie(self, job_id, oozie_user):
165167
logging.debug("_stop_oozie: %s", job_id)
166-
oozie_url = '%s/v1/job/%s?action=suspend' % (self._environment['oozie_uri'], job_id)
168+
oozie_url = '%s/v1/job/%s?action=suspend&user.name=%s' % (self._environment['oozie_uri'], job_id, oozie_user)
167169
print oozie_url
168170
requests.put(oozie_url)

0 commit comments

Comments
 (0)