Skip to content

Commit 509edf5

Browse files
authored
Merge pull request #17 from pndaproject/PNDA-2498
Use thrift server role for happybase and store packages in hdfs
2 parents ce50953 + 8ebb7b3 commit 509edf5

13 files changed

+152
-89
lines changed

api/src/main/resources/app.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,12 +297,16 @@ def main():
297297

298298
deployer_utils.fill_hadoop_env(config['environment'])
299299

300-
package_repository = PackageRepoRestClient(config['config']["package_repository"])
300+
package_repository = PackageRepoRestClient(config['config']["package_repository"], config['config']['stage_root'])
301301
dm = deployment_manager.DeploymentManager(package_repository,
302302
package_registrar.HbasePackageRegistrar(
303-
config['environment']['hbase_rest_server']),
303+
config['environment']['hbase_thrift_server'],
304+
config['environment']['webhdfs_host'],
305+
'hdfs',
306+
config['environment']['webhdfs_port'],
307+
config['config']['stage_root']),
304308
application_registrar.HbaseApplicationRegistrar(
305-
config['environment']['hbase_rest_server']),
309+
config['environment']['hbase_thrift_server']),
306310
config['environment'],
307311
config['config'])
308312

api/src/main/resources/application_creator.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
import tarfile
2424
import os
25-
import io
2625
import json
2726
import re
2827

@@ -46,14 +45,14 @@ def __init__(self, config, environment, service):
4645
environment['webhdfs_port'],
4746
'hdfs')
4847

49-
def create_application(self, package_data, package_metadata, application_name, property_overrides):
48+
def create_application(self, package_data_path, package_metadata, application_name, property_overrides):
5049

5150
logging.debug("create_application: %s", application_name)
5251

5352
if not re.match('^[a-zA-Z0-9_-]+$', application_name):
5453
raise FailedCreation('Application name %s may only contain a-z A-Z 0-9 - _' % application_name)
5554

56-
stage_path = self._stage_package(package_data)
55+
stage_path = self._stage_package(package_data_path)
5756

5857
# create each class of components in the package, aggregating any
5958
# component specific return data for destruction
@@ -164,16 +163,14 @@ def _load_creator(self, component_type):
164163

165164
return creator
166165

167-
def _stage_package(self, package_data):
166+
def _stage_package(self, package_data_path):
168167

169168
logging.debug("_stage_package")
170169

171170
if not os.path.isdir(self._config['stage_root']):
172171
os.mkdir(self._config['stage_root'])
173172

174-
file_like_object = io.BytesIO(package_data)
175-
tar = tarfile.open(fileobj=file_like_object)
173+
tar = tarfile.open(package_data_path)
176174
stage_path = "%s/%s" % (self._config['stage_root'], uuid.uuid4())
177175
tar.extractall(path=stage_path)
178-
file_like_object.close()
179176
return stage_path

