Skip to content

Commit 88c7679

Browse files
committed
IGNITE-27196 Added authorization of Compute Job cancellation
1 parent 92074fd commit 88c7679

File tree

5 files changed

+236
-71
lines changed

5 files changed

+236
-71
lines changed

modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java

Lines changed: 134 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,11 @@
4040
import org.apache.ignite.compute.ComputeJob;
4141
import org.apache.ignite.compute.ComputeJobAdapter;
4242
import org.apache.ignite.compute.ComputeJobResult;
43+
import org.apache.ignite.compute.ComputeJobResultPolicy;
44+
import org.apache.ignite.compute.ComputeJobSibling;
4345
import org.apache.ignite.compute.ComputeTask;
4446
import org.apache.ignite.compute.ComputeTaskAdapter;
47+
import org.apache.ignite.compute.ComputeTaskFuture;
4548
import org.apache.ignite.configuration.ClientConfiguration;
4649
import org.apache.ignite.configuration.ClientConnectorConfiguration;
4750
import org.apache.ignite.configuration.ConnectorConfiguration;
@@ -50,6 +53,7 @@
5053
import org.apache.ignite.internal.IgniteEx;
5154
import org.apache.ignite.internal.management.cache.VerifyBackupPartitionsTask;
5255
import org.apache.ignite.internal.processors.security.AbstractSecurityTest;
56+
import org.apache.ignite.internal.processors.security.AbstractTestSecurityPluginProvider;
5357
import org.apache.ignite.internal.processors.security.OperationSecurityContext;
5458
import org.apache.ignite.internal.processors.security.PublicAccessJob;
5559
import org.apache.ignite.internal.processors.security.SecurityContext;
@@ -71,6 +75,8 @@
7175
import org.apache.ignite.plugin.security.SecurityException;
7276
import org.apache.ignite.plugin.security.SecurityPermissionSet;
7377
import org.apache.ignite.plugin.security.SecurityPermissionSetBuilder;
78+
import org.apache.ignite.testframework.ListeningTestLogger;
79+
import org.apache.ignite.testframework.LogListener;
7480
import org.jetbrains.annotations.NotNull;
7581
import org.jetbrains.annotations.Nullable;
7682
import org.junit.Test;
@@ -92,6 +98,7 @@
9298
import static org.apache.ignite.plugin.security.SecurityPermissionSetBuilder.NO_PERMISSIONS;
9399
import static org.apache.ignite.plugin.security.SecurityPermissionSetBuilder.create;
94100
import static org.apache.ignite.plugin.security.SecuritySubjectType.REMOTE_CLIENT;
101+
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
95102
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
96103
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
97104

