Skip to content

Commit ffe6865

Browse files
committed
wip
1 parent e297c05 commit ffe6865

File tree

15 files changed

+588
-92
lines changed

15 files changed

+588
-92
lines changed

ruby/lib/ci/queue.rb

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,6 @@ def requeueable?(test_result)
4242
requeueable.nil? || requeueable.call(test_result)
4343
end
4444

45-
def shuffle(tests, random, config: nil)
46-
if shuffler
47-
shuffler.call(tests, random)
48-
else
49-
strategy = get_strategy(config&.strategy)
50-
strategy.order_tests(tests, random: random, config: config)
51-
end
52-
end
53-
5445
def from_uri(url, config)
5546
uri = URI(url)
5647
implementation = case uri.scheme
@@ -66,18 +57,5 @@ def from_uri(url, config)
6657
end
6758
implementation.from_uri(uri, config)
6859
end
69-
70-
private
71-
72-
def get_strategy(strategy_name)
73-
case strategy_name&.to_sym
74-
when :timing_based
75-
Strategy::TimingBased.new
76-
when :suite_bin_packing
77-
Strategy::SuiteBinPacking.new
78-
else
79-
Strategy::Random.new
80-
end
81-
end
8260
end
8361
end

ruby/lib/ci/queue/common.rb

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,28 @@ def rescue_connection_errors(handler = ->(err) { nil })
3636
rescue *self::class::CONNECTION_ERRORS => err
3737
handler.call(err)
3838
end
39+
40+
def ordering_strategy
41+
case config.strategy.to_sym
42+
when :timing_based
43+
Strategy::TimingBased.new(config)
44+
when :suite_bin_packing
45+
# pass redis if available
46+
# need to think about a better way to structure queue/strategy interaction
47+
redis_instance = if self.respond_to?(:redis, true) # include private methods
48+
self.send(:redis)
49+
else
50+
nil
51+
end
52+
Strategy::SuiteBinPacking.new(config, redis: redis_instance)
53+
else
54+
Strategy::Random.new(config)
55+
end
56+
end
57+
58+
def reorder_tests(tests, random: Random.new)
59+
ordering_strategy.order_tests(tests, random: random)
60+
end
3961
end
4062
end
4163
end

ruby/lib/ci/queue/redis.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
require 'ci/queue/redis/supervisor'
1111
require 'ci/queue/redis/grind_supervisor'
1212
require 'ci/queue/redis/test_time_record'
13+
require 'ci/queue/redis/moving_average'
1314

1415
module CI
1516
module Queue
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# frozen_string_literal: true
2+
3+
module CI
4+
module Queue
5+
module Redis
6+
# Represents a redis hash of moving averages for test durations
7+
#
8+
# Moving average is calculated using exponential moving average formula
9+
class MovingAverage
10+
LUA_SCRIPT= <<~LUA
11+
local hash_key = KEYS[1]
12+
local test_id = ARGV[1]
13+
local new_duration = tonumber(ARGV[2])
14+
local smoothing = tonumber(ARGV[3])
15+
local current_avg = redis.call('HGET', hash_key, test_id)
16+
if current_avg then
17+
current_avg = tonumber(current_avg)
18+
local new_avg = smoothing * new_duration + (1 - smoothing) * current_avg
19+
redis.call('HSET', hash_key, test_id, new_avg)
20+
return tostring(new_avg)
21+
else
22+
redis.call('HSET', hash_key, test_id, new_duration)
23+
return tostring(new_duration)
24+
end
25+
LUA
26+
27+
def initialize(redis, key: "test_duration_moving_averages", smoothing_factor: 0.2)
28+
@redis = redis
29+
@key = key
30+
@smoothing_factor = smoothing_factor
31+
@values = {}
32+
end
33+
34+
def [](test_id)
35+
load_all if @values.empty?
36+
@values[test_id]
37+
end
38+
39+
def update(test_id, duration)
40+
new_avg = @redis.eval(LUA_SCRIPT, keys: [@key], argv: [test_id, duration, @smoothing_factor])
41+
@values[test_id] = new_avg.to_f
42+
new_avg.to_f
43+
end
44+
45+
def load_all
46+
batch_size = 1000
47+
cursor = '0'
48+
@values = {}
49+
50+
loop do
51+
cursor, batch = @redis.hscan(@key, cursor, count: batch_size)
52+
batch.each do |test_id, value|
53+
@values[test_id] = value.to_f
54+
end
55+
break if cursor == '0'
56+
end
57+
end
58+
59+
def size
60+
@redis.hlen(@key)
61+
end
62+
end
63+
end
64+
end
65+
end

