Skip to content

Commit 43829fe

Browse files
use a discrete time window in pid controller
1 parent 7605783 commit 43829fe

File tree

4 files changed

+169
-99
lines changed

4 files changed

+169
-99
lines changed

lib/semian.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ def create_adaptive_circuit_breaker(name, **options)
306306
kp: 1.0, # Standard proportional gain
307307
ki: 0.1, # Moderate integral gain
308308
kd: 0.01, # Small derivative gain (as per design doc)
309-
window_size: 10, # 10-second window for rate calculation
309+
window_size: 10, # 10-second window for rate calculation and update interval
310310
history_duration: 3600, # 1 hour of history for p90 calculation
311311
ping_interval: 1.0, # 1 second between health checks
312312
thread_safe: Semian.thread_safe?,

lib/semian/adaptive_circuit_breaker.rb

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@
55
module Semian
66
# Adaptive Circuit Breaker that uses PID controller for dynamic rejection
77
class AdaptiveCircuitBreaker
8-
attr_reader :name, :pid_controller, :ping_thread
8+
attr_reader :name, :pid_controller, :ping_thread, :update_thread
99

1010
def initialize(name:, kp: 1.0, ki: 0.1, kd: 0.01,
1111
window_size: 10, history_duration: 3600,
1212
ping_interval: 1.0, thread_safe: true, enable_background_ping: true)
1313
@name = name
14+
@window_size = window_size
1415
@ping_interval = ping_interval
1516
@last_ping_time = 0
1617
@enable_background_ping = enable_background_ping
@@ -38,8 +39,9 @@ def initialize(name:, kp: 1.0, ki: 0.1, kd: 0.01,
3839
)
3940
end
4041

41-
# Start background ping thread if enabled
42+
# Start background threads
4243
start_ping_thread if @enable_background_ping
44+
start_update_thread
4345
end
4446

4547
# Main acquire method compatible with existing Semian interface
@@ -50,22 +52,16 @@ def acquire(resource = nil, &block)
5052
# Check if we should reject based on current rejection rate
5153
if @pid_controller.should_reject?
5254
@pid_controller.record_request(:rejected)
53-
# Update controller after rejection
54-
@pid_controller.update
5555
raise OpenCircuitError, "Rejected by adaptive circuit breaker (rejection_rate: #{@pid_controller.rejection_rate})"
5656
end
5757

5858
# Try to execute the block
5959
begin
6060
result = block.call
6161
@pid_controller.record_request(:success)
62-
# Update controller after success
63-
@pid_controller.update
6462
result
6563
rescue => error
6664
@pid_controller.record_request(:error)
67-
# Update controller after error
68-
@pid_controller.update
6965
raise error
7066
end
7167
end
@@ -76,11 +72,13 @@ def reset
7672
@resource = nil
7773
end
7874

79-
# Stop the background ping thread (called by destroy)
75+
# Stop the background threads (called by destroy)
8076
def stop
8177
@stopped = true
8278
@ping_thread&.kill
8379
@ping_thread = nil
80+
@update_thread&.kill
81+
@update_thread = nil
8482
end
8583

8684
# Destroy the adaptive circuit breaker (compatible with ProtectedResource interface)
@@ -138,5 +136,21 @@ def send_background_ping
138136
@pid_controller.record_ping(:failure)
139137
end
140138
end
139+
140+
def start_update_thread
141+
@update_thread = Thread.new do
142+
loop do
143+
break if @stopped
144+
145+
sleep(@window_size)
146+
147+
# Update PID controller at the end of each window
148+
@pid_controller.update
149+
end
150+
rescue => e
151+
# Log error if logger is available
152+
Semian.logger&.warn("[#{@name}] Background update thread error: #{e.message}")
153+
end
154+
end
141155
end
142156
end

lib/semian/pid_controller.rb

