Skip to content

Commit 113b441

Browse files
committed
[CI] heart beating ongoing chunk
1 parent dbe8b01 commit 113b441

File tree

8 files changed

+139
-14
lines changed

8 files changed

+139
-14
lines changed

redis/heartbeat.lua

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,44 @@ local processed_key = KEYS[2]
33
local owners_key = KEYS[3]
44
local worker_queue_key = KEYS[4]
55

6-
local current_time = ARGV[1]
6+
local current_time = tonumber(ARGV[1])
77
local test = ARGV[2]
88

99
-- already processed, we do not need to bump the timestamp
1010
if redis.call('sismember', processed_key, test) == 1 then
1111
return false
1212
end
1313

14-
-- we're still the owner of the test, we can bump the timestamp
15-
if redis.call('hget', owners_key, test) == worker_queue_key then
16-
return redis.call('zadd', zset_key, current_time, test)
14+
-- we're still the owner of the test, check if we need to extend the deadline
15+
local owner_value = redis.call('hget', owners_key, test)
16+
if owner_value then
17+
-- Parse owner value: format is "worker_queue_key|heartbeat_timestamp"
18+
local pipe_pos = string.find(owner_value, "|")
19+
if not pipe_pos then
20+
return false
21+
end
22+
local stored_worker_key = string.sub(owner_value, 1, pipe_pos - 1)
23+
24+
if stored_worker_key == worker_queue_key then
25+
-- Always update last heartbeat timestamp in owners hash
26+
local new_owner_value = worker_queue_key .. "|" .. current_time
27+
redis.call('hset', owners_key, test, new_owner_value)
28+
29+
local deadline = redis.call('zscore', zset_key, test)
30+
if deadline then
31+
deadline = tonumber(deadline)
32+
-- Only extend if deadline is within 1 minute of expiring
33+
if deadline - 60 < current_time then
34+
-- Extend by 2 minutes
35+
local new_deadline = current_time + 120
36+
redis.call('zadd', zset_key, new_deadline, test)
37+
-- Return old deadline and new deadline
38+
return {deadline, new_deadline}
39+
end
40+
end
41+
-- No extension needed, but heartbeat was recorded
42+
return 0
43+
end
1744
end
45+
46+
return false

redis/release.lua

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,19 @@ local zset_key = KEYS[1]
22
local worker_queue_key = KEYS[2]
33
local owners_key = KEYS[3]
44

5-
-- owned_tests = {"SomeTest", "worker:1", "SomeOtherTest", "worker:2", ...}
5+
-- owned_tests = {"SomeTest", "worker:1|1234567890", "SomeOtherTest", "worker:2|1234567891", ...}
66
local owned_tests = redis.call('hgetall', owners_key)
77
for index, owner_or_test in ipairs(owned_tests) do
8-
if owner_or_test == worker_queue_key then -- If we owned a test
9-
local test = owned_tests[index - 1]
10-
redis.call('zadd', zset_key, "0", test) -- We expire the lease immediately
11-
return nil
8+
-- Parse owner value: format is "worker_queue_key|heartbeat_timestamp"
9+
local pipe_pos = string.find(owner_or_test, "|")
10+
if pipe_pos then
11+
local stored_worker_key = string.sub(owner_or_test, 1, pipe_pos - 1)
12+
13+
if stored_worker_key == worker_queue_key then -- If we owned a test
14+
local test = owned_tests[index - 1]
15+
redis.call('zadd', zset_key, "0", test) -- We expire the lease immediately
16+
return nil
17+
end
1218
end
1319
end
1420

redis/requeue.lua

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,17 @@ local global_max_requeues = tonumber(ARGV[2])
1010
local test = ARGV[3]
1111
local offset = ARGV[4]
1212

13-
if redis.call('hget', owners_key, test) == worker_queue_key then
14-
redis.call('hdel', owners_key, test)
13+
local owner_value = redis.call('hget', owners_key, test)
14+
if owner_value then
15+
-- Parse owner value: format is "worker_queue_key|heartbeat_timestamp"
16+
local pipe_pos = string.find(owner_value, "|")
17+
if pipe_pos then
18+
local stored_worker_key = string.sub(owner_value, 1, pipe_pos - 1)
19+
20+
if stored_worker_key == worker_queue_key then
21+
redis.call('hdel', owners_key, test)
22+
end
23+
end
1524
end
1625

1726
if redis.call('sismember', processed_key, test) == 1 then

redis/reserve.lua

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ if test then
2323
redis.call('zadd', zset_key, current_time, test)
2424
end
2525
redis.call('lpush', worker_queue_key, test)
26-
redis.call('hset', owners_key, test, worker_queue_key)
26+
-- Store owner with initial heartbeat timestamp (current_time)
27+
local owner_value = worker_queue_key .. "|" .. current_time
28+
redis.call('hset', owners_key, test, owner_value)
2729
return test
2830
else
2931
return nil

