Skip to content

Commit 8783197

Browse files
ravxzptupitsyn
andauthored
IGNITE-27341 .ΝΕΤ: Propagate compute task names to Java (#12609)
Ensure that the correct .NET task or closure name is visible in system views and internal APIs. Co-authored-by: Pavel Tupitsyn <[email protected]>
1 parent e29c66b commit 8783197

File tree

8 files changed

+230
-3
lines changed

8 files changed

+230
-3
lines changed

modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ public abstract class PlatformAbstractTask implements ComputeTask<Object, Void>
4949
/** Done flag. */
5050
protected boolean done;
5151

52+
/** Platform task name. */
53+
protected String taskName;
54+
5255
/**
5356
* Constructor.
5457
*
@@ -60,6 +63,11 @@ protected PlatformAbstractTask(PlatformContext ctx, long taskPtr) {
6063
this.taskPtr = taskPtr;
6164
}
6265

66+
/** @return Task name. */
67+
public String taskName() {
68+
return taskName;
69+
}
70+
6371
/** {@inheritDoc} */
6472
@Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
6573
assert rcvd.isEmpty() : "Should not cache result in Java for interop task";

modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,5 +79,8 @@ public PlatformBalancingMultiClosureTask(PlatformContext ctx, long taskPtr) {
7979
*/
8080
public void jobs(Collection<PlatformJob> jobs) {
8181
this.jobs = jobs;
82+
83+
if (!F.isEmpty(jobs))
84+
this.taskName = jobs.iterator().next().name();
8285
}
8386
}

modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,5 +77,8 @@ public PlatformBalancingSingleClosureTask(PlatformContext ctx, long taskPtr) {
7777
*/
7878
public void job(PlatformJob job) {
7979
this.job = job;
80+
81+
if (job != null)
82+
this.taskName = job.name();
8083
}
8184
}

modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,5 +84,8 @@ public PlatformBroadcastingMultiClosureTask(PlatformContext ctx, long taskPtr) {
8484
*/
8585
public void jobs(Collection<PlatformJob> jobs) {
8686
this.jobs = jobs;
87+
88+
if (!F.isEmpty(jobs))
89+
this.taskName = jobs.iterator().next().name();
8790
}
8891
}

modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,5 +81,8 @@ public PlatformBroadcastingSingleClosureTask(PlatformContext ctx, long taskPtr)
8181
*/
8282
public void job(PlatformJob job) {
8383
this.job = job;
84+
85+
if (job != null)
86+
this.taskName = job.name();
8487
}
8588
}

modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,6 @@ public final class PlatformFullTask extends PlatformAbstractTask {
5757
/** Cluster group. */
5858
private final ClusterGroup grp;
5959

60-
/** Platform task name. */
61-
private final String taskName;
62-
6360
/** {@code true} if distribution of the session attributes should be enabled. */
6461
private final boolean taskSesFullSupport;
6562

modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.apache.ignite.internal.processors.job.ComputeJobStatusEnum;
7373
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
7474
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
75+
import org.apache.ignite.internal.processors.platform.compute.PlatformAbstractTask;
7576
import org.apache.ignite.internal.processors.platform.compute.PlatformFullTask;
7677
import org.apache.ignite.internal.processors.task.monitor.ComputeGridMonitor;
7778
import org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatus;
@@ -641,6 +642,13 @@ else if (task != null) {
641642
}
642643
}
643644

645+
if (task instanceof PlatformAbstractTask) {
646+
String taskName0 = ((PlatformAbstractTask)task).taskName();
647+
648+
if (taskName0 != null)
649+
taskName = taskName0;
650+
}
651+
644652
assert taskName != null;
645653

