2222import com .webank .wedatasphere .dss .linkis .node .execution .exception .LinkisJobExecutionErrorException ;
2323import com .webank .wedatasphere .dss .linkis .node .execution .job .CommonLinkisJob ;
2424import com .webank .wedatasphere .dss .linkis .node .execution .job .Job ;
25+ import com .webank .wedatasphere .dss .linkis .node .execution .job .LinkisJob ;
2526import com .webank .wedatasphere .dss .linkis .node .execution .service .LinkisURLService ;
2627import com .webank .wedatasphere .dss .linkis .node .execution .utils .LinkisJobExecutionUtils ;
2728import org .apache .linkis .filesystem .WorkspaceClientFactory ;
2829import org .apache .linkis .filesystem .request .WorkspaceClient ;
2930import org .apache .linkis .filesystem .response .ScriptFromBMLResponse ;
3031
31- import java .util .ArrayList ;
3232import java .util .HashMap ;
3333import java .util .List ;
3434import java .util .Map ;
35- import java .util .Optional ;
36- import java .util .regex .Matcher ;
3735import java .util .regex .Pattern ;
38- import org .apache .commons .lang3 .StringUtils ;
3936
4037
4138public class CodeParser implements JobParser {
@@ -44,9 +41,10 @@ public class CodeParser implements JobParser {
4441 private volatile WorkspaceClient client1X = null ;
4542 private volatile WorkspaceClient client0X = null ;
4643 private final Object clientLocker = new Object ();
44+
4745 @ Override
4846 public void parseJob (Job job ) throws Exception {
49- if (! ( job instanceof CommonLinkisJob ) ) {
47+ if (! (job instanceof CommonLinkisJob ) ) {
5048 return ;
5149 }
5250 CommonLinkisJob linkisAppConnJob = (CommonLinkisJob ) job ;
@@ -66,8 +64,11 @@ public void parseJob(Job job) throws Exception{
6664 if (null == scriptResource ) {
6765 throw new LinkisJobExecutionErrorException (90102 ,"Failed to get script resource" );
6866 }
67+ getAndSetCode (scriptResource , linkisAppConnJob );
68+ }
6969
70- Map <String , Object > executionParams = getExecutionParams (scriptResource , linkisAppConnJob );
70+ protected void getAndSetCode (BMLResource bmlResource , LinkisJob linkisAppConnJob ) {
71+ Map <String , Object > executionParams = getExecutionParams (bmlResource , linkisAppConnJob );
7172 if (executionParams .get ("executionCode" ) != null ) {
7273 String executionCode = (String ) executionParams .get ("executionCode" );
7374 linkisAppConnJob .getLogObj ().info ("************************************SUBMIT CODE************************************" );
@@ -81,9 +82,12 @@ public void parseJob(Job job) throws Exception{
8182 linkisAppConnJob .getParams ().putAll ( (Map <String , Object >)executionParams .get ("params" ));
8283 }
8384 }
85+ dealExecutionParams (linkisAppConnJob , executionParams );
8486 }
8587
86- private Map <String , Object > getExecutionParams (BMLResource bmlResource , CommonLinkisJob linkisAppConnJob ) {
88+ protected void dealExecutionParams (LinkisJob linkisAppConnJob , Map <String , Object > executionParams ) {}
89+
90+ protected Map <String , Object > getExecutionParams (BMLResource bmlResource , LinkisJob linkisAppConnJob ) {
8791 Map <String , Object > map = new HashMap <>();
8892 ScriptFromBMLResponse response = getOrCreateWorkSpaceClient (linkisAppConnJob ).requestOpenScriptFromBML (bmlResource .getResourceId (), bmlResource .getVersion (), bmlResource .getFileName ());
8993 linkisAppConnJob .getLogObj ().info ("Get execution code from workspace client,bml resource id " +bmlResource .getResourceId ()+", version is " +bmlResource .getVersion ());
@@ -92,7 +96,7 @@ private Map<String, Object> getExecutionParams(BMLResource bmlResource, CommonL
9296 return map ;
9397 }
9498
95- private WorkspaceClient getOrCreateWorkSpaceClient (CommonLinkisJob linkisAppConnJob ) {
99+ private WorkspaceClient getOrCreateWorkSpaceClient (LinkisJob linkisAppConnJob ) {
96100 Map <String , String > props = linkisAppConnJob .getJobProps ();
97101 if (LinkisJobExecutionConfiguration .isLinkis1_X (props )) {
98102 if (null == client1X ) {
@@ -119,131 +123,4 @@ private WorkspaceClient getOrCreateWorkSpaceClient(CommonLinkisJob linkisAppConn
119123 }
120124 }
121125
122- private ArrayList <String > getResourceNames (String code ){
123- ArrayList <String > bmlResourceNames = new ArrayList <String >();
124- Matcher mb = pb .matcher (code );
125- while (mb .find ()) {
126- bmlResourceNames .add (mb .group ().trim ());
127- }
128- return bmlResourceNames ;
129- }
130-
131-
132- /**
133- * 1.Find the project file used in the script
134- * 2.Find the node file used in the script
135- * 3.Recursively find the flow file used in the script
136- * 4.Replace file name with prefixed name
137- * @param resourceNames
138- * @param linkisAppConnJob
139- * @return
140- */
141- private ArrayList <BMLResource > getResourcesByNames (ArrayList <String > resourceNames , CommonLinkisJob linkisAppConnJob ) {
142-
143- ArrayList <BMLResource > bmlResourceArrayList = new ArrayList <>();
144-
145- String jobName = linkisAppConnJob .getJobName ();
146- String flowName = linkisAppConnJob .getSource ().get ("flowName" );
147- String projectName = linkisAppConnJob .getSource ().get ("projectName" );
148-
149-
150- List <BMLResource > projectResourceList = linkisAppConnJob .getProjectResourceList ();
151-
152-
153- List <BMLResource > jobResourceList = linkisAppConnJob .getJobResourceList ();
154- for (String resourceName : resourceNames ) {
155- String [] resourceNameSplit = resourceName .split ("://" );
156- String prefix = resourceNameSplit [0 ].toLowerCase ();
157- String fileName = resourceNameSplit [1 ];
158- BMLResource resource = null ;
159- String afterFileName = fileName ;
160- switch (prefix ) {
161- case "project" :
162- resource = findResource (projectResourceList , fileName );
163- afterFileName = LinkisJobExecutionConfiguration .PROJECT_PREFIX + "_" + projectName + "_" + fileName ;
164- break ;
165- case "flow" :
166- resource = findFlowResource (linkisAppConnJob , fileName , flowName );
167- break ;
168- case "node" :
169- resource = findResource (jobResourceList , fileName );
170- afterFileName = LinkisJobExecutionConfiguration .JOB_PREFIX + "_" + jobName + "_" + fileName ;
171- break ;
172- default :
173- }
174- if (null == resource ) {
175- linkisAppConnJob .getLogObj ().error ("Failed to find the " + prefix + " resource file of " + fileName );
176- throw new RuntimeException ("Failed to find the " + prefix + " resource file of " + fileName );
177- }
178- if (!afterFileName .equals (fileName )) {
179- resource .setFileName (afterFileName );
180- }
181- bmlResourceArrayList .add (resource );
182- }
183- return bmlResourceArrayList ;
184- }
185-
186-
187- /**
188- * Recursively find the flow file used in the script
189- * Recursive exit condition is top-level flow
190- *
191- */
192- private BMLResource findFlowResource (CommonLinkisJob linkisAppConnJob , String fileName , String flowName ) {
193-
194- String fullFlowName = "" ;
195- Map <String , List <BMLResource >> fLowNameAndResources = linkisAppConnJob .getFlowNameAndResources ();
196- if (fLowNameAndResources == null ){
197- return null ;
198- }
199- Optional <Map .Entry <String , List <BMLResource >>> first = fLowNameAndResources .entrySet ().stream ().filter (fLowNameAndResource -> fLowNameAndResource .getKey ().endsWith (flowName + LinkisJobExecutionConfiguration .RESOURCES_NAME )).findFirst ();
200-
201- if (first .isPresent ()){
202- fullFlowName = first .get ().getKey ();
203- BMLResource resource = findResource (first .get ().getValue (), fileName );
204- if (resource != null ) {
205- resource .setFileName (flowName + "_" + fileName );
206- return resource ;
207- }
208- }
209-
210- String firstFlow = "flow." + flowName + LinkisJobExecutionConfiguration .RESOURCES_NAME ;
211- if (firstFlow .equals (fullFlowName )) {
212- return null ;
213- }
214- //getParentFlowName:flow.flows1.test.resources return:flows1
215- String parentFlowName = StringUtils .substringAfterLast (StringUtils .substringBefore (fullFlowName , "." + flowName
216- + LinkisJobExecutionConfiguration .RESOURCES_NAME ), "." );
217- if (StringUtils .isEmpty (parentFlowName )) {
218- return null ;
219- }
220-
221- return findFlowResource (linkisAppConnJob , fileName , parentFlowName );
222- }
223-
224-
225- private String replaceCodeResourceNames (String code , ArrayList <String > resourceNameList , ArrayList <BMLResource > resourceList ){
226- if (resourceList .size () != resourceNameList .size ()){
227- throw new RuntimeException ("Failed to parsed resource file" );
228- }
229-
230- String [] names = resourceNameList .toArray (new String []{});
231-
232- String [] afterNames = new String [resourceList .size ()];
233- for (int i =0 ; i < afterNames .length ; i ++){
234- afterNames [i ] = resourceList .get (i ).getFileName ();
235- }
236- return StringUtils .replaceEach (code , names , afterNames );
237- }
238-
239- private BMLResource findResource (List <BMLResource > resourceArrayList , String fileName ){
240- if (resourceArrayList != null && !resourceArrayList .isEmpty ()) {
241- for (BMLResource resource : resourceArrayList ){
242- if (resource .getFileName ().equals (fileName )){
243- return resource ;
244- }
245- }
246- }
247- return null ;
248- }
249126}
0 commit comments