Lines changed: 62 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,19 @@ def initialize(name:, kp: 1.0, ki: 0.1, kd: 0.0, target_error_rate: nil,
3131

3232
# Metrics tracking
3333
@error_rate_history = []
34-
@max_history_size = history_duration # Duration in seconds to keep history
34+
@max_history_size = history_duration / window_size # Number of windows to keep
3535

36-
# Request tracking for rate calculation
37-
@window_size = window_size # Time window in seconds for rate calculation
38-
@request_outcomes = [] # Array of [timestamp, :success/:error/:rejected]
39-
@ping_outcomes = [] # Array of [timestamp, :success/:failure]
36+
# Discrete window tracking
37+
@window_size = window_size # Time window in seconds
38+
@window_start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
39+
40+
# Current window counters
41+
@current_window_requests = { success: 0, error: 0, rejected: 0 }
42+
@current_window_pings = { success: 0, failure: 0 }
43+
44+
# Last completed window metrics (used between updates)
45+
@last_error_rate = 0.0
46+
@last_ping_failure_rate = 0.0
4047
end
4148

4249
# Calculate the current health metric P
@@ -51,29 +58,48 @@ def calculate_health_metric(current_error_rate, ping_failure_rate)
5158

5259
# Record a request outcome
5360
def record_request(outcome)
54-
timestamp = Process.clock_gettime(Process::CLOCK_MONOTONIC)
55-
@request_outcomes << [timestamp, outcome]
56-
cleanup_old_data(timestamp)
61+
case outcome
62+
when :success
63+
@current_window_requests[:success] += 1
64+
when :error
65+
@current_window_requests[:error] += 1
66+
when :rejected
67+
@current_window_requests[:rejected] += 1
68+
end
5769
end
5870

5971
# Record a ping outcome (ungated health check)
6072
def record_ping(outcome)
61-
timestamp = Process.clock_gettime(Process::CLOCK_MONOTONIC)
62-
@ping_outcomes << [timestamp, outcome]
63-
cleanup_old_data(timestamp)
73+
case outcome
74+
when :success
75+
@current_window_pings[:success] += 1
76+
when :failure
77+
@current_window_pings[:failure] += 1
78+
end
6479
end
6580

66-
# Update the controller with new measurements
81+
# Update the controller at the end of each time window
6782
def update(current_error_rate = nil, ping_failure_rate = nil)
6883
current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
69-
dt = current_time - @last_update_time
7084

71-
# Use calculated rates if not provided
72-
current_error_rate ||= calculate_error_rate
73-
ping_failure_rate ||= calculate_ping_failure_rate
85+
# Calculate rates for the current window
86+
@last_error_rate = calculate_window_error_rate
87+
@last_ping_failure_rate = calculate_window_ping_failure_rate
7488

7589
# Store error rate for historical analysis
76-
store_error_rate(current_error_rate)
90+
store_error_rate(@last_error_rate)
91+
92+
# Reset window counters for next window
93+
@current_window_requests = { success: 0, error: 0, rejected: 0 }
94+
@current_window_pings = { success: 0, failure: 0 }
95+
@window_start_time = current_time
96+
97+
# Use provided rates or calculated rates
98+
current_error_rate ||= @last_error_rate
99+
ping_failure_rate ||= @last_ping_failure_rate
100+
101+
# dt is always window_size since we update once per window
102+
dt = @window_size
77103

78104
# Calculate the current error (health metric)
79105
error = calculate_health_metric(current_error_rate, ping_failure_rate)
@@ -82,8 +108,7 @@ def update(current_error_rate = nil, ping_failure_rate = nil)
82108
proportional = @kp * error
83109
@integral += error * dt
84110
integral = @ki * @integral
85-
derivative = @kd * (error - @previous_error) / dt if dt > 0
86-
derivative ||= 0.0
111+
derivative = @kd * (error - @previous_error) / dt
87112

88113
# Calculate the control signal (change in rejection rate)
89114
control_signal = proportional + integral + derivative
@@ -109,46 +134,43 @@ def reset
109134
@integral = 0.0
110135
@previous_error = 0.0
111136
@last_update_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
112-
@request_outcomes.clear
113-
@ping_outcomes.clear
137+
@window_start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
138+
@current_window_requests = { success: 0, error: 0, rejected: 0 }
139+
@current_window_pings = { success: 0, failure: 0 }
140+
@last_error_rate = 0.0
141+
@last_ping_failure_rate = 0.0
114142
@error_rate_history.clear
115143
end
116144

117145
# Get current metrics for monitoring/debugging
118146
def metrics
119147
{
120148
rejection_rate: @rejection_rate,
121-
error_rate: calculate_error_rate,
122-
ping_failure_rate: calculate_ping_failure_rate,
149+
error_rate: @last_error_rate,
150+
ping_failure_rate: @last_ping_failure_rate,
123151
ideal_error_rate: @target_error_rate || calculate_ideal_error_rate,
124-
health_metric: calculate_health_metric(calculate_error_rate, calculate_ping_failure_rate),
152+
health_metric: calculate_health_metric(@last_error_rate, @last_ping_failure_rate),
125153
integral: @integral,
126154
previous_error: @previous_error,
155+
current_window_requests: @current_window_requests.dup,
156+
current_window_pings: @current_window_pings.dup,
127157
}
128158
end
129159

130160
private
131161

132-
def calculate_error_rate
133-
current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
134-
cutoff_time = current_time - @window_size
135-
136-
recent_requests = @request_outcomes.select { |t, _| t >= cutoff_time }
137-
return 0.0 if recent_requests.empty?
162+
def calculate_window_error_rate
163+
total_requests = @current_window_requests[:success] + @current_window_requests[:error]
164+
return 0.0 if total_requests == 0
138165

139-
errors = recent_requests.count { |_, outcome| outcome == :error }
140-
errors.to_f / recent_requests.size
166+
@current_window_requests[:error].to_f / total_requests
141167
end
142168

143-
def calculate_ping_failure_rate
144-
current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
145-
cutoff_time = current_time - @window_size
146-
147-
recent_pings = @ping_outcomes.select { |t, _| t >= cutoff_time }
148-
return 0.0 if recent_pings.empty?
169+
def calculate_window_ping_failure_rate
170+
total_pings = @current_window_pings[:success] + @current_window_pings[:failure]
171+
return 0.0 if total_pings == 0
149172

150-
failures = recent_pings.count { |_, outcome| outcome == :failure }
151-
failures.to_f / recent_pings.size
173+
@current_window_pings[:failure].to_f / total_pings
152174
end
153175

154176
def store_error_rate(error_rate)
@@ -168,18 +190,6 @@ def calculate_ideal_error_rate
168190
# Cap at 10% to prevent bootstrapping issues
169191
[p90_value, 0.1].min
170192
end
171-
172-
def cleanup_old_data(current_time)
173-
cutoff_time = current_time - @window_size
174-
175-
# Clean up old request outcomes
176-
@request_outcomes.reject! { |timestamp, _| timestamp < cutoff_time }
177-
178-
# Clean up old ping outcomes
179-
@ping_outcomes.reject! { |timestamp, _| timestamp < cutoff_time }
180-
181-
# NOTE: error_rate_history is cleaned up in store_error_rate
182-
end
183193
end
184194

185195
# Thread-safe version of PIDController

0 commit comments

Comments
 (0)