@@ -111,10 +118,10 @@ public class ComputeTaskPermissionsTest extends AbstractSecurityTest {
111118
private static final ComputeTask SYSTEM_TASK = new VerifyBackupPartitionsTask();
112119

113120
/** */
114-
private static final AtomicInteger EXECUTED_TASK_CNTR = new AtomicInteger();
121+
private static final AtomicInteger EXECUTED_JOB_CNT = new AtomicInteger();
115122

116123
/** */
117-
private static final AtomicInteger CANCELLED_TASK_CNTR = new AtomicInteger();
124+
private static final AtomicInteger CANCELLED_JOB_CNT = new AtomicInteger();
118125

119126
/** */
120127
private static final String CACHE = DEFAULT_CACHE_NAME;
@@ -128,6 +135,9 @@ public class ComputeTaskPermissionsTest extends AbstractSecurityTest {
128135
/** */
129136
public static CountDownLatch taskUnblockedLatch;
130137

138+
/** */
139+
private static ListeningTestLogger listeningLog;
140+
131141
/** {@inheritDoc} */
132142
@Override protected void beforeTestsStarted() throws Exception {
133143
super.beforeTestsStarted();
@@ -138,6 +148,8 @@ public class ComputeTaskPermissionsTest extends AbstractSecurityTest {
138148
PublicAccessSystemJob.class
139149
);
140150

151+
listeningLog = new ListeningTestLogger(log);
152+
141153
for (int idx = 0; idx < SRV_NODES_CNT; idx++)
142154
startGrid(idx, false);
143155

@@ -151,6 +163,15 @@ public class ComputeTaskPermissionsTest extends AbstractSecurityTest {
151163
grid(0).createCache(CACHE);
152164
}
153165

166+
/** {@inheritDoc} */
167+
@Override protected IgniteConfiguration getConfiguration(
168+
String instanceName,
169+
AbstractTestSecurityPluginProvider pluginProv
170+
) throws Exception {
171+
return super.getConfiguration(instanceName, pluginProv)
172+
.setGridLogger(listeningLog);
173+
}
174+
154175
/** */
155176
private IgniteEx startGrid(int idx, boolean isClient) throws Exception {
156177
String login = getTestIgniteInstanceName(idx);
@@ -373,6 +394,92 @@ private void checkTaskStart(int initiator, int executor) {
373394
checkCallable(c -> executorService(initiator, executor).invokeAny(singletonList(c), getTestTimeout(), MILLISECONDS));
374395
}
375396

397+
/** */
398+
@Test
399+
public void testJobCancelAuthorizationSucceeded() throws Exception {
400+
taskStartedLatch = new CountDownLatch(SRV_NODES_CNT);
401+
taskUnblockedLatch = new CountDownLatch(1);
402+
403+
CANCELLED_JOB_CNT.set(0);
404+
EXECUTED_JOB_CNT.set(0);
405+
406+
try {
407+
ComputeTaskFuture<Object> fut = grid(0).compute().executeAsync(CancelAllowedTask.class, null);
408+
409+
taskStartedLatch.await(getTestTimeout(), MILLISECONDS);
410+
411+
for (ComputeJobSibling sibling : fut.getTaskSession().getJobSiblings())
412+
sibling.cancel();
413+
414+
fut.get(getTestTimeout());
415+
416+
assertTrue(waitForCondition(() -> SRV_NODES_CNT == CANCELLED_JOB_CNT.get(), getTestTimeout()));
417+
assertEquals(0, EXECUTED_JOB_CNT.get());
418+
}
419+
finally {
420+
taskUnblockedLatch.countDown();
421+
}
422+
}
423+
424+
/** */
425+
@Test
426+
public void testJobCancelAuthorizationFailed() throws Exception {
427+
taskStartedLatch = new CountDownLatch(SRV_NODES_CNT);
428+
taskUnblockedLatch = new CountDownLatch(1);
429+
430+
CANCELLED_JOB_CNT.set(0);
431+
EXECUTED_JOB_CNT.set(0);
432+
433+
try {
434+
ComputeTaskFuture<Object> fut = grid(0).compute().executeAsync(CancelForbiddenTask.class, null);
435+
436+
taskStartedLatch.await(getTestTimeout(), MILLISECONDS);
437+
438+
for (ComputeJobSibling sibling : fut.getTaskSession().getJobSiblings()) {
439+
LogListener logLsnr = LogListener.matches("Failed to cancel Ignite Compute Task Job" +
440+
" [sesId=" + fut.getTaskSession().getId() +
441+
", jobId=" + sibling.getJobId() + ']'
442+
).build();
443+
444+
listeningLog.registerListener(logLsnr);
445+
446+
// TODO https://issues.apache.org/jira/browse/IGNITE-27195 Authorization errors during Compute Job
447+
// cancellation do not propagate from remote nodes back to the one that initiated cancellation.
448+
if (grid(0).context().job().activeJob(sibling.getJobId()) != null) {
449+
assertThrowsAnyCause(
450+
log,
451+
() -> {
452+
sibling.cancel();
453+
454+
return null;
455+
},
456+
SecurityException.class,
457+
"Authorization failed"
458+
);
459+
}
460+
else
461+
sibling.cancel();
462+
463+
logLsnr.check(getTestTimeout());
464+
}
465+
466+
assertEquals(0, EXECUTED_JOB_CNT.get());
467+
assertEquals(0, CANCELLED_JOB_CNT.get());
468+
469+
assertFalse(fut.isDone());
470+
471+
taskUnblockedLatch.countDown();
472+
473+
fut.get(getTestTimeout());
474+
475+
assertTrue(waitForCondition(() -> SRV_NODES_CNT == EXECUTED_JOB_CNT.get(), getTestTimeout()));
476+
assertEquals(0, CANCELLED_JOB_CNT.get());
477+
}
478+
finally {
479+
taskUnblockedLatch.countDown();
480+
}
481+
}
482+
376483
/** */
377484
@Test
378485
public void testSystemTaskCancel() throws Exception {
@@ -420,8 +527,8 @@ private void checkTaskCancel(
420527
taskStartedLatch = new CountDownLatch(expTaskCnt);
421528
taskUnblockedLatch = new CountDownLatch(1);
422529

423-
CANCELLED_TASK_CNTR.set(0);
424-
EXECUTED_TASK_CNTR.set(0);
530+
CANCELLED_JOB_CNT.set(0);
531+
EXECUTED_JOB_CNT.set(0);
425532

426533
try {
427534
Future<?> fut = taskStarter.get();
@@ -438,9 +545,9 @@ private void checkTaskCancel(
438545

439546
assertTrue(fut.isCancelled());
440547

441-
assertTrue(waitForCondition(() -> expTaskCnt == CANCELLED_TASK_CNTR.get(), getTestTimeout()));
548+
assertTrue(waitForCondition(() -> expTaskCnt == CANCELLED_JOB_CNT.get(), getTestTimeout()));
442549

443-
assertEquals(0, EXECUTED_TASK_CNTR.get());
550+
assertEquals(0, EXECUTED_JOB_CNT.get());
444551
}
445552
else {
446553
assertThrowsWithCause(() -> fut.cancel(true), expE);
@@ -449,7 +556,7 @@ private void checkTaskCancel(
449556

450557
taskUnblockedLatch.countDown();
451558

452-
assertTrue(waitForCondition(() -> expTaskCnt == EXECUTED_TASK_CNTR.get(), getTestTimeout()));
559+
assertTrue(waitForCondition(() -> expTaskCnt == EXECUTED_JOB_CNT.get(), getTestTimeout()));
453560
}
454561
}
455562
}
@@ -516,16 +623,16 @@ private <T> void checkTask(Class<T> cls, BiConsumerX<T, Object> consumer) {
516623

517624
/** */
518625
private void assertCompleted(RunnableX r, int expCnt) {
519-
EXECUTED_TASK_CNTR.set(0);
626+
EXECUTED_JOB_CNT.set(0);
520627

521628
r.run();
522629

523-
assertEquals(expCnt, EXECUTED_TASK_CNTR.get());
630+
assertEquals(expCnt, EXECUTED_JOB_CNT.get());
524631
}
525632

526633
/** */
527634
private void assertFailed(RunnableX r) {
528-
EXECUTED_TASK_CNTR.set(0);
635+
EXECUTED_JOB_CNT.set(0);
529636

530637
try {
531638
r.run();
@@ -547,7 +654,7 @@ private void assertFailed(RunnableX r) {
547654

548655
fail();
549656

550-
assertEquals(0, EXECUTED_TASK_CNTR.get());
657+
assertEquals(0, EXECUTED_JOB_CNT.get());
551658
}
552659

553660
/** */
@@ -629,15 +736,15 @@ private static class PublicAccessSystemTask extends AbstractTask {
629736
private abstract static class AbstractRunnable implements IgniteRunnable {
630737
/** {@inheritDoc} */
631738
@Override public void run() {
632-
EXECUTED_TASK_CNTR.incrementAndGet();
739+
EXECUTED_JOB_CNT.incrementAndGet();
633740
}
634741
}
635742

636743
/** */
637744
private abstract static class AbstractCallable implements IgniteCallable<AtomicInteger> {
638745
/** {@inheritDoc} */
639746
@Override public AtomicInteger call() throws Exception {
640-
EXECUTED_TASK_CNTR.incrementAndGet();
747+
EXECUTED_JOB_CNT.incrementAndGet();
641748

642749
return new AtomicInteger(0);
643750
}
@@ -647,7 +754,7 @@ private abstract static class AbstractCallable implements IgniteCallable<AtomicI
647754
private abstract static class AbstractClosure implements IgniteClosure<Boolean, Boolean> {
648755
/** {@inheritDoc} */
649756
@Override public Boolean apply(Boolean o) {
650-
EXECUTED_TASK_CNTR.incrementAndGet();
757+
EXECUTED_JOB_CNT.incrementAndGet();
651758

652759
return null;
653760
}
@@ -681,9 +788,19 @@ private abstract static class AbstractTask extends ComputeTaskAdapter<Object, Ob
681788

682789
/** {@inheritDoc} */
683790
@Override public @Nullable Object reduce(List<ComputeJobResult> results) throws IgniteException {
791+
for (ComputeJobResult res : results) {
792+
if (!res.isCancelled() && res.getException() != null)
793+
throw res.getException();
794+
}
795+
684796
return null;
685797
}
686798

799+
/** {@inheritDoc} */
800+
@Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException {
801+
return ComputeJobResultPolicy.WAIT;
802+
}
803+
687804
/** */
688805
protected ComputeJob job() {
689806
return new TestJob();
@@ -707,7 +824,7 @@ private static class TestJob implements ComputeJob {
707824

708825
/** {@inheritDoc} */
709826
@Override public Object execute() {
710-
EXECUTED_TASK_CNTR.incrementAndGet();
827+
EXECUTED_JOB_CNT.incrementAndGet();
711828

712829
return null;
713830
}
@@ -725,14 +842,14 @@ private static class HangingJob extends ComputeJobAdapter {
725842
taskUnblockedLatch.await(5_000, MILLISECONDS);
726843
}
727844
catch (InterruptedException e) {
728-
CANCELLED_TASK_CNTR.incrementAndGet();
845+
CANCELLED_JOB_CNT.incrementAndGet();
729846

730847
Thread.currentThread().interrupt();
731848

732849
throw new IgniteException(e);
733850
}
734851

735-
EXECUTED_TASK_CNTR.incrementAndGet();
852+
EXECUTED_JOB_CNT.incrementAndGet();
736853

737854
return null;
738855
}

modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public GridDeployment deployment() {
7373
}
7474

7575
/** {@inheritDoc} */
76-
@Override public GridTaskSessionInternal session() {
76+
@Override public GridTaskSessionImpl session() {
7777
return ses;
7878
}
7979

0 commit comments

Comments
 (0)