Skip to content

Commit 675dc53

Browse files
committed
Call allocation listener on BaseAllocator#wrapForeignAllocation
1 parent 0db5c93 commit 675dc53

File tree

5 files changed

+189
-106
lines changed

5 files changed

+189
-106
lines changed

java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,41 @@ private void childClosed(final BaseAllocator childAllocator) {
233233
listener.onChildRemoved(this, childAllocator);
234234
}
235235

236+
@Override
237+
public ArrowBuf wrapForeignAllocation(ForeignAllocation allocation) {
238+
assertOpen();
239+
final long size = allocation.getSize();
240+
listener.onPreAllocation(size);
241+
AllocationOutcome outcome = this.allocateBytes(size);
242+
if (!outcome.isOk()) {
243+
if (listener.onFailedAllocation(size, outcome)) {
244+
// Second try, in case the listener can do something about it
245+
outcome = this.allocateBytes(size);
246+
}
247+
if (!outcome.isOk()) {
248+
throw new OutOfMemoryException(createErrorMsg(this, size,
249+
size), outcome.getDetails());
250+
}
251+
}
252+
try {
253+
final AllocationManager manager = new ForeignAllocationManager(this, allocation);
254+
final BufferLedger ledger = manager.associate(this);
255+
final ArrowBuf buf =
256+
new ArrowBuf(ledger, /*bufferManager=*/null, size, allocation.memoryAddress());
257+
buf.writerIndex(size);
258+
listener.onAllocation(size);
259+
return buf;
260+
} catch (Throwable t) {
261+
try {
262+
releaseBytes(size);
263+
allocation.release0();
264+
} catch (Throwable e) {
265+
t.addSuppressed(e);
266+
}
267+
throw t;
268+
}
269+
}
270+
236271
@Override
237272
public ArrowBuf buffer(final long initialRequestSize) {
238273
assertOpen();

java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -248,22 +248,5 @@ default RoundingPolicy getRoundingPolicy() {
248248
*
249249
* @param allocation The underlying allocation.
250250
*/
251-
default ArrowBuf wrapForeignAllocation(ForeignAllocation allocation) {
252-
try {
253-
forceAllocate(allocation.getSize());
254-
final AllocationManager manager = new ForeignAllocationManager(this, allocation);
255-
final BufferLedger ledger = manager.associate(this);
256-
final ArrowBuf buf =
257-
new ArrowBuf(ledger, /*bufferManager=*/null, allocation.getSize(), allocation.memoryAddress());
258-
buf.writerIndex(allocation.getSize());
259-
return buf;
260-
} catch (Throwable t) {
261-
try {
262-
allocation.release0();
263-
} catch (Throwable e) {
264-
t.addSuppressed(e);
265-
}
266-
throw t;
267-
}
268-
}
251+
ArrowBuf wrapForeignAllocation(ForeignAllocation allocation);
269252
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.arrow.memory;
19+
20+
// Allocation listener
21+
// It counts the number of times it has been invoked, and how much memory allocation it has seen
22+
// When set to 'expand on fail', it attempts to expand the associated allocator's limit
23+
final class CountingAllocationListener implements AllocationListener {
24+
private int numPreCalls;
25+
private int numCalls;
26+
private int numReleaseCalls;
27+
private int numChildren;
28+
private long totalMem;
29+
private boolean expandOnFail;
30+
BufferAllocator expandAlloc;
31+
long expandLimit;
32+
33+
CountingAllocationListener() {
34+
this.numCalls = 0;
35+
this.numChildren = 0;
36+
this.totalMem = 0;
37+
this.expandOnFail = false;
38+
this.expandAlloc = null;
39+
this.expandLimit = 0;
40+
}
41+
42+
@Override
43+
public void onPreAllocation(long size) {
44+
numPreCalls++;
45+
}
46+
47+
@Override
48+
public void onAllocation(long size) {
49+
numCalls++;
50+
totalMem += size;
51+
}
52+
53+
@Override
54+
public boolean onFailedAllocation(long size, AllocationOutcome outcome) {
55+
if (expandOnFail) {
56+
expandAlloc.setLimit(expandLimit);
57+
return true;
58+
}
59+
return false;
60+
}
61+
62+
63+
@Override
64+
public void onRelease(long size) {
65+
numReleaseCalls++;
66+
}
67+
68+
@Override
69+
public void onChildAdded(BufferAllocator parentAllocator, BufferAllocator childAllocator) {
70+
++numChildren;
71+
}
72+
73+
@Override
74+
public void onChildRemoved(BufferAllocator parentAllocator, BufferAllocator childAllocator) {
75+
--numChildren;
76+
}
77+
78+
void setExpandOnFail(BufferAllocator expandAlloc, long expandLimit) {
79+
this.expandOnFail = true;
80+
this.expandAlloc = expandAlloc;
81+
this.expandLimit = expandLimit;
82+
}
83+
84+
int getNumPreCalls() {
85+
return numPreCalls;
86+
}
87+
88+
int getNumReleaseCalls() {
89+
return numReleaseCalls;
90+
}
91+
92+
int getNumCalls() {
93+
return numCalls;
94+
}
95+
96+
int getNumChildren() {
97+
return numChildren;
98+
}
99+
100+
long getTotalMem() {
101+
return totalMem;
102+
}
103+
}

java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java

Lines changed: 3 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -438,100 +438,15 @@ public ArrowBuf empty() {
438438
}).build());
439439
}
440440

