|
28 | 28 | import java.io.Writer; |
29 | 29 | import java.net.URI; |
30 | 30 | import java.nio.charset.StandardCharsets; |
| 31 | +import java.util.List; |
31 | 32 | import java.util.Properties; |
32 | 33 |
|
33 | 34 | import org.apache.hadoop.conf.Configuration; |
|
39 | 40 | import org.apache.log4j.SimpleLayout; |
40 | 41 | import org.apache.log4j.WriterAppender; |
41 | 42 | import org.apache.oozie.DagEngine; |
| 43 | +import org.apache.oozie.ForTestingActionExecutor; |
42 | 44 | import org.apache.oozie.WorkflowActionBean; |
43 | 45 | import org.apache.oozie.client.OozieClient; |
44 | 46 | import org.apache.oozie.client.WorkflowAction; |
45 | 47 | import org.apache.oozie.client.WorkflowJob; |
| 48 | +import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; |
| 49 | +import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor; |
46 | 50 | import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; |
| 51 | +import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor; |
47 | 52 | import org.apache.oozie.local.LocalOozie; |
48 | 53 | import org.apache.oozie.service.CallableQueueService; |
49 | 54 | import org.apache.oozie.service.ConfigurationService; |
50 | 55 | import org.apache.oozie.service.ExtendedCallableQueueService; |
51 | 56 | import org.apache.oozie.service.Services; |
| 57 | +import org.apache.oozie.service.SchemaService; |
| 58 | +import org.apache.oozie.service.LiteWorkflowStoreService; |
| 59 | +import org.apache.oozie.service.ActionService; |
| 60 | +import org.apache.oozie.service.JPAService; |
52 | 61 | import org.apache.oozie.test.XDataTestCase; |
53 | 62 | import org.apache.oozie.util.IOUtils; |
54 | 63 | import org.apache.oozie.util.XConfiguration; |
55 | 64 | import org.apache.oozie.workflow.lite.LiteWorkflowAppParser; |
56 | 65 |
|
| 66 | +import javax.persistence.EntityManager; |
| 67 | + |
57 | 68 | public class TestSignalXCommand extends XDataTestCase { |
58 | 69 |
|
59 | 70 | private Services services; |
@@ -410,4 +421,137 @@ public boolean evaluate() throws Exception { |
410 | 421 | .getStatus(), |
411 | 422 | WorkflowJob.Status.SUCCEEDED); |
412 | 423 | } |
| 424 | + |
| 425 | + /** |
| 426 | + * Test : fork parallel submit, one transition fail, and the job is failed but the other transition |
| 427 | + * always RUNNING or PREP. |
| 428 | + * verify the PreconditionException is thrown when action2 = RUNNING or PREP and job = FAIL |
| 429 | + * |
| 430 | + */ |
| 431 | + public void testForkParallelSubmitFail() throws Exception { |
| 432 | + _testForkSubmitRunFail(true); |
| 433 | + } |
| 434 | + |
| 435 | + /** |
| 436 | + * Test : fork serial submit, one transition fail, and the job is failed but the other transition |
| 437 | + * always RUNNING or PREP. |
| 438 | + * verify the PreconditionException is thrown when action2 = RUNNING or PREP and job = FAIL |
| 439 | + * |
| 440 | + */ |
| 441 | + public void testForkSerialSubmitFail() throws Exception { |
| 442 | + _testForkSubmitRunFail(false); |
| 443 | + } |
| 444 | + |
| 445 | + /** |
| 446 | + * Test : fork parallel submit, one transition fail, and the job is failed but the other transition |
| 447 | + * always RUNNING or PREP. |
| 448 | + * verify the PreconditionException is thrown when action2 = RUNNING or PREP and job = FAIL |
| 449 | + * |
| 450 | + * @param isForkParallelSubmit ("ture" or "fail") |
| 451 | + */ |
| 452 | + private void _testForkSubmitRunFail(boolean isForkParallelSubmit) throws Exception { |
| 453 | + services.destroy(); |
| 454 | + setSystemProperty(SchemaService.WF_CONF_EXT_SCHEMAS, "wf-ext-schema.xsd"); |
| 455 | + setSystemProperty(LiteWorkflowStoreService.CONF_USER_RETRY_ERROR_CODE_EXT, ForTestingActionExecutor.TEST_ERROR); |
| 456 | + services = new Services(); |
| 457 | + services.init(); |
| 458 | + services.get(ActionService.class).registerAndInitExecutor(ForTestingActionExecutor.class); |
| 459 | + ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, isForkParallelSubmit); |
| 460 | + |
| 461 | + String workflowUri = getTestCaseFileUri("workflow.xml"); |
| 462 | + //@formatter:off |
| 463 | + String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:1.0\" name=\"wf-fork-submit\">\n" + |
| 464 | + " <start to=\"fork1\"/>\n" + |
| 465 | + " <fork name=\"fork1\">\n" + |
| 466 | + " <path start=\"action_to_be_failed\"/>\n" + |
| 467 | + " <path start=\"action_to_be_succeeded_or_killed\"/>\n" + |
| 468 | + " </fork>\n" + |
| 469 | + " <action name=\"action_to_be_failed\">\n" + |
| 470 | + " <test xmlns=\"uri:test\">\n" + |
| 471 | + " <signal-value>${wf:conf('signal-value')}</signal-value>\n" + |
| 472 | + " <external-status>${wf:conf('external-status')}</external-status>\n" + |
| 473 | + " <error>${wf:conf('error')}</error>\n" + |
| 474 | + " <avoid-set-execution-data>${wf:conf('avoid-set-execution-data')}</avoid-set-execution-data>\n" + |
| 475 | + " <avoid-set-end-data>${wf:conf('avoid-set-end-data')}</avoid-set-end-data>\n" + |
| 476 | + " <running-mode>${wf:conf('running-mode')}</running-mode>\n" + |
| 477 | + " </test>\n" + |
| 478 | + " <ok to=\"join1\"/>\n" + |
| 479 | + " <error to=\"kill\"/>\n" + |
| 480 | + " </action>\n" + |
| 481 | + " <action name=\"action_to_be_succeeded_or_killed\">\n" + |
| 482 | + " <test xmlns=\"uri:test\">\n" + |
| 483 | + " <signal-value>based_on_action_status</signal-value>\n" + |
| 484 | + " <external-status>ok</external-status>\n" + |
| 485 | + " <error>ok</error>\n" + |
| 486 | + " <avoid-set-execution-data>true</avoid-set-execution-data>\n" + |
| 487 | + " <avoid-set-end-data>false</avoid-set-end-data>\n" + |
| 488 | + " <running-mode>async</running-mode>\n" + |
| 489 | + " </test>\n" + |
| 490 | + " <ok to=\"join1\"/>\n" + |
| 491 | + " <error to=\"kill\"/>\n" + |
| 492 | + " </action>\n" + |
| 493 | + " <join name=\"join1\" to=\"end\"/>\n" + |
| 494 | + " <kill name=\"kill\">\n" + |
| 495 | + " <message>killed</message>\n" + |
| 496 | + " </kill>\n" + |
| 497 | + " <end name=\"end\"/>\n" + |
| 498 | + "</workflow-app>"; |
| 499 | + //@Formatter:on |
| 500 | + writeToFile(appXml, workflowUri); |
| 501 | + |
| 502 | + final DagEngine engine = new DagEngine("u"); |
| 503 | + |
| 504 | + Configuration conf = new Configuration(); |
| 505 | + conf.set(OozieClient.APP_PATH, workflowUri); |
| 506 | + conf.set(OozieClient.USER_NAME, getTestUser()); |
| 507 | + conf.set(OozieClient.LOG_TOKEN, "t"); |
| 508 | + conf.set("error", "start.fail"); |
| 509 | + conf.set("external-status", "error"); |
| 510 | + conf.set("signal-value", "based_on_action_status"); |
| 511 | + |
| 512 | + final String jobId = engine.submitJob(conf, true); |
| 513 | + final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new WorkflowActionsGetForJobJPAExecutor(jobId); |
| 514 | + final JPAService jpaService = Services.get().get(JPAService.class); |
| 515 | + |
| 516 | + waitFor(30 * 1000, new Predicate() { |
| 517 | + public boolean evaluate() throws Exception { |
| 518 | + return WorkflowJob.Status.FAILED.equals(engine.getJob(jobId).getStatus()); |
| 519 | + } |
| 520 | + }); |
| 521 | + |
| 522 | + // wait for execute KillXCommand, and all actions has finished |
| 523 | + waitFor(30 * 1000, new Predicate() { |
| 524 | + public boolean evaluate() throws Exception { |
| 525 | + List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor); |
| 526 | + for (WorkflowActionBean action : actions) { |
| 527 | + if (WorkflowAction.Status.PREP.equals(action.getStatus()) || |
| 528 | + WorkflowAction.Status.RUNNING.equals(action.getStatus()) ){ |
| 529 | + return false; |
| 530 | + } |
| 531 | + } |
| 532 | + return true; |
| 533 | + } |
| 534 | + }); |
| 535 | + |
| 536 | + List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor); |
| 537 | + assertEquals("action size [" + actions.size() + "] had incorrect", 4, actions.size()); |
| 538 | + |
| 539 | + for (WorkflowActionBean action : actions) { |
| 540 | + if ("action_to_be_failed".equals(action.getName())){ |
| 541 | + assertEquals("action [" + action.getName() + "] had incorrect status", |
| 542 | + WorkflowAction.Status.FAILED, action.getStatus()); |
| 543 | + } |
| 544 | + |
| 545 | + if ("action_to_be_succeeded_or_killed".equals(action.getName())){ |
| 546 | + // 1.The "action_to_be_failed" action submit fail, and the "action_to_be_succeeded_or_killed" action |
| 547 | + // has finished, so the "action_to_be_succeeded_or_killed" action should be OK |
| 548 | + // 2.The "action_to_be_failed" action submit fail, and the "action_to_be_succeeded_or_killed" action is |
| 549 | + // PREP or RUNNING, so the "action_to_be_succeeded_or_killed" action should be KILLED |
| 550 | + if (!WorkflowAction.Status.KILLED.equals(action.getStatus()) && |
| 551 | + !WorkflowAction.Status.OK.equals(action.getStatus())) { |
| 552 | + fail("Unexpected action [" + action.getName() + "] with status [" + action.getStatus() + "]"); |
| 553 | + } |
| 554 | + } |
| 555 | + } |
| 556 | + } |
413 | 557 | } |
0 commit comments