ruby/lib/ci/queue/redis/test_time_record.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ module Redis
55
class TestTimeRecord < Worker
66
def record(test_name, duration)
77
record_test_time(test_name, duration)
8+
record_test_duration_moving_average(test_name, duration)
89
record_test_name(test_name)
10+
911
end
1012

1113
def fetch
@@ -29,6 +31,10 @@ def record_test_time(test_name, duration)
2931
nil
3032
end
3133

34+
def record_test_duration_moving_average(test_name, duration)
35+
MovingAverage.new(redis).update(test_name, duration)
36+
end
37+
3238
def record_test_name(test_name)
3339
redis.pipelined do |pipeline|
3440
pipeline.lpush(

ruby/lib/ci/queue/redis/worker.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def distributed?
3030

3131
def populate(tests, random: Random.new)
3232
@index = tests.map { |t| [t.id, t] }.to_h
33-
executables = Queue.shuffle(tests, random, config: config)
33+
executables = reorder_tests(tests, random:)
3434

3535
# Separate chunks from individual tests
3636
chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) }

ruby/lib/ci/queue/strategy/base.rb

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,16 @@ module CI
44
module Queue
55
module Strategy
66
class Base
7-
def order_tests(tests, random: Random.new, config: nil)
7+
def initialize(config)
8+
@config = config
9+
end
10+
11+
attr_reader :config
12+
13+
def order_tests(tests)
814
raise NotImplementedError, "#{self.class} must implement #order_tests"
915
end
1016
end
1117
end
1218
end
13-
end
19+
end

ruby/lib/ci/queue/strategy/random.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ module CI
55
module Queue
66
module Strategy
77
class Random < Base
8-
def order_tests(tests, random: Random.new, config: nil)
8+
def order_tests(tests, random: Random.new)
99
tests.sort.shuffle(random: random)
1010
end
1111
end
1212
end
1313
end
14-
end
14+
end

ruby/lib/ci/queue/strategy/suite_bin_packing.rb

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,36 @@ module CI
66
module Queue
77
module Strategy
88
class SuiteBinPacking < Base
9-
def order_tests(tests, random: Random.new, config: nil)
10-
timing_data = load_timing_data(config&.timing_file)
11-
max_duration = config&.suite_max_duration || 120_000
12-
fallback_duration = config&.timing_fallback_duration || 100.0
13-
buffer_percent = config&.suite_buffer_percent || 10
9+
class << self
10+
def load_timing_data(file_path)
11+
return {} unless file_path && ::File.exist?(file_path)
12+
13+
JSON.parse(::File.read(file_path))
14+
rescue JSON::ParserError => e
15+
warn "Warning: Could not parse timing file #{file_path}: #{e.message}"
16+
{}
17+
end
18+
end
19+
20+
def initialize(config, redis: nil)
21+
super(config)
22+
23+
if redis
24+
@moving_average = CI::Queue::Redis::MovingAverage.new(redis)
25+
end
26+
27+
if config&.timing_file
28+
@timing_data = self.class.load_timing_data(config.timing_file)
29+
else
30+
@timing_data = {}
31+
end
1432

33+
@max_duration = config&.suite_max_duration || 120_000
34+
@fallback_duration = config&.timing_fallback_duration || 100.0
35+
@buffer_percent = config&.suite_buffer_percent || 10
36+
end
37+
38+
def order_tests(tests, random: ::Random.new, redis: nil)
1539
# Group tests by suite name
1640
suites = tests.group_by { |test| extract_suite_name(test.id) }
1741

