Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.ebean.datasource.pool;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* A buffer especially designed for Busy PooledConnections.
Expand Down Expand Up @@ -82,9 +84,10 @@ boolean remove(PooledConnection pc) {
}

/**
* Close connections that should be considered leaked.
* Remove connections that should be considered leaked.
*/
void closeBusyConnections(long leakTimeMinutes) {
List<PooledConnection> removeBusyConnections(long leakTimeMinutes) {
List<PooledConnection> busyConnections = null;
long olderThanTime = System.currentTimeMillis() - (leakTimeMinutes * 60000);
Log.debug("Closing busy connections using leakTimeMinutes {0}", leakTimeMinutes);
for (int i = 0; i < slots.length; i++) {
Expand All @@ -98,20 +101,14 @@ void closeBusyConnections(long leakTimeMinutes) {
} else {
slots[i] = null;
--size;
closeBusyConnection(pc);
if (busyConnections == null) {
busyConnections = new ArrayList<>();
}
busyConnections.add(pc);
}
}
}
}

private void closeBusyConnection(PooledConnection pc) {
try {
Log.warn("DataSource closing busy connection? {0}", pc.fullDescription());
System.out.println("CLOSING busy connection: " + pc.fullDescription());
pc.closeConnectionFully(false);
} catch (Exception ex) {
Log.error("Error when closing potentially leaked connection " + pc.description(), ex);
}
return busyConnections;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public SQLException dataSourceDownReason() {

private void notifyDataSourceIsDown(SQLException reason) {
if (dataSourceUp.get()) {
reset();
reset(false);
notifyDown(reason);
}
}
Expand All @@ -316,7 +316,7 @@ private void notifyDown(SQLException reason) {

private void notifyDataSourceIsUp() {
if (!dataSourceUp.get()) {
reset();
reset(true);
notifyUp();
}
}
Expand Down Expand Up @@ -553,7 +553,7 @@ void returnConnectionForceClose(PooledConnection pooledConnection, boolean testP
}

void removeClosedConnection(PooledConnection pooledConnection) {
queue.returnPooledConnection(pooledConnection, true);
queue.returnPooledConnection(pooledConnection, true, false);
}

/**
Expand All @@ -564,13 +564,13 @@ private void returnTheConnection(PooledConnection pooledConnection, boolean forc
if (poolListener != null && !forceClose) {
poolListener.onBeforeReturnConnection(pooledConnection);
}
queue.returnPooledConnection(pooledConnection, forceClose);
queue.returnPooledConnection(pooledConnection, forceClose, true);
}

void returnConnectionReset(PooledConnection pooledConnection) {
queue.returnPooledConnection(pooledConnection, true);
queue.returnPooledConnection(pooledConnection, true, false);
Log.warn("Resetting DataSource on read-only failure [{0}]", name);
reset();
reset(false);
}

/**
Expand Down Expand Up @@ -599,9 +599,9 @@ PooledConnection createConnectionForQueue(int connId) throws SQLException {
* <li>Busy connections are closed when they are returned to the pool.</li>
* </ul>
*/
private void reset() {
private void reset(boolean logErrors) {
heartbeatPoolExhaustedCount = 0;
queue.reset(leakTimeMinutes);
queue.reset(leakTimeMinutes, logErrors);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,22 @@ void closeAll(boolean logErrors) {

/**
* Trim any inactive connections that have not been used since usedSince.
* <p>
* The connections are returned to close.
*/
int trim(int minSize, long usedSince, long createdSince) {
int trimCount = 0;
List<PooledConnection> trim(int minSize, long usedSince, long createdSince) {
List<PooledConnection> trimmedConnections = null;
ListIterator<PooledConnection> iterator = freeBuffer.listIterator(minSize);
while (iterator.hasNext()) {
PooledConnection pooledConnection = iterator.next();
if (pooledConnection.shouldTrim(usedSince, createdSince)) {
iterator.remove();
pooledConnection.closeConnectionFully(true);
trimCount++;
if (trimmedConnections == null) {
trimmedConnections = new ArrayList<>();
}
trimmedConnections.add(pooledConnection);
}
}
return trimCount;
return trimmedConnections;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.ebean.datasource.pool.ConnectionPool.Status;

import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -87,12 +88,7 @@ private PoolStatus createStatus() {

@Override
public String toString() {
lock.lock();
try {
return createStatus().toString();
} finally {
lock.unlock();
}
return status(false).toString();
}

PoolStatus status(boolean reset) {
Expand Down Expand Up @@ -147,31 +143,40 @@ void ensureMinimumConnections() throws SQLException {
/**
* Return a PooledConnection.
*/
void returnPooledConnection(PooledConnection c, boolean forceClose) {
void returnPooledConnection(PooledConnection c, boolean forceClose, boolean logErrors) {
boolean closeConnection = false;
lock.lock();
try {
if (!busyList.remove(c)) {
Log.error("Connection [{0}] not found in BusyList?", c);
}
if (forceClose || c.shouldTrimOnReturn(lastResetTime, maxAgeMillis)) {
c.closeConnectionFully(false);
closeConnection = true; // close outside lock
} else {
freeList.add(c);
notEmpty.signal();
notEmpty.signal(); // notify _obtainConnectionWaitLoop, that there are new connections in the freelist
}
} finally {
lock.unlock();
}
if (closeConnection) {
c.closeConnectionFully(logErrors);
}
}

private PooledConnection extractFromFreeList() {
/**
* Returns one connection from the free list and put it into the busy list or null if no connections are free.
*
* @throws StaleConnectionException there was a connection in the freeList, but it is stale.
* The connection has to be closed outside the lock.
*/
private PooledConnection extractFromFreeList() throws StaleConnectionException {
if (freeList.isEmpty()) {
return null;
}
final PooledConnection c = freeList.remove();
if (validateStaleMillis > 0 && staleEviction(c)) {
c.closeConnectionFully(false);
return null;
throw new StaleConnectionException(c);
}
registerBusyConnection(c);
return c;
Expand All @@ -192,15 +197,24 @@ private boolean stale(PooledConnection c) {
}

PooledConnection obtainConnection() throws SQLException {
try {
PooledConnection pc = _obtainConnection();
pc.resetForUse();
return pc;

} catch (InterruptedException e) {
// restore the interrupted status as we throw SQLException
Thread.currentThread().interrupt();
throw new SQLException("Interrupted getting connection from pool", e);
var start = System.nanoTime();
while (true) {
try {
PooledConnection pc = _obtainConnection();
pc.resetForUse();
final var elapsed = System.nanoTime() - start;
totalAcquireNanos += elapsed;
maxAcquireNanos = Math.max(maxAcquireNanos, elapsed);
return pc;

} catch (InterruptedException e) {
// restore the interrupted status as we throw SQLException
Thread.currentThread().interrupt();
throw new SQLException("Interrupted getting connection from pool", e);
} catch (StaleConnectionException e) {
e.getConnection().closeConnectionFully(true);
// try again...
}
}
}

Expand All @@ -215,8 +229,8 @@ private int registerBusyConnection(PooledConnection connection) {
return busySize;
}

private PooledConnection _obtainConnection() throws InterruptedException, SQLException {
var start = System.nanoTime();
private PooledConnection _obtainConnection() throws InterruptedException, SQLException, StaleConnectionException {

lock.lockInterruptibly();
try {
if (doingShutdown) {
Expand Down Expand Up @@ -246,9 +260,6 @@ private PooledConnection _obtainConnection() throws InterruptedException, SQLExc
waitingThreads--;
}
} finally {
final var elapsed = System.nanoTime() - start;
totalAcquireNanos += elapsed;
maxAcquireNanos = Math.max(maxAcquireNanos, elapsed);
lock.unlock();
}
}
Expand All @@ -270,7 +281,7 @@ private PooledConnection createConnection() throws SQLException {
/**
* Got into a loop waiting for connections to be returned to the pool.
*/
private PooledConnection _obtainConnectionWaitLoop() throws SQLException, InterruptedException {
private PooledConnection _obtainConnectionWaitLoop() throws SQLException, InterruptedException, StaleConnectionException {
long nanos = MILLIS_TIME_UNIT.toNanos(waitTimeoutMillis);
for (; ; ) {
if (nanos <= 0) {
Expand Down Expand Up @@ -307,7 +318,7 @@ PoolStatus shutdown(boolean closeBusyConnections) {
try {
doingShutdown = true;
PoolStatus status = createStatus();
closeFreeConnections(true);
freeList.closeAll(true);

if (!closeBusyConnections) {
// connections close on return to pool
Expand All @@ -332,14 +343,14 @@ PoolStatus shutdown(boolean closeBusyConnections) {
* <p>
* This is typically done when a database down event occurs.
*/
void reset(long leakTimeMinutes) {
void reset(long leakTimeMinutes, boolean logErrors) {
lock.lock();
try {
PoolStatus status = createStatus();
Log.info("Resetting DataSource [{0}] {1}", name, status);
lastResetTime = System.currentTimeMillis();

closeFreeConnections(false);
freeList.closeAll(logErrors);
closeBusyConnections(leakTimeMinutes);

String busyInfo = getBusyConnectionInformation();
Expand All @@ -353,17 +364,12 @@ void reset(long leakTimeMinutes) {
}

void trim(long maxInactiveMillis, long maxAgeMillis) {
lock.lock();
try {
if (trimInactiveConnections(maxInactiveMillis, maxAgeMillis)) {
try {
ensureMinimumConnections();
} catch (SQLException e) {
Log.error("Error trying to ensure minimum connections", e);
}
if (trimInactiveConnections(maxInactiveMillis, maxAgeMillis)) {
try {
ensureMinimumConnections();
} catch (SQLException e) {
Log.error("Error trying to ensure minimum connections", e);
}
} finally {
lock.unlock();
}
}

Expand All @@ -372,33 +378,32 @@ void trim(long maxInactiveMillis, long maxAgeMillis) {
*/
private boolean trimInactiveConnections(long maxInactiveMillis, long maxAgeMillis) {
final long createdSince = (maxAgeMillis == 0) ? 0 : System.currentTimeMillis() - maxAgeMillis;
final int trimmedCount;
if (freeList.size() > minSize) {
// trim on maxInactive and maxAge
long usedSince = System.currentTimeMillis() - maxInactiveMillis;
trimmedCount = freeList.trim(minSize, usedSince, createdSince);
} else if (createdSince > 0) {
// trim only on maxAge
trimmedCount = freeList.trim(0, createdSince, createdSince);
} else {
trimmedCount = 0;
}
if (trimmedCount > 0 && Log.isLoggable(DEBUG)) {
Log.debug("DataSource [{0}] trimmed [{1}] inactive connections. New size[{2}]", name, trimmedCount, totalConnections());
}
return trimmedCount > 0 && freeList.size() < minSize;
}

/**
* Close all the connections that are in the free list.
*/
private void closeFreeConnections(boolean logErrors) {
final List<PooledConnection> trimmed;
lock.lock();
try {
freeList.closeAll(logErrors);
if (freeList.size() > minSize) {
// trim on maxInactive and maxAge
long usedSince = System.currentTimeMillis() - maxInactiveMillis;
trimmed = freeList.trim(minSize, usedSince, createdSince);
} else if (createdSince > 0) {
// trim only on maxAge
trimmed = freeList.trim(0, createdSince, createdSince);
} else {
trimmed = null;
}
} finally {
lock.unlock();
}
if (trimmed != null) {
if (Log.isLoggable(DEBUG)) {
Log.debug("DataSource [{0}] trimmed [{1}] inactive connections. New size[{2}]", name, trimmed.size(), totalConnections());
}
for (PooledConnection pc : trimmed) {
pc.closeConnectionFully(true);
}
return freeList.size() < minSize;
}
return false;
}

/**
Expand All @@ -412,12 +417,23 @@ private void closeFreeConnections(boolean logErrors) {
* closed and put back into the pool.
*/
void closeBusyConnections(long leakTimeMinutes) {
List<PooledConnection> busyConnections;
lock.lock();
try {
busyList.closeBusyConnections(leakTimeMinutes);
busyConnections = busyList.removeBusyConnections(leakTimeMinutes);
} finally {
lock.unlock();
}
if (busyConnections != null) {
for (PooledConnection pc : busyConnections) {
try {
Log.warn("DataSource closing busy connection? {0}", pc.fullDescription());
pc.closeConnectionFully(true);
} catch (Exception ex) {
Log.error("Error when closing potentially leaked connection " + pc.description(), ex);
}
}
}
}

String getBusyConnectionInformation() {
Expand Down
Loading
Loading