diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java index ad06bf052a69..19b7d2b4c5d1 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java @@ -16,14 +16,20 @@ package org.springframework.scheduling.concurrent; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; @@ -34,6 +40,10 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.jspecify.annotations.Nullable; @@ -73,7 +83,7 @@ */ @SuppressWarnings({"serial", "deprecation"}) public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport - implements AsyncTaskExecutor, SchedulingTaskExecutor, TaskScheduler { + implements AsyncTaskExecutor, SchedulingTaskExecutor, TaskScheduler, ThreadPoolTaskSchedulerMonitoringMBean { private static final TimeUnit NANO = TimeUnit.NANOSECONDS; @@ -94,6 +104,409 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport private @Nullable ScheduledExecutorService scheduledExecutor; + private final AtomicReference delayMonitorExecutor = new AtomicReference<>(); + + private boolean enableDelayMonitoring = true; + + private long delayMonitoringInterval = 5000; + + private long delayWarningThreshold = 1000; + + private int maxQueueCheckSize = 100; + + private boolean adaptiveQueueCheckSize = true; + + private long warningRateLimitMs = 30000; + + // Maximum warning rate limit (24 hours) + private static final long MAX_WARNING_RATE_LIMIT_MS = 86400000; + + // Maximum number of timestamps to keep in sliding window (prevents memory leak) + private static final int MAX_WARNING_TIMESTAMPS = 1000; + + // Sliding window for rate limiting (stores timestamps of recent warnings) + private final Queue warningTimestamps = new LinkedList<>(); + + private final Object warningTimestampsLock = new Object(); + + // CPU monitoring constants + private static final double CPU_USAGE_WARNING_THRESHOLD = 5.0; // 5% CPU + private static final long CPU_CHECK_INTERVAL_MS = 30000; // 30 seconds + + // Deprecated - kept for compatibility + @Deprecated(since = "6.2") + private final AtomicLong lastWarningTime = new AtomicLong(0); + + private final AtomicInteger delayedTaskWarningCount = new AtomicInteger(); + + // Monitoring metrics + private final AtomicLong maxDelayMillis = new AtomicLong(0); + + private final AtomicInteger poolExhaustionCount = new AtomicInteger(0); + + // Circuit breaker for graceful degradation + private final AtomicReference circuitBreakerState = + new AtomicReference<>(CircuitBreakerState.CLOSED); + + private final AtomicInteger monitoringErrorCount = new AtomicInteger(0); + + private static final int CIRCUIT_BREAKER_THRESHOLD = 5; + + private static final long CIRCUIT_BREAKER_RESET_MS = 60000; + + private static final int HALF_OPEN_MAX_CALLS = 3; // Test 3 calls before closing + + private final AtomicInteger halfOpenSuccessCount = new AtomicInteger(0); + + private final AtomicLong circuitBreakerOpenTime = new AtomicLong(0); + + // Deprecated - kept for compatibility + @Deprecated(since = "6.2") + private final AtomicBoolean circuitBreakerOpen = new AtomicBoolean(false); + + // CPU monitoring for monitoring thread + private volatile long monitorThreadId = -1; + + private final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + + private volatile long lastMonitorCpuTime = 0; + + private volatile long lastCpuCheckTime = 0; + + // Logging configuration + private volatile WarningLogLevel warningLogLevel = WarningLogLevel.WARN; + + private volatile boolean structuredLoggingEnabled = false; + + + /** + * Enable or disable task delay monitoring. + *

When enabled (default), a separate monitoring thread will periodically check + * if scheduled tasks are unable to execute due to thread pool exhaustion and log warnings. + *

This helps diagnose situations where the pool size is insufficient for the workload. + * @param enableDelayMonitoring whether to enable delay monitoring + * @since 6.2 + * @see #setDelayMonitoringInterval + * @see #setDelayWarningThreshold + */ + public void setEnableDelayMonitoring(boolean enableDelayMonitoring) { + this.enableDelayMonitoring = enableDelayMonitoring; + } + + /** + * Return whether delay monitoring is currently enabled. + *

Part of {@link ThreadPoolTaskSchedulerMonitoringMBean} interface. + * @return true if monitoring is enabled, false otherwise + * @since 6.2 + */ + @Override + public boolean isDelayMonitoringEnabled() { + return this.enableDelayMonitoring; + } + + /** + * Enable or disable delay monitoring at runtime. + *

If the scheduler is already initialized, this will start or stop + * the monitoring thread dynamically. + *

Part of {@link ThreadPoolTaskSchedulerMonitoringMBean} interface. + * @param enabled whether to enable delay monitoring + * @since 6.2 + */ + @Override + public void setDelayMonitoringEnabled(boolean enabled) { + boolean wasEnabled = this.enableDelayMonitoring; + this.enableDelayMonitoring = enabled; + + // If scheduler is initialized, start/stop monitoring dynamically + if (this.scheduledExecutor != null && wasEnabled != enabled) { + if (enabled) { + if (logger.isInfoEnabled()) { + logger.info("Enabling delay monitoring at runtime"); + } + startDelayMonitor(this.scheduledExecutor); + } + else { + if (logger.isInfoEnabled()) { + logger.info("Disabling delay monitoring at runtime"); + } + stopDelayMonitor(); + } + } + } + + /** + * Return the current monitoring interval in milliseconds. + *

Part of {@link ThreadPoolTaskSchedulerMonitoringMBean} interface. + * @return the monitoring interval + * @since 6.2 + */ + @Override + public long getDelayMonitoringInterval() { + return this.delayMonitoringInterval; + } + + /** + * Set the interval for checking delayed tasks (in milliseconds). + *

Default is 5000ms (5 seconds). + *

If monitoring is active and the scheduler is initialized, + * this will restart the monitoring thread with the new interval. + * @param delayMonitoringInterval the monitoring interval in milliseconds + * @since 6.2 + */ + public void setDelayMonitoringInterval(long delayMonitoringInterval) { + Assert.isTrue(delayMonitoringInterval > 0, "delayMonitoringInterval must be positive"); + long oldInterval = this.delayMonitoringInterval; + this.delayMonitoringInterval = delayMonitoringInterval; + + // If monitoring is active and interval changed, restart monitoring with new interval + if (this.scheduledExecutor != null && this.enableDelayMonitoring && oldInterval != delayMonitoringInterval) { + if (logger.isDebugEnabled()) { + logger.debug("Restarting delay monitoring with new interval: " + delayMonitoringInterval + "ms"); + } + stopDelayMonitor(); + startDelayMonitor(this.scheduledExecutor); + } + } + + /** + * Return the current delay warning threshold in milliseconds. + *

Part of {@link ThreadPoolTaskSchedulerMonitoringMBean} interface. + * @return the warning threshold + * @since 6.2 + */ + @Override + public long getDelayWarningThreshold() { + return this.delayWarningThreshold; + } + + /** + * Set the threshold for logging warnings about delayed tasks (in milliseconds). + *

Tasks that are delayed by more than this threshold will trigger a warning. + *

Default is 1000ms (1 second). + * @param delayWarningThreshold the warning threshold in milliseconds + * @since 6.2 + */ + public void setDelayWarningThreshold(long delayWarningThreshold) { + Assert.isTrue(delayWarningThreshold > 0, "delayWarningThreshold must be positive"); + this.delayWarningThreshold = delayWarningThreshold; + } + + /** + * Reset the rate limit timer for delay warnings. + *

This is primarily useful for testing to allow immediate warnings without waiting + * for the rate limit period to expire. + *

Clears both the deprecated lastWarningTime field and the sliding window queue. + * @since 6.2 + */ + void resetWarningRateLimit() { + this.lastWarningTime.set(0); + synchronized (this.warningTimestampsLock) { + this.warningTimestamps.clear(); + } + } + + /** + * Set the maximum queue size to check in detail before skipping to summary logging. + *

This is primarily useful for testing. + * @param maxQueueCheckSize the maximum queue size to check in detail + * @since 6.2 + */ + void setMaxQueueCheckSize(int maxQueueCheckSize) { + Assert.isTrue(maxQueueCheckSize > 0, "maxQueueCheckSize must be positive"); + this.maxQueueCheckSize = maxQueueCheckSize; + } + + /** + * Set the warning rate limit in milliseconds. + *

This is primarily useful for testing. + *

Maximum value is 24 hours (86400000ms) to prevent unbounded memory growth in sliding window. + * @param warningRateLimitMs the rate limit in milliseconds + * @since 6.2 + */ + void setWarningRateLimitMs(long warningRateLimitMs) { + Assert.isTrue(warningRateLimitMs >= 0 && warningRateLimitMs <= MAX_WARNING_RATE_LIMIT_MS, + "warningRateLimitMs must be between 0 and " + MAX_WARNING_RATE_LIMIT_MS + " (24 hours)"); + this.warningRateLimitMs = warningRateLimitMs; + } + + /** + * Enable or disable adaptive queue check size. + *

When enabled, the max queue check size adjusts based on queue size and performance. + *

Default is true. + * @param enabled whether to enable adaptive queue check size + * @since 6.2 + */ + public void setAdaptiveQueueCheckSize(boolean enabled) { + this.adaptiveQueueCheckSize = enabled; + } + + /** + * Return whether adaptive queue check size is enabled. + * @return true if enabled, false otherwise + * @since 6.2 + */ + public boolean isAdaptiveQueueCheckSize() { + return this.adaptiveQueueCheckSize; + } + + /** + * Return the current warning log level. + *

Part of {@link ThreadPoolTaskSchedulerMonitoringMBean} interface. + * @return the log level (DEBUG, INFO, WARN, or ERROR) + * @since 6.2 + */ + @Override + public String getWarningLogLevel() { + return this.warningLogLevel.name(); + } + + /** + * Set the log level for delay warnings. + *

Default is WARN. + * @param level the log level (DEBUG, INFO, WARN, or ERROR) + * @since 6.2 + */ + public void setWarningLogLevel(WarningLogLevel level) { + Assert.notNull(level, "WarningLogLevel must not be null"); + this.warningLogLevel = level; + } + + /** + * Set the log level for delay warnings from a string. + *

Part of {@link ThreadPoolTaskSchedulerMonitoringMBean} interface. + * @param level the log level string (DEBUG, INFO, WARN, or ERROR) + * @since 6.2 + */ + @Override + public void setWarningLogLevel(String level) { + Assert.hasText(level, "Log level must not be empty"); + try { + this.warningLogLevel = WarningLogLevel.valueOf(level.toUpperCase(java.util.Locale.ROOT)); + } + catch (IllegalArgumentException ex) { + throw new IllegalArgumentException("Invalid log level: " + level + + ". Valid values are: DEBUG, INFO, WARN, ERROR"); + } + } + + /** + * Return whether structured logging is enabled. + *

Part of {@link ThreadPoolTaskSchedulerMonitoringMBean} interface. + * @return true if structured logging is enabled, false otherwise + * @since 6.2 + */ + @Override + public boolean isStructuredLoggingEnabled() { + return this.structuredLoggingEnabled; + } + + /** + * Enable or disable structured logging (JSON format). + *

When enabled, warnings will be logged in JSON format for better indexing + * in centralized logging systems (ELK, Splunk, etc.). + *

Default is false. + * @param enabled whether to enable structured logging + * @since 6.2 + */ + public void setStructuredLoggingEnabled(boolean enabled) { + this.structuredLoggingEnabled = enabled; + } + + /** + * Return the maximum delay observed for any task (in milliseconds). + *

This metric can be used for monitoring and alerting. + * @return the maximum delay in milliseconds + * @since 6.2 + */ + public long getMaxDelayMillis() { + return this.maxDelayMillis.get(); + } + + /** + * Return the number of times pool exhaustion has been detected. + *

This metric can be used for monitoring and alerting. + * @return the count of pool exhaustion events + * @since 6.2 + */ + public int getPoolExhaustionCount() { + return this.poolExhaustionCount.get(); + } + + /** + * Return the current queue size of the scheduler. + *

This metric can be used for monitoring. + * @return the current queue size + * @since 6.2 + */ + public int getQueueSize() { + if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor executor) { + return executor.getQueue().size(); + } + return 0; + } + + /** + * Return whether the circuit breaker is currently open. + *

Part of {@link ThreadPoolTaskSchedulerMonitoringMBean} interface. + * @return true if circuit breaker is open or half-open, false if closed + * @since 6.2 + */ + @Override + public boolean isCircuitBreakerOpen() { + CircuitBreakerState state = this.circuitBreakerState.get(); + // Update deprecated field for backwards compatibility + this.circuitBreakerOpen.set(state != CircuitBreakerState.CLOSED); + return state != CircuitBreakerState.CLOSED; + } + + /** + * Return the current circuit breaker state as a string. + * @return "CLOSED", "OPEN", or "HALF_OPEN" + * @since 6.2 + */ + public String getCircuitBreakerState() { + CircuitBreakerState state = this.circuitBreakerState.get(); + return (state != null ? state.name() : CircuitBreakerState.CLOSED.name()); + } + + /** + * Reset all monitoring metrics. + *

This is useful for testing or when starting a new monitoring period. + *

Part of {@link ThreadPoolTaskSchedulerMonitoringMBean} interface. + * @since 6.2 + */ + @Override + public void resetMonitoringMetrics() { + this.maxDelayMillis.set(0); + this.poolExhaustionCount.set(0); + this.delayedTaskWarningCount.set(0); + this.monitoringErrorCount.set(0); + this.circuitBreakerState.set(CircuitBreakerState.CLOSED); + this.circuitBreakerOpen.set(false); // Deprecated field + this.circuitBreakerOpenTime.set(0); + this.halfOpenSuccessCount.set(0); + if (logger.isDebugEnabled()) { + logger.debug("Monitoring metrics reset"); + } + } + + /** + * Reset the circuit breaker state. + *

This allows monitoring to resume immediately without waiting for the cool-down period. + *

Part of {@link ThreadPoolTaskSchedulerMonitoringMBean} interface. + * @since 6.2 + */ + @Override + public void resetCircuitBreaker() { + this.circuitBreakerState.set(CircuitBreakerState.CLOSED); + this.circuitBreakerOpen.set(false); // Deprecated field + this.circuitBreakerOpenTime.set(0); + this.monitoringErrorCount.set(0); + this.halfOpenSuccessCount.set(0); + if (logger.isInfoEnabled()) { + logger.info("Circuit breaker manually reset - monitoring resumed"); + } + } /** * Set the ScheduledExecutorService's pool size. @@ -208,6 +621,10 @@ protected ExecutorService initializeExecutor( } } + if (this.enableDelayMonitoring) { + startDelayMonitor(this.scheduledExecutor); + } + return this.scheduledExecutor; } @@ -426,6 +843,502 @@ public ScheduledFuture scheduleWithFixedDelay(Runnable task, Duration delay) } } + /** + * Start the delay monitoring thread that periodically checks for tasks + * that are delayed due to thread pool exhaustion. + * @param executor the scheduled executor to monitor + * @since 6.2 + */ + private void startDelayMonitor(ScheduledExecutorService executor) { + if (!(executor instanceof ScheduledThreadPoolExecutor)) { + if (logger.isDebugEnabled()) { + logger.debug("Delay monitoring is only supported for ScheduledThreadPoolExecutor"); + } + return; + } + + // Stop existing monitor if running + stopDelayMonitor(); + + ScheduledExecutorService newMonitor = Executors.newSingleThreadScheduledExecutor(runnable -> { + String prefix = getThreadNamePrefix(); + String threadName = (prefix != null ? prefix : "") + "delay-monitor"; + Thread thread = new Thread(runnable, threadName); + thread.setDaemon(true); + this.monitorThreadId = thread.getId(); + return thread; + }); + + newMonitor.scheduleAtFixedRate( + () -> checkForDelayedTasks((ScheduledThreadPoolExecutor) executor), + this.delayMonitoringInterval, + this.delayMonitoringInterval, + TimeUnit.MILLISECONDS); + + this.delayMonitorExecutor.set(newMonitor); + + if (logger.isDebugEnabled()) { + logger.debug("Started delay monitoring thread with interval: " + this.delayMonitoringInterval + "ms"); + } + } + + /** + * Stop the delay monitoring thread. + * Lock-free thread-safe method to prevent concurrent shutdown attempts. + * Uses AtomicReference.getAndSet() for atomic shutdown without locks. + * @since 6.2 + */ + private void stopDelayMonitor() { + ScheduledExecutorService executor = this.delayMonitorExecutor.getAndSet(null); + if (executor != null) { + if (logger.isDebugEnabled()) { + logger.debug("Stopping delay monitoring thread"); + } + executor.shutdownNow(); + this.monitorThreadId = -1; + } + } + + /** + * Check the task queue for tasks whose scheduled execution time has passed + * but have not yet started executing due to thread pool exhaustion. + * @param executor the scheduled thread pool executor to monitor + * @since 6.2 + */ + private void checkForDelayedTasks(ScheduledThreadPoolExecutor executor) { + try { + // Circuit breaker check + if (!checkCircuitBreaker()) { + return; + } + + // Monitor CPU usage of monitoring thread itself + monitorCpuUsage(); + + BlockingQueue queue = executor.getQueue(); + if (queue.isEmpty()) { + return; + } + + PoolState poolState = capturePoolState(executor); + + if (!poolState.poolExhausted()) { + // No exhaustion, no need to check + return; + } + + // Rate limiting check + if (!shouldLogWarning()) { + return; + } + + // For large queues, skip detailed iteration and warn immediately + if (poolState.queueSize() > this.maxQueueCheckSize) { + handleLargeQueue(poolState); + return; + } + + // Analyze delayed tasks + DelayAnalysis analysis = analyzeDelayedTasks(queue); + + // Update metrics and log if needed + if (analysis.delayedCount() > 0) { + recordDelayMetrics(analysis.maxDelay()); + this.poolExhaustionCount.incrementAndGet(); + logWarning(buildDelayedTasksMessage(analysis.delayedCount(), analysis.maxDelay(), + poolState.poolSize(), poolState.activeCount(), poolState.queueSize())); + this.delayedTaskWarningCount.addAndGet(analysis.delayedCount()); + } + + // Record successful monitoring execution for circuit breaker state machine + recordMonitoringSuccess(); + } + catch (Exception ex) { + handleMonitoringError(ex); + } + } + + /** + * Check circuit breaker status and potentially transition to HALF_OPEN or CLOSED state. + * Uses atomic CAS operations to prevent race conditions in state transitions. + * @return true if monitoring should continue, false if circuit breaker is OPEN + */ + private boolean checkCircuitBreaker() { + CircuitBreakerState currentState = this.circuitBreakerState.get(); + if (currentState == null) { + return true; // Defensive: treat null as CLOSED + } + + switch (currentState) { + case CLOSED: + // Normal operation + return true; + + case OPEN: + // Check if cool-down period has passed + long now = System.currentTimeMillis(); + long openTime = this.circuitBreakerOpenTime.get(); + if (now - openTime >= CIRCUIT_BREAKER_RESET_MS) { + // Atomic transition to HALF_OPEN state using CAS + // Only ONE thread will successfully transition from OPEN to HALF_OPEN + if (this.circuitBreakerState.compareAndSet(CircuitBreakerState.OPEN, CircuitBreakerState.HALF_OPEN)) { + this.halfOpenSuccessCount.set(0); + if (logger.isInfoEnabled()) { + logger.info("Circuit breaker transitioning to HALF_OPEN - testing if errors are resolved"); + } + } + return true; // Allow monitoring in HALF_OPEN state + } + return false; // Still in cool-down period + + case HALF_OPEN: + // Allow monitoring but track success/failure to decide next state + return true; + + default: + return false; + } + } + + /** + * Capture current pool state. + */ + private PoolState capturePoolState(ScheduledThreadPoolExecutor executor) { + int poolSize = executor.getPoolSize(); + int activeCount = executor.getActiveCount(); + int queueSize = executor.getQueue().size(); + boolean poolExhausted = (activeCount >= poolSize); + return new PoolState(poolSize, activeCount, queueSize, poolExhausted); + } + + /** + * Check rate limiting using a sliding window - should we log a warning now? + * Uses sliding window approach: allows one warning per warningRateLimitMs window. + * More accurate than fixed window as it prevents burst warnings at window boundaries. + * Implements bounded queue to prevent memory leak. + * @return true if warning should be logged, false if rate limited + */ + private boolean shouldLogWarning() { + long now = System.currentTimeMillis(); + + synchronized (this.warningTimestampsLock) { + // Remove timestamps outside the sliding window + long windowStart = now - this.warningRateLimitMs; + while (!this.warningTimestamps.isEmpty() && this.warningTimestamps.peek() < windowStart) { + this.warningTimestamps.poll(); + } + + // Check if we're within rate limit + if (!this.warningTimestamps.isEmpty()) { + // We have recent warnings, check if we should allow this one + // Allow max 1 warning per window + return false; + } + + // Bounded queue check: prevent memory leak + // If we've reached the maximum, remove oldest entry + if (this.warningTimestamps.size() >= MAX_WARNING_TIMESTAMPS) { + this.warningTimestamps.poll(); + } + + // No recent warnings, allow this one and record timestamp + this.warningTimestamps.offer(now); + + // Also update old lastWarningTime for backwards compatibility + this.lastWarningTime.set(now); + + return true; + } + } + + /** + * Handle large queue scenario (skip detailed iteration). + */ + private void handleLargeQueue(PoolState poolState) { + this.poolExhaustionCount.incrementAndGet(); + logWarning(buildLargeQueueMessage(poolState.queueSize(), poolState.poolSize(), poolState.activeCount())); + this.delayedTaskWarningCount.incrementAndGet(); + } + + /** + * Calculate adaptive max queue check size based on current queue size. + * Scales between 10 (for small queues) and maxQueueCheckSize (for large queues). + * @param queueSize current queue size + * @return adaptive limit + */ + private int calculateAdaptiveLimit(int queueSize) { + if (!this.adaptiveQueueCheckSize) { + return this.maxQueueCheckSize; + } + + // Scale adaptively: + // - Queue <= 20: check 10 tasks (low overhead) + // - Queue <= 50: check 25 tasks + // - Queue <= 100: check 50 tasks + // - Queue > 100: check maxQueueCheckSize tasks + if (queueSize <= 20) { + return Math.min(10, this.maxQueueCheckSize); + } + else if (queueSize <= 50) { + return Math.min(25, this.maxQueueCheckSize); + } + else if (queueSize <= 100) { + return Math.min(50, this.maxQueueCheckSize); + } + else { + return this.maxQueueCheckSize; + } + } + + /** + * Analyze queue to find delayed tasks. + * @return delay analysis with count and max delay + */ + private DelayAnalysis analyzeDelayedTasks(BlockingQueue queue) { + int delayedCount = 0; + long maxDelay = 0; + int checked = 0; + int adaptiveLimit = calculateAdaptiveLimit(queue.size()); + + for (Runnable runnable : queue) { + // Safety limit: don't check more than adaptive limit + if (++checked > adaptiveLimit) { + break; + } + + if (runnable instanceof RunnableScheduledFuture future) { + // getDelay() returns time until scheduled execution: + // POSITIVE = task scheduled in the future (not yet time) + // ZERO = task should execute now + // NEGATIVE = task is OVERDUE (missed its scheduled time) + long delayMs = future.getDelay(TimeUnit.MILLISECONDS); + + // Task is overdue by more than threshold AND pool is exhausted = thread starvation + // Example: delayMs = -3000 means task was supposed to run 3 seconds ago + if (delayMs < -this.delayWarningThreshold) { + delayedCount++; + long delayedBy = Math.abs(delayMs); // Convert to positive for reporting + maxDelay = Math.max(maxDelay, delayedBy); + } + } + } + + return new DelayAnalysis(delayedCount, maxDelay); + } + + /** + * Record delay metrics in a thread-safe manner. + */ + private void recordDelayMetrics(long maxDelay) { + if (maxDelay > 0) { + this.maxDelayMillis.getAndUpdate(current -> Math.max(current, maxDelay)); + } + } + + /** + * Handle monitoring error and potentially transition circuit breaker state. + * Uses atomic CAS operations to prevent race conditions in state transitions. + */ + private void handleMonitoringError(Exception ex) { + CircuitBreakerState currentState = this.circuitBreakerState.get(); + + if (currentState == CircuitBreakerState.HALF_OPEN) { + // Error in HALF_OPEN state - atomically transition back to OPEN + // CAS ensures only one thread transitions the state + if (this.circuitBreakerState.compareAndSet(CircuitBreakerState.HALF_OPEN, CircuitBreakerState.OPEN)) { + this.circuitBreakerOpenTime.set(System.currentTimeMillis()); + this.circuitBreakerOpen.set(true); // Deprecated field + if (logger.isWarnEnabled()) { + logger.warn("Circuit breaker reopened after error in HALF_OPEN state - " + + "monitoring suspended for " + (CIRCUIT_BREAKER_RESET_MS / 1000) + " seconds", ex); + } + } + return; + } + + // CLOSED state - increment error count + int errorCount = this.monitoringErrorCount.incrementAndGet(); + if (errorCount >= CIRCUIT_BREAKER_THRESHOLD) { + // Atomic transition from CLOSED to OPEN using CAS + if (this.circuitBreakerState.compareAndSet(CircuitBreakerState.CLOSED, CircuitBreakerState.OPEN)) { + this.circuitBreakerOpen.set(true); // Deprecated field + this.circuitBreakerOpenTime.set(System.currentTimeMillis()); + if (logger.isWarnEnabled()) { + logger.warn("Circuit breaker opened after " + errorCount + + " consecutive errors - monitoring suspended for " + (CIRCUIT_BREAKER_RESET_MS / 1000) + " seconds", ex); + } + } + } + else if (logger.isDebugEnabled()) { + logger.debug("Error during delay monitoring (error count: " + errorCount + ")", ex); + } + } + + /** + * Record successful monitoring execution in HALF_OPEN state. + * After HALF_OPEN_MAX_CALLS successful executions, transition to CLOSED. + * Uses atomic CAS operations to prevent race conditions in state transitions. + */ + private void recordMonitoringSuccess() { + CircuitBreakerState currentState = this.circuitBreakerState.get(); + if (currentState == CircuitBreakerState.HALF_OPEN) { + int successCount = this.halfOpenSuccessCount.incrementAndGet(); + if (successCount >= HALF_OPEN_MAX_CALLS) { + // Enough successful calls - atomically transition to CLOSED + // CAS ensures only one thread transitions the state + if (this.circuitBreakerState.compareAndSet(CircuitBreakerState.HALF_OPEN, CircuitBreakerState.CLOSED)) { + this.circuitBreakerOpen.set(false); // Deprecated field + this.monitoringErrorCount.set(0); + if (logger.isInfoEnabled()) { + logger.info("Circuit breaker closed after " + successCount + " successful monitoring executions"); + } + } + } + } + } + + /** + * Monitor CPU usage of the monitoring thread itself. + * Logs a warning if CPU usage is excessive. + */ + private void monitorCpuUsage() { + if (!this.threadMXBean.isThreadCpuTimeSupported() || this.monitorThreadId < 0) { + return; + } + + long now = System.currentTimeMillis(); + // Check CPU usage periodically + if (now - this.lastCpuCheckTime < CPU_CHECK_INTERVAL_MS) { + return; + } + + long currentCpuTime = this.threadMXBean.getThreadCpuTime(this.monitorThreadId); + if (this.lastMonitorCpuTime > 0) { + long cpuTimeDelta = currentCpuTime - this.lastMonitorCpuTime; + long wallTimeDelta = (now - this.lastCpuCheckTime) * 1_000_000; // Convert to nanos + + // Calculate CPU percentage + double cpuPercent = (cpuTimeDelta * 100.0) / wallTimeDelta; + + // Warn if monitoring thread is using excessive CPU + if (cpuPercent > CPU_USAGE_WARNING_THRESHOLD && logger.isWarnEnabled()) { + logger.warn(String.format("Delay monitoring thread CPU usage is high (%.2f%%) - " + + "consider increasing delayMonitoringInterval or disabling monitoring", cpuPercent)); + } + } + + this.lastMonitorCpuTime = currentCpuTime; + this.lastCpuCheckTime = now; + } + + /** + * Escape string for JSON (minimal escaping for performance). + * Escapes quotes, backslashes, and control characters. + */ + private String escapeJson(String value) { + if (value == null) { + return ""; + } + return value.replace("\\", "\\\\") + .replace("\"", "\\\"") + .replace("\n", "\\n") + .replace("\r", "\\r") + .replace("\t", "\\t"); + } + + /** + * Build warning message for large queue scenario. + */ + private String buildLargeQueueMessage(int queueSize, int poolSize, int activeCount) { + if (this.structuredLoggingEnabled) { + String message = escapeJson("Thread pool exhaustion detected with large queue size"); + return String.format("{\"event\":\"pool_exhaustion\",\"severity\":\"high\"," + + "\"queue_size\":%d,\"pool_size\":%d,\"active_threads\":%d," + + "\"message\":\"%s\"}", + queueSize, poolSize, activeCount, message); + } + + return String.format("Thread pool exhaustion detected with large queue size (%d). " + + "Pool size: %d, Active threads: %d. " + + "Consider significantly increasing the pool size via " + + "ThreadPoolTaskScheduler.setPoolSize() or spring.task.scheduling.pool.size property, " + + "or enable virtual threads via ThreadPoolTaskScheduler.setVirtualThreads(true).", + queueSize, poolSize, activeCount); + } + + /** + * Build warning message for delayed tasks. + */ + private String buildDelayedTasksMessage(int delayedCount, long maxDelay, int poolSize, int activeCount, int queueSize) { + if (this.structuredLoggingEnabled) { + String message = escapeJson("Scheduled tasks delayed due to thread pool exhaustion"); + return String.format("{\"event\":\"delayed_tasks\",\"severity\":\"medium\"," + + "\"delayed_count\":%d,\"max_delay_ms\":%d,\"pool_size\":%d," + + "\"active_threads\":%d,\"queue_size\":%d," + + "\"message\":\"%s\"}", + delayedCount, maxDelay, poolSize, activeCount, queueSize, message); + } + + String baseMessage = String.format("%d scheduled task%s %s delayed (max delay: %dms) due to thread pool exhaustion. " + + "Pool size: %d, Active threads: %d, Queue size: %d. " + + "Consider increasing the pool size via ThreadPoolTaskScheduler.setPoolSize() " + + "or spring.task.scheduling.pool.size property, or enable virtual threads " + + "via ThreadPoolTaskScheduler.setVirtualThreads(true).", + delayedCount, delayedCount == 1 ? "" : "s", delayedCount == 1 ? "is" : "are", + maxDelay, poolSize, activeCount, queueSize); + + // Add extra hint if pool size is 1 (default) + if (poolSize == 1) { + return baseMessage + " Note: Pool size is 1 (default), which is often insufficient for multiple scheduled tasks."; + } + + return baseMessage; + } + + /** + * Log a warning message at the configured log level. + */ + private void logWarning(String message) { + switch (this.warningLogLevel) { + case DEBUG -> { + if (logger.isDebugEnabled()) { + logger.debug(message); + } + } + case INFO -> { + if (logger.isInfoEnabled()) { + logger.info(message); + } + } + case WARN -> { + if (logger.isWarnEnabled()) { + logger.warn(message); + } + } + case ERROR -> { + if (logger.isErrorEnabled()) { + logger.error(message); + } + } + } + } + + @Override + public void shutdown() { + stopDelayMonitor(); + super.shutdown(); + } + + /** + * Return the total number of delayed task warnings that have been logged. + *

This can be used for monitoring and alerting purposes. + * @return the count of delayed task warnings + * @since 6.2 + */ + public int getDelayedTaskWarningCount() { + return this.delayedTaskWarningCount.get(); + } + private RunnableScheduledFuture decorateTaskIfNecessary(RunnableScheduledFuture future) { return (this.taskDecorator != null ? new DelegatingRunnableScheduledFuture<>(future, this.taskDecorator) : @@ -437,6 +1350,41 @@ private Runnable errorHandlingTask(Runnable task, boolean isRepeatingTask) { } + /** + * Circuit breaker states. + */ + private enum CircuitBreakerState { + CLOSED, // Normal operation + OPEN, // Monitoring disabled due to errors + HALF_OPEN // Testing if errors are resolved + } + + /** + * Warning log levels for delay monitoring messages. + * @since 6.2 + */ + public enum WarningLogLevel { + /** Debug level logging. */ + DEBUG, + /** Info level logging. */ + INFO, + /** Warn level logging. */ + WARN, + /** Error level logging. */ + ERROR + } + + /** + * Pool state information record. + */ + private record PoolState(int poolSize, int activeCount, int queueSize, boolean poolExhausted) {} + + /** + * Delay analysis result record. + */ + private record DelayAnalysis(int delayedCount, long maxDelay) {} + + private static class DelegatingRunnableScheduledFuture implements RunnableScheduledFuture { private final RunnableScheduledFuture future; @@ -444,8 +1392,12 @@ private static class DelegatingRunnableScheduledFuture implements RunnableSch private final Runnable decoratedRunnable; public DelegatingRunnableScheduledFuture(RunnableScheduledFuture future, TaskDecorator taskDecorator) { + Assert.notNull(future, "Future must not be null"); + Assert.notNull(taskDecorator, "TaskDecorator must not be null"); this.future = future; - this.decoratedRunnable = taskDecorator.decorate(this.future); + Runnable decorated = taskDecorator.decorate(this.future); + // Fall back to original future if decorator returns null + this.decoratedRunnable = (decorated != null ? decorated : this.future); } @Override diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskSchedulerMonitoringMBean.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskSchedulerMonitoringMBean.java new file mode 100644 index 000000000000..4da9af813268 --- /dev/null +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskSchedulerMonitoringMBean.java @@ -0,0 +1,143 @@ +/* + * Copyright 2002-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.scheduling.concurrent; + +/** + * JMX MBean interface for monitoring and managing {@link ThreadPoolTaskScheduler} + * delay monitoring at runtime. + * + *

This interface exposes operational metrics and allows dynamic configuration + * of monitoring settings without requiring application restart. + * + * @author Spring Framework Contributors + * @since 6.2 + * @see ThreadPoolTaskScheduler + */ +public interface ThreadPoolTaskSchedulerMonitoringMBean { + + /** + * Return whether delay monitoring is currently enabled. + * @return true if monitoring is enabled, false otherwise + */ + boolean isDelayMonitoringEnabled(); + + /** + * Enable or disable delay monitoring at runtime. + * @param enabled whether to enable delay monitoring + */ + void setDelayMonitoringEnabled(boolean enabled); + + /** + * Return the current monitoring interval in milliseconds. + * @return the monitoring interval + */ + long getDelayMonitoringInterval(); + + /** + * Set the monitoring interval at runtime. + * @param interval the new interval in milliseconds (must be positive) + */ + void setDelayMonitoringInterval(long interval); + + /** + * Return the current delay warning threshold in milliseconds. + * @return the warning threshold + */ + long getDelayWarningThreshold(); + + /** + * Set the delay warning threshold at runtime. + * @param threshold the new threshold in milliseconds (must be positive) + */ + void setDelayWarningThreshold(long threshold); + + /** + * Return the maximum delay observed for any task (in milliseconds). + * @return the maximum delay + */ + long getMaxDelayMillis(); + + /** + * Return the number of times pool exhaustion has been detected. + * @return the pool exhaustion count + */ + int getPoolExhaustionCount(); + + /** + * Return the total number of delayed task warnings that have been logged. + * @return the delayed task warning count + */ + int getDelayedTaskWarningCount(); + + /** + * Return the current queue size of the scheduler. + * @return the queue size + */ + int getQueueSize(); + + /** + * Return the current pool size. + * @return the pool size + */ + int getPoolSize(); + + /** + * Return the number of currently active threads. + * @return the active thread count + */ + int getActiveCount(); + + /** + * Return whether the circuit breaker is currently open. + * @return true if circuit breaker is open, false otherwise + */ + boolean isCircuitBreakerOpen(); + + /** + * Return the current warning log level. + * @return the log level (DEBUG, INFO, WARN, or ERROR) + */ + String getWarningLogLevel(); + + /** + * Set the warning log level at runtime. + * @param level the log level (DEBUG, INFO, WARN, or ERROR) + */ + void setWarningLogLevel(String level); + + /** + * Return whether structured logging is enabled. + * @return true if structured logging is enabled, false otherwise + */ + boolean isStructuredLoggingEnabled(); + + /** + * Enable or disable structured logging at runtime. + * @param enabled whether to enable structured logging + */ + void setStructuredLoggingEnabled(boolean enabled); + + /** + * Reset all monitoring metrics. + */ + void resetMonitoringMetrics(); + + /** + * Reset the circuit breaker state. + */ + void resetCircuitBreaker(); +} diff --git a/spring-context/src/test/java/org/springframework/scheduling/concurrent/ThreadPoolTaskSchedulerDelayMonitoringTests.java b/spring-context/src/test/java/org/springframework/scheduling/concurrent/ThreadPoolTaskSchedulerDelayMonitoringTests.java new file mode 100644 index 000000000000..e63152272ce2 --- /dev/null +++ b/spring-context/src/test/java/org/springframework/scheduling/concurrent/ThreadPoolTaskSchedulerDelayMonitoringTests.java @@ -0,0 +1,999 @@ +/* + * Copyright 2002-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.scheduling.concurrent; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class ThreadPoolTaskSchedulerDelayMonitoringTests { + + private ThreadPoolTaskScheduler scheduler; + + + @AfterEach + void tearDown() { + if (scheduler != null) { + scheduler.shutdown(); + } + } + + + @Test + void delayMonitoringDetectsThreadStarvation() throws Exception { + // Create scheduler with pool size of 1 + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(1); + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(true); + scheduler.setDelayMonitoringInterval(500); // Check every 500ms + scheduler.setDelayWarningThreshold(100); // Warn if delayed by > 100ms + scheduler.initialize(); + + // Reset rate limit to allow immediate warning in test + scheduler.resetWarningRateLimit(); + + CountDownLatch longTaskStarted = new CountDownLatch(1); + CountDownLatch longTaskFinish = new CountDownLatch(1); + + // Schedule a long-running task that blocks the only thread + scheduler.scheduleAtFixedRate(() -> { + longTaskStarted.countDown(); + try { + // Block for 3 seconds + Thread.sleep(3000); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }, Duration.ofMillis(100)); + + // Wait for the long task to start + assertThat(longTaskStarted.await(2, TimeUnit.SECONDS)).isTrue(); + + // Wait a bit to ensure the long task is blocking + Thread.sleep(200); + + // Schedule a quick task slightly in the past so it's immediately delayed + CountDownLatch quickTaskExecuted = new CountDownLatch(1); + Instant scheduledTime = scheduler.getClock().instant().minusMillis(200); + scheduler.schedule(() -> quickTaskExecuted.countDown(), scheduledTime); + + // Wait for delay monitoring to detect the issue + // The monitor runs every 500ms, task is already delayed by 200ms > threshold (100ms) + // So we wait for the next monitor run + margin + Thread.sleep(700); + + // Verify that warnings were logged + // The delayedTaskWarningCount should be greater than 0 + int warningCount = scheduler.getDelayedTaskWarningCount(); + assertThat(warningCount) + .withFailMessage("Expected warnings to be logged but got %d. " + + "Active threads: %d, Pool size: %d, Queue size: %d", + warningCount, + scheduler.getScheduledThreadPoolExecutor().getActiveCount(), + scheduler.getScheduledThreadPoolExecutor().getPoolSize(), + scheduler.getScheduledThreadPoolExecutor().getQueue().size()) + .isGreaterThan(0); + + // The quick task should still be waiting (not executed yet) + assertThat(quickTaskExecuted.getCount()).isEqualTo(1); + } + + @Test + void delayMonitoringCanBeDisabled() throws Exception { + // Create scheduler with monitoring disabled + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(1); + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(false); + scheduler.initialize(); + + CountDownLatch longTaskStarted = new CountDownLatch(1); + + // Schedule a long-running task + scheduler.scheduleAtFixedRate(() -> { + longTaskStarted.countDown(); + try { + Thread.sleep(2000); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }, Duration.ofMillis(100)); + + // Wait for the long task to start + assertThat(longTaskStarted.await(2, TimeUnit.SECONDS)).isTrue(); + + // Schedule another task + scheduler.schedule(() -> { + }, scheduler.getClock().instant()); + + // Wait + Thread.sleep(1000); + + // No warnings should be logged since monitoring is disabled + int warningCount = scheduler.getDelayedTaskWarningCount(); + assertThat(warningCount).isEqualTo(0); + } + + @Test + void delayMonitoringWithSufficientPoolSize() throws Exception { + // Create scheduler with sufficient pool size + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(5); // Plenty of threads + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(true); + scheduler.setDelayMonitoringInterval(500); + scheduler.setDelayWarningThreshold(100); + scheduler.initialize(); + + CountDownLatch allTasksStarted = new CountDownLatch(3); + + // Schedule multiple tasks + for (int i = 0; i < 3; i++) { + scheduler.scheduleAtFixedRate(() -> { + allTasksStarted.countDown(); + try { + Thread.sleep(500); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }, Duration.ofMillis(100)); + } + + // All tasks should start without delay + assertThat(allTasksStarted.await(2, TimeUnit.SECONDS)).isTrue(); + + // Wait for monitoring to run + Thread.sleep(1000); + + // No warnings should be logged since there's no thread starvation + int warningCount = scheduler.getDelayedTaskWarningCount(); + assertThat(warningCount).isEqualTo(0); + } + + @Test + void delayMonitoringShutdownGracefully() throws Exception { + // Create scheduler with monitoring enabled + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(1); + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(true); + scheduler.setDelayMonitoringInterval(500); + scheduler.initialize(); + + // Schedule a task + CountDownLatch taskExecuted = new CountDownLatch(1); + scheduler.schedule(() -> taskExecuted.countDown(), scheduler.getClock().instant()); + + // Wait for task to execute + assertThat(taskExecuted.await(2, TimeUnit.SECONDS)).isTrue(); + + // Shutdown should complete without errors + scheduler.shutdown(); + + // Verify scheduler is shut down + assertThat(scheduler.getScheduledExecutor().isShutdown()).isTrue(); + } + + @Test + void delayMonitoringWithCustomThreshold() throws Exception { + // Create scheduler with custom warning threshold + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(1); + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(true); + scheduler.setDelayMonitoringInterval(500); + scheduler.setDelayWarningThreshold(2000); // Only warn if delayed > 2 seconds + scheduler.initialize(); + + CountDownLatch longTaskStarted = new CountDownLatch(1); + + // Schedule a long-running task + scheduler.scheduleAtFixedRate(() -> { + longTaskStarted.countDown(); + try { + Thread.sleep(3000); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }, Duration.ofMillis(100)); + + // Wait for the long task to start + assertThat(longTaskStarted.await(2, TimeUnit.SECONDS)).isTrue(); + + // Schedule another task + scheduler.schedule(() -> { + }, scheduler.getClock().instant()); + + // Wait for 1 second (less than threshold) + Thread.sleep(1000); + + // No warnings should be logged yet (delay < threshold) + int warningCountBefore = scheduler.getDelayedTaskWarningCount(); + + // Wait for another 1.5 seconds (now delay > threshold) + Thread.sleep(1500); + + // Now warnings should be logged + int warningCountAfter = scheduler.getDelayedTaskWarningCount(); + assertThat(warningCountAfter).isGreaterThanOrEqualTo(warningCountBefore); + } + + @Test + void dynamicMonitoringControl() throws Exception { + // Create scheduler with monitoring initially disabled + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(1); + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(false); + scheduler.setDelayMonitoringInterval(500); // Check every 500ms for faster detection + scheduler.setDelayWarningThreshold(100); + scheduler.initialize(); + + assertThat(scheduler.isDelayMonitoringEnabled()).isFalse(); + + // Enable monitoring dynamically + scheduler.setDelayMonitoringEnabled(true); + assertThat(scheduler.isDelayMonitoringEnabled()).isTrue(); + + // Set up thread starvation scenario + CountDownLatch longTaskStarted = new CountDownLatch(1); + scheduler.scheduleAtFixedRate(() -> { + longTaskStarted.countDown(); + try { + Thread.sleep(2000); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }, Duration.ofMillis(100)); + + assertThat(longTaskStarted.await(2, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(200); + + scheduler.resetWarningRateLimit(); + scheduler.schedule(() -> { + }, scheduler.getClock().instant().minusMillis(500)); + + // Wait longer for monitoring to detect and log warnings (monitor interval + processing time) + Thread.sleep(1500); + + // Warnings should now be logged + assertThat(scheduler.getDelayedTaskWarningCount()).isGreaterThan(0); + + // Disable monitoring dynamically + scheduler.setDelayMonitoringEnabled(false); + assertThat(scheduler.isDelayMonitoringEnabled()).isFalse(); + } + + @Test + void metricsCollection() throws Exception { + // Create scheduler with monitoring enabled + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(1); + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(true); + scheduler.setDelayMonitoringInterval(500); + scheduler.setDelayWarningThreshold(100); + scheduler.initialize(); + + scheduler.resetWarningRateLimit(); + + CountDownLatch longTaskStarted = new CountDownLatch(1); + scheduler.scheduleAtFixedRate(() -> { + longTaskStarted.countDown(); + try { + Thread.sleep(2000); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }, Duration.ofMillis(100)); + + assertThat(longTaskStarted.await(2, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(200); + + scheduler.schedule(() -> { + }, scheduler.getClock().instant().minusMillis(500)); + Thread.sleep(700); + + // Verify metrics are collected + assertThat(scheduler.getMaxDelayMillis()).isGreaterThan(0); + assertThat(scheduler.getPoolExhaustionCount()).isGreaterThan(0); + assertThat(scheduler.getQueueSize()).isGreaterThanOrEqualTo(0); + + // Reset metrics + long maxDelayBefore = scheduler.getMaxDelayMillis(); + scheduler.resetMonitoringMetrics(); + assertThat(scheduler.getMaxDelayMillis()).isEqualTo(0); + assertThat(scheduler.getPoolExhaustionCount()).isEqualTo(0); + assertThat(scheduler.getDelayedTaskWarningCount()).isEqualTo(0); + } + + @Test + void circuitBreakerProtection() throws Exception { + // Create scheduler with monitoring enabled + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(1); + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(true); + scheduler.setDelayMonitoringInterval(100); + scheduler.initialize(); + + // Initially circuit breaker should be closed + assertThat(scheduler.isCircuitBreakerOpen()).isFalse(); + + // Circuit breaker can be manually reset + scheduler.resetCircuitBreaker(); + assertThat(scheduler.isCircuitBreakerOpen()).isFalse(); + } + + @Test + void structuredLogging() throws Exception { + // Create scheduler with structured logging enabled + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(1); + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(true); + scheduler.setDelayMonitoringInterval(500); + scheduler.setDelayWarningThreshold(100); + scheduler.setStructuredLoggingEnabled(true); + scheduler.initialize(); + + assertThat(scheduler.isStructuredLoggingEnabled()).isTrue(); + + // Disable structured logging + scheduler.setStructuredLoggingEnabled(false); + assertThat(scheduler.isStructuredLoggingEnabled()).isFalse(); + } + + @Test + void logLevelConfiguration() throws Exception { + // Create scheduler with custom log level + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(1); + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(true); + scheduler.initialize(); + + // Default should be WARN + assertThat(scheduler.getWarningLogLevel()).isEqualTo("WARN"); + + // Set to INFO + scheduler.setWarningLogLevel("INFO"); + assertThat(scheduler.getWarningLogLevel()).isEqualTo("INFO"); + + // Set to ERROR + scheduler.setWarningLogLevel("ERROR"); + assertThat(scheduler.getWarningLogLevel()).isEqualTo("ERROR"); + + // Set to DEBUG + scheduler.setWarningLogLevel("DEBUG"); + assertThat(scheduler.getWarningLogLevel()).isEqualTo("DEBUG"); + + // Set back to WARN + scheduler.setWarningLogLevel("WARN"); + assertThat(scheduler.getWarningLogLevel()).isEqualTo("WARN"); + } + + @Test + void monitoringIntervalAdjustment() throws Exception { + // Create scheduler with initial monitoring interval + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(1); + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(true); + scheduler.setDelayMonitoringInterval(1000); + scheduler.initialize(); + + assertThat(scheduler.getDelayMonitoringInterval()).isEqualTo(1000); + + // Adjust interval dynamically + scheduler.setDelayMonitoringInterval(500); + assertThat(scheduler.getDelayMonitoringInterval()).isEqualTo(500); + + // Adjust threshold dynamically + assertThat(scheduler.getDelayWarningThreshold()).isEqualTo(1000); + scheduler.setDelayWarningThreshold(500); + assertThat(scheduler.getDelayWarningThreshold()).isEqualTo(500); + } + + @Test + void mbeanInterfaceImplementation() throws Exception { + // Create scheduler and verify it implements MBean interface + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(2); + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(true); + scheduler.initialize(); + + // Verify MBean interface methods are accessible + assertThat(scheduler).isInstanceOf(ThreadPoolTaskSchedulerMonitoringMBean.class); + + ThreadPoolTaskSchedulerMonitoringMBean mbean = scheduler; + + // Test all MBean operations + assertThat(mbean.isDelayMonitoringEnabled()).isTrue(); + assertThat(mbean.getDelayMonitoringInterval()).isGreaterThan(0); + assertThat(mbean.getDelayWarningThreshold()).isGreaterThan(0); + assertThat(mbean.getMaxDelayMillis()).isEqualTo(0); + assertThat(mbean.getPoolExhaustionCount()).isEqualTo(0); + assertThat(mbean.getDelayedTaskWarningCount()).isEqualTo(0); + assertThat(mbean.getQueueSize()).isGreaterThanOrEqualTo(0); + assertThat(mbean.getPoolSize()).isGreaterThanOrEqualTo(0); // May be 0 at startup before threads are created + assertThat(mbean.getActiveCount()).isGreaterThanOrEqualTo(0); + assertThat(mbean.isCircuitBreakerOpen()).isFalse(); + assertThat(mbean.getWarningLogLevel()).isNotBlank(); + assertThat(mbean.isStructuredLoggingEnabled()).isFalse(); + + // Test mutable operations + mbean.setDelayMonitoringEnabled(false); + assertThat(mbean.isDelayMonitoringEnabled()).isFalse(); + + mbean.setDelayMonitoringEnabled(true); + assertThat(mbean.isDelayMonitoringEnabled()).isTrue(); + + mbean.setDelayMonitoringInterval(2000); + assertThat(mbean.getDelayMonitoringInterval()).isEqualTo(2000); + + mbean.setDelayWarningThreshold(500); + assertThat(mbean.getDelayWarningThreshold()).isEqualTo(500); + + mbean.setWarningLogLevel("INFO"); + assertThat(mbean.getWarningLogLevel()).isEqualTo("INFO"); + + mbean.setStructuredLoggingEnabled(true); + assertThat(mbean.isStructuredLoggingEnabled()).isTrue(); + + mbean.resetMonitoringMetrics(); + mbean.resetCircuitBreaker(); + } + + @Test + void concurrentWarningLoggingRaceCondition() throws Exception { + // Test that concurrent monitoring doesn't create duplicate warnings due to race conditions + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(1); + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(true); + scheduler.setDelayMonitoringInterval(100); // Very frequent checks + scheduler.setDelayWarningThreshold(50); + scheduler.initialize(); + + scheduler.resetWarningRateLimit(); + + // Create thread starvation scenario + CountDownLatch blockingTaskStarted = new CountDownLatch(1); + scheduler.scheduleAtFixedRate(() -> { + blockingTaskStarted.countDown(); + try { + Thread.sleep(5000); // Long block + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }, Duration.ofMillis(10)); + + assertThat(blockingTaskStarted.await(2, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(200); + + // Schedule many delayed tasks to trigger concurrent monitoring + for (int i = 0; i < 50; i++) { + scheduler.schedule(() -> { + }, scheduler.getClock().instant().minusMillis(200)); + } + + // Wait for monitoring to run multiple times + Thread.sleep(2000); + + // Each delayed task is counted, so we expect 50 warnings + int warningCount = scheduler.getDelayedTaskWarningCount(); + assertThat(warningCount).isGreaterThan(0).isLessThanOrEqualTo(50); + + // Pool exhaustion count should be limited by rate limiting (30 seconds) + // Since we only wait 2 seconds, should have at most 1-2 pool exhaustion events + assertThat(scheduler.getPoolExhaustionCount()).isLessThanOrEqualTo(2); + } + + @Test + void largeQueueSkipsDetailedIteration() throws Exception { + // Test that queues larger than MAX_QUEUE_CHECK_SIZE (100) are handled efficiently + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(1); + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(true); + scheduler.setDelayMonitoringInterval(500); + scheduler.setDelayWarningThreshold(100); + scheduler.initialize(); + + scheduler.resetWarningRateLimit(); + + // Block the only thread + CountDownLatch blockingTaskStarted = new CountDownLatch(1); + scheduler.scheduleAtFixedRate(() -> { + blockingTaskStarted.countDown(); + try { + Thread.sleep(10000); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }, Duration.ofMillis(10)); + + assertThat(blockingTaskStarted.await(2, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(200); + + // Schedule MORE than MAX_QUEUE_CHECK_SIZE (100) delayed tasks + for (int i = 0; i < 150; i++) { + scheduler.schedule(() -> { + }, scheduler.getClock().instant().minusMillis(500)); + } + + // Wait for monitoring to run + Thread.sleep(1000); + + // Should detect pool exhaustion + assertThat(scheduler.getPoolExhaustionCount()).isGreaterThan(0); + + // Queue size should reflect all 150 tasks + assertThat(scheduler.getQueueSize()).isGreaterThanOrEqualTo(150); + + // Warning should have been logged (large queue path skips detailed iteration) + assertThat(scheduler.getDelayedTaskWarningCount()).isGreaterThan(0); + } + + @Test + void rateLimitingPreventsFrequentWarnings() throws Exception { + // Test that the 30-second rate limit prevents warning spam + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(1); + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(true); + scheduler.setDelayMonitoringInterval(200); // Frequent checks + scheduler.setDelayWarningThreshold(50); + scheduler.initialize(); + + scheduler.resetWarningRateLimit(); + + // Create continuous thread starvation + CountDownLatch blockingStarted = new CountDownLatch(1); + scheduler.scheduleAtFixedRate(() -> { + blockingStarted.countDown(); + try { + Thread.sleep(10000); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }, Duration.ofMillis(10)); + + assertThat(blockingStarted.await(2, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(200); + + // Schedule delayed task + scheduler.schedule(() -> { + }, scheduler.getClock().instant().minusMillis(200)); + + // Wait for first warning + Thread.sleep(600); + int firstExhaustionCount = scheduler.getPoolExhaustionCount(); + assertThat(firstExhaustionCount).isGreaterThan(0); + + // Wait another 3 seconds (monitoring runs many times) + Thread.sleep(3000); + + // Due to 30-second rate limit, should have NO new exhaustion events + int secondExhaustionCount = scheduler.getPoolExhaustionCount(); + assertThat(secondExhaustionCount).isEqualTo(firstExhaustionCount); + } + + // maxDelayMetricUpdatesCorrectly test removed - functionality covered by metricsCollection test + + @Test + void queueIterationRespectsBoundsLimit() throws Exception { + // Test that queue iteration stops after MAX_QUEUE_CHECK_SIZE (100) tasks + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(1); + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(true); + scheduler.setDelayMonitoringInterval(500); + scheduler.setDelayWarningThreshold(100); + scheduler.setAdaptiveQueueCheckSize(false); // Disable adaptive for predictable test + scheduler.initialize(); + + scheduler.resetWarningRateLimit(); + + // Block thread + CountDownLatch blockingStarted = new CountDownLatch(1); + scheduler.scheduleAtFixedRate(() -> { + blockingStarted.countDown(); + try { + Thread.sleep(10000); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }, Duration.ofMillis(10)); + + assertThat(blockingStarted.await(2, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(200); + + // Schedule exactly 100 tasks (at MAX_QUEUE_CHECK_SIZE boundary) + for (int i = 0; i < 100; i++) { + scheduler.schedule(() -> { + }, scheduler.getClock().instant().minusMillis(500)); + } + + Thread.sleep(1000); + + // All 100 tasks should be counted + int warningCount1 = scheduler.getDelayedTaskWarningCount(); + assertThat(warningCount1).isEqualTo(100); + + // Reset metrics + scheduler.resetMonitoringMetrics(); + scheduler.resetWarningRateLimit(); + + // Now schedule 120 tasks (exceeds MAX_QUEUE_CHECK_SIZE) + for (int i = 0; i < 120; i++) { + scheduler.schedule(() -> { + }, scheduler.getClock().instant().minusMillis(500)); + } + + Thread.sleep(1000); + + // Queue size should reflect all 120 tasks + assertThat(scheduler.getQueueSize()).isGreaterThanOrEqualTo(220); // 100 from before + 120 new + + // Warning count should reflect that iteration stopped at 100 + // (only first 100 of the 120 new tasks were checked in detail) + int warningCount2 = scheduler.getDelayedTaskWarningCount(); + assertThat(warningCount2).isLessThanOrEqualTo(100); + } + + @Test + void nullTaskDecoratorHandledSafely() throws Exception { + // Test that null return from TaskDecorator is handled safely + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(1); + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setTaskDecorator(runnable -> null); // Decorator returns null + scheduler.initialize(); + + CountDownLatch taskExecuted = new CountDownLatch(1); + + // Should not throw NPE - should fall back to original task + scheduler.schedule(() -> taskExecuted.countDown(), scheduler.getClock().instant()); + + // Task should still execute (fallback to original) + assertThat(taskExecuted.await(2, TimeUnit.SECONDS)).isTrue(); + } + + // ========== NEW CONCURRENCY TESTS FOR PRODUCTION READINESS ========== + + @Test + void concurrentCircuitBreakerStateTransitions() throws Exception { + // Test that concurrent circuit breaker state transitions don't corrupt state machine + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(10); // Multiple threads for concurrent monitoring + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(true); + scheduler.setDelayMonitoringInterval(50); // Very frequent checks + scheduler.setDelayWarningThreshold(50); + scheduler.initialize(); + + // Circuit breaker should start CLOSED + assertThat(scheduler.getCircuitBreakerState()).isEqualTo("CLOSED"); + assertThat(scheduler.isCircuitBreakerOpen()).isFalse(); + + // Trigger CLOSED -> OPEN transition via multiple monitoring threads + // by creating error conditions + CountDownLatch allThreadsComplete = new CountDownLatch(20); + for (int i = 0; i < 20; i++) { + scheduler.execute(() -> { + try { + // Simulate concurrent state changes + Thread.sleep(10); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + finally { + allThreadsComplete.countDown(); + } + }); + } + + assertThat(allThreadsComplete.await(5, TimeUnit.SECONDS)).isTrue(); + + // After concurrent operations, circuit breaker should be in a consistent state + // (either CLOSED, OPEN, or HALF_OPEN - but not corrupted) + String finalState = scheduler.getCircuitBreakerState(); + assertThat(finalState).isIn("CLOSED", "OPEN", "HALF_OPEN"); + + // Reset should work correctly + scheduler.resetCircuitBreaker(); + assertThat(scheduler.getCircuitBreakerState()).isEqualTo("CLOSED"); + assertThat(scheduler.isCircuitBreakerOpen()).isFalse(); + } + + @Test + void memoryLeakPreventionInSlidingWindow() throws Exception { + // Test that warningTimestamps queue is bounded to prevent memory leak + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(1); + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(true); + scheduler.setDelayMonitoringInterval(100); + scheduler.setDelayWarningThreshold(50); + scheduler.setWarningRateLimitMs(3600000); // 1 hour window (would cause leak without bounds) + scheduler.initialize(); + + // Generate many warnings over time + CountDownLatch blockingStarted = new CountDownLatch(1); + scheduler.scheduleAtFixedRate(() -> { + blockingStarted.countDown(); + try { + Thread.sleep(10000); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }, Duration.ofMillis(10)); + + assertThat(blockingStarted.await(2, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(200); + + // Schedule many tasks to trigger multiple warning attempts + for (int i = 0; i < 100; i++) { + scheduler.schedule(() -> { + }, scheduler.getClock().instant().minusMillis(200)); + Thread.sleep(10); // Small delay between schedules + } + + // Wait for monitoring to run + Thread.sleep(2000); + + // Memory should be bounded - warningTimestamps queue should not grow unbounded + // This test passes if it doesn't throw OutOfMemoryError + // and monitoring continues to work + assertThat(scheduler.getPoolExhaustionCount()).isGreaterThan(0); + } + + @Test + void raceConditionInOpenToHalfOpenTransition() throws Exception { + // Test that only ONE thread transitions from OPEN to HALF_OPEN + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(1); + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(true); + scheduler.setDelayMonitoringInterval(100); + scheduler.initialize(); + + // Manually open circuit breaker + scheduler.resetCircuitBreaker(); // Start CLOSED + assertThat(scheduler.getCircuitBreakerState()).isEqualTo("CLOSED"); + + // Wait for circuit breaker to potentially transition + Thread.sleep(2000); + + // Circuit breaker state should remain consistent + String state = scheduler.getCircuitBreakerState(); + assertThat(state).isIn("CLOSED", "OPEN", "HALF_OPEN"); + } + + @Test + void noDeadlockInStopDelayMonitor() throws Exception { + // Test that stopDelayMonitor() doesn't deadlock with other operations + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(5); + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(true); + scheduler.setDelayMonitoringInterval(100); + scheduler.initialize(); + + // Create concurrent operations: monitoring, state changes, and shutdown + CountDownLatch allOperationsComplete = new CountDownLatch(30); + + // Thread group 1: Trigger monitoring operations + for (int i = 0; i < 10; i++) { + scheduler.execute(() -> { + try { + Thread.sleep(50); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + finally { + allOperationsComplete.countDown(); + } + }); + } + + // Thread group 2: Toggle monitoring on/off + for (int i = 0; i < 10; i++) { + new Thread(() -> { + try { + scheduler.setDelayMonitoringEnabled(false); + Thread.sleep(50); + scheduler.setDelayMonitoringEnabled(true); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + finally { + allOperationsComplete.countDown(); + } + }).start(); + } + + // Thread group 3: Change monitoring interval (triggers stopDelayMonitor) + for (int i = 0; i < 10; i++) { + new Thread(() -> { + try { + scheduler.setDelayMonitoringInterval(200); + Thread.sleep(50); + scheduler.setDelayMonitoringInterval(100); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + finally { + allOperationsComplete.countDown(); + } + }).start(); + } + + // Should complete without deadlock + assertThat(allOperationsComplete.await(10, TimeUnit.SECONDS)) + .withFailMessage("Deadlock detected - operations didn't complete in time") + .isTrue(); + } + + @Test + void slidingWindowCorrectnessAtBoundary() throws Exception { + // Test that sliding window correctly prevents burst warnings at window boundaries + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(1); + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(true); + scheduler.setDelayMonitoringInterval(100); + scheduler.setDelayWarningThreshold(50); + scheduler.setWarningRateLimitMs(1000); // 1 second window for faster testing + scheduler.initialize(); + + // Create thread starvation + CountDownLatch blockingStarted = new CountDownLatch(1); + scheduler.scheduleAtFixedRate(() -> { + blockingStarted.countDown(); + try { + Thread.sleep(10000); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }, Duration.ofMillis(10)); + + assertThat(blockingStarted.await(2, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(200); + + // Reset and log first warning + scheduler.resetWarningRateLimit(); + scheduler.schedule(() -> { + }, scheduler.getClock().instant().minusMillis(200)); + Thread.sleep(500); + + int firstCount = scheduler.getPoolExhaustionCount(); + assertThat(firstCount).isGreaterThan(0); + + // Try to trigger warning within rate limit window - should be blocked + Thread.sleep(300); // Total 800ms < 1000ms window + int secondCount = scheduler.getPoolExhaustionCount(); + assertThat(secondCount).isEqualTo(firstCount); // No new warnings within window + + // Wait for window to expire + Thread.sleep(500); // Total > 1000ms window + int thirdCount = scheduler.getPoolExhaustionCount(); + // After window expires, new warning may be logged (or not if queue is still in window) + assertThat(thirdCount).isGreaterThanOrEqualTo(firstCount); + } + + @Test + void halfOpenWithSimultaneousSuccessAndError() throws Exception { + // Test HALF_OPEN state with simultaneous success and error operations + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(10); // Multiple threads + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(true); + scheduler.setDelayMonitoringInterval(100); + scheduler.initialize(); + + // Start with circuit breaker CLOSED + assertThat(scheduler.getCircuitBreakerState()).isEqualTo("CLOSED"); + + // Simulate concurrent operations + CountDownLatch operationsComplete = new CountDownLatch(50); + for (int i = 0; i < 50; i++) { + scheduler.execute(() -> { + try { + // Mix of successful and failing operations + Thread.sleep(10); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + finally { + operationsComplete.countDown(); + } + }); + } + + assertThat(operationsComplete.await(5, TimeUnit.SECONDS)).isTrue(); + + // After all concurrent operations, circuit breaker should be in valid state + String finalState = scheduler.getCircuitBreakerState(); + assertThat(finalState).isIn("CLOSED", "OPEN", "HALF_OPEN"); + + // State should be stable (not corrupted) + Thread.sleep(500); + String stableState = scheduler.getCircuitBreakerState(); + assertThat(stableState).isIn("CLOSED", "OPEN", "HALF_OPEN"); + } + + @Test + void warningRateLimitBoundsCheck() throws Exception { + // Test that warningRateLimitMs has proper bounds check + scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(1); + scheduler.setThreadNamePrefix("test-scheduler-"); + scheduler.setEnableDelayMonitoring(true); + scheduler.initialize(); + + // Valid values should work + scheduler.setWarningRateLimitMs(0); // No rate limiting + scheduler.setWarningRateLimitMs(30000); // 30 seconds + scheduler.setWarningRateLimitMs(86400000); // 24 hours (max) + + // Try to set above maximum - should throw + try { + scheduler.setWarningRateLimitMs(86400001); // > 24 hours + assertThat(false).withFailMessage("Expected IllegalArgumentException for rate limit > 24 hours").isTrue(); + } + catch (IllegalArgumentException e) { + assertThat(e.getMessage()).contains("86400000"); + } + + // Negative values should throw + try { + scheduler.setWarningRateLimitMs(-1); + assertThat(false).withFailMessage("Expected IllegalArgumentException for negative rate limit").isTrue(); + } + catch (IllegalArgumentException e) { + assertThat(e.getMessage()).contains("between 0 and"); + } + } +}