Skip to content

Commit 6a6e58d

Browse files
authored
Move job-related code to a single module (#180)
* Move job-related code to a single module * Add metadata and backfill configurators * Self-review fixes * Fix * Regenerate code * Linter fix
1 parent 41bce7a commit 6a6e58d

File tree

12 files changed

+174
-52
lines changed

12 files changed

+174
-52
lines changed

pkg/apis/streaming/v1/extensions.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package v1
22

33
import (
4-
"github.com/SneaksAndData/arcane-operator/services"
4+
"github.com/SneaksAndData/arcane-operator/services/job"
55
"k8s.io/apimachinery/pkg/runtime/schema"
66
)
77

@@ -23,12 +23,12 @@ func (in *StreamClass) TargetResourceGvk() schema.GroupVersionKind {
2323
}
2424
}
2525

26-
var _ services.JobConfiguratorProvider = (*BackfillRequest)(nil)
26+
var _ job.ConfiguratorProvider = (*BackfillRequest)(nil)
2727

2828
// JobConfigurator returns a JobConfigurator for the BackfillRequest
29-
func (in *BackfillRequest) JobConfigurator() services.JobConfigurator {
29+
func (in *BackfillRequest) JobConfigurator() job.Configurator {
3030
if in == nil {
3131
return nil
3232
}
33-
return services.NewEnvironmentConfigurator(in, "OVERRIDE")
33+
return job.NewEnvironmentConfigurator(in, "OVERRIDE").AddNext(job.NewBackfillConfigurator(true))
3434
}

services/controllers/stream/job_builder.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ package stream
22

33
import (
44
"context"
5-
"github.com/SneaksAndData/arcane-operator/services"
5+
"github.com/SneaksAndData/arcane-operator/services/job"
66
batchv1 "k8s.io/api/batch/v1"
77
)
88

99
type JobBuilder interface {
10-
BuildJob(ctx context.Context, jobType services.JobTemplateType, configurator services.JobConfigurator) (*batchv1.Job, error)
10+
BuildJob(ctx context.Context, jobType job.TemplateType, configurator job.Configurator) (*batchv1.Job, error)
1111
}

services/controllers/stream/stream_definition.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package stream
22

33
import (
44
v1 "github.com/SneaksAndData/arcane-operator/pkg/apis/streaming/v1"
5-
"github.com/SneaksAndData/arcane-operator/services"
5+
"github.com/SneaksAndData/arcane-operator/services/job"
66
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
77
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
88
"k8s.io/apimachinery/pkg/types"
@@ -59,7 +59,7 @@ type Definition interface {
5959
ToOwnerReference() metav1.OwnerReference
6060

6161
// ToConfiguratorProvider converts the stream definition to a JobConfiguratorProvider.
62-
ToConfiguratorProvider() services.JobConfiguratorProvider
62+
ToConfiguratorProvider() job.ConfiguratorProvider
6363
}
6464

6565
func fromUnstructured(obj *unstructured.Unstructured) (Definition, error) {

services/controllers/stream/stream_reconciler.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import (
44
"context"
55
"fmt"
66
v1 "github.com/SneaksAndData/arcane-operator/pkg/apis/streaming/v1"
7-
"github.com/SneaksAndData/arcane-operator/services"
87
"github.com/SneaksAndData/arcane-operator/services/controllers"
8+
"github.com/SneaksAndData/arcane-operator/services/job"
99
batchv1 "k8s.io/api/batch/v1"
1010
"k8s.io/apimachinery/pkg/api/errors"
1111
"k8s.io/apimachinery/pkg/api/meta"
@@ -113,8 +113,8 @@ func (s *streamReconciler) Reconcile(ctx context.Context, request reconcile.Requ
113113
return reconcile.Result{}, err
114114
}
115115

116-
job := &batchv1.Job{}
117-
err = s.client.Get(ctx, request.NamespacedName, job)
116+
j := &batchv1.Job{}
117+
err = s.client.Get(ctx, request.NamespacedName, j)
118118

119119
if client.IgnoreNotFound(err) != nil { // coverage-ignore
120120
logger.V(1).Error(err, "unable to fetch Stream Job")
@@ -127,7 +127,7 @@ func (s *streamReconciler) Reconcile(ctx context.Context, request reconcile.Requ
127127
streamingJob = nil
128128
logger.V(2).Info("streaming does not exist")
129129
} else {
130-
streamingJob = (*StreamingJob)(job)
130+
streamingJob = (*StreamingJob)(j)
131131
logger.V(2).Info("streaming job found")
132132
}
133133

@@ -186,10 +186,10 @@ func (s *streamReconciler) moveFsm(ctx context.Context, definition Definition, j
186186
}
187187

188188
func (s *streamReconciler) stopStream(ctx context.Context, definition Definition, nextPhase Phase) (reconcile.Result, error) {
189-
job := &batchv1.Job{}
190-
job.SetName(definition.NamespacedName().Name)
191-
job.SetNamespace(definition.NamespacedName().Namespace)
192-
err := s.client.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground))
189+
j := &batchv1.Job{}
190+
j.SetName(definition.NamespacedName().Name)
191+
j.SetNamespace(definition.NamespacedName().Namespace)
192+
err := s.client.Delete(ctx, j, client.PropagationPolicy(metav1.DeletePropagationBackground))
193193
if client.IgnoreNotFound(err) != nil { // coverage-ignore
194194
return reconcile.Result{}, err
195195
}
@@ -231,9 +231,9 @@ func (s *streamReconciler) reconcileJob(ctx context.Context, definition Definiti
231231
return reconcile.Result{}, err
232232
}
233233

234-
templateType := services.StreamingJobTemplate
234+
templateType := job.StreamingJobTemplate
235235
if backfillRequest != nil {
236-
templateType = services.BackfillJobTemplate
236+
templateType = job.BackfillJobTemplate
237237
}
238238
configurator := definition.ToConfiguratorProvider().JobConfigurator().AddNext(backfillRequest.JobConfigurator())
239239

@@ -305,13 +305,13 @@ func (s *streamReconciler) completeBackfill(ctx context.Context, job *batchv1.Jo
305305
return s.updateStreamPhase(ctx, definition, nil, nextStatus)
306306
}
307307

308-
func (s *streamReconciler) startNewJob(ctx context.Context, templateType services.JobTemplateType, configurator services.JobConfigurator) error {
309-
job, err := s.jobBuilder.BuildJob(ctx, templateType, configurator)
308+
func (s *streamReconciler) startNewJob(ctx context.Context, templateType job.TemplateType, configurator job.Configurator) error {
309+
j, err := s.jobBuilder.BuildJob(ctx, templateType, configurator)
310310
if err != nil { // coverage-ignore
311311
return err
312312
}
313313

314-
err = s.client.Create(ctx, job)
314+
err = s.client.Create(ctx, j)
315315
if err != nil { // coverage-ignore
316316
return err
317317
}

services/controllers/stream/unstructured_wrapper.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"encoding/json"
77
"fmt"
88
v1 "github.com/SneaksAndData/arcane-operator/pkg/apis/streaming/v1"
9-
"github.com/SneaksAndData/arcane-operator/services"
9+
"github.com/SneaksAndData/arcane-operator/services/job"
1010
corev1 "k8s.io/api/core/v1"
1111
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1212
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -15,8 +15,8 @@ import (
1515
)
1616

1717
var (
18-
_ Definition = (*unstructuredWrapper)(nil)
19-
_ services.JobConfiguratorProvider = (*unstructuredWrapper)(nil)
18+
_ Definition = (*unstructuredWrapper)(nil)
19+
_ job.ConfiguratorProvider = (*unstructuredWrapper)(nil)
2020
)
2121

2222
type unstructuredWrapper struct {
@@ -28,7 +28,7 @@ type unstructuredWrapper struct {
2828
backfillJobRef corev1.ObjectReference
2929
}
3030

31-
func (u *unstructuredWrapper) ToConfiguratorProvider() services.JobConfiguratorProvider {
31+
func (u *unstructuredWrapper) ToConfiguratorProvider() job.ConfiguratorProvider {
3232
return u
3333
}
3434

@@ -135,8 +135,10 @@ func (u *unstructuredWrapper) ToOwnerReference() metav1.OwnerReference {
135135
}
136136
}
137137

138-
func (u *unstructuredWrapper) JobConfigurator() services.JobConfigurator {
139-
return services.NewEnvironmentConfigurator(u, "SPEC")
138+
func (u *unstructuredWrapper) JobConfigurator() job.Configurator {
139+
metadata := job.NewMetadataConfigurator(u.underlying.GetName(), u.underlying.GetKind())
140+
backfill := job.NewBackfillConfigurator(false).AddNext(metadata)
141+
return job.NewEnvironmentConfigurator(u, "SPEC").AddNext(backfill)
140142
}
141143

142144
func (u *unstructuredWrapper) Validate() error {
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package job
2+
3+
import (
4+
batchv1 "k8s.io/api/batch/v1"
5+
corev1 "k8s.io/api/core/v1"
6+
"strconv"
7+
)
8+
9+
var _ Configurator = &backfillConfigurator{}
10+
11+
// backfillConfigurator sets the backfill status in the job's environment variables and labels.
12+
// It adds STREAMCONTEXT__BACKFILL environment variable and arcane/backfilling label.
13+
type backfillConfigurator struct {
14+
value bool
15+
next Configurator
16+
}
17+
18+
func (f backfillConfigurator) AddNext(configurator Configurator) Configurator {
19+
f.next = configurator
20+
return f
21+
}
22+
23+
func (f backfillConfigurator) ConfigureJob(job *batchv1.Job) error {
24+
found := false
25+
26+
for k := range job.Spec.Template.Spec.Containers {
27+
for v := range job.Spec.Template.Spec.Containers[k].Env {
28+
if job.Spec.Template.Spec.Containers[k].Env[v].Name == "STREAMCONTEXT__BACKFILL" {
29+
job.Spec.Template.Spec.Containers[k].Env[v].Value = strconv.FormatBool(f.value)
30+
found = true
31+
break
32+
}
33+
}
34+
35+
if !found {
36+
envVar := corev1.EnvVar{
37+
Name: "STREAMCONTEXT__BACKFILL",
38+
Value: strconv.FormatBool(f.value),
39+
}
40+
job.Spec.Template.Spec.Containers[k].Env = append(job.Spec.Template.Spec.Containers[k].Env, envVar)
41+
}
42+
found = false
43+
}
44+
45+
job.Labels["arcane/backfilling"] = strconv.FormatBool(f.value)
46+
47+
if f.next != nil {
48+
return f.next.ConfigureJob(job)
49+
}
50+
return nil
51+
}
52+
53+
func NewBackfillConfigurator(value bool) Configurator {
54+
return &backfillConfigurator{
55+
value: value,
56+
}
57+
}
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
1-
package services
1+
package job
22

33
import batchv1 "k8s.io/api/batch/v1"
44

5-
type JobTemplateType string
5+
type TemplateType string
66

77
const (
8-
BackfillJobTemplate JobTemplateType = "backfill"
9-
StreamingJobTemplate JobTemplateType = "streaming"
8+
BackfillJobTemplate TemplateType = "backfill"
9+
StreamingJobTemplate TemplateType = "streaming"
1010
)
1111

12-
// JobConfigurator defines an interface for configuring Kubernetes Jobs. Each implementer
12+
// Configurator defines an interface for configuring Kubernetes Jobs. Each implementer
1313
// can modify the Job object and chain to the next configurator in the sequence.
14-
type JobConfigurator interface {
14+
type Configurator interface {
1515

1616
// ConfigureJob modifies the provided Job object according to the configurator's logic.
1717
ConfigureJob(job *batchv1.Job) error
1818

1919
// AddNext sets the next JobConfigurator in the chain.
2020
// Returns self to allow for method chaining.
21-
AddNext(configurator JobConfigurator) JobConfigurator
21+
AddNext(configurator Configurator) Configurator
2222
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package job
2+
3+
// ConfiguratorProvider defines an interface for types that can provide a JobConfigurator.
4+
type ConfiguratorProvider interface {
5+
6+
// JobConfigurator returns a JobConfigurator for the current instance.
7+
JobConfigurator() Configurator
8+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package job
2+
3+
import (
4+
"fmt"
5+
batchv1 "k8s.io/api/batch/v1"
6+
corev1 "k8s.io/api/core/v1"
7+
)
8+
9+
var _ Configurator = &metadataConfigurator{}
10+
11+
// metadataConfigurator adds stream metadata as environment variables to the job's containers.
12+
// It adds STREAMCONTEXT__STREAM_ID and STREAMCONTEXT__STREAM_KIND environment variables.
13+
// It also adds corresponding labels to the job metadata.
14+
type metadataConfigurator struct {
15+
streamId string
16+
streamKind string
17+
next Configurator
18+
}
19+
20+
func (f metadataConfigurator) AddNext(configurator Configurator) Configurator {
21+
f.next = configurator
22+
return f
23+
}
24+
25+
func (f metadataConfigurator) ConfigureJob(job *batchv1.Job) error {
26+
err := f.addEnvironmentVariable(job, "STREAMCONTEXT__STREAM_ID", f.streamId)
27+
if err != nil {
28+
return err
29+
}
30+
31+
err = f.addEnvironmentVariable(job, "STREAMCONTEXT__STREAM_KIND", f.streamKind)
32+
if err != nil {
33+
return err
34+
}
35+
36+
job.Labels["arcane/stream-id"] = f.streamId
37+
job.Labels["arcane/stream-kind"] = f.streamKind
38+
39+
if f.next != nil {
40+
return f.next.ConfigureJob(job)
41+
}
42+
return nil
43+
}
44+
45+
func (f metadataConfigurator) addEnvironmentVariable(job *batchv1.Job, name string, value string) error {
46+
envVar := corev1.EnvVar{Name: name, Value: value}
47+
for k := range job.Spec.Template.Spec.Containers {
48+
for v := range job.Spec.Template.Spec.Containers[k].Env {
49+
if job.Spec.Template.Spec.Containers[k].Env[v].Name == name {
50+
return fmt.Errorf("environment variable %s already present", name)
51+
}
52+
job.Spec.Template.Spec.Containers[k].Env = append(job.Spec.Template.Spec.Containers[k].Env, envVar)
53+
}
54+
}
55+
return nil
56+
}
57+
58+
func NewMetadataConfigurator(streamId string, streamKind string) Configurator {
59+
return &metadataConfigurator{
60+
streamId: streamId,
61+
streamKind: streamKind,
62+
}
63+
}

services/environment_configurator.go renamed to services/job/stream_context_configurator.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package services
1+
package job
22

33
import (
44
"encoding/json"
@@ -8,20 +8,20 @@ import (
88
"strings"
99
)
1010

11-
var _ JobConfigurator = &EnvironmentConfigurator{}
11+
var _ Configurator = &StreamContextConfigurator{}
1212

13-
type EnvironmentConfigurator struct {
13+
type StreamContextConfigurator struct {
1414
environmentKey string
1515
definition string
16-
next JobConfigurator
16+
next Configurator
1717
}
1818

19-
func (f EnvironmentConfigurator) AddNext(configurator JobConfigurator) JobConfigurator {
19+
func (f StreamContextConfigurator) AddNext(configurator Configurator) Configurator {
2020
f.next = configurator
2121
return f
2222
}
2323

24-
func (f EnvironmentConfigurator) ConfigureJob(job *batchv1.Job) error {
24+
func (f StreamContextConfigurator) ConfigureJob(job *batchv1.Job) error {
2525
for k := range job.Spec.Template.Spec.Containers {
2626
envVar := corev1.EnvVar{
2727
Name: fmt.Sprintf("STREAMCONTEXT__%s", strings.ToUpper(f.environmentKey)),
@@ -36,13 +36,13 @@ func (f EnvironmentConfigurator) ConfigureJob(job *batchv1.Job) error {
3636
return nil
3737
}
3838

39-
func NewEnvironmentConfigurator(baseObject any, environmentKey string) *EnvironmentConfigurator {
39+
func NewEnvironmentConfigurator(baseObject any, environmentKey string) *StreamContextConfigurator {
4040
b, err := json.Marshal(baseObject)
4141
def := ""
4242
if err == nil {
4343
def = string(b)
4444
}
45-
return &EnvironmentConfigurator{
45+
return &StreamContextConfigurator{
4646
environmentKey: environmentKey,
4747
definition: def,
4848
}

0 commit comments

Comments
 (0)