Skip to content

Commit 5d1742c

Browse files
authored
IGNITE-24175 Show internal jobs in system view (#11798)
1 parent f4ff01c commit 5d1742c

File tree

3 files changed

+244
-48
lines changed

3 files changed

+244
-48
lines changed

modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,9 @@ public class GridJobProcessor extends GridProcessorAdapter {
180180
/** Collision SPI is not available: {@link GridCollisionManager#enabled()} {@code == false}. */
181181
private final boolean jobAlwaysActivate;
182182

183+
/** */
184+
private volatile ConcurrentMap<IgniteUuid, GridJobWorker> syncRunningJobs;
185+
183186
/** */
184187
private volatile ConcurrentMap<IgniteUuid, GridJobWorker> activeJobs;
185188

@@ -338,6 +341,8 @@ public GridJobProcessor(GridKernalContext ctx) {
338341

339342
metricsUpdateFreq = ctx.config().getMetricsUpdateFrequency();
340343

344+
syncRunningJobs = new ConcurrentHashMap<>();
345+
341346
activeJobs = initJobsMap(jobAlwaysActivate);
342347

343348
passiveJobs = jobAlwaysActivate ? null : new JobsMap(1024, 0.75f, 256);
@@ -371,11 +376,11 @@ public GridJobProcessor(GridKernalContext ctx) {
371376
ctx.systemView().registerInnerCollectionView(JOBS_VIEW, JOBS_VIEW_DESC,
372377
new ComputeJobViewWalker(),
373378
passiveJobs == null ?
374-
Arrays.asList(activeJobs, cancelledJobs) :
375-
Arrays.asList(activeJobs, passiveJobs, cancelledJobs),
379+
Arrays.asList(activeJobs, syncRunningJobs, cancelledJobs) :
380+
Arrays.asList(activeJobs, syncRunningJobs, passiveJobs, cancelledJobs),
376381
ConcurrentMap::entrySet,
377382
(map, e) -> {
378-
ComputeJobState state = map == activeJobs ? ComputeJobState.ACTIVE :
383+
ComputeJobState state = (map == activeJobs || map == syncRunningJobs) ? ComputeJobState.ACTIVE :
379384
(map == passiveJobs ? ComputeJobState.PASSIVE : ComputeJobState.CANCELED);
380385

381386
return new ComputeJobView(e.getKey(), e.getValue(), state);
@@ -428,6 +433,8 @@ public GridJobProcessor(GridKernalContext ctx) {
428433
/** {@inheritDoc} */
429434
@Override public void stop(boolean cancel) {
430435
// Clear collections.
436+
syncRunningJobs = new ConcurrentHashMap<>();
437+
431438
activeJobs = initJobsMap(jobAlwaysActivate);
432439

433440
activeJobsMetric.reset();
@@ -798,10 +805,16 @@ public void cancelJob(@Nullable final IgniteUuid sesId, @Nullable final IgniteUu
798805
cancelPassiveJob(job);
799806
}
800807
}
808+
801809
for (GridJobWorker job : activeJobs.values()) {
802810
if (idsMatch.test(job))
803811
cancelActiveJob(job, sys);
804812
}
813+
814+
for (GridJobWorker job : syncRunningJobs.values()) {
815+
if (idsMatch.test(job))
816+
cancelJob(job, sys);
817+
}
805818
}
806819
else {
807820
if (!jobAlwaysActivate) {
@@ -813,8 +826,16 @@ public void cancelJob(@Nullable final IgniteUuid sesId, @Nullable final IgniteUu
813826

814827
GridJobWorker activeJob = activeJobs.get(jobId);
815828

816-
if (activeJob != null && idsMatch.test(activeJob))
829+
if (activeJob != null && idsMatch.test(activeJob)) {
817830
cancelActiveJob(activeJob, sys);
831+
832+
return;
833+
}
834+
835+
activeJob = syncRunningJobs.get(jobId);
836+
837+
if (activeJob != null && idsMatch.test(activeJob))
838+
cancelJob(activeJob, sys);
818839
}
819840
}
820841
finally {
@@ -1364,7 +1385,7 @@ public void processJobExecuteRequest(ClusterNode node, final GridJobExecuteReque
13641385
// This is an internal job and can be executed inside busy lock
13651386
// since job is expected to be short.
13661387
// This is essential for proper stop without races.
1367-
job.run();
1388+
runSync(job);
13681389

13691390
// No execution outside lock.
13701391
job = null;
@@ -1441,6 +1462,23 @@ else if (jobAlwaysActivate) {
14411462
job.run();
14421463
}
14431464

1465+
/**
1466+
* Adds job to {@link #syncRunningJobs} while run to provide info in system view.
1467+
* @param job Job to add in system view and run synchronously.
1468+
*/
1469+
private void runSync(GridJobWorker job) {
1470+
IgniteUuid jobId = job.getJobId();
1471+
1472+
syncRunningJobs.put(jobId, job);
1473+
1474+
try {
1475+
job.run();
1476+
}
1477+
finally {
1478+
syncRunningJobs.remove(jobId);
1479+
}
1480+
}
1481+
14441482
/**
14451483
* Callback from job worker to set current task session for execution.
14461484
*

modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewSelfTest.java

Lines changed: 118 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.lang.reflect.Field;
2121
import java.sql.Connection;
2222
import java.util.Arrays;
23-
import java.util.Collections;
2423
import java.util.HashMap;
2524
import java.util.HashSet;
2625
import java.util.Iterator;
@@ -35,6 +34,8 @@
3534
import java.util.concurrent.atomic.AtomicInteger;
3635
import java.util.concurrent.atomic.AtomicLong;
3736
import java.util.function.Consumer;
37+
import java.util.function.Function;
38+
import java.util.stream.Collectors;
3839
import javax.cache.Cache;
3940
import javax.cache.expiry.CreatedExpiryPolicy;
4041
import javax.cache.expiry.Duration;
@@ -92,6 +93,7 @@
9293
import org.apache.ignite.internal.processors.metric.impl.PeriodicHistogramMetricImpl;
9394
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext;
9495
import org.apache.ignite.internal.processors.service.DummyService;
96+
import org.apache.ignite.internal.processors.task.GridInternal;
9597
import org.apache.ignite.internal.util.GridTestClockTimer;
9698
import org.apache.ignite.internal.util.StripedExecutor;
9799
import org.apache.ignite.internal.util.typedef.F;
@@ -103,6 +105,7 @@
103105
import org.apache.ignite.lang.IgniteClosure;
104106
import org.apache.ignite.lang.IgnitePredicate;
105107
import org.apache.ignite.lang.IgniteRunnable;
108+
import org.apache.ignite.lang.IgniteUuid;
106109
import org.apache.ignite.services.ServiceConfiguration;
107110
import org.apache.ignite.spi.systemview.view.BaselineNodeAttributeView;
108111
import org.apache.ignite.spi.systemview.view.BaselineNodeView;
@@ -114,6 +117,7 @@
114117
import org.apache.ignite.spi.systemview.view.ClientConnectionAttributeView;
115118
import org.apache.ignite.spi.systemview.view.ClientConnectionView;
116119
import org.apache.ignite.spi.systemview.view.ClusterNodeView;
120+
import org.apache.ignite.spi.systemview.view.ComputeJobView;
117121
import org.apache.ignite.spi.systemview.view.ComputeTaskView;
118122
import org.apache.ignite.spi.systemview.view.ConfigurationView;
119123
import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
@@ -180,6 +184,7 @@
180184
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.SETS_VIEW;
181185
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.STAMPED_VIEW;
182186
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.VOLATILE_DATA_REGION_NAME;
187+
import static org.apache.ignite.internal.processors.job.GridJobProcessor.JOBS_VIEW;
183188
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl.DISTRIBUTED_METASTORE_VIEW;
184189
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
185190
import static org.apache.ignite.internal.processors.odbc.ClientListenerProcessor.CLI_CONN_ATTR_VIEW;
@@ -209,6 +214,12 @@ public class SystemViewSelfTest extends GridCommonAbstractTest {
209214
/** */
210215
public static final String TEST_TRANSFORMER = "TestTransformer";
211216

217+
/** */
218+
private static CountDownLatch jobStartedLatch;
219+
220+
/** */
221+
private static CountDownLatch releaseJobLatch;
222+
212223
/** {@inheritDoc} */
213224
@Override protected void beforeTestsStarted() throws Exception {
214225
super.beforeTestsStarted();
@@ -598,63 +609,80 @@ public void testComputeAffinityCall() throws Exception {
598609
/** */
599610
@Test
600611
public void testComputeTask() throws Exception {
601-
CyclicBarrier barrier = new CyclicBarrier(2);
612+
doTestComputeTask(false);
613+
}
602614

603-
try (IgniteEx g1 = startGrid(0)) {
604-
SystemView<ComputeTaskView> tasks = g1.context().systemView().view(TASKS_VIEW);
615+
/** */
616+
@Test
617+
public void testInternalComputeTask() throws Exception {
618+
doTestComputeTask(true);
619+
}
620+
621+
/** */
622+
private void doTestComputeTask(boolean internal) throws Exception {
623+
int gridCnt = 3;
605624

625+
IgniteEx g1 = startGrids(gridCnt);
626+
627+
try {
606628
IgniteCache<Integer, Integer> cache = g1.createCache("test-cache");
607629

608630
cache.put(1, 1);
609631

610-
g1.compute().executeAsync(new ComputeTask<Object, Object>() {
611-
@Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
612-
@Nullable Object arg) throws IgniteException {
613-
return Collections.singletonMap(new ComputeJob() {
614-
@Override public void cancel() {
615-
// No-op.
616-
}
632+
for (int i = 0; i < gridCnt; i++) {
633+
IgniteEx grid = grid(i);
617634

618-
@Override public Object execute() throws IgniteException {
619-
return 1;
620-
}
621-
}, subgrid.get(0));
622-
}
635+
SystemView<ComputeTaskView> tasks = grid.context().systemView().view(TASKS_VIEW);
623636

624-
@Override public ComputeJobResultPolicy result(ComputeJobResult res,
625-
List<ComputeJobResult> rcvd) throws IgniteException {
626-
try {
627-
barrier.await();
628-
barrier.await();
629-
}
630-
catch (InterruptedException | BrokenBarrierException e) {
631-
throw new RuntimeException(e);
632-
}
637+
jobStartedLatch = new CountDownLatch(3);
638+
releaseJobLatch = new CountDownLatch(1);
633639

634-
return null;
635-
}
640+
IgniteInternalFuture<Object> fut
641+
= runAsync(() -> grid.compute().execute(internal ? new InternalTask() : new UserTask(), 1));
642+
643+
assertTrue(jobStartedLatch.await(30_000, TimeUnit.MILLISECONDS));
644+
645+
try {
646+
assertEquals(1, tasks.size());
647+
648+
ComputeTaskView t = tasks.iterator().next();
636649

637-
@Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteException {
638-
return 1;
650+
assertEquals("Expecting to see " + (internal ? "internal" : "user") + " task", internal, t.internal());
651+
assertNull(t.affinityCacheName());
652+
assertEquals(-1, t.affinityPartitionId());
653+
assertTrue(t.taskClassName().startsWith(getClass().getName()));
654+
assertTrue(t.taskName().startsWith(getClass().getName()));
655+
assertEquals(grid.localNode().id(), t.taskNodeId());
656+
assertEquals("0", t.userVersion());
657+
658+
checkJobs(gridCnt, internal, t.sessionId());
659+
}
660+
finally {
661+
releaseJobLatch.countDown();
639662
}
640-
}, 1);
641663

642-
barrier.await();
664+
fut.get(getTestTimeout(), TimeUnit.MILLISECONDS);
665+
}
666+
}
667+
finally {
668+
stopAllGrids();
669+
}
670+
}
643671

644-
assertEquals(1, tasks.size());
672+
/** */
673+
private void checkJobs(int gridCnt, boolean internal, IgniteUuid sesId) {
674+
for (int i = 0; i < gridCnt; i++) {
675+
SystemView<ComputeJobView> jobs = grid(i).context().systemView().view(JOBS_VIEW);
645676

646-
ComputeTaskView t = tasks.iterator().next();
677+
assertTrue("Expecting to see " + (internal ? "internal" : "user") + " job", jobs.size() > 0);
647678

648-
assertFalse(t.internal());
649-
assertNull(t.affinityCacheName());
650-
assertEquals(-1, t.affinityPartitionId());
651-
assertTrue(t.taskClassName().startsWith(getClass().getName()));
652-
assertTrue(t.taskName().startsWith(getClass().getName()));
653-
assertEquals(g1.localNode().id(), t.taskNodeId());
654-
assertEquals("0", t.userVersion());
679+
ComputeJobView job = jobs.iterator().next();
655680

656-
barrier.await();
681+
assertEquals(sesId, job.sessionId());
682+
assertEquals("Expecting to see " + (internal ? "internal" : "user") + " job", internal, job.isInternal());
657683
}
684+
685+
releaseJobLatch.countDown();
658686
}
659687

660688
/** */
@@ -2592,4 +2620,53 @@ public TestRunnable(CountDownLatch latch, int idx) {
25922620
return getClass().getSimpleName() + idx;
25932621
}
25942622
}
2623+
2624+
/** */
2625+
private static class UserTask implements ComputeTask<Object, Object> {
2626+
/** {@inheritDoc} */
2627+
@Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
2628+
List<ClusterNode> subgrid,
2629+
@Nullable Object arg
2630+
) throws IgniteException {
2631+
return subgrid.stream().collect(Collectors.toMap(k -> new UserJob(), Function.identity()));
2632+
}
2633+
2634+
/** {@inheritDoc} */
2635+
@Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException {
2636+
return ComputeJobResultPolicy.WAIT;
2637+
}
2638+
2639+
/** {@inheritDoc} */
2640+
@Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteException {
2641+
return 1;
2642+
}
2643+
2644+
/** */
2645+
private static class UserJob implements ComputeJob {
2646+
/** {@inheritDoc} */
2647+
@Override public void cancel() {
2648+
// No-op.
2649+
}
2650+
2651+
/** {@inheritDoc} */
2652+
@Override public Object execute() throws IgniteException {
2653+
jobStartedLatch.countDown();
2654+
2655+
try {
2656+
assertTrue(releaseJobLatch.await(30_000, TimeUnit.MILLISECONDS));
2657+
}
2658+
catch (InterruptedException e) {
2659+
throw new RuntimeException(e);
2660+
}
2661+
2662+
return 1;
2663+
}
2664+
}
2665+
}
2666+
2667+
/** */
2668+
@GridInternal
2669+
public static class InternalTask extends UserTask {
2670+
// No-op.
2671+
}
25952672
}

0 commit comments

Comments
 (0)