api/src/main/resources/deployer_utils.py

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -97,16 +97,14 @@ def fill_hadoop_env(env):
9797
env['yarn_resource_manager_mr_port%s' % rm_instance] = '8032'
9898
if role.type == "NODEMANAGER":
9999
if 'yarn_node_managers' in env:
100-
env['yarn_node_managers'] = '%s,%s' % (
101-
env['yarn_node_managers'], api.get_host(role.hostRef.hostId).hostname)
100+
env['yarn_node_managers'] = '%s,%s' % (env['yarn_node_managers'], api.get_host(role.hostRef.hostId).hostname)
102101
else:
103102
env['yarn_node_managers'] = '%s' % api.get_host(
104103
role.hostRef.hostId).hostname
105104
elif service.type == "MAPREDUCE":
106105
for role in service.get_all_roles():
107106
if role.type == "JOBTRACKER":
108-
env['job_tracker'] = '%s:8021' % api.get_host(
109-
role.hostRef.hostId).hostname
107+
env['job_tracker'] = '%s:8021' % api.get_host(role.hostRef.hostId).hostname
110108
break
111109
elif service.type == "ZOOKEEPER":
112110
for role in service.get_all_roles():
@@ -119,42 +117,37 @@ def fill_hadoop_env(env):
119117
elif service.type == "HBASE":
120118
for role in service.get_all_roles():
121119
if role.type == "HBASERESTSERVER":
122-
env['hbase_rest_server'] = '%s' % api.get_host(
123-
role.hostRef.hostId).hostname
120+
env['hbase_rest_server'] = '%s' % api.get_host(role.hostRef.hostId).hostname
124121
env['hbase_rest_port'] = '20550'
125-
break
122+
elif role.type == "HBASETHRIFTSERVER":
123+
env['hbase_thrift_server'] = '%s' % api.get_host(role.hostRef.hostId).hostname
126124
elif service.type == "OOZIE":
127125
for role in service.get_all_roles():
128126
if role.type == "OOZIE_SERVER":
129-
env['oozie_uri'] = 'http://%s:11000/oozie' % api.get_host(
130-
role.hostRef.hostId).hostname
127+
env['oozie_uri'] = 'http://%s:11000/oozie' % api.get_host(role.hostRef.hostId).hostname
131128
break
132129
elif service.type == "HIVE":
133130
for role in service.get_all_roles():
134131
if role.type == "HIVESERVER2":
135-
env['hive_server'] = '%s' % api.get_host(
136-
role.hostRef.hostId).hostname
132+
env['hive_server'] = '%s' % api.get_host(role.hostRef.hostId).hostname
137133
env['hive_port'] = '10000'
138134
break
139135
elif service.type == "IMPALA":
140136
for role in service.get_all_roles():
141137
if role.type == "IMPALAD":
142-
env['impala_host'] = '%s' % api.get_host(
143-
role.hostRef.hostId).hostname
138+
env['impala_host'] = '%s' % api.get_host(role.hostRef.hostId).hostname
144139
env['impala_port'] = '21050'
145140
break
146141
elif service.type == "KUDU":
147142
for role in service.get_all_roles():
148143
if role.type == "KUDU_MASTER":
149-
env['kudu_host'] = '%s' % api.get_host(
150-
role.hostRef.hostId).hostname
144+
env['kudu_host'] = '%s' % api.get_host(role.hostRef.hostId).hostname
151145
env['kudu_port'] = '7051'
152146
break
153147
elif service.type == "HUE":
154148
for role in service.get_all_roles():
155149
if role.type == "HUE_SERVER":
156-
env['hue_host'] = '%s' % api.get_host(
157-
role.hostRef.hostId).hostname
150+
env['hue_host'] = '%s' % api.get_host(role.hostRef.hostId).hostname
158151
env['hue_port'] = '8888'
159152
break
160153

@@ -240,6 +233,25 @@ def create_file(self, data, remote_file_path):
240233
sio,
241234
overwrite=True)
242235

236+
def append_file(self, data, remote_file_path):
237+
238+
logging.debug('append to: %s', remote_file_path)
239+
240+
self._hdfs.append_file(canonicalize(remote_file_path), data)
241+
242+
243+
def stream_file_to_disk(self, remote_file_path, local_file_path):
244+
chunk_size = 10*1024*1024
245+
offset = 0
246+
with open(local_file_path, 'wb') as dest_file:
247+
data = self._hdfs.read_file(canonicalize(remote_file_path), offset=offset, length=chunk_size)
248+
while True:
249+
dest_file.write(data)
250+
if len(data) < chunk_size:
251+
break
252+
offset += chunk_size
253+
data = self._hdfs.read_file(canonicalize(remote_file_path), offset=offset, length=chunk_size)
254+
243255
def read_file(self, remote_file_path):
244256

245257
data = self._hdfs.read_file(canonicalize(remote_file_path))

api/src/main/resources/deployment_manager.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import logging
2424
import json
25+
import os
2526
import time
2627
import datetime
2728
import threading
@@ -155,11 +156,11 @@ def _do_deploy():
155156
package_file = package + '.tar.gz'
156157
logging.info("deploy: %s", package)
157158
# download package:
158-
package_data = self._repository.get_package(package_file)
159+
package_data_path = self._repository.get_package(package_file)
159160
# put package in database:
160-
metadata = self._package_parser.get_package_metadata(package_data)
161+
metadata = self._package_parser.get_package_metadata(package_data_path)
161162
self._application_creator.validate_package(package, metadata)
162-
self._package_registrar.set_package(package, package_data)
163+
self._package_registrar.set_package(package, package_data_path)
163164
# set the operation status as complete
164165
deploy_status = {"state": PackageDeploymentState.DEPLOYED,
165166
"information": "Deployed " + package + " at " + self.utc_string()}
@@ -175,6 +176,7 @@ def _do_deploy():
175176
finally:
176177
# report final state of operation to database:
177178
self._package_registrar.set_package_deploy_status(package, deploy_status)
179+
os.remove(package_data_path)
178180

