Skip to content

Commit 8a201ea

Browse files
committed
[CI] heart beating ongoing chunk
1 parent dbe8b01 commit 8a201ea

File tree

11 files changed

+332
-94
lines changed

11 files changed

+332
-94
lines changed

redis/heartbeat.lua

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,73 @@ local zset_key = KEYS[1]
22
local processed_key = KEYS[2]
33
local owners_key = KEYS[3]
44
local worker_queue_key = KEYS[4]
5+
local test_group_timeout_key = KEYS[5]
56

6-
local current_time = ARGV[1]
7+
local current_time = tonumber(ARGV[1])
78
local test = ARGV[2]
9+
local default_timeout = tonumber(ARGV[3]) or 0
810

911
-- already processed, we do not need to bump the timestamp
1012
if redis.call('sismember', processed_key, test) == 1 then
1113
return false
1214
end
1315

14-
-- we're still the owner of the test, we can bump the timestamp
15-
if redis.call('hget', owners_key, test) == worker_queue_key then
16-
return redis.call('zadd', zset_key, current_time, test)
16+
-- we're still the owner of the test, check if we need to extend the deadline
17+
local owner_value = redis.call('hget', owners_key, test)
18+
if owner_value then
19+
-- Parse owner value: format is "worker_queue_key|initial_reservation_time|last_heartbeat_time"
20+
local first_pipe = string.find(owner_value, "|")
21+
if not first_pipe then
22+
return false
23+
end
24+
local stored_worker_key = string.sub(owner_value, 1, first_pipe - 1)
25+
26+
if stored_worker_key == worker_queue_key then
27+
-- Parse initial reservation time and last heartbeat time
28+
local rest = string.sub(owner_value, first_pipe + 1)
29+
local second_pipe = string.find(rest, "|")
30+
local initial_reservation_time
31+
if second_pipe then
32+
initial_reservation_time = tonumber(string.sub(rest, 1, second_pipe - 1))
33+
else
34+
-- Backward compatibility: old format only has one timestamp
35+
initial_reservation_time = tonumber(rest)
36+
end
37+
38+
-- Update last heartbeat timestamp in owners hash (keep initial reservation time)
39+
local new_owner_value = worker_queue_key .. "|" .. (initial_reservation_time or current_time) .. "|" .. current_time
40+
redis.call('hset', owners_key, test, new_owner_value)
41+
42+
local deadline = redis.call('zscore', zset_key, test)
43+
if deadline then
44+
deadline = tonumber(deadline)
45+
46+
-- Get the estimated timeout for this test
47+
local estimated_timeout = redis.call('hget', test_group_timeout_key, test)
48+
if not estimated_timeout or estimated_timeout == "" then
49+
estimated_timeout = default_timeout
50+
else
51+
estimated_timeout = tonumber(estimated_timeout)
52+
end
53+
54+
-- Cap deadline at 3x the estimated timeout from initial reservation
55+
local max_deadline = (initial_reservation_time or current_time) + (estimated_timeout * 3)
56+
57+
-- Only extend if deadline is within 20 seconds of expiring
58+
if deadline - 20 < current_time then
59+
-- Extend by 1 minute, but don't exceed max deadline
60+
local new_deadline = math.min(current_time + 60, max_deadline)
61+
62+
-- Only update if we're actually extending
63+
if new_deadline > deadline then
64+
redis.call('zadd', zset_key, new_deadline, test)
65+
return {deadline, new_deadline}
66+
end
67+
end
68+
end
69+
-- No extension needed, but heartbeat was recorded
70+
return 0
71+
end
1772
end
73+
74+
return false

redis/release.lua

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,20 @@ local zset_key = KEYS[1]
22
local worker_queue_key = KEYS[2]
33
local owners_key = KEYS[3]
44

