@@ -59,6 +59,52 @@ def assert_application_properties(self, override_properties, default_properties)
59
59
raise FailedValidation (json .dumps ({"information" : information }))
60
60
properties = None
61
61
62
+ def exec_cmds (self , exec_commands ):
63
+ key_file = self ._environment ['cluster_private_key' ]
64
+ root_user = self ._environment ['cluster_root_user' ]
65
+ target_host = 'localhost'
66
+ deployer_utils .exec_ssh (target_host , root_user , key_file , exec_commands )
67
+
68
+ def stage_flink_components (self , application_name , component_name , properties , staged_component_path ):
69
+ component_install_path = '%s/%s/%s' % (
70
+ '/opt/%s' % self ._namespace , application_name , component_name )
71
+ properties ['component_staged_path' ] = component_install_path
72
+
73
+ this_dir = os .path .dirname (os .path .realpath (__file__ ))
74
+
75
+ if 'component_main_jar' in properties :
76
+ service_script = 'flink-oozie.execute.sh.tpl'
77
+ elif 'component_main_py' in properties :
78
+ service_script = 'flink-oozie.execute.py.sh.tpl'
79
+ flink_lib_dir = properties ['environment_flink_lib_dir' ]
80
+ for jar in os .listdir (flink_lib_dir ):
81
+ if os .path .isfile (os .path .join (flink_lib_dir , jar )) and 'flink-python' in jar :
82
+ properties ['flink_python_jar' ] = '%s/%s' % (flink_lib_dir , jar )
83
+ else :
84
+ raise Exception ('properties.json must contain "main_jar or main_py" for %s flink-batch-job %s' % (application_name , component ['component_name' ]))
85
+
86
+ shutil .copyfile (os .path .join (this_dir , 'flink-stop.py' ), '%s/lib/flink-stop.py' % staged_component_path )
87
+ shutil .copyfile (os .path .join (this_dir , service_script ), '%s/lib/%s' % (staged_component_path , service_script ))
88
+ self ._fill_properties (os .path .join ('%s/lib' % staged_component_path , "flink-stop.py" ), properties )
89
+ self ._fill_properties (os .path .join ('%s/lib' % staged_component_path , service_script ), properties )
90
+
91
+ mkdir_commands = []
92
+ mkdir_commands .append ('sudo mkdir -p %s' % component_install_path )
93
+ self .exec_cmds (mkdir_commands )
94
+
95
+ os .system ("cp -r %s %s"
96
+ % (staged_component_path + '/lib/*' , component_install_path ))
97
+
98
+ copy_commands = []
99
+ copy_commands .append ('sudo mv %s/%s %s/execute.sh' % (component_install_path , service_script , component_install_path ))
100
+ copy_commands .append ('sudo chmod 777 %s/execute.sh' % (component_install_path ))
101
+ self .exec_cmds (copy_commands )
102
+
103
+ # adding flink_client host and script path to properties
104
+ flink_host = "%s@%s" % (self ._environment ['cluster_root_user' ], self ._environment ['flink_host' ])
105
+ properties ['flink_client' ] = flink_host
106
+ properties ['path_to_script' ] = '%s/execute.sh' % component_install_path
107
+
62
108
def destroy_component (self , application_name , create_data ):
63
109
logging .debug (
64
110
"destroy_component: %s %s" ,
@@ -71,6 +117,12 @@ def destroy_component(self, application_name, create_data):
71
117
remote_path = create_data ['component_hdfs_root' ][1 :]
72
118
self ._hdfs_client .remove (remote_path , recursive = True )
73
119
120
+ # stop flink job and delete component from local
121
+ if "flink_staged_path" in create_data :
122
+ destroy_commands = ["python %s/flink-stop.py" % create_data ["flink_staged_path" ],
123
+ "sudo rm -rf %s\n " % create_data ["flink_staged_path" ]]
124
+ self .exec_cmds (destroy_commands )
125
+
74
126
def start_component (self , application_name , create_data ):
75
127
logging .debug ("start_component: %s %s" , application_name , json .dumps (create_data ))
76
128
self ._start_oozie (create_data ['job_handle' ], create_data ['application_user' ])
@@ -79,6 +131,11 @@ def stop_component(self, application_name, create_data):
79
131
logging .debug ("stop_component: %s %s" , application_name , json .dumps (create_data ))
80
132
self ._stop_oozie (create_data ['job_handle' ], create_data ['application_user' ])
81
133
134
+ # stop flink job
135
+ if "flink_staged_path" in create_data :
136
+ stop_commands = ["python %s/flink-stop.py" % create_data ["flink_staged_path" ]]
137
+ self .exec_cmds (stop_commands )
138
+
82
139
def create_component (self , staged_component_path , application_name , user_name , component , properties ):
83
140
logging .debug (
84
141
"create_component: %s %s %s %s" ,
@@ -102,6 +159,10 @@ def create_component(self, staged_component_path, application_name, user_name, c
102
159
properties ['deployment_start' ] = start .strftime ("%Y-%m-%dT%H:%MZ" )
103
160
properties ['deployment_end' ] = end .strftime ("%Y-%m-%dT%H:%MZ" )
104
161
162
+ # for flink jobs, code need to be staged locally because both ssh action and flink client requires code to be present in local
163
+ if properties .get ('component_job_type' ,'' ) == 'flink' :
164
+ self .stage_flink_components (application_name , component ['component_name' ], properties , staged_component_path )
165
+
105
166
# insert required oozie properties
106
167
properties ['user.name' ] = properties ['application_user' ]
107
168
# Oozie ShareLib - supports actions
@@ -131,9 +192,16 @@ def create_component(self, staged_component_path, application_name, user_name, c
131
192
undeploy = self ._deploy_to_hadoop (component , properties , staged_component_path , remote_path , properties ['application_user' ])
132
193
133
194
# return something that can be used to undeploy later
134
- return {'job_handle' : undeploy ['id' ],
195
+ ret_data = {}
196
+
197
+ # if code staged locally in case of flink, add flink local staged path in return data for other oprations
198
+ if "component_staged_path" in properties :
199
+ ret_data ["flink_staged_path" ] = properties ["component_staged_path" ]
200
+
201
+ ret_data .update ({'job_handle' : undeploy ['id' ],
135
202
'component_hdfs_root' : properties ['component_hdfs_root' ],
136
- 'application_user' : properties ['application_user' ]}
203
+ 'application_user' : properties ['application_user' ]})
204
+ return ret_data
137
205
138
206
def _setup_queue_config (self , component , staged_component_path , properties ):
139
207
# Add queue config into the default config if none is defined.
0 commit comments