Skip to content

Commit 094aa59

Browse files
misha-clouderasquito
authored andcommitted
[SPARK-24801][CORE] Avoid memory waste by empty byte[] arrays in SaslEncryption$EncryptedMessage
## What changes were proposed in this pull request? Initialize SaslEncryption$EncryptedMessage.byteChannel lazily, so that empty, not yet used instances of ByteArrayWritableChannel referenced by this field don't use up memory. I analyzed a heap dump from Yarn Node Manager where this code is used, and found that there are over 40,000 of the above objects in memory, each with a big empty byte[] array. The reason they are all there is because of Netty queued up a large number of messages in memory before transferTo() is called. There is a small number of netty ChannelOutboundBuffer objects, and then collectively , via linked lists starting from their flushedEntry data fields, they end up referencing over 40K ChannelOutboundBuffer$Entry objects, which ultimately reference EncryptedMessage objects. ## How was this patch tested? Ran all the tests locally. Author: Misha Dmitriev <[email protected]> Closes apache#21811 from countmdm/misha/spark-24801.
1 parent fa09d91 commit 094aa59

File tree

1 file changed

+7
-3
lines changed

1 file changed

+7
-3
lines changed

common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,13 +135,14 @@ static class EncryptedMessage extends AbstractFileRegion {
135135
private final boolean isByteBuf;
136136
private final ByteBuf buf;
137137
private final FileRegion region;
138+
private final int maxOutboundBlockSize;
138139

139140
/**
140141
* A channel used to buffer input data for encryption. The channel has an upper size bound
141142
* so that if the input is larger than the allowed buffer, it will be broken into multiple
142-
* chunks.
143+
* chunks. Made non-final to enable lazy initialization, which saves memory.
143144
*/
144-
private final ByteArrayWritableChannel byteChannel;
145+
private ByteArrayWritableChannel byteChannel;
145146

146147
private ByteBuf currentHeader;
147148
private ByteBuffer currentChunk;
@@ -157,7 +158,7 @@ static class EncryptedMessage extends AbstractFileRegion {
157158
this.isByteBuf = msg instanceof ByteBuf;
158159
this.buf = isByteBuf ? (ByteBuf) msg : null;
159160
this.region = isByteBuf ? null : (FileRegion) msg;
160-
this.byteChannel = new ByteArrayWritableChannel(maxOutboundBlockSize);
161+
this.maxOutboundBlockSize = maxOutboundBlockSize;
161162
}
162163

163164
/**
@@ -292,6 +293,9 @@ public long transferTo(final WritableByteChannel target, final long position)
292293
}
293294

294295
private void nextChunk() throws IOException {
296+
if (byteChannel == null) {
297+
byteChannel = new ByteArrayWritableChannel(maxOutboundBlockSize);
298+
}
295299
byteChannel.reset();
296300
if (isByteBuf) {
297301
int copied = byteChannel.write(buf.nioBuffer());

0 commit comments

Comments
 (0)