5-
-- owned_tests = {"SomeTest", "worker:1", "SomeOtherTest", "worker:2", ...}
5+
-- owned_tests = {"SomeTest", "worker:1|1234567890", "SomeOtherTest", "worker:2|1234567891", ...}
66
local owned_tests = redis.call('hgetall', owners_key)
77
for index, owner_or_test in ipairs(owned_tests) do
8-
if owner_or_test == worker_queue_key then -- If we owned a test
9-
local test = owned_tests[index - 1]
10-
redis.call('zadd', zset_key, "0", test) -- We expire the lease immediately
11-
return nil
8+
-- Parse owner value: format is "worker_queue_key|heartbeat_timestamp"
9+
local pipe_pos = string.find(owner_or_test, "|")
10+
if pipe_pos then
11+
local stored_worker_key = string.sub(owner_or_test, 1, pipe_pos - 1)
12+
13+
if stored_worker_key == worker_queue_key then -- If we owned a test
14+
local test = owned_tests[index - 1]
15+
redis.call('zadd', zset_key, "0", test) -- We expire the lease immediately
16+
redis.call('hdel', owners_key, test) -- Remove from owners hash to clear heartbeat
17+
return nil
18+
end
1219
end
1320
end
1421

redis/requeue.lua

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,17 @@ local global_max_requeues = tonumber(ARGV[2])
1010
local test = ARGV[3]
1111
local offset = ARGV[4]
1212

13-
if redis.call('hget', owners_key, test) == worker_queue_key then
14-
redis.call('hdel', owners_key, test)
13+
local owner_value = redis.call('hget', owners_key, test)
14+
if owner_value then
15+
-- Parse owner value: format is "worker_queue_key|heartbeat_timestamp"
16+
local pipe_pos = string.find(owner_value, "|")
17+
if pipe_pos then
18+
local stored_worker_key = string.sub(owner_value, 1, pipe_pos - 1)
19+
20+
if stored_worker_key == worker_queue_key then
21+
redis.call('hdel', owners_key, test)
22+
end
23+
end
1524
end
1625

1726
if redis.call('sismember', processed_key, test) == 1 then

redis/reserve.lua

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ if test then
2323
redis.call('zadd', zset_key, current_time, test)
2424
end
2525
redis.call('lpush', worker_queue_key, test)
26-
redis.call('hset', owners_key, test, worker_queue_key)
26+
-- Store owner with initial reservation time and last heartbeat time
27+
-- Format: "worker_queue_key|initial_reservation_time|last_heartbeat_time"
28+
local owner_value = worker_queue_key .. "|" .. current_time .. "|" .. current_time
29+
redis.call('hset', owners_key, test, owner_value)
2730
return test
2831
else
2932
return nil

redis/reserve_lost.lua

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,49 @@ local timeout = tonumber(ARGV[2])
99
local use_dynamic_deadline = ARGV[3] == "true"
1010
local default_timeout = tonumber(ARGV[4]) or 0
1111

12-
local lost_tests
13-
if use_dynamic_deadline then
14-
lost_tests = redis.call('zrangebyscore', zset_key, 0, current_time)
15-
else
16-
lost_tests = redis.call('zrangebyscore', zset_key, 0, current_time - timeout)
12+
-- Helper: checks if a test can be stolen
13+
-- Returns true if heartbeat is old (> 2 minutes) or missing
14+
-- Owner value format: "worker_queue_key|initial_reservation_time|last_heartbeat_time"
15+
local function can_steal_test(test)
16+
local owner_value = redis.call('hget', owners_key, test)
17+
if not owner_value then return true end -- No owner, can steal
18+
19+
local first_pipe = string.find(owner_value, "|")
20+
if not first_pipe then return true end
21+
22+
local rest = string.sub(owner_value, first_pipe + 1)
23+
local second_pipe = string.find(rest, "|")
24+
25+
local last_heartbeat
26+
if second_pipe then
27+
-- New format: worker_key|initial_time|last_heartbeat
28+
last_heartbeat = tonumber(string.sub(rest, second_pipe + 1))
29+
else
30+
-- Old format: worker_key|timestamp (treat as last heartbeat)
31+
last_heartbeat = tonumber(rest)
32+
end
33+
34+
if not last_heartbeat then return true end
35+
36+
local heartbeat_age = current_time - last_heartbeat
37+
38+
-- Only steal if heartbeat is old (> 2 minutes)
39+
return heartbeat_age >= 120
40+
end
41+
42+
-- Collect tests that can be stolen
43+
local stealable_tests = {}
44+
45+
local all_running_tests = redis.call('zrange', zset_key, 0, -1)
46+
for _, test in ipairs(all_running_tests) do
47+
if redis.call('sismember', processed_key, test) == 0 then
48+
if can_steal_test(test) then
49+
table.insert(stealable_tests, test)
50+
end
51+
end
1752
end
1853