646654
if (log.isDebugEnabled())
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
namespace Apache.Ignite.Core.Tests.Compute
19+
{
20+
using System;
21+
using System.Collections.Generic;
22+
using System.Linq;
23+
using System.Threading;
24+
using Apache.Ignite.Core.Cache.Query;
25+
using Apache.Ignite.Core.Compute;
26+
using NUnit.Framework;
27+
using static TestUtils;
28+
29+
/// <summary>
30+
/// Tests for <see cref="IComputeTaskSession"/>
31+
/// </summary>
32+
public class ComputeTaskNameTest : TestBase
33+
{
34+
/// <summary>
35+
/// .Net compute task name is propagated into platform task.
36+
/// </summary>
37+
[Test]
38+
public void TaskNameTakenFromPlatformTask()
39+
{
40+
// Call task asynchronously with delay
41+
var task = new LongTask(3000);
42+
var cts = new CancellationTokenSource();
43+
Ignite.GetCompute().ExecuteAsync(task, 123, cts.Token);
44+
45+
try
46+
{
47+
// Check task in system views via SQL
48+
var res = Ignite
49+
.GetOrCreateCache<string, string>("test")
50+
.Query(new SqlFieldsQuery("SELECT TASK_NAME, TASK_CLASS_NAME FROM SYS.TASKS", null))
51+
.GetAll()
52+
.Single();
53+
54+
Assert.AreEqual("Apache.Ignite.Core.Tests.Compute.ComputeTaskNameTest+LongTask", res[0]);
55+
Assert.AreEqual("org.apache.ignite.internal.processors.platform.compute.PlatformFullTask", res[1]);
56+
}
57+
finally
58+
{
59+
cts.Cancel();
60+
}
61+
}
62+
63+
/// <summary>
64+
/// .Net compute closure name is propagated into platform task.
65+
/// </summary>
66+
[Test]
67+
public void ClosureNameTakenFromPlatformTask()
68+
{
69+
// Call task asynchronously with delay
70+
var clo = new LongClosure(3000);
71+
var cts = new CancellationTokenSource();
72+
Ignite.GetCompute().CallAsync(clo, cts.Token);
73+
74+
try
75+
{
76+
// Check task in system views via SQL
77+
var res = Ignite
78+
.GetOrCreateCache<string, string>("test")
79+
.Query(new SqlFieldsQuery("SELECT TASK_NAME, TASK_CLASS_NAME FROM SYS.TASKS", null))
80+
.GetAll()
81+
.Single();
82+
83+
Assert.AreEqual("Apache.Ignite.Core.Tests.Compute.LongClosure", res[0]);
84+
Assert.AreEqual("org.apache.ignite.internal.processors.platform.compute.PlatformBalancingSingleClosureTask", res[1]);
85+
}
86+
finally
87+
{
88+
cts.Cancel();
89+
}
90+
}
91+
92+
/// <summary>
93+
/// .Net broadcast compute closure name is propagated into platform task.
94+
/// </summary>
95+
[Test]
96+
public void BroadcastClosureNameTakenFromPlatformTask()
97+
{
98+
// Call task asynchronously with delay
99+
var clo = new LongClosure(3000);
100+
var cts = new CancellationTokenSource();
101+
Ignite.GetCompute().BroadcastAsync(clo, cts.Token);
102+
103+
try
104+
{
105+
// Check task in system views via SQL
106+
var res = Ignite
107+
.GetOrCreateCache<string, string>("test")
108+
.Query(new SqlFieldsQuery("SELECT TASK_NAME, TASK_CLASS_NAME FROM SYS.TASKS", null))
109+
.GetAll()
110+
.Single();
111+
112+
Assert.AreEqual("Apache.Ignite.Core.Tests.Compute.LongClosure", res[0]);
113+
Assert.AreEqual("org.apache.ignite.internal.processors.platform.compute.PlatformBroadcastingSingleClosureTask", res[1]);
114+
}
115+
finally
116+
{
117+
cts.Cancel();
118+
}
119+
}
120+
121+
/// <summary>
122+
/// Creates a task that executes <see cref="LongJob"/>.
123+
/// </summary>
124+
[ComputeTaskSessionFullSupport]
125+
private class LongTask : ComputeTaskSplitAdapter<int, string, string>
126+
{
127+
/// Delay time in milliseconds.
128+
private readonly int _delay;
129+
130+
/// <summary>
131+
/// Constructor.
132+
/// </summary>
133+
/// <param name="delay">Execution delay time in milliseconds</param>
134+
public LongTask(int delay)
135+
{
136+
_delay = delay;
137+
}
138+
139+
/// <inheritdoc />
140+
public override string Reduce(IList<IComputeJobResult<string>> results) => results[0].Data;
141+
142+
/// <inheritdoc />
143+
protected override ICollection<IComputeJob<string>> Split(int gridSize, int attrValue)
144+
{
145+
return new List<IComputeJob<string>> { new LongJob(_delay) };
146+
}
147+
}
148+
149+
/// <summary>
150+
/// Implements delayed job execution.
151+
/// </summary>
152+
private class LongJob : ComputeJobAdapter<string>
153+
{
154+
/// Delay time in milliseconds.
155+
private readonly int _delay;
156+
157+
/// <summary>
158+
/// Constructor.
159+
/// </summary>
160+
/// <param name="delay">Execution delay time in milliseconds</param>
161+
public LongJob(int delay)
162+
{
163+
_delay = delay;
164+
}
165+
166+
/// <inheritdoc />
167+
public override string Execute()
168+
{
169+
Thread.Sleep(_delay);
170+
171+
return "OK";
172+
}
173+
}
174+
}
175+
176+
/// <summary>
177+
/// Implements closure with delayed execution.
178+
/// </summary>
179+
[Serializable]
180+
public class LongClosure : IComputeFunc<String>
181+
{
182+
/// Delay time in milliseconds.
183+
private readonly int _delay;
184+
185+
/// <summary>
186+
///
187+
/// </summary>
188+
/// <param name="s"></param>
189+
public LongClosure(int delay)
190+
{
191+
_delay = delay;
192+
}
193+
194+
/** <inheritDoc /> */
195+
public string Invoke()
196+
{
197+
Thread.Sleep(_delay);
198+
199+
return "OK";
200+
}
201+
}
202+
}

0 commit comments

Comments
 (0)