Skip to content

Commit 5025d5c

Browse files
ARROW-1618: Reduce Heap Usage (Phase 1)
1 parent 1b8cabd commit 5025d5c

File tree

1 file changed

+37
-44
lines changed

1 file changed

+37
-44
lines changed

java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java

Lines changed: 37 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta
140140
return existingLedger;
141141
}
142142

143-
final BufferLedger ledger = new BufferLedger(allocator, new ReleaseListener(allocator));
143+
final BufferLedger ledger = new BufferLedger(allocator);
144144
if (retain) {
145145
ledger.inc();
146146
}
@@ -151,54 +151,41 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta
151151
}
152152
}
153153

154-
155154
/**
156155
* The way that a particular BufferLedger communicates back to the AllocationManager that it
157156
* now longer needs to hold
158157
* a reference to particular piece of memory.
158+
* Can only be called when you already hold the writeLock.
159159
*/
160-
private class ReleaseListener {
161-
162-
private final BufferAllocator allocator;
163-
164-
public ReleaseListener(BufferAllocator allocator) {
165-
this.allocator = allocator;
166-
}
167-
168-
/**
169-
* Can only be called when you already hold the writeLock.
170-
*/
171-
public void release() {
172-
allocator.assertOpen();
160+
private void release(final BufferLedger ledger) {
161+
final BaseAllocator allocator = ledger.getAllocator();
162+
allocator.assertOpen();
173163

174-
final BufferLedger oldLedger = map.remove(allocator);
175-
oldLedger.allocator.dissociateLedger(oldLedger);
164+
final BufferLedger oldLedger = map.remove(allocator);
165+
oldLedger.allocator.dissociateLedger(oldLedger);
176166

177-
if (oldLedger == owningLedger) {
178-
if (map.isEmpty()) {
179-
// no one else owns, lets release.
180-
oldLedger.allocator.releaseBytes(size);
181-
underlying.release();
182-
amDestructionTime = System.nanoTime();
183-
owningLedger = null;
184-
} else {
185-
// we need to change the owning allocator. we've been removed so we'll get whatever is
186-
// top of list
187-
BufferLedger newLedger = map.values().iterator().next();
188-
189-
// we'll forcefully transfer the ownership and not worry about whether we exceeded the
190-
// limit
191-
// since this consumer can't do anything with this.
192-
oldLedger.transferBalance(newLedger);
193-
}
167+
if (oldLedger == owningLedger) {
168+
if (map.isEmpty()) {
169+
// no one else owns, lets release.
170+
oldLedger.allocator.releaseBytes(size);
171+
underlying.release();
172+
amDestructionTime = System.nanoTime();
173+
owningLedger = null;
194174
} else {
195-
if (map.isEmpty()) {
196-
throw new IllegalStateException("The final removal of a ledger should be connected to " +
197-
"the owning ledger.");
198-
}
175+
// we need to change the owning allocator. we've been removed so we'll get whatever is
176+
// top of list
177+
BufferLedger newLedger = map.values().iterator().next();
178+
179+
// we'll forcefully transfer the ownership and not worry about whether we exceeded the
180+
// limit
181+
// since this consumer can't do anything with this.
182+
oldLedger.transferBalance(newLedger);
183+
}
184+
} else {
185+
if (map.isEmpty()) {
186+
throw new IllegalStateException("The final removal of a ledger should be connected to " +
187+
"the owning ledger.");
199188
}
200-
201-
202189
}
203190
}
204191

@@ -221,16 +208,22 @@ public class BufferLedger {
221208
// correctly
222209
private final long lCreationTime = System.nanoTime();
223210
private final BaseAllocator allocator;
224-
private final ReleaseListener listener;
225211
private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog
226212
(BaseAllocator.DEBUG_LOG_LENGTH,
227213
"BufferLedger[%d]", 1)
228214
: null;
229215
private volatile long lDestructionTime = 0;
230216

231-
private BufferLedger(BaseAllocator allocator, ReleaseListener listener) {
217+
private BufferLedger(BaseAllocator allocator) {
232218
this.allocator = allocator;
233-
this.listener = listener;
219+
}
220+
221+
/**
222+
* Get the allocator for this ledger
223+
* @return allocator
224+
*/
225+
private BaseAllocator getAllocator() {
226+
return allocator;
234227
}
235228

236229
/**
@@ -339,7 +332,7 @@ public int decrement(int decrement) {
339332
outcome = bufRefCnt.addAndGet(-decrement);
340333
if (outcome == 0) {
341334
lDestructionTime = System.nanoTime();
342-
listener.release();
335+
release(this);
343336
}
344337
}
345338

0 commit comments

Comments
 (0)