@@ -38,20 +38,20 @@ type Config struct {
38
38
// The Worker's role is to process the messages it receives from the listener.
39
39
// It then initiates Kubernetes API requests to carry out the necessary actions.
40
40
type Worker struct {
41
- clientset * kubernetes.Clientset
42
- config Config
43
- lastPatch int
44
- lastPatchID int
45
- logger * logr.Logger
41
+ clientset * kubernetes.Clientset
42
+ config Config
43
+ lastPatch int
44
+ patchSeq int
45
+ logger * logr.Logger
46
46
}
47
47
48
48
var _ listener.Handler = (* Worker )(nil )
49
49
50
50
func New (config Config , options ... Option ) (* Worker , error ) {
51
51
w := & Worker {
52
- config : config ,
53
- lastPatch : - 1 ,
54
- lastPatchID : - 1 ,
52
+ config : config ,
53
+ lastPatch : - 1 ,
54
+ patchSeq : - 1 ,
55
55
}
56
56
57
57
conf , err := rest .InClusterConfig ()
@@ -163,27 +163,8 @@ func (w *Worker) HandleJobStarted(ctx context.Context, jobInfo *actions.JobStart
163
163
// The function then scales the ephemeral runner set by applying the merge patch.
164
164
// Finally, it logs the scaled ephemeral runner set details and returns nil if successful.
165
165
// If any error occurs during the process, it returns an error with a descriptive message.
166
- func (w * Worker ) HandleDesiredRunnerCount (ctx context.Context , count int , jobsCompleted int ) (int , error ) {
167
- // Max runners should always be set by the resource builder either to the configured value,
168
- // or the maximum int32 (resourcebuilder.newAutoScalingListener()).
169
- targetRunnerCount := min (w .config .MinRunners + count , w .config .MaxRunners )
170
-
171
- logValues := []any {
172
- "assigned job" , count ,
173
- "decision" , targetRunnerCount ,
174
- "min" , w .config .MinRunners ,
175
- "max" , w .config .MaxRunners ,
176
- "currentRunnerCount" , w .lastPatch ,
177
- "jobsCompleted" , jobsCompleted ,
178
- }
179
-
180
- if count == 0 && jobsCompleted == 0 {
181
- w .lastPatchID = 0
182
- } else {
183
- w .lastPatchID ++
184
- }
185
-
186
- w .lastPatch = targetRunnerCount
166
+ func (w * Worker ) HandleDesiredRunnerCount (ctx context.Context , count , jobsCompleted int ) (int , error ) {
167
+ patchID := w .setDesiredWorkerState (count , jobsCompleted )
187
168
188
169
original , err := json .Marshal (
189
170
& v1alpha1.EphemeralRunnerSet {
@@ -200,8 +181,8 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCo
200
181
patch , err := json .Marshal (
201
182
& v1alpha1.EphemeralRunnerSet {
202
183
Spec : v1alpha1.EphemeralRunnerSetSpec {
203
- Replicas : targetRunnerCount ,
204
- PatchID : w . lastPatchID ,
184
+ Replicas : w . lastPatch ,
185
+ PatchID : patchID ,
205
186
},
206
187
},
207
188
)
@@ -210,14 +191,13 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCo
210
191
return 0 , err
211
192
}
212
193
194
+ w .logger .Info ("Compare" , "original" , string (original ), "patch" , string (patch ))
213
195
mergePatch , err := jsonpatch .CreateMergePatch (original , patch )
214
196
if err != nil {
215
197
return 0 , fmt .Errorf ("failed to create merge patch json for ephemeral runner set: %w" , err )
216
198
}
217
199
218
- w .logger .Info ("Created merge patch json for EphemeralRunnerSet update" , "json" , string (mergePatch ))
219
-
220
- w .logger .Info ("Scaling ephemeral runner set" , logValues ... )
200
+ w .logger .Info ("Preparing EphemeralRunnerSet update" , "json" , string (mergePatch ))
221
201
222
202
patchedEphemeralRunnerSet := & v1alpha1.EphemeralRunnerSet {}
223
203
err = w .clientset .RESTClient ().
@@ -238,5 +218,40 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCo
238
218
"name" , w .config .EphemeralRunnerSetName ,
239
219
"replicas" , patchedEphemeralRunnerSet .Spec .Replicas ,
240
220
)
241
- return targetRunnerCount , nil
221
+ return w .lastPatch , nil
222
+ }
223
+
224
+ // calculateDesiredState calculates the desired state of the worker based on the desired count and the the number of jobs completed.
225
+ func (w * Worker ) setDesiredWorkerState (count , jobsCompleted int ) int {
226
+ // Max runners should always be set by the resource builder either to the configured value,
227
+ // or the maximum int32 (resourcebuilder.newAutoScalingListener()).
228
+ targetRunnerCount := min (w .config .MinRunners + count , w .config .MaxRunners )
229
+ w .patchSeq ++
230
+ desiredPatchID := w .patchSeq
231
+
232
+ if count == 0 && jobsCompleted == 0 { // empty batch
233
+ targetRunnerCount = max (w .lastPatch , targetRunnerCount )
234
+ if targetRunnerCount == w .config .MinRunners {
235
+ // We have an empty batch, and the last patch was the min runners.
236
+ // Since this is an empty batch, and we are at the min runners, they should all be idle.
237
+ // If controller created few more pods on accident (during scale down events),
238
+ // this situation allows the controller to scale down to the min runners.
239
+ // However, it is important to keep the patch sequence increasing so we don't ignore one batch.
240
+ desiredPatchID = 0
241
+ }
242
+ }
243
+
244
+ w .lastPatch = targetRunnerCount
245
+
246
+ w .logger .Info (
247
+ "Calculated target runner count" ,
248
+ "assigned job" , count ,
249
+ "decision" , targetRunnerCount ,
250
+ "min" , w .config .MinRunners ,
251
+ "max" , w .config .MaxRunners ,
252
+ "currentRunnerCount" , w .lastPatch ,
253
+ "jobsCompleted" , jobsCompleted ,
254
+ )
255
+
256
+ return desiredPatchID
242
257
}
0 commit comments