19-
for _, test in ipairs(lost_tests) do
54+
for _, test in ipairs(stealable_tests) do
2055
if redis.call('sismember', processed_key, test) == 0 then
2156
if use_dynamic_deadline then
2257
local dynamic_timeout = redis.call('hget', test_group_timeout_key, test)
@@ -30,7 +65,9 @@ for _, test in ipairs(lost_tests) do
3065
redis.call('zadd', zset_key, current_time + timeout, test)
3166
end
3267
redis.call('lpush', worker_queue_key, test)
33-
redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership
68+
-- Store owner with initial reservation time and last heartbeat time
69+
local new_owner_value = worker_queue_key .. "|" .. current_time .. "|" .. current_time
70+
redis.call('hset', owners_key, test, new_owner_value) -- Take ownership
3471
return test
3572
end
3673
end

ruby/lib/ci/queue/redis/base.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,14 @@ def load_script(script)
122122
@scripts_cache[script] ||= redis.script(:load, read_script(script))
123123
end
124124

125+
def ensure_connection_and_script(script)
126+
# Pre-initialize Redis connection and script in current thread context
127+
# This ensures background threads use the same initialized connection
128+
load_script(script)
129+
# Ping Redis to ensure connection is established
130+
redis.ping
131+
end
132+
125133
def read_script(name)
126134
::File.read(::File.join(CI::Queue::DEV_SCRIPTS_ROOT, "#{name}.lua"))
127135
rescue SystemCallError

ruby/lib/ci/queue/redis/worker.rb

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,37 @@ def poll
8787
executable = resolve_executable(id)
8888

8989
if executable
90-
yield executable
90+
# Pre-initialize Redis connection and heartbeat script in main thread
91+
# to ensure background thread uses the same initialized context
92+
ensure_connection_and_script(:heartbeat)
93+
94+
# Start heartbeat thread to extend lease while executing
95+
heartbeat_thread = nil
96+
begin
97+
heartbeat_thread = Thread.new do
98+
heartbeat_interval = 10 # Send heartbeat every 10 seconds
99+
loop do
100+
break if Thread.current[:stop]
101+
102+
sleep heartbeat_interval
103+
104+
break if Thread.current[:stop]
105+
106+
heartbeat(id)
107+
end
108+
end
109+
heartbeat_thread[:stop] = false
110+
111+
yield executable
112+
ensure
113+
# Stop heartbeat thread when execution completes
114+
# This ensures it's stopped after acknowledge has completed
115+
if heartbeat_thread
116+
heartbeat_thread[:stop] = true
117+
heartbeat_thread.wakeup # Interrupt sleep if thread is sleeping
118+
heartbeat_thread.join(2) # Wait up to 2 seconds for thread to finish
119+
end
120+
end
91121
else
92122
warn("Warning: Could not resolve executable for ID #{id.inspect}. Acknowledging to remove from queue.")
93123
acknowledge(id)
@@ -209,6 +239,32 @@ def release!
209239
nil
210240
end
211241

242+
def heartbeat(test_id)
243+
current_time = CI::Queue.time_now.to_f
244+
result = eval_script(
245+
:heartbeat,
246+
keys: [
247+
key('running'),
248+
key('processed'),
249+
key('owners'),
250+
key('worker', worker_id, 'queue'),
251+
key('test-group-timeout')
252+
],
253+
argv: [current_time, test_id, config.timeout]
254+
)
255+
if result.is_a?(Array) && result.size == 2
256+
old_deadline = result[0]
257+
new_deadline = result[1]
258+
old_deadline_readable = Time.at(old_deadline).strftime('%Y-%m-%d %H:%M:%S')
259+
new_deadline_readable = Time.at(new_deadline).strftime('%Y-%m-%d %H:%M:%S')
260+
warn("[heartbeat] Extended deadline for #{test_id.inspect} from #{old_deadline_readable} (#{old_deadline}) to #{new_deadline_readable} (#{new_deadline})")
261+
end
262+
result
263+
rescue *CONNECTION_ERRORS => e
264+
warn("Failed to send heartbeat for #{test_id.inspect}: #{e.class} - #{e.message}")
265+
false
266+
end
267+
212268
private
213269

214270
attr_reader :index

ruby/test/ci/queue/.DS_Store

8 KB
Binary file not shown.

