Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/main/java/com/microsoft/sqlserver/jdbc/BufferPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.microsoft.sqlserver.jdbc;

import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;

public class BufferPool {
private final int bufferSize;
private final ConcurrentLinkedQueue<byte[]> pool = new ConcurrentLinkedQueue<>();

BufferPool(int bufferSize) {
this.bufferSize = bufferSize;
}

public byte[] rent() {
byte[] buffer = pool.poll();
return buffer == null ? new byte[bufferSize] : buffer;
}

public void release(byte[] buffer) {
if (buffer.length == bufferSize) {
pool.offer(buffer);
Arrays.fill(buffer, (byte) 0); // Clear the array for re-use
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to clear the buffer first, then put it pool. You lose ownership once added to pool and another thread might take the buffer, you can't touch it any more.

}
// If the buffer size does not match, we do not store it in the pool
}
}
42 changes: 42 additions & 0 deletions src/main/java/com/microsoft/sqlserver/jdbc/ByteBufferManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.microsoft.sqlserver.jdbc;

import java.util.concurrent.ConcurrentHashMap;

public class ByteBufferManager {

private static int[] poolSizes = { 1024, 2048, 4096, 8192, 16384, 32768, 65536 };
private final static ConcurrentHashMap<Integer, BufferPool> pools = new ConcurrentHashMap<>();

public ByteBufferManager() {
for (int size : poolSizes) {
pools.put(size, new BufferPool(size));
}
}

public byte[] rentBytes(int size) {
int poolSize = getPoolSize(size);

if (poolSize == -1) {
return new byte[size];

Check warning on line 20 in src/main/java/com/microsoft/sqlserver/jdbc/ByteBufferManager.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/microsoft/sqlserver/jdbc/ByteBufferManager.java#L20

Added line #L20 was not covered by tests
}
BufferPool pool = pools.computeIfAbsent(poolSize, BufferPool::new);
return pool.rent();
}

private static int getPoolSize(int size) {
if (size <= 0) return 1024;

int nextPower = Integer.highestOneBit(size - 1) << 1;
return nextPower <= 65536 ? nextPower : -1;
}

// Return a previously rented byte array
public void release(byte[] array) {
int size = array.length;
BufferPool pool = pools.get(size);
if (pool != null) {
pool.release(array);
}
// else discard — no pooling for this size
}
}
17 changes: 13 additions & 4 deletions src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6668,18 +6668,25 @@ final class TDSPacket {
final byte[] payload;
int payloadLength;
volatile TDSPacket next;
private final ByteBufferManager byteBufferManager;

final public String toString() {
return "TDSPacket(SPID:" + Util.readUnsignedShortBigEndian(header, TDS.PACKET_HEADER_SPID) + " Seq:"
+ header[TDS.PACKET_HEADER_SEQUENCE_NUM] + ")";
}

TDSPacket(int size) {
payload = new byte[size];
TDSPacket(int size, ByteBufferManager byteBufferManager) {
this.byteBufferManager = byteBufferManager;
// payload = new byte[size];
payload = byteBufferManager.rentBytes(size);
payloadLength = 0;
next = null;
}

void releasePayload() {
byteBufferManager.release(payload);
}

final boolean isEOM() {
return TDS.STATUS_BIT_EOM == (header[TDS.PACKET_HEADER_MESSAGE_STATUS] & TDS.STATUS_BIT_EOM);
}
Expand Down Expand Up @@ -6738,7 +6745,8 @@ final SQLServerConnection getConnection() {
return con;
}

private transient TDSPacket currentPacket = new TDSPacket(0);
private transient ByteBufferManager byteBufferManager = new ByteBufferManager();
private transient TDSPacket currentPacket = new TDSPacket(0, byteBufferManager);
private transient TDSPacket lastPacket = currentPacket;
private int payloadOffset = 0;
private int packetNum = 0;
Expand Down Expand Up @@ -6854,6 +6862,7 @@ private boolean nextPacket() throws SQLServerException {
if (logger.isLoggable(Level.FINEST))
logger.finest(toString() + " Moving to next packet -- unlinking consumed packet");

consumedPacket.releasePayload();
consumedPacket.next = null;
}
currentPacket = nextPacket;
Expand All @@ -6879,7 +6888,7 @@ final boolean readPacket() throws SQLServerException {
assert tdsChannel.numMsgsRcvd < tdsChannel.numMsgsSent : "numMsgsRcvd:" + tdsChannel.numMsgsRcvd
+ " should be less than numMsgsSent:" + tdsChannel.numMsgsSent;

TDSPacket newPacket = new TDSPacket(con.getTDSPacketSize());
TDSPacket newPacket = new TDSPacket(con.getTDSPacketSize(), byteBufferManager);
if ((null != command) &&
// if cancelQueryTimeout is set, we should wait for the total amount of
// queryTimeout + cancelQueryTimeout to
Expand Down
Loading