Skip to content

Commit 1c40da1

Browse files
authored
Merge pull request #135 from ebean-orm/feature/Timer-VirtualThread
Internal heartbeat Timer, use Virtual Thread for Java 21
2 parents 535ca43 + 9391eb1 commit 1c40da1

File tree

9 files changed

+294
-18
lines changed

9 files changed

+294
-18
lines changed

blackbox-tests/pom.xml

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<parent>
7+
<groupId>io.ebean</groupId>
8+
<artifactId>ebean-datasource-parent</artifactId>
9+
<version>9.6</version>
10+
</parent>
11+
12+
<artifactId>blackbox-tests</artifactId>
13+
14+
<properties>
15+
<maven.compiler.release>21</maven.compiler.release>
16+
</properties>
17+
18+
<dependencies>
19+
<dependency>
20+
<groupId>io.ebean</groupId>
21+
<artifactId>ebean-datasource</artifactId>
22+
<version>9.6</version>
23+
</dependency>
24+
25+
<dependency>
26+
<groupId>io.avaje</groupId>
27+
<artifactId>junit</artifactId>
28+
<version>1.5</version>
29+
<scope>test</scope>
30+
</dependency>
31+
32+
<dependency>
33+
<groupId>io.ebean</groupId>
34+
<artifactId>ebean-test-containers</artifactId>
35+
<version>7.8</version>
36+
<scope>test</scope>
37+
</dependency>
38+
39+
<dependency>
40+
<groupId>org.postgresql</groupId>
41+
<artifactId>postgresql</artifactId>
42+
<version>42.7.2</version>
43+
<scope>test</scope>
44+
</dependency>
45+
46+
<dependency>
47+
<groupId>ch.qos.logback</groupId>
48+
<artifactId>logback-classic</artifactId>
49+
<version>1.5.17</version>
50+
<scope>test</scope>
51+
</dependency>
52+
53+
<dependency>
54+
<groupId>org.slf4j</groupId>
55+
<artifactId>slf4j-jdk-platform-logging</artifactId>
56+
<version>2.0.17</version>
57+
<scope>test</scope>
58+
</dependency>
59+
60+
<dependency>
61+
<groupId>io.avaje</groupId>
62+
<artifactId>avaje-slf4j-jpl</artifactId>
63+
<version>1.2</version>
64+
<scope>test</scope>
65+
</dependency>
66+
67+
</dependencies>
68+
</project>
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package org.example.tests;
2+
3+
import io.ebean.datasource.DataSourceBuilder;
4+
import io.ebean.datasource.DataSourcePool;
5+
import io.ebean.test.containers.PostgresContainer;
6+
import org.junit.jupiter.api.BeforeAll;
7+
import org.junit.jupiter.api.Test;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import java.sql.Connection;
12+
import java.sql.SQLException;
13+
import java.util.ArrayList;
14+
import java.util.List;
15+
import java.util.Properties;
16+
17+
class Java21TrimAndShutdownTest {
18+
19+
private static Logger log = LoggerFactory.getLogger(Java21TrimAndShutdownTest.class);
20+
21+
@BeforeAll
22+
static void before() {
23+
PostgresContainer.builder("15")
24+
.port(9999)
25+
.containerName("pool_test")
26+
.dbName("app")
27+
.user("db_owner")
28+
.build()
29+
.startWithDropCreate();
30+
}
31+
32+
@Test
33+
void test() throws InterruptedException, SQLException {
34+
Properties clientInfo = new Properties();
35+
clientInfo.setProperty("ApplicationName", "my-test");
36+
37+
DataSourcePool pool = DataSourceBuilder.create()
38+
.url("jdbc:postgresql://127.0.0.1:9999/app")
39+
.username("db_owner")
40+
.password("test")
41+
.clientInfo(clientInfo)
42+
.maxInactiveTimeSecs(2)
43+
.heartbeatFreqSecs(1)
44+
.trimPoolFreqSecs(1)
45+
.build();
46+
47+
List<Connection> connectionList = new ArrayList<>();
48+
for (int i = 0; i < 50; i++) {
49+
connectionList.add(pool.getConnection());
50+
}
51+
52+
// close them slowly to allow multiple trims
53+
for (Connection connection : connectionList) {
54+
connection.rollback();
55+
connection.close();
56+
Thread.sleep(200);
57+
}
58+
59+
log.info("----------- Sleep allowing trim -------------");
60+
Thread.sleep(9_000);
61+
log.info("----------- Shutdown pool -------------");
62+
pool.shutdown();
63+
}
64+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<configuration scan="true" scanPeriod="10 seconds">
2+
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
3+
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
4+
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
5+
</encoder>
6+
</appender>
7+
8+
<root level="INFO">
9+
<appender-ref ref="STDOUT"/>
10+
</root>
11+
12+
<logger name="java.lang" level="WARN"/>
13+
<logger name="io.ebean" level="INFO"/>
14+
<logger name="io.avaje.config" level="TRACE"/>
15+
<logger name="io.ebean.docker" level="DEBUG"/>
16+
<logger name="io.ebean.test" level="TRACE"/>
17+
<logger name="io.ebean.datasource" level="TRACE"/>
18+
19+
</configuration>

ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@
2626
*/
2727
final class ConnectionPool implements DataSourcePool {
2828

29+
@FunctionalInterface
30+
interface Heartbeat {
31+
32+
void stop();
33+
}
34+
2935
private static final String APPLICATION_NAME = "ApplicationName";
3036
private final ReentrantLock heartbeatLock = new ReentrantLock(false);
3137
private final ReentrantLock notifyLock = new ReentrantLock(false);
@@ -80,7 +86,7 @@ final class ConnectionPool implements DataSourcePool {
8086
private final int waitTimeoutMillis;
8187
private final int pstmtCacheSize;
8288
private final PooledConnectionQueue queue;
83-
private Timer heartBeatTimer;
89+
private Heartbeat heartbeat;
8490
private int heartbeatPoolExhaustedCount;
8591
private final ExecutorService executor;
8692

@@ -161,13 +167,6 @@ void pstmtCacheMetrics(PstmtCache pstmtCache) {
161167
pscRem.add(pstmtCache.removeCount());
162168
}
163169

164-
final class HeartBeatRunnable extends TimerTask {
165-
@Override
166-
public void run() {
167-
heartBeat();
168-
}
169-
}
170-
171170
@Override
172171
public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
173172
throw new SQLFeatureNotSupportedException("We do not support java.util.logging");
@@ -387,7 +386,7 @@ private void trimIdleConnections() {
387386
* This is called by the HeartbeatRunnable which should be scheduled to
388387
* run periodically (every heartbeatFreqSecs seconds).
389388
*/
390-
private void heartBeat() {
389+
void heartbeat() {
391390
trimIdleConnections();
392391
if (validateOnHeartbeat) {
393392
testConnection();
@@ -727,11 +726,10 @@ private void startHeartBeatIfStopped() {
727726
heartbeatLock.lock();
728727
try {
729728
// only start if it is not already running
730-
if (heartBeatTimer == null) {
729+
if (heartbeat == null) {
731730
int freqMillis = heartbeatFreqSecs * 1000;
732731
if (freqMillis > 0) {
733-
heartBeatTimer = new Timer(name + ".heartBeat", true);
734-
heartBeatTimer.scheduleAtFixedRate(new HeartBeatRunnable(), freqMillis, freqMillis);
732+
heartbeat = ExecutorFactory.newHeartBeat(this, freqMillis);
735733
}
736734
}
737735
} finally {
@@ -743,9 +741,9 @@ private void stopHeartBeatIfRunning() {
743741
heartbeatLock.lock();
744742
try {
745743
// only stop if it was running
746-
if (heartBeatTimer != null) {
747-
heartBeatTimer.cancel();
748-
heartBeatTimer = null;
744+
if (heartbeat != null) {
745+
heartbeat.stop();
746+
heartbeat = null;
749747
}
750748
} finally {
751749
heartbeatLock.unlock();

ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPoolFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
import io.ebean.datasource.DataSourceFactory;
55
import io.ebean.datasource.DataSourcePool;
66

7+
import static java.util.Objects.requireNonNullElse;
8+
79
/**
810
* DataSourceFactory implementation that is service loaded.
911
*/
1012
public final class ConnectionPoolFactory implements DataSourceFactory {
1113

1214
@Override
1315
public DataSourcePool createPool(String name, DataSourceConfig config) {
14-
return new ConnectionPool(name, config);
16+
return new ConnectionPool(requireNonNullElse(name, ""), config);
1517
}
1618
}
Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,65 @@
11
package io.ebean.datasource.pool;
22

3+
import io.ebean.datasource.pool.ConnectionPool.Heartbeat;
4+
5+
import java.util.Timer;
6+
import java.util.TimerTask;
37
import java.util.concurrent.ExecutorService;
48
import java.util.concurrent.Executors;
9+
import java.util.concurrent.ThreadFactory;
510

611
final class ExecutorFactory {
712

813
static ExecutorService newExecutor() {
9-
return Executors.newSingleThreadExecutor();
14+
return Executors.newSingleThreadExecutor(factory());
15+
}
16+
17+
private static ThreadFactory factory() {
18+
return runnable -> {
19+
Thread thread = new Thread(runnable);
20+
thread.setName("datasource.reaper");
21+
return thread;
22+
};
23+
}
24+
25+
/**
26+
* Return a new Heartbeat for the pool.
27+
*/
28+
static Heartbeat newHeartBeat(ConnectionPool pool, int freqMillis) {
29+
final Timer timer = new Timer(nm(pool.name()), true);
30+
timer.scheduleAtFixedRate(new HeartbeatTask(pool), freqMillis, freqMillis);
31+
return new TimerHeartbeat(timer);
32+
}
33+
34+
private static String nm(String poolName) {
35+
return poolName.isEmpty() ? "datasource.heartbeat" : "datasource." + poolName + ".heartbeat";
36+
}
37+
38+
private static final class TimerHeartbeat implements Heartbeat {
39+
40+
private final Timer timer;
41+
42+
private TimerHeartbeat(Timer timer) {
43+
this.timer = timer;
44+
}
45+
46+
@Override
47+
public void stop() {
48+
timer.cancel();
49+
}
50+
}
51+
52+
private static final class HeartbeatTask extends TimerTask {
53+
54+
private final ConnectionPool pool;
55+
56+
private HeartbeatTask(ConnectionPool pool) {
57+
this.pool = pool;
58+
}
59+
60+
@Override
61+
public void run() {
62+
pool.heartbeat();
63+
}
1064
}
1165
}
Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,67 @@
11
package io.ebean.datasource.pool;
22

3+
import io.ebean.datasource.pool.ConnectionPool.Heartbeat;
4+
35
import java.util.concurrent.ExecutorService;
46
import java.util.concurrent.Executors;
7+
import java.util.concurrent.ThreadFactory;
8+
import java.util.concurrent.atomic.AtomicBoolean;
59

610
final class ExecutorFactory {
711

812
static ExecutorService newExecutor() {
9-
return Executors.newVirtualThreadPerTaskExecutor();
13+
ThreadFactory factory = Thread.ofVirtual().name("datasource.reaper").factory();
14+
return Executors.newThreadPerTaskExecutor(factory);
15+
}
16+
17+
static Heartbeat newHeartBeat(ConnectionPool pool, int freqMillis) {
18+
return new VTHeartbeat(pool, freqMillis).start();
19+
}
20+
21+
private static final class VTHeartbeat implements Heartbeat {
22+
23+
private final AtomicBoolean running = new AtomicBoolean(false);
24+
private final ConnectionPool pool;
25+
private final int freqMillis;
26+
private final Thread thread;
27+
28+
private VTHeartbeat(ConnectionPool pool, int freqMillis) {
29+
this.pool = pool;
30+
this.freqMillis = freqMillis;
31+
this.thread = Thread.ofVirtual()
32+
.name(nm(pool.name()))
33+
.unstarted(this::run);
34+
}
35+
36+
private static String nm(String poolName) {
37+
return poolName.isEmpty() ? "datasource.heartbeat" : "datasource." + poolName + ".heartbeat";
38+
}
39+
40+
private void run() {
41+
while (running.get()) {
42+
try {
43+
Thread.sleep(freqMillis);
44+
pool.heartbeat();
45+
} catch (InterruptedException e) {
46+
Thread.currentThread().interrupt();
47+
break;
48+
} catch (Exception e) {
49+
// continue heartbeat
50+
Log.warn("Error during heartbeat", e);
51+
}
52+
}
53+
}
54+
55+
private Heartbeat start() {
56+
running.set(true);
57+
thread.start();
58+
return this;
59+
}
60+
61+
@Override
62+
public void stop() {
63+
running.set(false);
64+
thread.interrupt();
65+
}
1066
}
1167
}

ebean-datasource/src/test/java/io/ebean/datasource/test/NetworkOutageTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*
1919
* @author Roland Praml, Foconis Analytics GmbH
2020
*/
21+
@Disabled
2122
public class NetworkOutageTest {
2223

2324
static void openPort(int port) throws Exception {

0 commit comments

Comments
 (0)