Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 7c92351

Browse files
original-brownbearsrowen
authored andcommitted
[MINOR][CORE] Cleanup dead code and duplication in Mem. Management
## What changes were proposed in this pull request? * Removed the method `org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter#alignToWords`. It became unused as a result of 85b0a15 (SPARK-15962) introducing word alignment for unsafe arrays. * Cleaned up duplicate code in memory management and unsafe sorters * The change extracting the exception paths is more than just cosmetics since it def. reduces the size the affected methods compile to ## How was this patch tested? * Build still passes after removing the method, grepping the codebase for `alignToWords` shows no reference to it anywhere either. * Dried up code is covered by existing tests. Author: Armin <[email protected]> Closes apache#19254 from original-brownbear/cleanup-mem-consumer.
1 parent a11db94 commit 7c92351

File tree

4 files changed

+37
-61
lines changed

4 files changed

+37
-61
lines changed

core/src/main/java/org/apache/spark/memory/MemoryConsumer.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,7 @@ public LongArray allocateArray(long size) {
8989
long required = size * 8L;
9090
MemoryBlock page = taskMemoryManager.allocatePage(required, this);
9191
if (page == null || page.size() < required) {
92-
long got = 0;
93-
if (page != null) {
94-
got = page.size();
95-
taskMemoryManager.freePage(page, this);
96-
}
97-
taskMemoryManager.showMemoryUsage();
98-
throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got);
92+
throwOom(page, required);
9993
}
10094
used += required;
10195
return new LongArray(page);
@@ -116,13 +110,7 @@ public void freeArray(LongArray array) {
116110
protected MemoryBlock allocatePage(long required) {
117111
MemoryBlock page = taskMemoryManager.allocatePage(Math.max(pageSize, required), this);
118112
if (page == null || page.size() < required) {
119-
long got = 0;
120-
if (page != null) {
121-
got = page.size();
122-
taskMemoryManager.freePage(page, this);
123-
}
124-
taskMemoryManager.showMemoryUsage();
125-
throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got);
113+
throwOom(page, required);
126114
}
127115
used += page.size();
128116
return page;
@@ -152,4 +140,14 @@ public void freeMemory(long size) {
152140
taskMemoryManager.releaseExecutionMemory(size, this);
153141
used -= size;
154142
}
143+
144+
private void throwOom(final MemoryBlock page, final long required) {
145+
long got = 0;
146+
if (page != null) {
147+
got = page.size();
148+
taskMemoryManager.freePage(page, this);
149+
}
150+
taskMemoryManager.showMemoryUsage();
151+
throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got);
152+
}
155153
}

core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -283,13 +283,7 @@ private void advanceToNextPage() {
283283
} else {
284284
currentPage = null;
285285
if (reader != null) {
286-
// remove the spill file from disk
287-
File file = spillWriters.removeFirst().getFile();
288-
if (file != null && file.exists()) {
289-
if (!file.delete()) {
290-
logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
291-
}
292-
}
286+
handleFailedDelete();
293287
}
294288
try {
295289
Closeables.close(reader, /* swallowIOException = */ false);
@@ -307,13 +301,7 @@ private void advanceToNextPage() {
307301
public boolean hasNext() {
308302
if (numRecords == 0) {
309303
if (reader != null) {
310-
// remove the spill file from disk
311-
File file = spillWriters.removeFirst().getFile();
312-
if (file != null && file.exists()) {
313-
if (!file.delete()) {
314-
logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
315-
}
316-
}
304+
handleFailedDelete();
317305
}
318306
}
319307
return numRecords > 0;
@@ -403,6 +391,14 @@ public long spill(long numBytes) throws IOException {
403391
public void remove() {
404392
throw new UnsupportedOperationException();
405393
}
394+
395+
private void handleFailedDelete() {
396+
// remove the spill file from disk
397+
File file = spillWriters.removeFirst().getFile();
398+
if (file != null && file.exists() && !file.delete()) {
399+
logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
400+
}
401+
}
406402
}
407403

408404
/**

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -219,15 +219,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
219219
new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics,
220220
inMemSorter.numRecords());
221221
spillWriters.add(spillWriter);
222-
final UnsafeSorterIterator sortedRecords = inMemSorter.getSortedIterator();
223-
while (sortedRecords.hasNext()) {
224-
sortedRecords.loadNext();
225-
final Object baseObject = sortedRecords.getBaseObject();
226-
final long baseOffset = sortedRecords.getBaseOffset();
227-
final int recordLength = sortedRecords.getRecordLength();
228-
spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix());
229-
}
230-
spillWriter.close();
222+
spillIterator(inMemSorter.getSortedIterator(), spillWriter);
231223
}
232224

233225
final long spillSize = freeMemory();
@@ -488,6 +480,18 @@ public UnsafeSorterIterator getSortedIterator() throws IOException {
488480
}
489481
}
490482

483+
private static void spillIterator(UnsafeSorterIterator inMemIterator,
484+
UnsafeSorterSpillWriter spillWriter) throws IOException {
485+
while (inMemIterator.hasNext()) {
486+
inMemIterator.loadNext();
487+
final Object baseObject = inMemIterator.getBaseObject();
488+
final long baseOffset = inMemIterator.getBaseOffset();
489+
final int recordLength = inMemIterator.getRecordLength();
490+
spillWriter.write(baseObject, baseOffset, recordLength, inMemIterator.getKeyPrefix());
491+
}
492+
spillWriter.close();
493+
}
494+
491495
/**
492496
* An UnsafeSorterIterator that support spilling.
493497
*/
@@ -503,6 +507,7 @@ class SpillableIterator extends UnsafeSorterIterator {
503507
this.numRecords = inMemIterator.getNumRecords();
504508
}
505509

510+
@Override
506511
public int getNumRecords() {
507512
return numRecords;
508513
}
@@ -521,14 +526,7 @@ public long spill() throws IOException {
521526
// Iterate over the records that have not been returned and spill them.
522527
final UnsafeSorterSpillWriter spillWriter =
523528
new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords);
524-
while (inMemIterator.hasNext()) {
525-
inMemIterator.loadNext();
526-
final Object baseObject = inMemIterator.getBaseObject();
527-
final long baseOffset = inMemIterator.getBaseOffset();
528-
final int recordLength = inMemIterator.getRecordLength();
529-
spillWriter.write(baseObject, baseOffset, recordLength, inMemIterator.getKeyPrefix());
530-
}
531-
spillWriter.close();
529+
spillIterator(inMemIterator, spillWriter);
532530
spillWriters.add(spillWriter);
533531
nextUpstream = spillWriter.getReader(serializerManager);
534532

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -109,22 +109,6 @@ public void setOffsetAndSize(int ordinal, long currentCursor, long size) {
109109
Platform.putLong(holder.buffer, fieldOffset, offsetAndSize);
110110
}
111111

112-
// Do word alignment for this row and grow the row buffer if needed.
113-
// todo: remove this after we make unsafe array data word align.
114-
public void alignToWords(int numBytes) {
115-
final int remainder = numBytes & 0x07;
116-
117-
if (remainder > 0) {
118-
final int paddingBytes = 8 - remainder;
119-
holder.grow(paddingBytes);
120-
121-
for (int i = 0; i < paddingBytes; i++) {
122-
Platform.putByte(holder.buffer, holder.cursor, (byte) 0);
123-
holder.cursor++;
124-
}
125-
}
126-
}
127-
128112
public void write(int ordinal, boolean value) {
129113
final long offset = getFieldOffset(ordinal);
130114
Platform.putLong(holder.buffer, offset, 0L);

0 commit comments

Comments
 (0)