4
4
5
5
class TestAgainstClusterDown < TestingWrapper
6
6
WAIT_SEC = 0.1
7
+ NUMBER_OF_JOBS = 5
7
8
8
9
def setup
9
10
@captured_commands = ::Middlewares ::CommandCapture ::CommandBuffer . new
10
11
@redirect_count = ::Middlewares ::RedirectCount ::Counter . new
11
- @clients = Array . new ( 5 ) { build_client }
12
+ @clients = Array . new ( NUMBER_OF_JOBS ) { build_client }
12
13
@threads = [ ]
13
14
@controller = nil
14
15
@cluster_down_counter = Counter . new
15
- @pubsub_recorder = Recorder . new
16
+ @recorders = Array . new ( NUMBER_OF_JOBS ) { Recorder . new }
16
17
@captured_commands . clear
17
18
@redirect_count . clear
18
19
end
@@ -27,11 +28,12 @@ def teardown
27
28
end
28
29
29
30
def test_recoverability_from_cluster_down
30
- @threads << spawn_single ( @clients [ 0 ] )
31
- @threads << spawn_pipeline ( @clients [ 1 ] )
32
- @threads << spawn_transaction ( @clients [ 2 ] )
33
- @threads << spawn_subscriber ( @clients [ 3 ] )
34
- @threads << spawn_publisher ( @clients [ 4 ] )
31
+ cases = %w[ Single Pipeline Transaction Subscriber Publisher ]
32
+ @threads << spawn_single ( @clients [ 0 ] , @recorders [ 0 ] )
33
+ @threads << spawn_pipeline ( @clients [ 1 ] , @recorders [ 1 ] )
34
+ @threads << spawn_transaction ( @clients [ 2 ] , @recorders [ 2 ] )
35
+ @threads << spawn_subscriber ( @clients [ 3 ] , @recorders [ 3 ] )
36
+ @threads << spawn_publisher ( @clients [ 4 ] , @recorders [ 4 ] )
35
37
wait_for_jobs_to_be_stable
36
38
37
39
system ( 'docker compose --progress quiet down' , exception : true )
@@ -44,25 +46,12 @@ def test_recoverability_from_cluster_down
44
46
refute ( @cluster_down_counter . get . zero? , 'Case: cluster down count' )
45
47
refute ( @captured_commands . count ( 'cluster' , 'nodes' ) . zero? , 'Case: cluster nodes calls' )
46
48
47
- client = build_client ( custom : nil , middlewares : nil )
48
- @clients << client
49
-
50
- single_value1 = client . call ( 'get' , 'single' , &:to_i )
51
- pipeline_value1 = client . call ( 'get' , 'pipeline' , &:to_i )
52
- transaction_value1 = client . call ( 'get' , 'transaction' , &:to_i )
53
- pubsub_message1 = @pubsub_recorder . get . to_i
54
-
49
+ @values_a = @recorders . map { |r | r . get . to_i }
55
50
wait_for_jobs_to_be_stable
56
-
57
- single_value2 = client . call ( 'get' , 'single' , &:to_i )
58
- pipeline_value2 = client . call ( 'get' , 'pipeline' , &:to_i )
59
- transaction_value2 = client . call ( 'get' , 'transaction' , &:to_i )
60
- pubsub_message2 = @pubsub_recorder . get . to_i
61
-
62
- assert ( single_value1 < single_value2 , "Single: #{ single_value1 } < #{ single_value2 } " )
63
- assert ( pipeline_value1 < pipeline_value2 , "Pipeline: #{ pipeline_value1 } < #{ pipeline_value2 } " )
64
- assert ( transaction_value1 < transaction_value2 , "Transaction: #{ transaction_value1 } < #{ transaction_value2 } " )
65
- assert ( pubsub_message1 < pubsub_message2 , "PubSub: #{ pubsub_message1 } < #{ pubsub_message2 } " )
51
+ @values_b = @recorders . map { |r | r . get . to_i }
52
+ @recorders . each_with_index do |_ , i |
53
+ assert ( @values_a [ i ] < @values_b [ i ] , "#{ cases [ i ] } : #{ @values_a [ i ] } < #{ @values_b [ i ] } " )
54
+ end
66
55
end
67
56
68
57
private
@@ -91,57 +80,63 @@ def build_controller
91
80
)
92
81
end
93
82
94
- def spawn_single ( cli )
95
- Thread . new ( cli ) do |r |
83
+ def spawn_single ( cli , rec )
84
+ Thread . new ( cli , rec ) do |c , r |
96
85
loop do
97
86
handle_errors do
98
- r . call ( 'incr' , 'single' )
99
- r . call ( 'incr' , 'single' )
87
+ c . call ( 'incr' , 'single' )
88
+ reply = c . call ( 'incr' , 'single' )
89
+ r . set ( reply )
100
90
end
101
91
ensure
102
92
sleep WAIT_SEC
103
93
end
104
94
end
105
95
end
106
96
107
- def spawn_pipeline ( cli )
108
- Thread . new ( cli ) do |r |
97
+ def spawn_pipeline ( cli , rec )
98
+ Thread . new ( cli , rec ) do |c , r |
109
99
loop do
110
100
handle_errors do
111
- r . pipelined do |pi |
101
+ reply = c . pipelined do |pi |
112
102
pi . call ( 'incr' , 'pipeline' )
113
103
pi . call ( 'incr' , 'pipeline' )
114
104
end
105
+
106
+ r . set ( reply [ 1 ] )
115
107
end
116
108
ensure
117
109
sleep WAIT_SEC
118
110
end
119
111
end
120
112
end
121
113
122
- def spawn_transaction ( cli )
123
- Thread . new ( cli ) do |r |
114
+ def spawn_transaction ( cli , rec )
115
+ Thread . new ( cli , rec ) do |c , r |
124
116
i = 0
125
117
loop do
126
118
handle_errors do
127
- r . multi ( watch : i . odd? ? %w[ transaction ] : nil ) do |tx |
119
+ reply = c . multi ( watch : i . odd? ? %w[ transaction ] : nil ) do |tx |
128
120
i += 1
129
121
tx . call ( 'incr' , 'transaction' )
130
122
tx . call ( 'incr' , 'transaction' )
131
123
end
124
+
125
+ r . set ( reply [ 1 ] )
132
126
end
133
127
ensure
134
128
sleep WAIT_SEC
135
129
end
136
130
end
137
131
end
138
132
139
- def spawn_publisher ( cli )
140
- Thread . new ( cli ) do |r |
133
+ def spawn_publisher ( cli , rec )
134
+ Thread . new ( cli , rec ) do |c , r |
141
135
i = 0
142
136
loop do
143
137
handle_errors do
144
- r . call ( 'spublish' , 'chan' , i )
138
+ c . call ( 'spublish' , 'chan' , i )
139
+ r . set ( i )
145
140
i += 1
146
141
end
147
142
ensure
@@ -150,12 +145,12 @@ def spawn_publisher(cli)
150
145
end
151
146
end
152
147
153
- def spawn_subscriber ( cli )
154
- Thread . new ( cli ) do |r |
148
+ def spawn_subscriber ( cli , rec )
149
+ Thread . new ( cli , rec ) do |c , r |
155
150
ps = nil
156
151
157
152
loop do
158
- ps = r . pubsub
153
+ ps = c . pubsub
159
154
ps . call ( 'ssubscribe' , 'chan' )
160
155
break
161
156
rescue StandardError
@@ -168,7 +163,7 @@ def spawn_subscriber(cli)
168
163
handle_errors do
169
164
event = ps . next_event ( 0.01 )
170
165
case event &.first
171
- when 'smessage' then @pubsub_recorder . set ( event [ 2 ] )
166
+ when 'smessage' then r . set ( event [ 2 ] )
172
167
end
173
168
end
174
169
ensure
@@ -197,16 +192,21 @@ def handle_errors
197
192
end
198
193
199
194
def wait_for_jobs_to_be_stable ( attempts : 100 )
200
- now = Process . clock_gettime ( Process ::CLOCK_MONOTONIC , :microsecond )
195
+ start = Process . clock_gettime ( Process ::CLOCK_MONOTONIC , :microsecond )
196
+ sleep_sec = WAIT_SEC * ( @threads . size * 2 )
201
197
202
- loop do
203
- raise MaxRetryExceeded if attempts <= 0
198
+ @recorders . each do |recorder |
199
+ loop do
200
+ raise MaxRetryExceeded if attempts <= 0
204
201
205
- attempts -= 1
206
- before = @cluster_down_counter . get
207
- sleep WAIT_SEC * ( @threads . size * 2 )
208
- after = @cluster_down_counter . get
209
- break if before == after && @pubsub_recorder . updated? ( now )
202
+ attempts -= 1
203
+ next sleep ( sleep_sec ) unless recorder . updated? ( start )
204
+
205
+ value_a = recorder . get . to_i
206
+ sleep sleep_sec
207
+ value_b = recorder . get . to_i
208
+ break if value_a < value_b
209
+ end
210
210
end
211
211
end
212
212
0 commit comments