ruby/test/ci/queue/redis/dynamic_timeout_test.rb

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,16 +268,62 @@ def test_single_test_marked_lost_after_default_timeout
268268
reserved_id = worker1.send(:try_to_reserve_test)
269269
assert_equal 'TestA#test_1', reserved_id
270270

271-
# Wait longer than timeout (0.5s)
271+
# Wait longer than timeout (0.5s) but less than 2 minutes
272272
sleep 0.6
273273

274-
# Try to reserve with worker2 - should get the lost test
274+
# Try to reserve with worker2 - should NOT get the test because heartbeat is recent (< 2 minutes)
275275
worker2_config = config.dup
276276
worker2_config.instance_variable_set(:@worker_id, '2')
277277
worker2 = CI::Queue::Redis.new(@redis_url, worker2_config)
278278

279279
lost_test = worker2.send(:try_to_reserve_lost_test)
280-
assert_equal 'TestA#test_1', lost_test, 'Single test should be marked as lost after default timeout'
280+
assert_nil lost_test, 'Test should not be marked as lost if heartbeat is recent (< 2 minutes)'
281+
end
282+
283+
def test_single_test_marked_lost_after_heartbeat_expires
284+
# Create worker with short timeout
285+
config = CI::Queue::Configuration.new(
286+
build_id: 'heartbeat-timeout-test',
287+
worker_id: '1',
288+
timeout: 0.5 # 0.5 seconds
289+
)
290+
291+
worker1 = CI::Queue::Redis.new(@redis_url, config)
292+
293+
# Populate with single test (no chunk)
294+
tests = create_mock_tests(['TestA#test_1'])
295+
296+
worker1.stub(:reorder_tests, tests) do
297+
worker1.populate(tests)
298+
end
299+
300+
# Reserve the test with worker1
301+
reserved_id = worker1.send(:try_to_reserve_test)
302+
assert_equal 'TestA#test_1', reserved_id
303+
304+
# Manually set the heartbeat timestamp to be older than 2 minutes
305+
# by manipulating Redis directly
306+
# Format: "worker_queue_key|initial_reservation_time|last_heartbeat_time"
307+
current_time = CI::Queue.time_now.to_f
308+
old_heartbeat_time = current_time - 130 # 130 seconds ago (more than 2 minutes)
309+
initial_reservation_time = current_time - 140 # Initial reservation was before the heartbeat
310+
worker_queue_key = "build:heartbeat-timeout-test:worker:1:queue"
311+
owner_value = "#{worker_queue_key}|#{initial_reservation_time}|#{old_heartbeat_time}"
312+
@redis.hset('build:heartbeat-timeout-test:owners', 'TestA#test_1', owner_value)
313+
314+
# Also set the deadline to be in the past so it's considered "lost"
315+
@redis.zadd('build:heartbeat-timeout-test:running', current_time - 10, 'TestA#test_1')
316+
317+
# Wait a bit to ensure time has passed
318+
sleep 0.1
319+
320+
# Try to reserve with worker2 - should get the lost test now
321+
worker2_config = config.dup
322+
worker2_config.instance_variable_set(:@worker_id, '2')
323+
worker2 = CI::Queue::Redis.new(@redis_url, worker2_config)
324+
325+
lost_test = worker2.send(:try_to_reserve_lost_test)
326+
assert_equal 'TestA#test_1', lost_test, 'Test should be marked as lost after heartbeat expires (> 2 minutes)'
281327
end
282328

283329
def test_batching_with_many_chunks

ruby/test/ci/queue/redis/worker_chunk_test.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,9 +161,11 @@ def test_resolved_chunk_detects_flaky_tests
161161

162162
def test_acknowledge_chunk
163163
# Set up a chunk as if it were reserved (in running zset)
164+
# Format: "worker_queue_key|initial_reservation_time|last_heartbeat_time"
164165
chunk_id = 'TestA:chunk_0'
165-
@redis.zadd('build:42:running', Time.now.to_i, chunk_id)
166-
@redis.hset('build:42:owners', chunk_id, 'build:42:worker:1:queue')
166+
current_time = Time.now.to_f
167+
@redis.zadd('build:42:running', current_time.to_i, chunk_id)
168+
@redis.hset('build:42:owners', chunk_id, "build:42:worker:1:queue|#{current_time}|#{current_time}")
167169
@worker.instance_variable_set(:@reserved_test, chunk_id)
168170

169171
# Acknowledge the chunk

0 commit comments

Comments
 (0)