22
22
23
23
import os
24
24
import tarfile
25
- import StringIO
25
+ from io import BytesIO
26
26
import logging
27
27
import traceback
28
28
import time
32
32
import spur
33
33
from pywebhdfs .webhdfs import PyWebHdfsClient
34
34
35
- from cm_api .api_client import ApiResource
36
-
37
-
38
- def connect_cm (cm_api , cm_username , cm_password ):
39
- api = ApiResource (
40
- cm_api ,
41
- version = 6 ,
42
- username = cm_username ,
43
- password = cm_password )
44
- return api
45
-
46
-
47
35
def get_nameservice (cm_host , cluster_name , service_name , user_name = 'admin' , password = 'admin' ):
48
36
request_url = 'http://%s:7180/api/v11/clusters/%s/services/%s/nameservices' % (cm_host ,
49
37
cluster_name ,
@@ -66,9 +54,11 @@ def update_hadoop_env(env):
66
54
tmp_env = dict (env )
67
55
logging .debug ('Updating environment descriptor' )
68
56
if env ['hadoop_distro' ] == 'CDH' :
69
- fill_hadoop_env_cdh ( tmp_env )
70
- else :
57
+ logging . error ( 'CDH is not a supported hadoop distribution' )
58
+ elif env [ 'hadoop_distro' ] == 'HDP' :
71
59
fill_hadoop_env_hdp (tmp_env )
60
+ else :
61
+ logging .warning ('Skipping update_hadoop_env for hadoop distro "%s"' , env ['hadoop_distro' ])
72
62
logging .debug ('Updated environment descriptor' )
73
63
for key in tmp_env :
74
64
# Dictionary get/put operations are atomic so inherently thread safe and don't need a lock
@@ -182,105 +172,6 @@ def fill_hadoop_env_hdp(env):
182
172
env ['hive_server' ] = '%s' % component_host (component_detail )
183
173
env ['hive_port' ] = '10001'
184
174
185
- def fill_hadoop_env_cdh (env ):
186
- # pylint: disable=E1103
187
- api = connect_cm (
188
- env ['hadoop_manager_host' ],
189
- env ['hadoop_manager_username' ],
190
- env ['hadoop_manager_password' ])
191
-
192
- for cluster_detail in api .get_all_clusters ():
193
- cluster_name = cluster_detail .name
194
- break
195
-
196
- logging .debug ('getting %s' , cluster_name )
197
- env ['cm_status_links' ] = {}
198
- env .pop ('yarn_node_managers' , None )
199
- env .pop ('yarn_resource_manager_host' , None )
200
- env .pop ('zookeeper_quorum' , None )
201
-
202
- cluster = api .get_cluster (cluster_name )
203
- for service in cluster .get_all_services ():
204
- env ['cm_status_links' ]['%s' % service .name ] = service .serviceUrl
205
- if service .type == "HDFS" :
206
- nameservice = get_nameservice (env ['hadoop_manager_host' ], cluster_name ,
207
- service .name ,
208
- user_name = env ['hadoop_manager_username' ],
209
- password = env ['hadoop_manager_password' ])
210
- if nameservice :
211
- env ['name_node' ] = 'hdfs://%s' % nameservice
212
- for role in service .get_all_roles ():
213
- if not nameservice and role .type == "NAMENODE" :
214
- env ['name_node' ] = 'hdfs://%s:8020' % api .get_host (role .hostRef .hostId ).hostname
215
- if role .type == "HTTPFS" :
216
- env ['webhdfs_host' ] = '%s' % api .get_host (role .hostRef .hostId ).hostname
217
- env ['webhdfs_port' ] = '14000'
218
- elif service .type == "YARN" :
219
- for role in service .get_all_roles ():
220
- if role .type == "RESOURCEMANAGER" :
221
- if 'yarn_resource_manager_host' in env :
222
- rm_instance = '_backup'
223
- else :
224
- rm_instance = ''
225
- env ['yarn_resource_manager_host%s' % rm_instance ] = '%s' % api .get_host (role .hostRef .hostId ).hostname
226
- env ['yarn_resource_manager_port%s' % rm_instance ] = '8088'
227
- env ['yarn_resource_manager_mr_port%s' % rm_instance ] = '8032'
228
- if role .type == "NODEMANAGER" :
229
- if 'yarn_node_managers' in env :
230
- env ['yarn_node_managers' ] = '%s,%s' % (env ['yarn_node_managers' ], api .get_host (role .hostRef .hostId ).hostname )
231
- else :
232
- env ['yarn_node_managers' ] = '%s' % api .get_host (
233
- role .hostRef .hostId ).hostname
234
- elif service .type == "MAPREDUCE" :
235
- for role in service .get_all_roles ():
236
- if role .type == "JOBTRACKER" :
237
- env ['job_tracker' ] = '%s:8021' % api .get_host (role .hostRef .hostId ).hostname
238
- break
239
- elif service .type == "ZOOKEEPER" :
240
- for role in service .get_all_roles ():
241
- if role .type == "SERVER" :
242
- if 'zookeeper_quorum' in env :
243
- env ['zookeeper_quorum' ] += ',%s' % api .get_host (role .hostRef .hostId ).hostname
244
- else :
245
- env ['zookeeper_quorum' ] = '%s' % api .get_host (role .hostRef .hostId ).hostname
246
- env ['zookeeper_port' ] = '2181'
247
- elif service .type == "HBASE" :
248
- for role in service .get_all_roles ():
249
- if role .type == "HBASERESTSERVER" :
250
- env ['hbase_rest_server' ] = '%s' % api .get_host (role .hostRef .hostId ).hostname
251
- env ['hbase_rest_port' ] = '20550'
252
- elif role .type == "HBASETHRIFTSERVER" :
253
- env ['hbase_thrift_server' ] = '%s' % api .get_host (role .hostRef .hostId ).hostname
254
- elif service .type == "OOZIE" :
255
- for role in service .get_all_roles ():
256
- if role .type == "OOZIE_SERVER" :
257
- env ['oozie_uri' ] = 'http://%s:11000/oozie' % api .get_host (role .hostRef .hostId ).hostname
258
- break
259
- elif service .type == "HIVE" :
260
- for role in service .get_all_roles ():
261
- if role .type == "HIVESERVER2" :
262
- env ['hive_server' ] = '%s' % api .get_host (role .hostRef .hostId ).hostname
263
- env ['hive_port' ] = '10000'
264
- break
265
- elif service .type == "IMPALA" :
266
- for role in service .get_all_roles ():
267
- if role .type == "IMPALAD" :
268
- env ['impala_host' ] = '%s' % api .get_host (role .hostRef .hostId ).hostname
269
- env ['impala_port' ] = '21050'
270
- break
271
- elif service .type == "KUDU" :
272
- for role in service .get_all_roles ():
273
- if role .type == "KUDU_MASTER" :
274
- env ['kudu_host' ] = '%s' % api .get_host (role .hostRef .hostId ).hostname
275
- env ['kudu_port' ] = '7051'
276
- break
277
- elif service .type == "HUE" :
278
- for role in service .get_all_roles ():
279
- if role .type == "HUE_SERVER" :
280
- env ['hue_host' ] = '%s' % api .get_host (role .hostRef .hostId ).hostname
281
- env ['hue_port' ] = '8888'
282
- break
283
-
284
175
def tree (archive_filepath ):
285
176
file_handle = file (archive_filepath , 'rb' )
286
177
tar_file = tarfile .open (None , 'r' , file_handle )
@@ -297,7 +188,6 @@ def tree(archive_filepath):
297
188
298
189
return root
299
190
300
-
301
191
def canonicalize (path ):
302
192
path = path .replace ('\\ ' , '/' )
303
193
path = path .replace ('//' , '/' )
@@ -353,7 +243,7 @@ def create_file(self, data, remote_file_path, permission=755):
353
243
354
244
logging .debug ('create_file: %s' , remote_file_path )
355
245
356
- sio = StringIO . StringIO (data )
246
+ sio = BytesIO (data )
357
247
358
248
self ._hdfs .create_file (
359
249
canonicalize (remote_file_path ),
@@ -420,7 +310,7 @@ def exec_ssh(host, user, key, ssh_commands):
420
310
421
311
def dict_to_props (dict_props ):
422
312
props = []
423
- for key , value in dict_props .iteritems ():
313
+ for key , value in dict_props .items ():
424
314
props .append ('%s=%s' % (key , value ))
425
315
return '\n ' .join (props )
426
316
0 commit comments