Skip to content

Commit 691addc

Browse files
authored
Merge pull request #79 from dharaneeshvrd/PNDA-4517
PNDA-4517: Oozie ssh action support for flink batch jobs
2 parents fd73b0d + de8d448 commit 691addc

File tree

3 files changed

+72
-2
lines changed

3 files changed

+72
-2
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
sudo -u ${application_user} ${environment_flink} run -m yarn-cluster ${component_flink_config_args} -ynm ${component_job_name} -v ${flink_python_jar} ${component_staged_path}/${component_main_py} ${component_application_args}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
sudo -u ${application_user} ${environment_flink} run -m yarn-cluster ${component_flink_config_args} -ynm ${component_job_name} --class ${component_main_class} ${component_staged_path}/${component_main_jar} ${component_application_args}

api/src/main/resources/plugins/oozie.py

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,52 @@ def assert_application_properties(self, override_properties, default_properties)
5959
raise FailedValidation(json.dumps({"information": information}))
6060
properties = None
6161

62+
def exec_cmds(self, exec_commands):
63+
key_file = self._environment['cluster_private_key']
64+
root_user = self._environment['cluster_root_user']
65+
target_host = 'localhost'
66+
deployer_utils.exec_ssh(target_host, root_user, key_file, exec_commands)
67+
68+
def stage_flink_components(self, application_name, component_name, properties, staged_component_path):
69+
component_install_path = '%s/%s/%s' % (
70+
'/opt/%s' % self._namespace, application_name, component_name)
71+
properties['component_staged_path'] = component_install_path
72+
73+
this_dir = os.path.dirname(os.path.realpath(__file__))
74+
75+
if 'component_main_jar' in properties:
76+
service_script = 'flink-oozie.execute.sh.tpl'
77+
elif 'component_main_py' in properties:
78+
service_script = 'flink-oozie.execute.py.sh.tpl'
79+
flink_lib_dir = properties['environment_flink_lib_dir']
80+
for jar in os.listdir(flink_lib_dir):
81+
if os.path.isfile(os.path.join(flink_lib_dir, jar)) and 'flink-python' in jar:
82+
properties['flink_python_jar'] = '%s/%s' % (flink_lib_dir, jar)
83+
else:
84+
raise Exception('properties.json must contain "main_jar or main_py" for %s flink-batch-job %s' % (application_name, component['component_name']))
85+
86+
shutil.copyfile(os.path.join(this_dir, 'flink-stop.py'), '%s/lib/flink-stop.py' % staged_component_path)
87+
shutil.copyfile(os.path.join(this_dir, service_script), '%s/lib/%s' % (staged_component_path, service_script))
88+
self._fill_properties(os.path.join('%s/lib' % staged_component_path, "flink-stop.py"), properties)
89+
self._fill_properties(os.path.join('%s/lib' % staged_component_path, service_script), properties)
90+
91+
mkdir_commands = []
92+
mkdir_commands.append('sudo mkdir -p %s' % component_install_path)
93+
self.exec_cmds(mkdir_commands)
94+
95+
os.system("cp -r %s %s"
96+
% (staged_component_path + '/lib/*', component_install_path))
97+
98+
copy_commands = []
99+
copy_commands.append('sudo mv %s/%s %s/execute.sh' % (component_install_path, service_script, component_install_path))
100+
copy_commands.append('sudo chmod 777 %s/execute.sh' % (component_install_path))
101+
self.exec_cmds(copy_commands)
102+
103+
# adding flink_client host and script path to properties
104+
flink_host = "%s@%s" % (self._environment['cluster_root_user'], self._environment['flink_host'])
105+
properties['flink_client'] = flink_host
106+
properties['path_to_script'] = '%s/execute.sh' % component_install_path
107+
62108
def destroy_component(self, application_name, create_data):
63109
logging.debug(
64110
"destroy_component: %s %s",
@@ -71,6 +117,12 @@ def destroy_component(self, application_name, create_data):
71117
remote_path = create_data['component_hdfs_root'][1:]
72118
self._hdfs_client.remove(remote_path, recursive=True)
73119

120+
# stop flink job and delete component from local
121+
if "flink_staged_path" in create_data:
122+
destroy_commands = ["python %s/flink-stop.py" % create_data["flink_staged_path"],
123+
"sudo rm -rf %s\n" % create_data["flink_staged_path"]]
124+
self.exec_cmds(destroy_commands)
125+
74126
def start_component(self, application_name, create_data):
75127
logging.debug("start_component: %s %s", application_name, json.dumps(create_data))
76128
self._start_oozie(create_data['job_handle'], create_data['application_user'])
@@ -79,6 +131,11 @@ def stop_component(self, application_name, create_data):
79131
logging.debug("stop_component: %s %s", application_name, json.dumps(create_data))
80132
self._stop_oozie(create_data['job_handle'], create_data['application_user'])
81133

134+
# stop flink job
135+
if "flink_staged_path" in create_data:
136+
stop_commands = ["python %s/flink-stop.py" % create_data["flink_staged_path"]]
137+
self.exec_cmds(stop_commands)
138+
82139
def create_component(self, staged_component_path, application_name, user_name, component, properties):
83140
logging.debug(
84141
"create_component: %s %s %s %s",
@@ -102,6 +159,10 @@ def create_component(self, staged_component_path, application_name, user_name, c
102159
properties['deployment_start'] = start.strftime("%Y-%m-%dT%H:%MZ")
103160
properties['deployment_end'] = end.strftime("%Y-%m-%dT%H:%MZ")
104161

162+
# for flink jobs, code need to be staged locally because both ssh action and flink client requires code to be present in local
163+
if properties.get('component_job_type','') == 'flink':
164+
self.stage_flink_components(application_name, component['component_name'], properties, staged_component_path)
165+
105166
# insert required oozie properties
106167
properties['user.name'] = properties['application_user']
107168
# Oozie ShareLib - supports actions
@@ -131,9 +192,16 @@ def create_component(self, staged_component_path, application_name, user_name, c
131192
undeploy = self._deploy_to_hadoop(component, properties, staged_component_path, remote_path, properties['application_user'])
132193

133194
# return something that can be used to undeploy later
134-
return {'job_handle': undeploy['id'],
195+
ret_data = {}
196+
197+
# if code staged locally in case of flink, add flink local staged path in return data for other oprations
198+
if "component_staged_path" in properties:
199+
ret_data["flink_staged_path"] = properties["component_staged_path"]
200+
201+
ret_data.update({'job_handle': undeploy['id'],
135202
'component_hdfs_root': properties['component_hdfs_root'],
136-
'application_user': properties['application_user']}
203+
'application_user': properties['application_user']})
204+
return ret_data
137205

138206
def _setup_queue_config(self, component, staged_component_path, properties):
139207
# Add queue config into the default config if none is defined.

0 commit comments

Comments
 (0)