Skip to content

Commit 4670089

Browse files
authored
Merge pull request #64 from pndaproject/PNDA-4383
Fixes for Oozie queue config
2 parents de4acf2 + 098f7f0 commit 4670089

File tree

1 file changed

+78
-22
lines changed

1 file changed

+78
-22
lines changed

api/src/main/resources/plugins/oozie.py

Lines changed: 78 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@
2222

2323
# pylint: disable=C0103
2424

25+
import os
2526
import json
2627
import logging
2728
import datetime
2829
import xml.etree.ElementTree as ElementTree
2930
import commands
31+
import shutil
32+
import traceback
3033
import requests
3134

3235
import deployer_utils
@@ -111,41 +114,28 @@ def create_component(self, staged_component_path, application_name, user_name, c
111114
properties[def_path] = '%s/%s' % (self._environment['name_node'], remote_path)
112115

113116
# deploy everything to various hadoop services
114-
undeploy = self._deploy_to_hadoop(properties, staged_component_path, remote_path, properties['application_user'])
117+
undeploy = self._deploy_to_hadoop(component, properties, staged_component_path, remote_path, properties['application_user'])
115118

116119
# return something that can be used to undeploy later
117120
return {'job_handle': undeploy['id'],
118121
'component_hdfs_root': properties['component_hdfs_root'],
119122
'application_user': properties['application_user']}
120123

121-
def _deploy_to_hadoop(self, properties, staged_component_path, remote_path, application_user, exclude=None):
122-
if exclude is None:
123-
exclude = []
124-
exclude.extend(['hdfs.json',
125-
'hbase.json',
126-
'properties.json',
127-
'application.properties'])
128-
129-
# stage the component files to hdfs
130-
self._hdfs_client.recursive_copy(staged_component_path, remote_path, exclude=exclude, permission=755)
131-
132-
# stage the instantiated job properties back to HDFS - no functional purpose,
133-
# just helps developers understand what has happened
134-
effective_job_properties = deployer_utils.dict_to_props(properties)
135-
self._hdfs_client.create_file(effective_job_properties, '%s/application.properties' % remote_path)
136-
124+
def _setup_queue_config(self, component, staged_component_path, properties):
137125
# Add queue config into the default config if none is defined.
138126
if 'mapreduce.job.queuename' in properties:
139127
defaults = {'mapreduce.job.queuename':properties['mapreduce.job.queuename']}
140128
try:
141-
data = self._hdfs_client.read_file('%s/config-default.xml' % remote_path)
129+
with open('%s/config-default.xml' % staged_component_path, 'r') as config_default_file:
130+
data = config_default_file.read()
142131
except:
143132
logging.debug('No config-default.xml is detected.')
144133
data = None
145134

146135
if data is None:
147136
logging.debug('Creating config-default.xml to inject mapreduce.job.queuename property.')
148-
self._hdfs_client.create_file(deployer_utils.dict_to_xml(defaults), '%s/config-default.xml' % remote_path)
137+
with open('%s/config-default.xml' % staged_component_path, 'w') as config_default_file:
138+
config_default_file.write(deployer_utils.dict_to_xml(defaults))
149139
else:
150140
prop = None
151141
root = None
@@ -167,10 +157,76 @@ def _deploy_to_hadoop(self, properties, staged_component_path, remote_path, appl
167157
logging.debug('adding mapred.queue.names in config-default.xml')
168158
prop = ElementTree.SubElement(root, 'property')
169159
ElementTree.SubElement(prop, 'name').text = 'mapreduce.job.queuename'
170-
ElementTree.SubElement(prop, 'value').text = 'dev'
160+
ElementTree.SubElement(prop, 'value').text = properties['mapreduce.job.queuename']
171161
data = ElementTree.tostring(root)
172-
self._hdfs_client.remove('%s/config-default.xml' % remote_path)
173-
self._hdfs_client.create_file(data, '%s/config-default.xml' % remote_path)
162+
with open('%s/config-default.xml' % staged_component_path, 'w') as config_default_file:
163+
config_default_file.write(data)
164+
165+
file_list = [file_name for file_name in component['component_detail'] if os.path.isfile('%s/%s' % (staged_component_path, file_name))]
166+
# find workflow.xml files
167+
for afile in file_list:
168+
workflow_modified = False
169+
file_path = '%s/%s' % (staged_component_path, afile)
170+
with open(file_path, 'r') as component_file:
171+
workflow_xml = component_file.read()
172+
if 'uri:oozie:workflow' not in workflow_xml:
173+
continue
174+
logging.debug("Found workflow file %s", file_path)
175+
# copy config-default.xml into this directory
176+
if os.path.dirname(file_path) != staged_component_path:
177+
shutil.copyfile('%s/config-default.xml' % staged_component_path, '%s/config-default.xml' % os.path.dirname(file_path))
178+
179+
# set the spark opts --queue so spark jobs are put in the right queue
180+
spark_action_index = 0
181+
while spark_action_index >= 0:
182+
spark_action_index = workflow_xml.find('<spark ', spark_action_index+1)
183+
spark_end_index = workflow_xml.find('</spark>', spark_action_index)
184+
jar_end_index = workflow_xml.find('</jar>', spark_action_index, spark_end_index)
185+
opts_index = workflow_xml.find('<spark-opts>', spark_action_index, spark_end_index)
186+
opts_end_index = workflow_xml.find('</spark-opts>', opts_index, spark_end_index)
187+
queue_opt_index = workflow_xml.find('--queue ', opts_index, opts_end_index)
188+
if jar_end_index >= 0:
189+
if opts_index < 0:
190+
# we need to add a spark-opts element
191+
split_index = jar_end_index+len('</jar>')
192+
workflow_xml = '%s%s%s' % (workflow_xml[:split_index],
193+
'<spark-opts>--queue ${wf:conf("mapreduce.job.queuename")}</spark-opts>',
194+
workflow_xml[split_index:])
195+
workflow_modified = True
196+
elif queue_opt_index < 0:
197+
# we need to add a queue opt to the existing spark-opts element
198+
split_index = opts_end_index
199+
workflow_xml = '%s%s%s' % (workflow_xml[:split_index], ' --queue ${wf:conf("mapreduce.job.queuename")}', workflow_xml[split_index:])
200+
workflow_modified = True
201+
202+
# write out modified workflow if changes were made
203+
if workflow_modified:
204+
logging.debug("Writing out modified workflow xml to %s", file_path)
205+
with open(file_path, "w") as workflow_file:
206+
workflow_file.write(workflow_xml)
207+
208+
def _deploy_to_hadoop(self, component, properties, staged_component_path, remote_path, application_user, exclude=None):
209+
if exclude is None:
210+
exclude = []
211+
exclude.extend(['hdfs.json',
212+
'hbase.json',
213+
'properties.json',
214+
'application.properties'])
215+
216+
# setup queue config
217+
try:
218+
self._setup_queue_config(component, staged_component_path, properties)
219+
except Exception as ex:
220+
logging.error(traceback.format_exc())
221+
raise FailedCreation('Failed to set up yarn queue config: %s' % str(ex))
222+
223+
# stage the component files to hdfs
224+
self._hdfs_client.recursive_copy(staged_component_path, remote_path, exclude=exclude, permission=755)
225+
226+
# stage the instantiated job properties back to HDFS - no functional purpose,
227+
# just helps developers understand what has happened
228+
effective_job_properties = deployer_utils.dict_to_props(properties)
229+
self._hdfs_client.create_file(effective_job_properties, '%s/application.properties' % remote_path)
174230

175231
# submit to oozie
176232
result = self._submit_oozie(properties)

0 commit comments

Comments
 (0)