Skip to content

Commit 3587a0b

Browse files
committed
[CI] heart beating ongoing chunk
1 parent dbe8b01 commit 3587a0b

File tree

9 files changed

+198
-27
lines changed

9 files changed

+198
-27
lines changed

redis/heartbeat.lua

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,44 @@ local processed_key = KEYS[2]
33
local owners_key = KEYS[3]
44
local worker_queue_key = KEYS[4]
55

6-
local current_time = ARGV[1]
6+
local current_time = tonumber(ARGV[1])
77
local test = ARGV[2]
88

99
-- already processed, we do not need to bump the timestamp
1010
if redis.call('sismember', processed_key, test) == 1 then
1111
return false
1212
end
1313

14-
-- we're still the owner of the test, we can bump the timestamp
15-
if redis.call('hget', owners_key, test) == worker_queue_key then
16-
return redis.call('zadd', zset_key, current_time, test)
14+
-- we're still the owner of the test, check if we need to extend the deadline
15+
local owner_value = redis.call('hget', owners_key, test)
16+
if owner_value then
17+
-- Parse owner value: format is "worker_queue_key|heartbeat_timestamp"
18+
local pipe_pos = string.find(owner_value, "|")
19+
if not pipe_pos then
20+
return false
21+
end
22+
local stored_worker_key = string.sub(owner_value, 1, pipe_pos - 1)
23+
24+
if stored_worker_key == worker_queue_key then
25+
-- Always update last heartbeat timestamp in owners hash
26+
local new_owner_value = worker_queue_key .. "|" .. current_time
27+
redis.call('hset', owners_key, test, new_owner_value)
28+
29+
local deadline = redis.call('zscore', zset_key, test)
30+
if deadline then
31+
deadline = tonumber(deadline)
32+
-- Only extend if deadline is within 1 minute of expiring
33+
if deadline - 60 < current_time then
34+
-- Extend by 2 minutes
35+
local new_deadline = current_time + 120
36+
redis.call('zadd', zset_key, new_deadline, test)
37+
-- Return old deadline and new deadline
38+
return {deadline, new_deadline}
39+
end
40+
end
41+
-- No extension needed, but heartbeat was recorded
42+
return 0
43+
end
1744
end
45+
46+
return false

redis/release.lua

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,19 @@ local zset_key = KEYS[1]
22
local worker_queue_key = KEYS[2]
33
local owners_key = KEYS[3]
44

5-
-- owned_tests = {"SomeTest", "worker:1", "SomeOtherTest", "worker:2", ...}
5+
-- owned_tests = {"SomeTest", "worker:1|1234567890", "SomeOtherTest", "worker:2|1234567891", ...}
66
local owned_tests = redis.call('hgetall', owners_key)
77
for index, owner_or_test in ipairs(owned_tests) do
8-
if owner_or_test == worker_queue_key then -- If we owned a test
9-
local test = owned_tests[index - 1]
10-
redis.call('zadd', zset_key, "0", test) -- We expire the lease immediately
11-
return nil
8+
-- Parse owner value: format is "worker_queue_key|heartbeat_timestamp"
9+
local pipe_pos = string.find(owner_or_test, "|")
10+
if pipe_pos then
11+
local stored_worker_key = string.sub(owner_or_test, 1, pipe_pos - 1)
12+
13+
if stored_worker_key == worker_queue_key then -- If we owned a test
14+
local test = owned_tests[index - 1]
15+
redis.call('zadd', zset_key, "0", test) -- We expire the lease immediately
16+
return nil
17+
end
1218
end
1319
end
1420

redis/requeue.lua

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,17 @@ local global_max_requeues = tonumber(ARGV[2])
1010
local test = ARGV[3]
1111
local offset = ARGV[4]
1212

13-
if redis.call('hget', owners_key, test) == worker_queue_key then
14-
redis.call('hdel', owners_key, test)
13+
local owner_value = redis.call('hget', owners_key, test)
14+
if owner_value then
15+
-- Parse owner value: format is "worker_queue_key|heartbeat_timestamp"
16+
local pipe_pos = string.find(owner_value, "|")
17+
if pipe_pos then
18+
local stored_worker_key = string.sub(owner_value, 1, pipe_pos - 1)
19+
20+
if stored_worker_key == worker_queue_key then
21+
redis.call('hdel', owners_key, test)
22+
end
23+
end
1524
end
1625

1726
if redis.call('sismember', processed_key, test) == 1 then

