1616
1717import org .threadly .concurrent .CentralThreadlyPool ;
1818import org .threadly .concurrent .ReschedulingOperation ;
19- import org .threadly .concurrent .SubmitterScheduler ;
19+ import org .threadly .concurrent .SchedulerService ;
2020import org .threadly .db .aurora .DelegateAuroraDriver .IllegalDriverStateException ;
21+ import org .threadly .util .ArgumentVerifier ;
2122
2223/**
2324 * Class which monitors a "cluster" of aurora servers. It is expected that for each given cluster
3233public class AuroraClusterMonitor {
3334 protected static final Logger LOG = Logger .getLogger (AuroraClusterMonitor .class .getSimpleName ());
3435
35- protected static final int CHECK_FREQUENCY_MILLIS = 500 ; // TODO - make configurable
3636 protected static final int MAXIMUM_THREAD_POOL_SIZE = 64 ;
37- protected static final SubmitterScheduler MONITOR_SCHEDULER ;
37+ protected static final SchedulerService MONITOR_SCHEDULER ;
3838 protected static final ConcurrentMap <AuroraServersKey , AuroraClusterMonitor > MONITORS ;
39+ private static volatile long CHECK_FREQUENCY_MILLIS = 500 ;
3940
4041 static {
4142 MONITOR_SCHEDULER = CentralThreadlyPool .threadPool (MAXIMUM_THREAD_POOL_SIZE , "auroraMonitor" );
4243
4344 MONITORS = new ConcurrentHashMap <>();
4445 }
46+
47+ /**
48+ * Sets or updates the delay between individual server status checks. Reducing this from the
49+ * default of 500ms can make failover events be discovered faster. Since this is done on a
50+ * per-client basis, it is recommended not to make this too small or it can significantly impact
51+ * server load when there is a lot of clients. It is worth being aware that server checks will
52+ * be expedited if the driver discovers potential server stability issues anyways.
53+ *
54+ * @param millis The milliseconds between server checks
55+ */
56+ public static void setServerCheckDelayMillis (long millis ) {
57+ ArgumentVerifier .assertGreaterThanZero (millis , "millis" );
58+
59+ synchronized (AuroraClusterMonitor .class ) {
60+ if (CHECK_FREQUENCY_MILLIS != millis ) {
61+ CHECK_FREQUENCY_MILLIS = millis ;
62+
63+ for (AuroraClusterMonitor acm : MONITORS .values ()) {
64+ acm .clusterStateChecker .updateServerCheckDelayMillis (millis );
65+ }
66+ }
67+ }
68+ }
4569
4670 /**
4771 * Return a monitor instance for a given set of servers. This instance will be consistent as
@@ -71,7 +95,7 @@ protected static AuroraClusterMonitor getMonitor(DelegateAuroraDriver driver, Au
7195 protected final ClusterChecker clusterStateChecker ;
7296 private final AtomicLong replicaIndex ; // used to distribute replica reads
7397
74- protected AuroraClusterMonitor (SubmitterScheduler scheduler , long checkIntervalMillis ,
98+ protected AuroraClusterMonitor (SchedulerService scheduler , long checkIntervalMillis ,
7599 DelegateAuroraDriver driver , AuroraServer [] clusterServers ) {
76100 clusterStateChecker = new ClusterChecker (scheduler , checkIntervalMillis , driver , clusterServers );
77101 replicaIndex = new AtomicLong ();
@@ -216,14 +240,14 @@ public boolean equals(Object o) {
216240 * (and thus will witness the final cluster state).
217241 */
218242 protected static class ClusterChecker extends ReschedulingOperation {
219- protected final SubmitterScheduler scheduler ;
243+ protected final SchedulerService scheduler ;
220244 protected final Map <AuroraServer , ServerMonitor > allServers ;
221245 protected final List <AuroraServer > secondaryServers ;
222246 protected final AtomicReference <AuroraServer > masterServer ;
223247 protected final CopyOnWriteArrayList <AuroraServer > serversWaitingExpeditiedCheck ;
224248 private volatile boolean initialized = false ; // starts false to avoid updates while constructor is running
225249
226- protected ClusterChecker (SubmitterScheduler scheduler , long checkIntervalMillis ,
250+ protected ClusterChecker (SchedulerService scheduler , long checkIntervalMillis ,
227251 DelegateAuroraDriver driver , AuroraServer [] clusterServers ) {
228252 super (scheduler , 0 );
229253
@@ -250,12 +274,7 @@ protected ClusterChecker(SubmitterScheduler scheduler, long checkIntervalMillis,
250274 scheduler .execute (monitor );
251275 }
252276
253- scheduler .scheduleAtFixedRate (monitor ,
254- // hopefully well distributed hash code will distribute
255- // these tasks so that they are not all checked at once
256- // we convert to a long to avoid a possible overflow at Integer.MIN_VALUE
257- Math .abs ((long )System .identityHashCode (monitor )) % checkIntervalMillis ,
258- checkIntervalMillis );
277+ scheduleMonitor (monitor , checkIntervalMillis );
259278 }
260279 if (masterServer .get () == null ) {
261280 LOG .warning ("No master server found! Will use read only servers till one becomes master" );
@@ -266,7 +285,7 @@ protected ClusterChecker(SubmitterScheduler scheduler, long checkIntervalMillis,
266285 }
267286
268287 // used in testing
269- protected ClusterChecker (SubmitterScheduler scheduler , long checkIntervalMillis ,
288+ protected ClusterChecker (SchedulerService scheduler ,
270289 Map <AuroraServer , ServerMonitor > clusterServers ) {
271290 super (scheduler , 0 );
272291
@@ -279,6 +298,34 @@ protected ClusterChecker(SubmitterScheduler scheduler, long checkIntervalMillis,
279298 initialized = true ;
280299 }
281300
301+
302+ /**
303+ * Update how often monitors check the individual servers status.
304+ * <p>
305+ * This can NOT be called concurrently,
306+ * {@link AuroraClusterMonitor#setClusterCheckFrequencyMillis(long)} currently guards this.
307+ *
308+ * @param millis The delay between runs for monitoring server status
309+ */
310+ protected void updateServerCheckDelayMillis (long millis ) {
311+ for (ServerMonitor sm : allServers .values ()) {
312+ if (scheduler .remove (sm )) {
313+ scheduleMonitor (sm , millis );
314+ } else {
315+ throw new IllegalStateException ("Could not unschedule monitor: " + sm );
316+ }
317+ }
318+ }
319+
320+ protected void scheduleMonitor (ServerMonitor monitor , long checkIntervalMillis ) {
321+ scheduler .scheduleAtFixedRate (monitor ,
322+ // hopefully well distributed hash code will distribute
323+ // these tasks so that they are not all checked at once
324+ // we convert to a long to avoid a possible overflow at Integer.MIN_VALUE
325+ Math .abs ((long )System .identityHashCode (monitor )) % checkIntervalMillis ,
326+ checkIntervalMillis );
327+ }
328+
282329 protected void expediteServerCheck (ServerMonitor serverMonitor ) {
283330 if (serversWaitingExpeditiedCheck .addIfAbsent (serverMonitor .server )) {
284331 scheduler .execute (() -> {
@@ -374,6 +421,11 @@ protected ServerMonitor(DelegateAuroraDriver driver, AuroraServer server,
374421 masterServer = false ;
375422 }
376423
424+ @ Override
425+ public String toString () {
426+ return (masterServer ? "m:" : "r:" ) + server ;
427+ }
428+
377429 protected void reconnect () throws SQLException {
378430 Connection newConnection =
379431 driver .connect (server .hostAndPortString () +
0 commit comments