22
22
23
23
# pylint: disable=C0103
24
24
25
+ import os
25
26
import json
26
27
import logging
27
28
import datetime
28
29
import xml .etree .ElementTree as ElementTree
29
30
import commands
31
+ import shutil
32
+ import traceback
30
33
import requests
31
34
32
35
import deployer_utils
@@ -111,41 +114,28 @@ def create_component(self, staged_component_path, application_name, user_name, c
111
114
properties [def_path ] = '%s/%s' % (self ._environment ['name_node' ], remote_path )
112
115
113
116
# deploy everything to various hadoop services
114
- undeploy = self ._deploy_to_hadoop (properties , staged_component_path , remote_path , properties ['application_user' ])
117
+ undeploy = self ._deploy_to_hadoop (component , properties , staged_component_path , remote_path , properties ['application_user' ])
115
118
116
119
# return something that can be used to undeploy later
117
120
return {'job_handle' : undeploy ['id' ],
118
121
'component_hdfs_root' : properties ['component_hdfs_root' ],
119
122
'application_user' : properties ['application_user' ]}
120
123
121
- def _deploy_to_hadoop (self , properties , staged_component_path , remote_path , application_user , exclude = None ):
122
- if exclude is None :
123
- exclude = []
124
- exclude .extend (['hdfs.json' ,
125
- 'hbase.json' ,
126
- 'properties.json' ,
127
- 'application.properties' ])
128
-
129
- # stage the component files to hdfs
130
- self ._hdfs_client .recursive_copy (staged_component_path , remote_path , exclude = exclude , permission = 755 )
131
-
132
- # stage the instantiated job properties back to HDFS - no functional purpose,
133
- # just helps developers understand what has happened
134
- effective_job_properties = deployer_utils .dict_to_props (properties )
135
- self ._hdfs_client .create_file (effective_job_properties , '%s/application.properties' % remote_path )
136
-
124
+ def _setup_queue_config (self , component , staged_component_path , properties ):
137
125
# Add queue config into the default config if none is defined.
138
126
if 'mapreduce.job.queuename' in properties :
139
127
defaults = {'mapreduce.job.queuename' :properties ['mapreduce.job.queuename' ]}
140
128
try :
141
- data = self ._hdfs_client .read_file ('%s/config-default.xml' % remote_path )
129
+ with open ('%s/config-default.xml' % staged_component_path , 'r' ) as config_default_file :
130
+ data = config_default_file .read ()
142
131
except :
143
132
logging .debug ('No config-default.xml is detected.' )
144
133
data = None
145
134
146
135
if data is None :
147
136
logging .debug ('Creating config-default.xml to inject mapreduce.job.queuename property.' )
148
- self ._hdfs_client .create_file (deployer_utils .dict_to_xml (defaults ), '%s/config-default.xml' % remote_path )
137
+ with open ('%s/config-default.xml' % staged_component_path , 'w' ) as config_default_file :
138
+ config_default_file .write (deployer_utils .dict_to_xml (defaults ))
149
139
else :
150
140
prop = None
151
141
root = None
@@ -167,10 +157,76 @@ def _deploy_to_hadoop(self, properties, staged_component_path, remote_path, appl
167
157
logging .debug ('adding mapred.queue.names in config-default.xml' )
168
158
prop = ElementTree .SubElement (root , 'property' )
169
159
ElementTree .SubElement (prop , 'name' ).text = 'mapreduce.job.queuename'
170
- ElementTree .SubElement (prop , 'value' ).text = 'dev'
160
+ ElementTree .SubElement (prop , 'value' ).text = properties [ 'mapreduce.job.queuename' ]
171
161
data = ElementTree .tostring (root )
172
- self ._hdfs_client .remove ('%s/config-default.xml' % remote_path )
173
- self ._hdfs_client .create_file (data , '%s/config-default.xml' % remote_path )
162
+ with open ('%s/config-default.xml' % staged_component_path , 'w' ) as config_default_file :
163
+ config_default_file .write (data )
164
+
165
+ file_list = [file_name for file_name in component ['component_detail' ] if os .path .isfile ('%s/%s' % (staged_component_path , file_name ))]
166
+ # find workflow.xml files
167
+ for afile in file_list :
168
+ workflow_modified = False
169
+ file_path = '%s/%s' % (staged_component_path , afile )
170
+ with open (file_path , 'r' ) as component_file :
171
+ workflow_xml = component_file .read ()
172
+ if 'uri:oozie:workflow' not in workflow_xml :
173
+ continue
174
+ logging .debug ("Found workflow file %s" , file_path )
175
+ # copy config-default.xml into this directory
176
+ if os .path .dirname (file_path ) != staged_component_path :
177
+ shutil .copyfile ('%s/config-default.xml' % staged_component_path , '%s/config-default.xml' % os .path .dirname (file_path ))
178
+
179
+ # set the spark opts --queue so spark jobs are put in the right queue
180
+ spark_action_index = 0
181
+ while spark_action_index >= 0 :
182
+ spark_action_index = workflow_xml .find ('<spark ' , spark_action_index + 1 )
183
+ spark_end_index = workflow_xml .find ('</spark>' , spark_action_index )
184
+ jar_end_index = workflow_xml .find ('</jar>' , spark_action_index , spark_end_index )
185
+ opts_index = workflow_xml .find ('<spark-opts>' , spark_action_index , spark_end_index )
186
+ opts_end_index = workflow_xml .find ('</spark-opts>' , opts_index , spark_end_index )
187
+ queue_opt_index = workflow_xml .find ('--queue ' , opts_index , opts_end_index )
188
+ if jar_end_index >= 0 :
189
+ if opts_index < 0 :
190
+ # we need to add a spark-opts element
191
+ split_index = jar_end_index + len ('</jar>' )
192
+ workflow_xml = '%s%s%s' % (workflow_xml [:split_index ],
193
+ '<spark-opts>--queue ${wf:conf("mapreduce.job.queuename")}</spark-opts>' ,
194
+ workflow_xml [split_index :])
195
+ workflow_modified = True
196
+ elif queue_opt_index < 0 :
197
+ # we need to add a queue opt to the existing spark-opts element
198
+ split_index = opts_end_index
199
+ workflow_xml = '%s%s%s' % (workflow_xml [:split_index ], ' --queue ${wf:conf("mapreduce.job.queuename")}' , workflow_xml [split_index :])
200
+ workflow_modified = True
201
+
202
+ # write out modified workflow if changes were made
203
+ if workflow_modified :
204
+ logging .debug ("Writing out modified workflow xml to %s" , file_path )
205
+ with open (file_path , "w" ) as workflow_file :
206
+ workflow_file .write (workflow_xml )
207
+
208
+ def _deploy_to_hadoop (self , component , properties , staged_component_path , remote_path , application_user , exclude = None ):
209
+ if exclude is None :
210
+ exclude = []
211
+ exclude .extend (['hdfs.json' ,
212
+ 'hbase.json' ,
213
+ 'properties.json' ,
214
+ 'application.properties' ])
215
+
216
+ # setup queue config
217
+ try :
218
+ self ._setup_queue_config (component , staged_component_path , properties )
219
+ except Exception as ex :
220
+ logging .error (traceback .format_exc ())
221
+ raise FailedCreation ('Failed to set up yarn queue config: %s' % str (ex ))
222
+
223
+ # stage the component files to hdfs
224
+ self ._hdfs_client .recursive_copy (staged_component_path , remote_path , exclude = exclude , permission = 755 )
225
+
226
+ # stage the instantiated job properties back to HDFS - no functional purpose,
227
+ # just helps developers understand what has happened
228
+ effective_job_properties = deployer_utils .dict_to_props (properties )
229
+ self ._hdfs_client .create_file (effective_job_properties , '%s/application.properties' % remote_path )
174
230
175
231
# submit to oozie
176
232
result = self ._submit_oozie (properties )
0 commit comments