179181
# schedule work to be done in the background:
180182
self._run_asynch_package_task(package_name=package,
@@ -373,7 +375,7 @@ def create_application(self, package, application, overrides):
373375
self._assert_application_status(application, ApplicationState.NOTCREATED)
374376
self._assert_package_status(package, PackageDeploymentState.DEPLOYED)
375377
defaults = self.get_package_info(package)['defaults']
376-
package_data = self._package_registrar.get_package_data(package)
378+
package_data_path = self._package_registrar.get_package_data(package)
377379
self._application_registrar.create_application(package, application, overrides, defaults)
378380
self._mark_creating(application)
379381

@@ -383,7 +385,7 @@ def do_work():
383385
try:
384386
package_metadata = self._package_registrar.get_package_metadata(package)['metadata']
385387
create_data = self._application_creator.create_application(
386-
package_data, package_metadata, application, overrides)
388+
package_data_path, package_metadata, application, overrides)
387389
self._application_registrar.set_create_data(application, create_data)
388390
self._application_registrar.set_application_status(application, ApplicationState.CREATED)
389391
except ExceptionThatShouldBeDisplayedToCaller as ex:
@@ -398,6 +400,7 @@ def do_work():
398400
# clear inner locks:
399401
self._clear_package_progress(application)
400402
self._state_change_event_application(application)
403+
os.remove(package_data_path)
401404

402405
self.dispatcher.run_as_asynch(task=do_work)
403406

api/src/main/resources/package_parser.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
import json
2424
import tarfile
25-
import io
2625
import traceback
2726
import logging
2827

@@ -42,14 +41,13 @@ def properties_from_metadata(self, metadata):
4241
properties[component_type][component_name] = component_detail['component_detail']['properties.json']
4342
return properties
4443

45-
def get_package_metadata(self, package_data):
44+
def get_package_metadata(self, package_data_path):
4645

4746
try:
4847
logging.debug("get_package_metadata")
4948
metadata = {}
5049

51-
file_like_object = io.BytesIO(package_data)
52-
tar = tarfile.open(fileobj=file_like_object)
50+
tar = tarfile.open(package_data_path)
5351
for name in sorted([member.name for member in tar.getmembers()]):
5452
name_parts = name.split('/')
5553
package_name = name_parts[0]

api/src/main/resources/package_registrar.py

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,27 @@
2222

2323
import logging
2424
import json
25+
2526
import happybase
2627
from Hbase_thrift import AlreadyExists
2728

2829
from package_parser import PackageParser
30+
from deployer_utils import HDFS
2931

3032

3133
class HbasePackageRegistrar(object):
3234
COLUMN_DEPLOY_STATUS = "cf:deploy_status"
3335

34-
def __init__(self, hbase_host):
36+
def __init__(self, hbase_host, hdfs_host, hdfs_user, hdfs_port, package_local_dir_path):
3537
self._hbase_host = hbase_host
38+
self._hdfs_user = hdfs_user
39+
self._hdfs_host = hdfs_host
40+
self._hdfs_port = hdfs_port
41+
self._hdfs_client = HDFS(hdfs_host, hdfs_port, hdfs_user)
3642
self._parser = PackageParser()
3743
self._table_name = 'platform_packages'
44+
self._package_hdfs_dir_path = "/user/pnda/application_packages"
45+
self._package_local_dir_path = package_local_dir_path
3846
if self._hbase_host is not None:
3947
connection = happybase.Connection(self._hbase_host)
4048
try:
@@ -45,10 +53,11 @@ def __init__(self, hbase_host):
4553
finally:
4654
connection.close()
4755

48-
def set_package(self, package_name, package_data):
49-
logging.debug("Storing %s, %s bytes", package_name, len(package_data))
50-
metadata = self._parser.get_package_metadata(package_data)
51-
key, data = self.generate_record(metadata, package_data)
56+
def set_package(self, package_name, package_data_path):
57+
logging.debug("Storing %s", package_name)
58+
metadata = self._parser.get_package_metadata(package_data_path)
59+
key, data = self.generate_record(metadata)
60+
self._write_to_hdfs(package_data_path, data['cf:package_data'])
5261
self._write_to_db(key, data)
5362

5463
def set_package_deploy_status(self, package_name, deploy_status):
@@ -62,6 +71,8 @@ def set_package_deploy_status(self, package_name, deploy_status):
6271

