Skip to content

Commit 6313ea0

Browse files
IGNITE-27891 SQL Calcite: Fix concurrent memory tracker reset and quota exceed - Fixes #12758.
Signed-off-by: Aleksey Plekhanov <plehanov.alex@gmail.com>
1 parent 3f31281 commit 6313ea0

File tree

9 files changed

+113
-79
lines changed

9 files changed

+113
-79
lines changed

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -426,8 +426,6 @@ public void execute(RunnableX task, Consumer<Throwable> onError) {
426426
}
427427
catch (Throwable e) {
428428
onError.accept(e);
429-
430-
throw new IgniteException("Unexpected exception", e);
431429
}
432430
});
433431
}

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ExecutionNodeMemoryTracker.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,15 @@ public static <T> RowTracker<T> create(MemoryTracker qryMemoryTracker, long rowO
5858
long size = sizeCalculator.sizeOf(obj);
5959

6060
size += rowOverhead;
61-
long newAllocated = allocated + size;
6261

63-
if (newAllocated > prevReported) {
64-
long newReported = (newAllocated + (BATCH_SIZE - 1)) & -BATCH_SIZE; // Align to batch size.
65-
qryMemoryTracker.onMemoryAllocated(newReported - prevReported);
62+
allocated += size;
63+
64+
if (allocated > prevReported) {
65+
long newReported = (allocated + (BATCH_SIZE - 1)) & -BATCH_SIZE; // Align to batch size.
66+
long diff = newReported - prevReported;
6667
prevReported = newReported;
68+
qryMemoryTracker.onMemoryAllocated(diff);
6769
}
68-
69-
allocated = newAllocated;
7070
}
7171

7272
/** {@inheritDoc} */
@@ -77,23 +77,25 @@ public static <T> RowTracker<T> create(MemoryTracker qryMemoryTracker, long rowO
7777
size = Math.min(size, allocated);
7878

7979
if (size > 0) {
80-
long newAllocated = allocated - size;
80+
allocated -= size;
8181

82-
if (newAllocated <= prevReported - BATCH_SIZE) {
83-
long newReported = (newAllocated + (BATCH_SIZE - 1)) & -BATCH_SIZE; // Align to batch size.
84-
qryMemoryTracker.onMemoryReleased(prevReported - newReported);
82+
if (allocated <= prevReported - BATCH_SIZE) {
83+
long newReported = (allocated + (BATCH_SIZE - 1)) & -BATCH_SIZE; // Align to batch size.
84+
long diff = prevReported - newReported;
8585
prevReported = newReported;
86+
qryMemoryTracker.onMemoryReleased(diff);
8687
}
87-
88-
allocated = newAllocated;
8988
}
9089
}
9190

9291
/** {@inheritDoc} */
9392
@Override public void reset() {
94-
if (prevReported > 0)
93+
if (prevReported > 0) {
9594
qryMemoryTracker.onMemoryReleased(prevReported);
9695

96+
prevReported = 0;
97+
}
98+
9799
allocated = 0;
98100
}
99101
}

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/GlobalMemoryTracker.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,8 @@ public GlobalMemoryTracker(long quota) {
3939

4040
/** {@inheritDoc} */
4141
@Override public void onMemoryAllocated(long size) {
42-
if (allocated.addAndGet(size) > quota) {
43-
allocated.addAndGet(-size);
44-
42+
if (allocated.addAndGet(size) > quota)
4543
throw new IgniteException("Global memory quota for SQL queries exceeded [quota=" + quota + ']');
46-
}
4744
}
4845

4946
/** {@inheritDoc} */

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/QueryMemoryTracker.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,10 @@ public static MemoryTracker create(MemoryTracker parent, long quota) {
5050
/** {@inheritDoc} */
5151
@Override public void onMemoryAllocated(long size) {
5252
try {
53-
if (allocated.addAndGet(size) > quota && quota > 0)
54-
throw new IgniteException("Query quota exceeded [quota=" + quota + ']');
55-
5653
parent.onMemoryAllocated(size);
5754
}
58-
catch (Exception e) {
59-
// Undo changes in case of quota exceeded.
60-
release(size);
61-
62-
throw e;
55+
finally {
56+
allocate(size);
6357
}
6458
}
6559

@@ -71,7 +65,13 @@ public static MemoryTracker create(MemoryTracker parent, long quota) {
7165
parent.onMemoryReleased(released);
7266
}
7367

74-
/** Release size, but no more than currently allocated. */
68+
/** Allocate size for current query. */
69+
private void allocate(long size) {
70+
if (allocated.addAndGet(size) > quota && quota > 0)
71+
throw new IgniteException("Query quota exceeded [quota=" + quota + ']');
72+
}
73+
74+
/** Release size for current query, but no more than currently allocated. */
7575
private long release(long size) {
7676
long wasAllocated;
7777
long released;

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RemoteException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class RemoteException extends RuntimeException {
3939
* @param fragmentId Fragment ID.
4040
*/
4141
public RemoteException(UUID nodeId, UUID queryId, long fragmentId, Throwable cause) {
42-
super("Remote query execution", cause);
42+
super("Remote query execution: " + cause.getMessage(), cause);
4343
this.nodeId = nodeId;
4444
this.queryId = queryId;
4545
this.fragmentId = fragmentId;

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.ignite.cache.query.FieldsQueryCursor;
2525
import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
2626
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
27+
import org.apache.ignite.internal.processors.query.IgniteSQLException;
2728
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
2829
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.ExecutionNodeMemoryTracker;
2930
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker;
@@ -90,7 +91,13 @@ public ListFieldsQueryCursor(
9091
});
9192
}
9293
catch (IgniteCheckedException e) {
93-
throw U.convertException(e);
94+
throw new IgniteSQLException(e.getMessage(), U.convertException(e));
95+
}
96+
catch (IgniteSQLException e) {
97+
throw e;
98+
}
99+
catch (Exception e) {
100+
throw new IgniteSQLException(e.getMessage(), e);
94101
}
95102
finally {
96103
qryMemoryTracker.reset();

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/MemoryTrackerTest.java

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -136,14 +136,29 @@ public void testRemoveOverflow() {
136136
/** */
137137
@Test
138138
public void testConcurrentModification() throws Exception {
139-
MemoryTracker globalTracker = new GlobalMemoryTracker(10_000_000L);
140-
MemoryTracker qryTracker = new QueryMemoryTracker(globalTracker, 1_000_000L);
139+
MemoryTracker globalTracker = new GlobalMemoryTracker(8_000L);
140+
141+
MemoryTracker[] qryTrackers = new MemoryTracker[2];
142+
143+
for (int i = 0; i < qryTrackers.length; i++)
144+
qryTrackers[i] = new QueryMemoryTracker(globalTracker, 5_000L);
145+
141146
AtomicBoolean stop = new AtomicBoolean();
142147

143148
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(() -> {
144149
while (!stop.get()) {
145-
qryTracker.onMemoryAllocated(1_000L);
146-
qryTracker.onMemoryReleased(1_000L);
150+
MemoryTracker qryTracker = qryTrackers[ThreadLocalRandom.current().nextInt(qryTrackers.length)];
151+
152+
try {
153+
qryTracker.onMemoryAllocated(1_000L);
154+
}
155+
catch (Exception ignore) {
156+
// No-op.
157+
}
158+
finally {
159+
// Release a little bit more than allocated, to test inaccuracy of row size calculation.
160+
qryTracker.onMemoryReleased(1_001L);
161+
}
147162

148163
if (ThreadLocalRandom.current().nextInt(10) == 0)
149164
qryTracker.reset();
@@ -158,7 +173,9 @@ public void testConcurrentModification() throws Exception {
158173

159174
fut.get();
160175

161-
assertEquals(0L, qryTracker.allocated());
176+
for (MemoryTracker qryTracker : qryTrackers)
177+
assertEquals(0L, qryTracker.allocated());
178+
162179
assertEquals(0L, globalTracker.allocated());
163180
}
164181

@@ -173,12 +190,20 @@ public void testQuotaExceed() {
173190
RowTracker<Object[]> rowTracker1 = new ExecutionNodeMemoryTracker<>(qryTracker1, 1_000L);
174191
RowTracker<Object[]> rowTracker2 = new ExecutionNodeMemoryTracker<>(qryTracker2, 1_000L);
175192

176-
GridTestUtils.assertThrows(log, () -> rowTracker1.onRowAdded(new Object[1]), IgniteException.class,
193+
Object[] row = new Object[1];
194+
195+
GridTestUtils.assertThrows(log, () -> rowTracker1.onRowAdded(row), IgniteException.class,
177196
"Global memory quota");
178197

179-
GridTestUtils.assertThrows(log, () -> rowTracker2.onRowAdded(new Object[1]), IgniteException.class,
198+
GridTestUtils.assertThrows(log, () -> rowTracker2.onRowAdded(row), IgniteException.class,
180199
"Global memory quota");
181200

201+
assertEquals(499_000L + ExecutionNodeMemoryTracker.BATCH_SIZE, qryTracker1.allocated());
202+
assertEquals(499_000L + ExecutionNodeMemoryTracker.BATCH_SIZE, qryTracker2.allocated());
203+
204+
rowTracker1.onRowRemoved(row);
205+
rowTracker2.onRowRemoved(row);
206+
182207
assertEquals(499_000L, qryTracker1.allocated());
183208
assertEquals(499_000L, qryTracker2.allocated());
184209

@@ -188,15 +213,15 @@ public void testQuotaExceed() {
188213
assertEquals(0L, qryTracker1.allocated());
189214
assertEquals(899_000L, qryTracker2.allocated());
190215

191-
rowTracker1.onRowAdded(new Object[1]);
216+
rowTracker1.onRowAdded(row);
192217

193218
assertEquals(ExecutionNodeMemoryTracker.BATCH_SIZE, qryTracker1.allocated());
194219

195-
GridTestUtils.assertThrows(log, () -> rowTracker2.onRowAdded(new Object[1]), IgniteException.class,
220+
GridTestUtils.assertThrows(log, () -> rowTracker2.onRowAdded(row), IgniteException.class,
196221
"Query quota");
197222

198-
assertEquals(899_000L, qryTracker2.allocated());
199-
assertEquals(899_000L + ExecutionNodeMemoryTracker.BATCH_SIZE, globalTracker.allocated());
223+
assertEquals(899_000L + ExecutionNodeMemoryTracker.BATCH_SIZE, qryTracker2.allocated());
224+
assertEquals(899_000L + ExecutionNodeMemoryTracker.BATCH_SIZE * 2, globalTracker.allocated());
200225
}
201226

202227
/** */

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -94,15 +94,6 @@ protected boolean destroyCachesAfterTest() {
9494
assertTrue("Not finished queries found on client", waitForCondition(
9595
() -> queryProcessor(client).queryRegistry().runningQueries().isEmpty(), 1_000L));
9696

97-
waitForCondition(() -> {
98-
for (Ignite ign : G.allGrids()) {
99-
if (!queryProcessor(ign).mailboxRegistry().inboxes().isEmpty())
100-
return false;
101-
}
102-
103-
return true;
104-
}, INBOX_INITIALIZATION_TIMEOUT * 2);
105-
10697
for (Ignite ign : G.allGrids()) {
10798
if (destroyCachesAfterTest()) {
10899
for (String cacheName : ign.cacheNames())
@@ -118,8 +109,8 @@ protected boolean destroyCachesAfterTest() {
118109
assertEquals("Tracked memory must be 0 after test [ignite=" + ign.name() + ']',
119110
0, execSvc.memoryTracker().allocated());
120111

121-
assertEquals("Count of inboxes must be 0 after test [ignite=" + ign.name() + ']',
122-
0, qryProc.mailboxRegistry().inboxes().size());
112+
assertTrue("Not closed inbox found [ignite=" + ign.name() + ']',
113+
waitForCondition(() -> qryProc.mailboxRegistry().inboxes().isEmpty(), INBOX_INITIALIZATION_TIMEOUT * 2));
123114

124115
assertEquals("Count of outboxes must be 0 after test [ignite=" + ign.name() + ']',
125116
0, qryProc.mailboxRegistry().outboxes().size());

0 commit comments

Comments
 (0)