redis/reserve.lua

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ if test then
2323
redis.call('zadd', zset_key, current_time, test)
2424
end
2525
redis.call('lpush', worker_queue_key, test)
26-
redis.call('hset', owners_key, test, worker_queue_key)
26+
-- Store owner with initial heartbeat timestamp (current_time)
27+
local owner_value = worker_queue_key .. "|" .. current_time
28+
redis.call('hset', owners_key, test, owner_value)
2729
return test
2830
else
2931
return nil

redis/reserve_lost.lua

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,43 @@ end
1818

1919
for _, test in ipairs(lost_tests) do
2020
if redis.call('sismember', processed_key, test) == 0 then
21-
if use_dynamic_deadline then
22-
local dynamic_timeout = redis.call('hget', test_group_timeout_key, test)
23-
if not dynamic_timeout or dynamic_timeout == "" then
24-
dynamic_timeout = default_timeout
21+
-- Check if test has been heartbeated recently (within 2 minutes)
22+
local should_skip = false
23+
local owner_value = redis.call('hget', owners_key, test)
24+
if owner_value then
25+
-- Parse owner value: format is "worker_queue_key|heartbeat_timestamp"
26+
local pipe_pos = string.find(owner_value, "|")
27+
if pipe_pos then
28+
local last_heartbeat_str = string.sub(owner_value, pipe_pos + 1)
29+
local last_heartbeat = tonumber(last_heartbeat_str)
30+
if last_heartbeat then
31+
local heartbeat_age = current_time - last_heartbeat
32+
-- If heartbeat is less than 2 minutes old, skip this test
33+
if heartbeat_age < 120 then
34+
should_skip = true
35+
end
36+
end
37+
end
38+
end
39+
40+
if not should_skip then
41+
if use_dynamic_deadline then
42+
local dynamic_timeout = redis.call('hget', test_group_timeout_key, test)
43+
if not dynamic_timeout or dynamic_timeout == "" then
44+
dynamic_timeout = default_timeout
45+
else
46+
dynamic_timeout = tonumber(dynamic_timeout)
47+
end
48+
redis.call('zadd', zset_key, current_time + dynamic_timeout, test)
2549
else
26-
dynamic_timeout = tonumber(dynamic_timeout)
50+
redis.call('zadd', zset_key, current_time + timeout, test)
2751
end
28-
redis.call('zadd', zset_key, current_time + dynamic_timeout, test)
29-
else
30-
redis.call('zadd', zset_key, current_time + timeout, test)
52+
redis.call('lpush', worker_queue_key, test)
53+
-- Store owner with initial heartbeat timestamp (current_time)
54+
local new_owner_value = worker_queue_key .. "|" .. current_time
55+
redis.call('hset', owners_key, test, new_owner_value) -- Take ownership
56+
return test
3157
end
32-
redis.call('lpush', worker_queue_key, test)
33-
redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership
34-
return test
3558
end
3659
end
3760

ruby/lib/ci/queue/redis/base.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,14 @@ def load_script(script)
122122
@scripts_cache[script] ||= redis.script(:load, read_script(script))
123123
end
124124

125+
def ensure_connection_and_script(script)
126+
# Pre-initialize Redis connection and script in current thread context
127+
# This ensures background threads use the same initialized connection
128+
load_script(script)
129+
# Ping Redis to ensure connection is established
130+
redis.ping
131+
end
132+
125133
def read_script(name)
126134
::File.read(::File.join(CI::Queue::DEV_SCRIPTS_ROOT, "#{name}.lua"))
127135
rescue SystemCallError

ruby/lib/ci/queue/redis/worker.rb

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,32 @@ def poll
8787
executable = resolve_executable(id)
8888

8989
if executable
90-
yield executable
90+
# Pre-initialize Redis connection and heartbeat script in main thread
91+
# to ensure background thread uses the same initialized context
92+
ensure_connection_and_script(:heartbeat)
93+
94+
# Start heartbeat thread to extend lease while executing
95+
heartbeat_thread = nil
96+
begin
97+
heartbeat_thread = Thread.new do
98+
heartbeat_interval = 10 # Send heartbeat every 10 seconds
99+
loop do
100+
sleep heartbeat_interval
101+
break if Thread.current[:stop]
102+
103+
heartbeat(id)
104+
end
105+
end
106+
heartbeat_thread[:stop] = false
107+
108+
yield executable
109+
ensure
110+
# Stop heartbeat thread when execution completes
111+
if heartbeat_thread
112+
heartbeat_thread[:stop] = true
113+
heartbeat_thread.join(1) # Wait up to 1 second for thread to finish
114+
end
115+
end
91116
else
92117
warn("Warning: Could not resolve executable for ID #{id.inspect}. Acknowledging to remove from queue.")
93118
acknowledge(id)
@@ -209,6 +234,31 @@ def release!
209234
nil
210235
end
211236