6372
def delete_package(self, package_name):
6473
logging.debug("Deleting %s", package_name)
74+
package_data_hdfs_path = self._read_from_db(package_name, ['cf:package_data'])['cf:package_data']
75+
self._hdfs_client.remove(package_data_hdfs_path)
6576
connection = happybase.Connection(self._hbase_host)
6677
try:
6778
table = connection.table(self._table_name)
@@ -71,10 +82,12 @@ def delete_package(self, package_name):
7182

7283
def get_package_data(self, package_name):
7384
logging.debug("Reading %s", package_name)
74-
package_data = self._read_from_db(package_name, ['cf:package_data'])
75-
if len(package_data) == 0:
85+
record = self._read_from_db(package_name, ['cf:package_data'])
86+
if len(record) == 0:
7687
return None
77-
return package_data['cf:package_data']
88+
local_package_path = "%s/%s" % (self._package_local_dir_path, package_name)
89+
self._read_from_hdfs(record['cf:package_data'], local_package_path)
90+
return local_package_path
7891

7992
def get_package_metadata(self, package_name):
8093
logging.debug("Reading %s", package_name)
@@ -114,12 +127,12 @@ def list_packages(self):
114127
connection.close()
115128
return result
116129

117-
def generate_record(self, metadata, package_data):
130+
def generate_record(self, metadata):
118131
return metadata["package_name"], {
119132
'cf:name': '-'.join(metadata["package_name"].split("-")[:-1]),
120133
'cf:version': metadata["package_name"].split("-")[-1],
121134
'cf:metadata': json.dumps(metadata),
122-
'cf:package_data': package_data
135+
'cf:package_data': "%s/%s" % (self._package_hdfs_dir_path, metadata["package_name"])
123136
}
124137

125138
def _read_from_db(self, key, columns):
@@ -131,10 +144,26 @@ def _read_from_db(self, key, columns):
131144
connection.close()
132145
return data
133146

147+
def _read_from_hdfs(self, source_hdfs_path, dest_local_path):
148+
self._hdfs_client.stream_file_to_disk(source_hdfs_path, dest_local_path)
149+
134150
def _write_to_db(self, key, data):
135151
connection = happybase.Connection(self._hbase_host)
136152
try:
137153
table = connection.table(self._table_name)
138154
table.put(key, data)
139155
finally:
140156
connection.close()
157+
158+
def _write_to_hdfs(self, source_local_path, dest_hdfs_path):
159+
with open(source_local_path, 'rb') as source_file:
160+
first = True
161+
chunk_size = 10*1024*1024
162+
data_chunk = source_file.read(chunk_size)
163+
while data_chunk:
164+
if first:
165+
self._hdfs_client.create_file(data_chunk, dest_hdfs_path)
166+
first = False
167+
else:
168+
self._hdfs_client.append_file(data_chunk, dest_hdfs_path)
169+
data_chunk = source_file.read(chunk_size)

api/src/main/resources/package_repo_rest_client.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@
2525

2626

2727
class PackageRepoRestClient(object):
28-
def __init__(self, api_url):
28+
def __init__(self, api_url, package_local_dir_path):
2929
"""
3030
A client implementation for the package repository API
3131
:param api_url: A url describing the location to make REST calls to
3232
"""
3333
self.api_url = api_url
34+
self._package_local_dir_path = package_local_dir_path
3435

3536
def put_package(self, package_name, package_data):
3637
"""
@@ -48,12 +49,15 @@ def get_package(self, package_name, expected_codes=None):
4849
"""
4950
gets a package from the repository
5051
:param package_nam:
51-
:return: the http response
52+
:return: local path to file
5253
"""
5354
if not expected_codes:
5455
expected_codes = [200]
5556
response = self.make_rest_get_request("/packages/" + package_name, expected_codes)
56-
return response.content
57+
local_path = "%s/%s" % (self._package_local_dir_path, package_name)
58+
with open(local_path, 'wb') as local_file:
59+
local_file.write(response.content)
60+
return local_path
5761

5862
def get_package_list(self, recency=None):
5963
"""

api/src/main/resources/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,4 @@ thrift==0.9.3
2626
thriftpy==0.3.9
2727
tornado==4.4.2
2828
tornado-cors==0.6.0
29-
wsgiref==0.1.2
29+
wsgiref==0.1.2

0 commit comments

Comments
 (0)