Skip to content

Commit 202be7c

Browse files
author
donaldh
authored
Merge pull request #99 from cgiraldo/containers
Added support to deploy jupyter components.
2 parents 5fe290b + 6d54dfd commit 202be7c

20 files changed

+323
-130
lines changed

Dockerfile

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,17 @@ COPY api/src/main/resources /deployment-manager/
88
WORKDIR /deployment-manager
99

1010
RUN pip3 install -r requirements.txt
11+
12+
# GIT http server to serve notebooks to jupyterhub
13+
RUN yum install -y git nginx fcgiwrap && \
14+
git config --global user.email "[email protected]" && \
15+
git config --global user.name "pnda" && \
16+
mkdir -p /data/git-repos/ && \
17+
mkdir -p /data/stage/
18+
COPY docker/nginx.conf /etc/nginx/nginx.conf
19+
COPY docker/entrypoint.sh /entrypoint.sh
20+
# PNDA platform users must transition from Linux SO users to cloud-native. For now we add a pnda user to container images.
21+
RUN useradd pnda
22+
23+
ENTRYPOINT "/entrypoint.sh"
24+

api/src/main/resources/application_creator.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def __init__(self, config, environment, service):
4747
environment['webhdfs_user'])
4848

4949
def assert_application_properties(self, override_properties, default_properties):
50-
for component_type, component_properties in default_properties.iteritems():
50+
for component_type, component_properties in default_properties.items():
5151
creator = self._load_creator(component_type)
5252
creator.assert_application_properties(override_properties.get(component_type, {}), component_properties)
5353

