Skip to content

Commit 5a677a9

Browse files
committed
[CI] heart beating ongoing chunk
1 parent dbe8b01 commit 5a677a9

File tree

4 files changed

+78
-4
lines changed

4 files changed

+78
-4
lines changed

redis/heartbeat.lua

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,30 @@ local processed_key = KEYS[2]
33
local owners_key = KEYS[3]
44
local worker_queue_key = KEYS[4]
55

6-
local current_time = ARGV[1]
6+
local current_time = tonumber(ARGV[1])
77
local test = ARGV[2]
88

99
-- already processed, we do not need to bump the timestamp
1010
if redis.call('sismember', processed_key, test) == 1 then
1111
return false
1212
end
1313

14-
-- we're still the owner of the test, we can bump the timestamp
14+
-- we're still the owner of the test, check if we need to extend the deadline
1515
if redis.call('hget', owners_key, test) == worker_queue_key then
16-
return redis.call('zadd', zset_key, current_time, test)
16+
local deadline = redis.call('zscore', zset_key, test)
17+
if deadline then
18+
deadline = tonumber(deadline)
19+
-- Only extend if deadline is within 1 minute of expiring
20+
if deadline - 60 < current_time then
21+
-- Extend by 2 minutes
22+
local new_deadline = current_time + 120
23+
redis.call('zadd', zset_key, new_deadline, test)
24+
-- Return old deadline and new deadline
25+
return {deadline, new_deadline}
26+
end
27+
end
28+
-- No extension needed
29+
return 0
1730
end
31+
32+
return false

ruby/lib/ci/queue/redis/base.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,14 @@ def load_script(script)
122122
@scripts_cache[script] ||= redis.script(:load, read_script(script))
123123
end
124124

125+
def ensure_connection_and_script(script)
126+
# Pre-initialize Redis connection and script in current thread context
127+
# This ensures background threads use the same initialized connection
128+
load_script(script)
129+
# Ping Redis to ensure connection is established
130+
redis.ping
131+
end
132+
125133
def read_script(name)
126134
::File.read(::File.join(CI::Queue::DEV_SCRIPTS_ROOT, "#{name}.lua"))
127135
rescue SystemCallError

ruby/lib/ci/queue/redis/worker.rb

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,32 @@ def poll
8787
executable = resolve_executable(id)
8888

8989
if executable
90-
yield executable
90+
# Pre-initialize Redis connection and heartbeat script in main thread
91+
# to ensure background thread uses the same initialized context
92+
ensure_connection_and_script(:heartbeat)
93+
94+
# Start heartbeat thread to extend lease while executing
95+
heartbeat_thread = nil
96+
begin
97+
heartbeat_thread = Thread.new do
98+
heartbeat_interval = 10 # Send heartbeat every 10 seconds
99+
loop do
100+
sleep heartbeat_interval
101+
break if Thread.current[:stop]
102+
103+
heartbeat(id)
104+
end
105+
end
106+
heartbeat_thread[:stop] = false
107+
108+
yield executable
109+
ensure
110+
# Stop heartbeat thread when execution completes
111+
if heartbeat_thread
112+
heartbeat_thread[:stop] = true
113+
heartbeat_thread.join(1) # Wait up to 1 second for thread to finish
114+
end
115+
end
91116
else
92117
warn("Warning: Could not resolve executable for ID #{id.inspect}. Acknowledging to remove from queue.")
93118
acknowledge(id)
@@ -209,6 +234,32 @@ def release!
209234
nil
210235
end
211236

237+
def heartbeat(test_id)
238+
current_time = CI::Queue.time_now.to_f
239+
result = eval_script(
240+
:heartbeat,
241+
keys: [
242+
key('running'),
243+
key('processed'),
244+
key('owners'),
245+
key('worker', worker_id, 'queue')
246+
],
247+
argv: [current_time, test_id]
248+
)
249+
if result.is_a?(Array) && result.size == 2
250+
old_deadline = result[0]
251+
new_deadline = result[1]
252+
old_deadline_readable = Time.at(old_deadline).strftime('%Y-%m-%d %H:%M:%S')
253+
new_deadline_readable = Time.at(new_deadline).strftime('%Y-%m-%d %H:%M:%S')
254+
warn("[heartbeat] Extended deadline for #{test_id.inspect} from #{old_deadline_readable} (#{old_deadline}) to #{new_deadline_readable} (#{new_deadline})")
255+
end
256+
result
257+
rescue StandardError => e
258+
warn("Failed to send heartbeat for #{test_id.inspect}: #{e.class} - #{e.message}")
259+
warn(e.backtrace.join("\n")) if e.backtrace
260+
false
261+
end
262+
212263
private
213264

214265
attr_reader :index

ruby/test/ci/queue/.DS_Store

8 KB
Binary file not shown.

0 commit comments

Comments
 (0)