441-
// Allocation listener
442-
// It counts the number of times it has been invoked, and how much memory allocation it has seen
443-
// When set to 'expand on fail', it attempts to expand the associated allocator's limit
444-
private static final class TestAllocationListener implements AllocationListener {
445-
private int numPreCalls;
446-
private int numCalls;
447-
private int numReleaseCalls;
448-
private int numChildren;
449-
private long totalMem;
450-
private boolean expandOnFail;
451-
BufferAllocator expandAlloc;
452-
long expandLimit;
453-
454-
TestAllocationListener() {
455-
this.numCalls = 0;
456-
this.numChildren = 0;
457-
this.totalMem = 0;
458-
this.expandOnFail = false;
459-
this.expandAlloc = null;
460-
this.expandLimit = 0;
461-
}
462-
463-
@Override
464-
public void onPreAllocation(long size) {
465-
numPreCalls++;
466-
}
467-
468-
@Override
469-
public void onAllocation(long size) {
470-
numCalls++;
471-
totalMem += size;
472-
}
473-
474-
@Override
475-
public boolean onFailedAllocation(long size, AllocationOutcome outcome) {
476-
if (expandOnFail) {
477-
expandAlloc.setLimit(expandLimit);
478-
return true;
479-
}
480-
return false;
481-
}
482-
483-
484-
@Override
485-
public void onRelease(long size) {
486-
numReleaseCalls++;
487-
}
488-
489-
@Override
490-
public void onChildAdded(BufferAllocator parentAllocator, BufferAllocator childAllocator) {
491-
++numChildren;
492-
}
493-
494-
@Override
495-
public void onChildRemoved(BufferAllocator parentAllocator, BufferAllocator childAllocator) {
496-
--numChildren;
497-
}
498-
499-
void setExpandOnFail(BufferAllocator expandAlloc, long expandLimit) {
500-
this.expandOnFail = true;
501-
this.expandAlloc = expandAlloc;
502-
this.expandLimit = expandLimit;
503-
}
504-
505-
int getNumPreCalls() {
506-
return numPreCalls;
507-
}
508-
509-
int getNumReleaseCalls() {
510-
return numReleaseCalls;
511-
}
512-
513-
int getNumCalls() {
514-
return numCalls;
515-
}
516-
517-
int getNumChildren() {
518-
return numChildren;
519-
}
520-
521-
long getTotalMem() {
522-
return totalMem;
523-
}
524-
}
525-
526441
@Test
527442
public void testRootAllocator_listeners() throws Exception {
528-
TestAllocationListener l1 = new TestAllocationListener();
443+
CountingAllocationListener l1 = new CountingAllocationListener();
529444
assertEquals(0, l1.getNumPreCalls());
530445
assertEquals(0, l1.getNumCalls());
531446
assertEquals(0, l1.getNumReleaseCalls());
532447
assertEquals(0, l1.getNumChildren());
533448
assertEquals(0, l1.getTotalMem());
534-
TestAllocationListener l2 = new TestAllocationListener();
449+
CountingAllocationListener l2 = new CountingAllocationListener();
535450
assertEquals(0, l2.getNumPreCalls());
536451
assertEquals(0, l2.getNumCalls());
537452
assertEquals(0, l2.getNumReleaseCalls());
@@ -590,7 +505,7 @@ public void testRootAllocator_listeners() throws Exception {
590505

591506
@Test
592507
public void testRootAllocator_listenerAllocationFail() throws Exception {
593-
TestAllocationListener l1 = new TestAllocationListener();
508+
CountingAllocationListener l1 = new CountingAllocationListener();
594509
assertEquals(0, l1.getNumCalls());
595510
assertEquals(0, l1.getTotalMem());
596511
// Test attempts to allocate too much from a child whose limit is set to half of the max

java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestForeignAllocation.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,53 @@ public void wrapForeignAllocation() {
5454
assertEquals(0, allocator.getAllocatedMemory());
5555
}
5656

57+
@Test
58+
public void wrapForeignAllocationWithAllocationListener() {
59+
final long bufferSize = 16;
60+
61+
final CountingAllocationListener listener = new CountingAllocationListener();
62+
try (BufferAllocator listenedAllocator =
63+
allocator.newChildAllocator("child", listener, 0L, allocator.getLimit())) {
64+
UnsafeForeignAllocation allocation = new UnsafeForeignAllocation(bufferSize);
65+
try {
66+
assertEquals(0, listenedAllocator.getAllocatedMemory());
67+
ArrowBuf buf = listenedAllocator.wrapForeignAllocation(allocation);
68+
assertEquals(bufferSize, buf.capacity());
69+
buf.close();
70+
assertTrue(allocation.released);
71+
} finally {
72+
allocation.release0();
73+
}
74+
assertEquals(0, listenedAllocator.getAllocatedMemory());
75+
}
76+
assertEquals(1, listener.getNumPreCalls());
77+
assertEquals(1, listener.getNumCalls());
78+
assertEquals(1, listener.getNumReleaseCalls());
79+
assertEquals(16, listener.getTotalMem());
80+
}
81+
82+
@Test(expected = OutOfMemoryException.class)
83+
public void wrapForeignAllocationFailedWithAllocationListener() {
84+
final long bufferSize = 16;
85+
final long limit = bufferSize - 1;
86+
87+
final CountingAllocationListener listener = new CountingAllocationListener();
88+
try (BufferAllocator listenedAllocator =
89+
allocator.newChildAllocator("child", listener, 0L, limit)) {
90+
UnsafeForeignAllocation allocation = new UnsafeForeignAllocation(bufferSize);
91+
try {
92+
assertEquals(0, listenedAllocator.getAllocatedMemory());
93+
ArrowBuf buf = listenedAllocator.wrapForeignAllocation(allocation);
94+
assertEquals(bufferSize, buf.capacity());
95+
buf.close();
96+
assertTrue(allocation.released);
97+
} finally {
98+
allocation.release0();
99+
}
100+
assertEquals(0, listenedAllocator.getAllocatedMemory());
101+
}
102+
}
103+
57104
private static class UnsafeForeignAllocation extends ForeignAllocation {
58105
boolean released = false;
59106

0 commit comments

Comments
 (0)