@@ -70,7 +70,7 @@ def create_application(self, package_data_path, package_metadata, application_na
7070
# component specific return data for destruction
7171
create_metadata = {}
7272
try:
73-
for component_type, components in package_metadata['component_types'].iteritems():
73+
for component_type, components in package_metadata['component_types'].items():
7474
creator = self._load_creator(component_type)
7575
result = creator.create_components(stage_path,
7676
application_name,
@@ -89,7 +89,7 @@ def destroy_application(self, application_name, application_create_data):
8989
logging.debug("destroy_application: %s %s", application_name, application_create_data)
9090

9191
app_hdfs_root = None
92-
for component_type, component_create_data in application_create_data.iteritems():
92+
for component_type, component_create_data in application_create_data.items():
9393
creator = self._load_creator(component_type)
9494
creator.destroy_components(application_name, component_create_data)
9595
if component_create_data and 'application_hdfs_root' in component_create_data[0]:
@@ -106,15 +106,15 @@ def start_application(self, application_name, application_create_data):
106106

107107
logging.debug("start_application: %s %s", application_name, application_create_data)
108108

109-
for component_type, component_create_data in application_create_data.iteritems():
109+
for component_type, component_create_data in application_create_data.items():
110110
creator = self._load_creator(component_type)
111111
creator.start_components(application_name, component_create_data)
112112

113113
def stop_application(self, application_name, application_create_data):
114114

115115
logging.debug("stop_application: %s %s", application_name, application_create_data)
116116

117-
for component_type, component_create_data in application_create_data.iteritems():
117+
for component_type, component_create_data in application_create_data.items():
118118
creator = self._load_creator(component_type)
119119
creator.stop_components(application_name, component_create_data)
120120

@@ -124,7 +124,7 @@ def validate_package(self, package_name, package_metadata):
124124

125125
result = {}
126126
self._validate_name(package_name, package_metadata)
127-
for component_type, component_metadata in package_metadata['component_types'].iteritems():
127+
for component_type, component_metadata in package_metadata['component_types'].items():
128128
creator = self._load_creator(component_type)
129129
validation_errors = creator.validate_components(component_metadata)
130130
if validation_errors:
@@ -152,7 +152,7 @@ def get_application_runtime_details(self, application_name, application_create_d
152152

153153
details = {}
154154
details['yarn_applications'] = {}
155-
for component_type, component_create_data in application_create_data.iteritems():
155+
for component_type, component_create_data in application_create_data.items():
156156
creator = self._load_creator(component_type)
157157
type_details = creator.get_component_runtime_details(component_create_data)
158158
details['yarn_applications'].update(type_details['yarn_applications'])

api/src/main/resources/application_detailed_summary.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def _do_generate():
7171
try:
7272
create_data = self._application_registrar.get_create_data(application)
7373
input_data = {}
74-
for component_name, component_data in create_data.iteritems():
74+
for component_name, component_data in create_data.items():
7575
input_data[component_name] = {}
7676
input_data[component_name]["component_ref"] = self._load_creator(component_name)
7777
input_data[component_name]["component_data"] = component_data

api/src/main/resources/application_registrar.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from Hbase_thrift import AlreadyExists
2727

2828
from lifecycle_states import ApplicationState
29+
from hbase_utils import encode,decode
2930

3031

3132
class HbaseApplicationRegistrar(object):
@@ -101,7 +102,7 @@ def list_applications(self):
101102
connection = happybase.Connection(self._hbase_host)
102103
try:
103104
table = connection.table(self._table_name)
104-
result = [key for key, data in table.scan(columns=['cf:status']) if data['cf:status'] != ApplicationState.NOTCREATED]
105+
result = [decode(key) for key, data in table.scan(columns=[b'cf:status']) if decode(data[b'cf:status']) != ApplicationState.NOTCREATED]
105106
finally:
106107
connection.close()
107108
return result
@@ -112,8 +113,8 @@ def list_applications_for_package(self, package_name):
112113
connection = happybase.Connection(self._hbase_host)
113114
try:
114115
table = connection.table(self._table_name)
115-
result = [key for key, data in table.scan(columns=['cf:package_name', 'cf:status'])
116-
if data['cf:package_name'] == package_name and data['cf:status'] != ApplicationState.NOTCREATED]
116+
result = [key for key, data in table.scan(columns=[b'cf:package_name', b'cf:status'])
117+
if decode(data[b'cf:package_name']) == package_name and decode(data[b'cf:status']) != ApplicationState.NOTCREATED]
117118
finally:
118119
connection.close()
119120
return result
@@ -131,15 +132,15 @@ def _read_from_db(self, key):
131132
connection = happybase.Connection(self._hbase_host)
132133
try:
133134
table = connection.table(self._table_name)
134-
data = table.row(key)
135+
data = table.row(encode(key))
135136
finally:
136137
connection.close()
137-
return data
138+
return decode(data)
138139

139140
def _write_to_db(self, key, data):
140141
connection = happybase.Connection(self._hbase_host)
141142
try:
142143
table = connection.table(self._table_name)
143-
table.put(key, data)
144+
table.put(encode(key), encode(data))
144145
finally:
145146
connection.close()

api/src/main/resources/application_summary_registrar.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,19 +85,19 @@ def get_dm_status(self, key):
8585
try:
8686
connection = happybase.Connection(self._hbase_host)
8787
table = connection.table("platform_applications")
88-
row = table.row(key, columns=['cf:status'])
89-
status = row['cf:status']
88+
row = table.row(key, columns=[b'cf:status'])
89+
status = row[b'cf:status']
9090
except TTransportException as error_message:
9191
logging.error(str(error_message))
9292
finally:
9393
connection.close()
94-
return status
94+
return status.decode()
9595

9696
def get_flink_job_id(self, key):
9797
jid = ''
9898
data = self._read_from_db(key)
9999
if data:
100-
data = json.loads(data['cf:component_data'])
100+
data = json.loads(data[b'cf:component_data'])
101101
for component in data:
102102
if 'flink' in component:
103103
tracking_url = data[component]['tracking_url']
@@ -111,9 +111,9 @@ def get_summary_data(self, application):
111111
summary_data = self._read_from_db(application)
112112
if summary_data:
113113
record[application].update({
114-
'aggregate_status': summary_data['cf:aggregate_status']
114+
'aggregate_status': summary_data[b'cf:aggregate_status']
115115
})
116-
record[application].update(json.loads(summary_data['cf:component_data']))
116+
record[application].update(json.loads(summary_data[b'cf:component_data']))
117117
else:
118118
record[application].update({
119119
'status': 'Not Available'

api/src/main/resources/deployer_utils.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
import os
2424
import tarfile
25-
from io import StringIO
25+
from io import BytesIO
2626
import logging
2727
import traceback
2828
import time
@@ -243,7 +243,7 @@ def create_file(self, data, remote_file_path, permission=755):
243243

244244
logging.debug('create_file: %s', remote_file_path)
245245

246-
sio = StringIO.StringIO(data)
246+
sio = BytesIO(data)
247247

248248
self._hdfs.create_file(
249249
canonicalize(remote_file_path),
@@ -310,7 +310,7 @@ def exec_ssh(host, user, key, ssh_commands):
310310

311311
def dict_to_props(dict_props):
312312
props = []
313-
for key, value in dict_props.iteritems():
313+
for key, value in dict_props.items():
314314
props.append('%s=%s' % (key, value))
315315
return '\n'.join(props)
316316

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
def decode(data):
2+
if isinstance(data, bytes): return data.decode('utf-8')
3+
if isinstance(data, dict): return dict(map(decode, data.items()))
4+
if isinstance(data, tuple): return map(decode, data)
5+
if isinstance(data, list): return list(map(decode, data))
6+
return data
7+
8+
def encode(data):
9+
if isinstance(data, str): return data.encode('utf-8')
10+
if isinstance(data, dict): return dict(map(encode, data.items()))
11+
if isinstance(data, tuple): return map(encode, data)
12+
if isinstance(data, list): return list(map(encode, data))
13+
return data

api/src/main/resources/package_parser.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ def __init__(self):
3535

3636
def properties_from_metadata(self, metadata):
3737
properties = {}
38-
for component_type, components in metadata['component_types'].iteritems():
38+
for component_type, components in metadata['component_types'].items():
3939
properties[component_type] = {}
40-
for component_name, component_detail in components.iteritems():
40+
for component_name, component_detail in components.items():
4141
properties[component_type][component_name] = component_detail['component_detail']['properties.json']
4242
return properties
4343

@@ -81,8 +81,8 @@ def get_package_metadata(self, package_data_path):
8181
if len(metadata['component_types']) <= 0:
8282
raise FailedValidation("Expected to find at least one component within the package directory")
8383

84-
for component_type, components in metadata['component_types'].iteritems():
85-
for component_name, component_detail in components.iteritems():
84+
for component_type, components in metadata['component_types'].items():
85+
for component_name, component_detail in components.items():
8686
if 'properties.json' not in component_detail['component_detail']:
8787
component_detail['component_detail']['properties.json'] = {}
8888

api/src/main/resources/package_registrar.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,10 @@
3131

3232
from exceptiondef import FailedConnection
3333

34+
from hbase_utils import encode,decode
3435

3536
class HbasePackageRegistrar(object):
36-
COLUMN_DEPLOY_STATUS = "cf:deploy_status"
37+
COLUMN_DEPLOY_STATUS = 'cf:deploy_status'
3738

3839
def __init__(self, hbase_host, hdfs_host, hdfs_user, hdfs_port, package_local_dir_path):
3940
self._hbase_host = hbase_host
@@ -110,8 +111,8 @@ def get_package_metadata(self, package_name):
110111
package_name, ['cf:metadata', 'cf:name', 'cf:version'])
111112
if not package_data:
112113
return None
113-
return {"metadata": json.loads(package_data["cf:metadata"]), "name": package_data[
114-
"cf:name"], "version": package_data["cf:version"]}
114+
return {"metadata": json.loads(package_data['cf:metadata']), "name": package_data[
115+
'cf:name'], "version": package_data['cf:version']}
115116

116117
def package_exists(self, package_name):
117118
logging.debug("Checking %s", package_name)
@@ -138,7 +139,7 @@ def list_packages(self):
138139
try:
139140
connection = happybase.Connection(self._hbase_host)
140141
table = connection.table(self._table_name)
141-
result = [key for key, _ in table.scan(columns=['cf:name'])]
142+
result = [key.decode() for key, _ in table.scan(columns=['cf:name'])]
142143
except Exception as exc:
143144
logging.debug(str(exc))
144145
raise FailedConnection('Unable to connect to the HBase master')
@@ -159,10 +160,11 @@ def _read_from_db(self, key, columns):
159160
connection = happybase.Connection(self._hbase_host)
160161
try:
161162
table = connection.table(self._table_name)
162-
data = table.row(key, columns=columns)
163+
data = table.row(encode(key), columns=encode(columns))
163164
finally:
164165
connection.close()
165-
return data
166+
167+
return decode(data)
166168

167169
def _read_from_hdfs(self, source_hdfs_path, dest_local_path):
168170
self._hdfs_client.stream_file_to_disk(source_hdfs_path, dest_local_path)
@@ -171,7 +173,7 @@ def _write_to_db(self, key, data):
171173
connection = happybase.Connection(self._hbase_host)
172174
try:
173175
table = connection.table(self._table_name)
174-
table.put(key, data)
176+
table.put(encode(key), encode(data))
175177
finally:
176178
connection.close()
177179

api/src/main/resources/plugins/base_creator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ def _destroy_optional_descriptors(self, descriptors):
246246
def create_components(self, stage_path, application_name, user_name, components,
247247
components_overrides):
248248
results = []
249-
for component_name, component in components.iteritems():
249+
for component_name, component in components.items():
250250
staged_component_path = '%s/%s' % (stage_path, component['component_path'])
251251
overrides = components_overrides.get(component_name) if components_overrides is not None else {}
252252
overrides = {} if overrides is None else overrides
@@ -280,7 +280,7 @@ def stop_components(self, application_name, stop_data):
280280
def validate_components(self, components):
281281
logging.debug("validate_components: %s", components)
282282
result = {}
283-
for component_name, component in components.iteritems():
283+
for component_name, component in components.items():
284284
validation_errors = self.validate_component(component)
285285
if validation_errors:
286286
result[component_name] = validation_errors

0 commit comments

Comments
 (0)