Skip to content

Commit e484f2f

Browse files
author
Jonathan Knight
committed
Bug 38298007 make RefreshableAddressProvider use an executor for the refresh thread
(merge 14.1.2-0 -> ce/14.1.2-0 118486) [git-p4: depot-paths = "//dev/coherence-ce/release/coherence-ce-v14.1.2.0/": change = 118487]
1 parent 71c48d0 commit e484f2f

File tree

4 files changed

+118
-32
lines changed

4 files changed

+118
-32
lines changed

prj/coherence-core-21/src/main/java/com/tangosol/internal/util/VirtualThreads.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313

1414
import com.tangosol.net.security.SecurityHelper;
1515

16+
import java.util.concurrent.Executor;
17+
import java.util.concurrent.Executors;
18+
import java.util.concurrent.ThreadFactory;
1619
import java.util.function.Function;
1720

1821
/**
@@ -94,6 +97,21 @@ public static boolean isEnabled(String sServiceName)
9497
: Config.getBoolean(PROPERTY_SERVICE_ENABLED.apply(sServiceName), isEnabled());
9598
}
9699

100+
101+
/**
102+
* Returns either a new virtual thread per-task executor on Java 21 or higher
103+
* or a single threaded executor if lower than Java 21.
104+
*
105+
* @param factory the {@link ThreadFactory} to use if not on Java 21
106+
*
107+
* @return either a new virtual thread per-task executor on Java 21 or higher
108+
* or a single threaded executor if lower than Java 21.
109+
*/
110+
public static Executor newMaybeVirtualThreadExecutor(ThreadFactory factory)
111+
{
112+
return Executors.newVirtualThreadPerTaskExecutor();
113+
}
114+
97115
// ---- constants -------------------------------------------------------
98116

99117
/**

prj/coherence-core/src/main/java/com/tangosol/internal/util/VirtualThreads.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2000, 2023, Oracle and/or its affiliates.
2+
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
33
*
44
* Licensed under the Universal Permissive License v 1.0 as shown at
55
* https://oss.oracle.com/licenses/upl.
@@ -9,6 +9,10 @@
99

1010
import com.tangosol.util.Base;
1111

12+
import java.util.concurrent.Executor;
13+
import java.util.concurrent.Executors;
14+
import java.util.concurrent.ThreadFactory;
15+
1216
/**
1317
* Helper class for virtual threads functionality.
1418
* <p>
@@ -68,4 +72,18 @@ public static boolean isEnabled(String serviceName)
6872
{
6973
return false;
7074
}
75+
76+
/**
77+
* Returns either a new virtual thread per-task executor on Java 21 or higher
78+
* or a single threaded executor if lower than Java 21.
79+
*
80+
* @param factory the {@link ThreadFactory} to use if not on Java 21
81+
*
82+
* @return either a new virtual thread per-task executor on Java 21 or higher
83+
* or a single threaded executor if lower than Java 21.
84+
*/
85+
public static Executor newMaybeVirtualThreadExecutor(ThreadFactory factory)
86+
{
87+
return Executors.newSingleThreadExecutor(factory);
88+
}
7189
}

prj/coherence-core/src/main/java/com/tangosol/net/RefreshableAddressProvider.java

Lines changed: 74 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,29 @@
11
/*
2-
* Copyright (c) 2000, 2020, Oracle and/or its affiliates.
2+
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
33
*
44
* Licensed under the Universal Permissive License v 1.0 as shown at
5-
* http://oss.oracle.com/licenses/upl.
5+
* https://oss.oracle.com/licenses/upl.
66
*/
77

88
package com.tangosol.net;
99

1010

1111
import com.tangosol.coherence.config.Config;
1212

13+
import com.tangosol.internal.util.VirtualThreads;
14+
1315
import com.tangosol.util.Base;
14-
import com.tangosol.util.Daemon;
1516

1617
import java.net.InetSocketAddress;
1718

1819
import java.util.ArrayList;
1920
import java.util.Iterator;
2021
import java.util.List;
2122

23+
import java.util.concurrent.Executor;
24+
import java.util.concurrent.locks.Lock;
25+
import java.util.concurrent.locks.ReentrantLock;
26+
2227

2328
/**
2429
* A RefreshableAddressProvider is an AddressProvider implementation
@@ -57,7 +62,9 @@ public RefreshableAddressProvider(AddressProvider ap, long lRefresh)
5762
// populate the initial address list cache
5863
refreshAddressList();
5964
// initialize the refresh thread
60-
f_daemonRefresh = new RefreshThread(lRefresh);
65+
String sName = RefreshableAddressProvider.this.getClass().getName() + ": RefreshThread";
66+
f_refreshTask = new RefreshTask(lRefresh);
67+
f_daemonRefresh = VirtualThreads.newMaybeVirtualThreadExecutor(r -> new Thread(r, sName));
6168
}
6269

6370
/**
@@ -178,54 +185,74 @@ protected void refreshAddressList()
178185
*/
179186
protected void ensureRefreshThread()
180187
{
181-
if (!f_daemonRefresh.isRunning())
188+
if (!f_refreshTask.isRunning())
182189
{
183-
f_daemonRefresh.start();
190+
f_daemonRefresh.execute(f_refreshTask);
184191
}
185192
}
186193