@@ -22,10 +46,6 @@ def order_tests(tests, random: Random.new, config: nil)
2246
create_chunks_for_suite(
2347
suite_name,
2448
suite_tests,
25-
max_duration,
26-
buffer_percent,
27-
timing_data,
28-
fallback_duration
2949
)
3050
)
3151
end
@@ -40,27 +60,27 @@ def extract_suite_name(test_id)
4060
test_id.split('#').first
4161
end
4262

43-
def load_timing_data(file_path)
44-
return {} unless file_path && ::File.exist?(file_path)
45-
46-
JSON.parse(::File.read(file_path))
47-
rescue JSON::ParserError => e
48-
warn "Warning: Could not parse timing file #{file_path}: #{e.message}"
49-
{}
50-
end
63+
def get_test_duration(test_id)
64+
if @moving_average
65+
avg = @moving_average[test_id]
66+
return avg if avg
67+
end
5168

52-
def get_test_duration(test_id, timing_data, fallback_duration)
53-
timing_data[test_id]&.to_f || fallback_duration
69+
if @timing_data.key?(test_id)
70+
@timing_data[test_id]
71+
else
72+
@fallback_duration
73+
end
5474
end
5575

56-
def create_chunks_for_suite(suite_name, suite_tests, max_duration, buffer_percent, timing_data, fallback_duration)
76+
def create_chunks_for_suite(suite_name, suite_tests)
5777
# Calculate total suite duration
5878
total_duration = suite_tests.sum do |test|
59-
get_test_duration(test.id, timing_data, fallback_duration)
79+
get_test_duration(test.id)
6080
end
6181

6282
# If suite fits in max duration, create full_suite chunk
63-
if total_duration <= max_duration
83+
if total_duration <= @max_duration
6484
chunk_id = "#{suite_name}:full_suite"
6585
# Don't store test_ids in Redis - worker will resolve from index
6686
# But pass test_count for timeout calculation
@@ -71,20 +91,16 @@ def create_chunks_for_suite(suite_name, suite_tests, max_duration, buffer_percen
7191
split_suite_into_chunks(
7292
suite_name,
7393
suite_tests,
74-
max_duration,
75-
buffer_percent,
76-
timing_data,
77-
fallback_duration
7894
)
7995
end
8096

81-
def split_suite_into_chunks(suite_name, suite_tests, max_duration, buffer_percent, timing_data, fallback_duration)
97+
def split_suite_into_chunks(suite_name, suite_tests)
8298
# Apply buffer to max duration
83-
effective_max = max_duration * (1 - buffer_percent / 100.0)
99+
effective_max = @max_duration * (1 - @buffer_percent / 100.0)
84100

85101
# Sort tests by duration (longest first for better bin packing)
86102
sorted_tests = suite_tests.sort_by do |test|
87-
-get_test_duration(test.id, timing_data, fallback_duration)
103+
-get_test_duration(test.id)
88104
end
89105

90106
# First-fit decreasing bin packing
@@ -94,7 +110,7 @@ def split_suite_into_chunks(suite_name, suite_tests, max_duration, buffer_percen
94110
chunk_index = 0
95111

96112
sorted_tests.each do |test|
97-
test_duration = get_test_duration(test.id, timing_data, fallback_duration)
113+
test_duration = get_test_duration(test.id)
98114

99115
if current_chunk_duration + test_duration > effective_max && current_chunk_tests.any?
100116
# Finalize current chunk and start new one

ruby/lib/ci/queue/strategy/timing_based.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ module CI
66
module Queue
77
module Strategy
88
class TimingBased < Base
9-
def order_tests(tests, random: Random.new, config: nil)
9+
def order_tests(tests, random: Random.new)
1010
timing_data = load_timing_data(config&.timing_file)
1111
fallback_duration = config&.timing_fallback_duration || 100.0
1212

@@ -20,7 +20,7 @@ def order_tests(tests, random: Random.new, config: nil)
2020

2121
def load_timing_data(file_path)
2222
return {} unless file_path && ::File.exist?(file_path)
23-
23+
2424
JSON.parse(::File.read(file_path))
2525
rescue JSON::ParserError => e
2626
warn "Warning: Could not parse timing file #{file_path}: #{e.message}"
@@ -32,4 +32,4 @@ def load_timing_data(file_path)
3232
end
3333
end
3434
end
35-
end
35+
end

0 commit comments

Comments
 (0)