redis/reserve_lost.lua

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,24 @@ end
1818

1919
for _, test in ipairs(lost_tests) do
2020
if redis.call('sismember', processed_key, test) == 0 then
21+
-- Check if test has been heartbeated recently (within 2 minutes)
22+
local owner_value = redis.call('hget', owners_key, test)
23+
if owner_value then
24+
-- Parse owner value: format is "worker_queue_key|heartbeat_timestamp"
25+
local pipe_pos = string.find(owner_value, "|")
26+
if pipe_pos then
27+
local last_heartbeat_str = string.sub(owner_value, pipe_pos + 1)
28+
local last_heartbeat = tonumber(last_heartbeat_str)
29+
if last_heartbeat then
30+
local heartbeat_age = current_time - last_heartbeat
31+
-- If heartbeat is less than 2 minutes old, skip this test
32+
if heartbeat_age < 120 then
33+
goto continue
34+
end
35+
end
36+
end
37+
end
38+
2139
if use_dynamic_deadline then
2240
local dynamic_timeout = redis.call('hget', test_group_timeout_key, test)
2341
if not dynamic_timeout or dynamic_timeout == "" then
@@ -30,8 +48,11 @@ for _, test in ipairs(lost_tests) do
3048
redis.call('zadd', zset_key, current_time + timeout, test)
3149
end
3250
redis.call('lpush', worker_queue_key, test)
33-
redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership
51+
-- Store owner with initial heartbeat timestamp (current_time)
52+
local new_owner_value = worker_queue_key .. "|" .. current_time
53+
redis.call('hset', owners_key, test, new_owner_value) -- Take ownership
3454
return test
55+
::continue::
3556
end
3657
end
3758

ruby/lib/ci/queue/redis/base.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,14 @@ def load_script(script)
122122
@scripts_cache[script] ||= redis.script(:load, read_script(script))
123123
end
124124

125+
def ensure_connection_and_script(script)
126+
# Pre-initialize Redis connection and script in current thread context
127+
# This ensures background threads use the same initialized connection
128+
load_script(script)
129+
# Ping Redis to ensure connection is established
130+
redis.ping
131+
end
132+
125133
def read_script(name)
126134
::File.read(::File.join(CI::Queue::DEV_SCRIPTS_ROOT, "#{name}.lua"))
127135
rescue SystemCallError

ruby/lib/ci/queue/redis/worker.rb

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,32 @@ def poll
8787
executable = resolve_executable(id)
8888

8989
if executable
90-
yield executable
90+
# Pre-initialize Redis connection and heartbeat script in main thread
91+
# to ensure background thread uses the same initialized context
92+
ensure_connection_and_script(:heartbeat)
93+
94+
# Start heartbeat thread to extend lease while executing
95+
heartbeat_thread = nil
96+
begin
97+
heartbeat_thread = Thread.new do
98+
heartbeat_interval = 10 # Send heartbeat every 10 seconds
99+
loop do
100+
sleep heartbeat_interval
101+
break if Thread.current[:stop]
102+
103+
heartbeat(id)
104+
end
105+
end
106+
heartbeat_thread[:stop] = false
107+
108+
yield executable
109+
ensure
110+
# Stop heartbeat thread when execution completes
111+
if heartbeat_thread
112+
heartbeat_thread[:stop] = true
113+
heartbeat_thread.join(1) # Wait up to 1 second for thread to finish
114+
end
115+
end
91116
else
92117
warn("Warning: Could not resolve executable for ID #{id.inspect}. Acknowledging to remove from queue.")
93118
acknowledge(id)
@@ -209,6 +234,31 @@ def release!
209234
nil
210235
end
211236

237+
def heartbeat(test_id)
238+
current_time = CI::Queue.time_now.to_f
239+
result = eval_script(
240+
:heartbeat,
241+
keys: [
242+
key('running'),
243+
key('processed'),
244+
key('owners'),
245+
key('worker', worker_id, 'queue')
246+
],
247+
argv: [current_time, test_id]
248+
)
249+
if result.is_a?(Array) && result.size == 2
250+
old_deadline = result[0]
251+
new_deadline = result[1]
252+
old_deadline_readable = Time.at(old_deadline).strftime('%Y-%m-%d %H:%M:%S')
253+
new_deadline_readable = Time.at(new_deadline).strftime('%Y-%m-%d %H:%M:%S')
254+
warn("[heartbeat] Extended deadline for #{test_id.inspect} from #{old_deadline_readable} (#{old_deadline}) to #{new_deadline_readable} (#{new_deadline})")
255+
end
256+
result
257+
rescue *CONNECTION_ERRORS => e
258+
warn("Failed to send heartbeat for #{test_id.inspect}: #{e.class} - #{e.message}")
259+
false
260+
end
261+
212262
private
213263

214264
attr_reader :index

ruby/test/ci/queue/.DS_Store

8 KB
Binary file not shown.

0 commit comments

Comments
 (0)