187194

188195
// ----- inner class: RefreshThread -------------------------------------
189196

190-
protected class RefreshThread
191-
extends Daemon
197+
protected class RefreshTask
198+
implements Runnable
192199
{
193200
// ----- constructors -----------------------------------------------
194201

195202
/**
196-
* Construct a new RefreshThread with the specified refresh interval.
203+
* Construct a new RefreshTask with the specified refresh interval.
197204
*
198205
* @param lRefresh the refresh interval
199206
*/
200-
protected RefreshThread(long lRefresh)
207+
protected RefreshTask(long lRefresh)
201208
{
202-
super(RefreshableAddressProvider.this.getClass().getName() +
203-
": RefreshThread");
204209
f_lRefresh = lRefresh;
205210
}
206211

207-
/**
208-
* {@inheritDoc}
209-
*/
212+
public boolean isRunning()
213+
{
214+
return m_fRunning;
215+
}
216+
217+
@Override
210218
public void run()
211219
{
212-
long lRefresh = f_lRefresh;
213-
while (!isStopping())
220+
if (m_fRunning)
214221
{
215-
try
216-
{
217-
refreshAddressList();
218-
stop();
219-
}
220-
catch (Throwable t)
222+
return;
223+
}
224+
225+
f_lock.lock();
226+
try
227+
{
228+
if (!m_fRunning)
221229
{
222-
err("An exception occurred while refreshing an address list: " +
223-
"\n" + getStackTrace(t) +
224-
"\nReducing the refresh rate.");
225-
Base.sleep(lRefresh);
226-
lRefresh = 2 * lRefresh;
230+
m_fRunning = true;
231+
long lRefresh = f_lRefresh;
232+
while (m_fRunning)
233+
{
234+
try
235+
{
236+
refreshAddressList();
237+
m_fRunning = false;
238+
}
239+
catch (Throwable t)
240+
{
241+
err("An exception occurred while refreshing an address list: " +
242+
"\n" + t.getMessage() +
243+
"\n" + getStackTrace(t) +
244+
"\nReducing the refresh rate.");
245+
Base.sleep(lRefresh);
246+
lRefresh = 2 * lRefresh;
247+
}
248+
}
227249
}
228250
}
251+
finally
252+
{
253+
m_fRunning = false;
254+
f_lock.unlock();
255+
}
229256
}
230257

231258
// ----- data members -----------------------------------------------
@@ -234,6 +261,16 @@ public void run()
234261
* The interval with which to attempt to refresh the address list.
235262
*/
236263
protected final long f_lRefresh;
264+
265+
/**
266+
* The flag to indicate whether the task is running.
267+
*/
268+
private volatile boolean m_fRunning = false;
269+
270+
/**
271+
* A lock to control running only a single task.
272+
*/
273+
private final Lock f_lock = new ReentrantLock();
237274
}
238275

239276

@@ -357,7 +394,15 @@ protected void refreshIterator()
357394
/**
358395
* The refresh daemon.
359396
*/
360-
protected final Daemon f_daemonRefresh;
397+
protected final RefreshTask f_refreshTask;
398+
399+
/**
400+
* The refresh executor.
401+
* <p>
402+
* This will be a single threaded executor on Java 17 or a
403+
* virtual thread per-task on Java 21 or higher.
404+
*/
405+
protected final Executor f_daemonRefresh;
361406

362407
/**
363408
* An Iterator over the cached set of addresses.

prj/coherence-core/src/main/java/com/tangosol/util/Daemon.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -751,7 +751,10 @@ protected void configureWorker(DaemonWorker worker)
751751
ThreadGroup curThreadGroup = ensureThreadGroup();
752752
synchronized (curThreadGroup) // ensures that the thread group is not destroyed concurrently
753753
{
754-
ensureThreadGroup();
754+
if (curThreadGroup.isDestroyed())
755+
{
756+
ensureThreadGroup();
757+
}
755758
threadWorker = makeThread(m_threadGroup, worker, null);
756759
}
757760

@@ -782,9 +785,11 @@ protected void configureWorker(DaemonWorker worker)
782785
protected ThreadGroup ensureThreadGroup()
783786
{
784787
ThreadGroup threadGroup = m_threadGroup;
785-
if (threadGroup == null)
788+
if (threadGroup == null || threadGroup.isDestroyed())
786789
{
787790
threadGroup = m_threadGroup = new ThreadGroup(getConfiguredName());
791+
// Make it a daemon so that it is destroyed automatically.
792+
threadGroup.setDaemon(true);
788793
}
789794
return threadGroup;
790795
}

0 commit comments

Comments
 (0)