237+
def heartbeat(test_id)
238+
current_time = CI::Queue.time_now.to_f
239+
result = eval_script(
240+
:heartbeat,
241+
keys: [
242+
key('running'),
243+
key('processed'),
244+
key('owners'),
245+
key('worker', worker_id, 'queue')
246+
],
247+
argv: [current_time, test_id]
248+
)
249+
if result.is_a?(Array) && result.size == 2
250+
old_deadline = result[0]
251+
new_deadline = result[1]
252+
old_deadline_readable = Time.at(old_deadline).strftime('%Y-%m-%d %H:%M:%S')
253+
new_deadline_readable = Time.at(new_deadline).strftime('%Y-%m-%d %H:%M:%S')
254+
warn("[heartbeat] Extended deadline for #{test_id.inspect} from #{old_deadline_readable} (#{old_deadline}) to #{new_deadline_readable} (#{new_deadline})")
255+
end
256+
result
257+
rescue *CONNECTION_ERRORS => e
258+
warn("Failed to send heartbeat for #{test_id.inspect}: #{e.class} - #{e.message}")
259+
false
260+
end
261+
212262
private
213263

214264
attr_reader :index

ruby/test/ci/queue/.DS_Store

8 KB
Binary file not shown.

ruby/test/ci/queue/redis/dynamic_timeout_test.rb

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,16 +268,60 @@ def test_single_test_marked_lost_after_default_timeout
268268
reserved_id = worker1.send(:try_to_reserve_test)
269269
assert_equal 'TestA#test_1', reserved_id
270270

271-
# Wait longer than timeout (0.5s)
271+
# Wait longer than timeout (0.5s) but less than 2 minutes
272272
sleep 0.6
273273

274-
# Try to reserve with worker2 - should get the lost test
274+
# Try to reserve with worker2 - should NOT get the test because heartbeat is recent (< 2 minutes)
275275
worker2_config = config.dup
276276
worker2_config.instance_variable_set(:@worker_id, '2')
277277
worker2 = CI::Queue::Redis.new(@redis_url, worker2_config)
278278

279279
lost_test = worker2.send(:try_to_reserve_lost_test)
280-
assert_equal 'TestA#test_1', lost_test, 'Single test should be marked as lost after default timeout'
280+
assert_nil lost_test, 'Test should not be marked as lost if heartbeat is recent (< 2 minutes)'
281+
end
282+
283+
def test_single_test_marked_lost_after_heartbeat_expires
284+
# Create worker with short timeout
285+
config = CI::Queue::Configuration.new(
286+
build_id: 'heartbeat-timeout-test',
287+
worker_id: '1',
288+
timeout: 0.5 # 0.5 seconds
289+
)
290+
291+
worker1 = CI::Queue::Redis.new(@redis_url, config)
292+
293+
# Populate with single test (no chunk)
294+
tests = create_mock_tests(['TestA#test_1'])
295+
296+
worker1.stub(:reorder_tests, tests) do
297+
worker1.populate(tests)
298+
end
299+
300+
# Reserve the test with worker1
301+
reserved_id = worker1.send(:try_to_reserve_test)
302+
assert_equal 'TestA#test_1', reserved_id
303+
304+
# Manually set the heartbeat timestamp to be older than 2 minutes
305+
# by manipulating Redis directly
306+
current_time = CI::Queue.time_now.to_f
307+
old_heartbeat_time = current_time - 130 # 130 seconds ago (more than 2 minutes)
308+
worker_queue_key = "build:heartbeat-timeout-test:worker:1:queue"
309+
owner_value = "#{worker_queue_key}|#{old_heartbeat_time}"
310+
@redis.hset('build:heartbeat-timeout-test:owners', 'TestA#test_1', owner_value)
311+
312+
# Also set the deadline to be in the past so it's considered "lost"
313+
@redis.zadd('build:heartbeat-timeout-test:running', current_time - 10, 'TestA#test_1')
314+
315+
# Wait a bit to ensure time has passed
316+
sleep 0.1
317+
318+
# Try to reserve with worker2 - should get the lost test now
319+
worker2_config = config.dup
320+
worker2_config.instance_variable_set(:@worker_id, '2')
321+
worker2 = CI::Queue::Redis.new(@redis_url, worker2_config)
322+
323+
lost_test = worker2.send(:try_to_reserve_lost_test)
324+
assert_equal 'TestA#test_1', lost_test, 'Test should be marked as lost after heartbeat expires (> 2 minutes)'
281325
end
282326

283327
def test_batching_with_many_chunks

0 commit comments

Comments
 (0)