Skip to content

Commit 3534e59

Browse files
IGNITE-27389 SQL Calcite: Reduce contention by QueryBlockingTaskExecutor - Fixes #12589.
Signed-off-by: Aleksey Plekhanov <[email protected]>
1 parent 2de7856 commit 3534e59

File tree

3 files changed

+53
-36
lines changed

3 files changed

+53
-36
lines changed

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,6 @@
3333
* Query task executor based on queue with query blocking.
3434
*/
3535
public class QueryBlockingTaskExecutor extends AbstractQueryTaskExecutor {
36-
/** */
37-
private final QueryTasksQueue tasksQueue = new QueryTasksQueue();
38-
3936
/** */
4037
private IgniteThreadPoolExecutor executor;
4138

@@ -57,6 +54,8 @@ public QueryBlockingTaskExecutor(GridKernalContext ctx) {
5754
@Override public void onStart(GridKernalContext ctx) {
5855
super.onStart(ctx);
5956

57+
QueryTasksQueue tasksQueue = new QueryTasksQueue(ctx.config().getQueryThreadPoolSize());
58+
6059
executor = new IgniteThreadPoolExecutor(
6160
THREAD_PREFIX,
6261
ctx.igniteInstanceName(),

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueue.java

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,18 @@ private static class Node {
6969
/** Set of blocked (currently running) queries. */
7070
private final Set<QueryKey> blockedQrys = new HashSet<>();
7171

72+
/** All tasks blocked by currently running queries. Can be false-negative. */
73+
private boolean allTasksBlocked;
74+
75+
/** Count of not parked and not currently busy task processing threads. */
76+
private int freeThreadsCnt;
77+
7278
/**
7379
* Creates a {@code LinkedBlockingQueue}.
7480
*/
75-
QueryTasksQueue() {
81+
QueryTasksQueue(int threadsCnt) {
7682
last = head = new Node(null);
83+
freeThreadsCnt = threadsCnt;
7784
}
7885

7986
/** Queue size. */
@@ -83,34 +90,43 @@ public int size() {
8390

8491
/** Add a task to the queue. */
8592
public void addTask(QueryAwareTask task) {
93+
Node node = new Node(task);
94+
8695
lock.lock();
8796

8897
try {
8998
assert last.next == null : "Unexpected last.next: " + last.next;
9099

91-
last = last.next = new Node(task);
100+
last = last.next = node;
92101

93-
cnt.getAndIncrement();
102+
int tasksCnt = cnt.incrementAndGet();
94103

95-
notEmpty.signal();
104+
// Do not wake up new threads if it's enough free threads to process the new task.
105+
if (tasksCnt > freeThreadsCnt && !(allTasksBlocked && freeThreadsCnt > 0))
106+
notEmpty.signal();
107+
108+
allTasksBlocked = false;
96109
}
97110
finally {
98111
lock.unlock();
99112
}
100113
}
101114

102115
/** Poll task and block query. */
103-
public QueryAwareTask pollTaskAndBlockQuery(long timeout, TimeUnit unit) throws InterruptedException {
116+
public @Nullable QueryAwareTask pollTaskAndBlockQuery(long nanos) throws InterruptedException {
104117
lock.lockInterruptibly();
105118

106119
try {
120+
freeThreadsCnt--;
121+
107122
QueryAwareTask res;
108123

109-
long nanos = unit.toNanos(timeout);
124+
while (cnt.get() == 0 || allTasksBlocked || (allTasksBlocked = (res = dequeue()) == null)) {
125+
if (nanos <= 0L) {
126+
freeThreadsCnt++;
110127

111-
while (cnt.get() == 0 || (res = dequeue()) == null) {
112-
if (nanos <= 0L)
113128
return null;
129+
}
114130

115131
nanos = notEmpty.awaitNanos(nanos);
116132
}
@@ -121,6 +137,11 @@ public QueryAwareTask pollTaskAndBlockQuery(long timeout, TimeUnit unit) throws
121137

122138
return res;
123139
}
140+
catch (Throwable e) {
141+
freeThreadsCnt++;
142+
143+
throw e;
144+
}
124145
finally {
125146
lock.unlock();
126147
}
@@ -141,8 +162,7 @@ private QueryAwareTask dequeue() {
141162

142163
unlink(pred, cur);
143164

144-
if (cnt.decrementAndGet() > 0)
145-
notEmpty.signal();
165+
cnt.getAndDecrement();
146166

147167
return res;
148168
}
@@ -160,8 +180,9 @@ public void unblockQuery(QueryKey qryKey) {
160180

161181
assert removed;
162182

163-
if (cnt.get() > 0)
164-
notEmpty.signal();
183+
allTasksBlocked = false;
184+
185+
freeThreadsCnt++;
165186
}
166187
finally {
167188
lock.unlock();
@@ -290,11 +311,11 @@ public BlockingQueue<Runnable> blockingQueue() {
290311
}
291312

292313
@Override public @NotNull Runnable take() throws InterruptedException {
293-
return pollTaskAndBlockQuery(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
314+
return pollTaskAndBlockQuery(Long.MAX_VALUE);
294315
}
295316

296317
@Override public @Nullable Runnable poll(long timeout, @NotNull TimeUnit unit) throws InterruptedException {
297-
return pollTaskAndBlockQuery(0, TimeUnit.NANOSECONDS);
318+
return pollTaskAndBlockQuery(unit.toNanos(timeout));
298319
}
299320

300321
@Override public Runnable remove() {

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueueTest.java

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ public class QueryTasksQueueTest extends GridCommonAbstractTest {
3131
@Test
3232
public void testQueryBlockingUnblocking() throws Exception {
3333
long waitTimeout = 10_000L;
34+
long waitTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(waitTimeout);
3435

35-
QueryTasksQueue queue = new QueryTasksQueue();
36+
QueryTasksQueue queue = new QueryTasksQueue(1);
3637
UUID qryId1 = UUID.randomUUID();
3738
UUID qryId2 = UUID.randomUUID();
3839
QueryKey qryKey1 = new QueryKey(qryId1, 0);
@@ -46,36 +47,34 @@ public void testQueryBlockingUnblocking() throws Exception {
4647
queue.addTask(new TestQueryAwareTask(qryKey1));
4748
queue.addTask(new TestQueryAwareTask(qryKey3));
4849

49-
QueryAwareTask task = queue.pollTaskAndBlockQuery(waitTimeout, TimeUnit.MILLISECONDS);
50+
QueryAwareTask task = queue.pollTaskAndBlockQuery(waitTimeoutNanos);
5051
assertEquals(qryKey1, task.queryKey());
5152

52-
task = queue.pollTaskAndBlockQuery(waitTimeout, TimeUnit.MILLISECONDS);
53+
task = queue.pollTaskAndBlockQuery(waitTimeoutNanos);
5354
assertEquals(qryKey2, task.queryKey());
5455

55-
task = queue.pollTaskAndBlockQuery(waitTimeout, TimeUnit.MILLISECONDS);
56+
task = queue.pollTaskAndBlockQuery(waitTimeoutNanos);
5657
assertEquals(qryKey3, task.queryKey());
5758

5859
// Test threads parking and unparking.
5960
QueryAwareTask[] res = new TestQueryAwareTask[1];
6061

6162
Runnable pollAndStoreResult = () -> {
6263
try {
63-
res[0] = queue.pollTaskAndBlockQuery(waitTimeout, TimeUnit.MILLISECONDS);
64+
res[0] = queue.pollTaskAndBlockQuery(waitTimeoutNanos);
6465
}
6566
catch (InterruptedException e) {
6667
throw new RuntimeException(e);
6768
}
6869
};
6970

70-
// Unparking on unblock query.
71+
// Query unblock check.
7172
Thread thread1 = new Thread(pollAndStoreResult);
7273

73-
thread1.start();
74-
75-
assertTrue(GridTestUtils.waitForCondition(() -> thread1.getState() == Thread.State.TIMED_WAITING, waitTimeout));
76-
7774
queue.unblockQuery(qryKey2);
7875

76+
thread1.start();
77+
7978
thread1.join(waitTimeout);
8079

8180
assertFalse(thread1.isAlive());
@@ -104,18 +103,16 @@ public void testQueryBlockingUnblocking() throws Exception {
104103
queue.unblockQuery(qryKey2);
105104
queue.unblockQuery(qryKey3);
106105

107-
task = queue.pollTaskAndBlockQuery(waitTimeout, TimeUnit.MILLISECONDS);
106+
task = queue.pollTaskAndBlockQuery(waitTimeoutNanos);
108107
assertEquals(qryKey1, task.queryKey());
109108

110-
// Unparking on unblock query second time.
109+
// Query unblock check second time.
111110
Thread thread3 = new Thread(pollAndStoreResult);
112111

113-
thread3.start();
114-
115-
assertTrue(GridTestUtils.waitForCondition(() -> thread3.getState() == Thread.State.TIMED_WAITING, waitTimeout));
116-
117112
queue.unblockQuery(qryKey1);
118113

114+
thread3.start();
115+
119116
thread3.join(waitTimeout);
120117

121118
assertFalse(thread3.isAlive());
@@ -128,7 +125,7 @@ public void testQueryBlockingUnblocking() throws Exception {
128125
/** */
129126
@Test
130127
public void testToArray() {
131-
QueryTasksQueue queue = new QueryTasksQueue();
128+
QueryTasksQueue queue = new QueryTasksQueue(1);
132129

133130
QueryKey qryKey1 = new QueryKey(UUID.randomUUID(), 0);
134131
QueryKey qryKey2 = new QueryKey(UUID.randomUUID(), 1);
@@ -169,7 +166,7 @@ public void testToArray() {
169166
/** */
170167
@Test
171168
public void testDrainTo() {
172-
QueryTasksQueue queue = new QueryTasksQueue();
169+
QueryTasksQueue queue = new QueryTasksQueue(1);
173170

174171
QueryKey qryKey1 = new QueryKey(UUID.randomUUID(), 0);
175172
QueryKey qryKey2 = new QueryKey(UUID.randomUUID(), 1);
@@ -210,7 +207,7 @@ public void testDrainTo() {
210207
/** */
211208
@Test
212209
public void testRemove() {
213-
QueryTasksQueue queue = new QueryTasksQueue();
210+
QueryTasksQueue queue = new QueryTasksQueue(1);
214211

215212
QueryKey qryKey1 = new QueryKey(UUID.randomUUID(), 0);
216213
QueryKey qryKey2 = new QueryKey(UUID.randomUUID(), 1);

0 commit comments

Comments
 (0)