Skip to content

Commit 685cc43

Browse files
authored
Implement stream controller factory (#163)
* Implement stream controller factory * Review fixes * Build fix * Add some more tests * Add the mocks * Fix * Improve test coverage * Fix
1 parent f16b1cf commit 685cc43

File tree

13 files changed

+511
-57
lines changed

13 files changed

+511
-57
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package conf
2+
3+
// StreamOperatorConfiguration holds configuration for StreamClassOperatorService.
4+
type StreamOperatorConfiguration struct {
5+
6+
// RateLimitConfiguration holds the rate limiting configuration
7+
RateLimiting RateLimitConfiguration
8+
}

generate.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package main
2+
3+
// run controller-gen to generate CRDs into Helm chart templates
4+
//go:generate mockgen -destination=./tests/mocks/stream_class_mocks.go -package=mocks github.com/SneaksAndData/arcane-operator/services/controllers/stream_class StreamControllerFactory,StreamControllerHandle

go.mod

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@ go 1.24.5
55
require (
66
github.com/DataDog/datadog-api-client-go/v2 v2.50.0
77
github.com/DataDog/datadog-go/v5 v5.8.1
8+
github.com/google/uuid v1.6.0
89
github.com/samber/slog-datadog/v2 v2.10.2
910
github.com/samber/slog-multi v1.6.0
1011
github.com/spf13/viper v1.21.0
1112
github.com/stretchr/testify v1.11.1
13+
go.uber.org/mock v0.6.0
1214
golang.org/x/time v0.9.0
13-
gopkg.in/yaml.v3 v3.0.1
1415
k8s.io/api v0.34.3
1516
k8s.io/apimachinery v0.34.3
1617
k8s.io/client-go v0.34.3
@@ -34,7 +35,6 @@ require (
3435
github.com/gogo/protobuf v1.3.2 // indirect
3536
github.com/google/gnostic-models v0.7.0 // indirect
3637
github.com/google/go-cmp v0.7.0 // indirect
37-
github.com/google/uuid v1.6.0 // indirect
3838
github.com/josharian/intern v1.0.0 // indirect
3939
github.com/json-iterator/go v1.1.12 // indirect
4040
github.com/mailru/easyjson v0.7.7 // indirect
@@ -57,18 +57,19 @@ require (
5757
github.com/x448/float16 v0.8.4 // indirect
5858
go.yaml.in/yaml/v2 v2.4.2 // indirect
5959
go.yaml.in/yaml/v3 v3.0.4 // indirect
60-
golang.org/x/mod v0.26.0 // indirect
61-
golang.org/x/net v0.42.0 // indirect
60+
golang.org/x/mod v0.27.0 // indirect
61+
golang.org/x/net v0.43.0 // indirect
6262
golang.org/x/oauth2 v0.27.0 // indirect
6363
golang.org/x/sync v0.16.0 // indirect
64-
golang.org/x/sys v0.34.0 // indirect
65-
golang.org/x/term v0.33.0 // indirect
64+
golang.org/x/sys v0.35.0 // indirect
65+
golang.org/x/term v0.34.0 // indirect
6666
golang.org/x/text v0.28.0 // indirect
67-
golang.org/x/tools v0.35.0 // indirect
67+
golang.org/x/tools v0.36.0 // indirect
6868
golang.org/x/tools/go/packages/packagestest v0.1.1-deprecated // indirect
6969
google.golang.org/protobuf v1.36.5 // indirect
7070
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
7171
gopkg.in/inf.v0 v0.9.1 // indirect
72+
gopkg.in/yaml.v3 v3.0.1 // indirect
7273
k8s.io/code-generator v0.34.3 // indirect
7374
k8s.io/gengo/v2 v2.0.0-20250604051438-85fd79dbfd9f // indirect
7475
k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b // indirect

go.sum

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
123123
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
124124
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
125125
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
126+
go.uber.org/mock v0.6.0 h1:hyF9dfmbgIX5EfOdasqLsWD6xqpNZlXblLB/Dbnwv3Y=
127+
go.uber.org/mock v0.6.0/go.mod h1:KiVJ4BqZJaMj4svdfmHM0AUx4NJYO8ZNpPnZn1Z+BBU=
126128
go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
127129
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
128130
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
@@ -133,15 +135,15 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
133135
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
134136
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
135137
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
136-
golang.org/x/mod v0.26.0 h1:EGMPT//Ezu+ylkCijjPc+f4Aih7sZvaAr+O3EHBxvZg=
137-
golang.org/x/mod v0.26.0/go.mod h1:/j6NAhSk8iQ723BGAUyoAcn7SlD7s15Dp9Nd/SfeaFQ=
138+
golang.org/x/mod v0.27.0 h1:kb+q2PyFnEADO2IEF935ehFUXlWiNjJWtRNgBLSfbxQ=
139+
golang.org/x/mod v0.27.0/go.mod h1:rWI627Fq0DEoudcK+MBkNkCe0EetEaDSwJJkCcjpazc=
138140
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
139141
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
140142
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
141143
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
142144
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
143-
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
144-
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
145+
golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE=
146+
golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg=
145147
golang.org/x/oauth2 v0.27.0 h1:da9Vo7/tDv5RH/7nZDz1eMGS/q1Vv1N/7FCrBhI9I3M=
146148
golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8=
147149
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -158,11 +160,11 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
158160
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
159161
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
160162
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
161-
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=
162-
golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
163+
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
164+
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
163165
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
164-
golang.org/x/term v0.33.0 h1:NuFncQrRcaRvVmgRkvM3j/F00gWIAlcmlB8ACEKmGIg=
165-
golang.org/x/term v0.33.0/go.mod h1:s18+ql9tYWp1IfpV9DmCtQDDSRBUjKaw9M1eAv5UeF0=
166+
golang.org/x/term v0.34.0 h1:O/2T7POpk0ZZ7MAzMeWFSg6S5IpWd/RXDlM9hgM3DR4=
167+
golang.org/x/term v0.34.0/go.mod h1:5jC53AEywhIVebHgPVeg0mj8OD3VO9OzclacVrqpaAw=
166168
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
167169
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
168170
golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
@@ -174,8 +176,8 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
174176
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
175177
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
176178
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
177-
golang.org/x/tools v0.35.0 h1:mBffYraMEf7aa0sB+NuKnuCy8qI/9Bughn8dC2Gu5r0=
178-
golang.org/x/tools v0.35.0/go.mod h1:NKdj5HkL/73byiZSJjqJgKn3ep7KjFkBOkR/Hps3VPw=
179+
golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg=
180+
golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s=
179181
golang.org/x/tools/go/expect v0.1.0-deprecated h1:jY2C5HGYR5lqex3gEniOQL0r7Dq5+VGVgY1nudX5lXY=
180182
golang.org/x/tools/go/expect v0.1.0-deprecated/go.mod h1:eihoPOH+FgIqa3FpoTwguz/bVUSGBlGQU67vpBeOrBY=
181183
golang.org/x/tools/go/packages/packagestest v0.1.1-deprecated h1:1h2MnaIAIXISqTFKdENegdpAgUXz6NrPEsbIeWaBRvM=

pkg/apis/streaming/v1/types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ type StreamClassSpec struct {
3333

3434
// SecretRefs is a list of fields to be extracted from the secret
3535
SecretRefs []string `json:"secretRefs,omitempty"`
36+
37+
// TargetNamespace is the namespace where streaming jobs will be created
38+
TargetNamespace string `json:"namespace,omitempty"`
3639
}
3740

3841
// StreamClassStatus defines the observed state of a stream class
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package stream
2+
3+
import (
4+
"fmt"
5+
"github.com/SneaksAndData/arcane-operator/configuration/conf"
6+
"github.com/SneaksAndData/arcane-operator/pkg/apis/streaming/v1"
7+
"github.com/SneaksAndData/arcane-operator/services/controllers/stream_class"
8+
"golang.org/x/time/rate"
9+
"k8s.io/client-go/informers"
10+
"k8s.io/client-go/kubernetes"
11+
"k8s.io/client-go/util/workqueue"
12+
"k8s.io/klog/v2"
13+
"time"
14+
)
15+
16+
var _ stream_class.StreamControllerFactory = (*ControllerFactory)(nil)
17+
18+
type ControllerFactory struct {
19+
logger klog.Logger
20+
queue workqueue.TypedRateLimitingInterface[any]
21+
client kubernetes.Interface
22+
}
23+
24+
//lint:ignore U1000 Ignore unused function temporarily
25+
func NewStreamControllerFactory(logger klog.Logger, configuration conf.StreamOperatorConfiguration) *ControllerFactory {
26+
rlc := configuration.RateLimiting
27+
rateLimiter := workqueue.NewTypedMaxOfRateLimiter(
28+
workqueue.NewTypedItemExponentialFailureRateLimiter[any](rlc.FailureRateBaseDelay, rlc.FailureRateMaxDelay),
29+
&workqueue.TypedBucketRateLimiter[any]{
30+
Limiter: rate.NewLimiter(rlc.RateLimitElementsPerSecond, rlc.RateLimitElementsBurst),
31+
},
32+
)
33+
34+
queue := workqueue.NewTypedRateLimitingQueue[any](rateLimiter)
35+
return &ControllerFactory{
36+
logger: logger,
37+
queue: queue,
38+
}
39+
40+
}
41+
42+
func (s *ControllerFactory) CreateStreamOperator(class *v1.StreamClass) (stream_class.StreamControllerHandle, error) {
43+
factory := informers.NewSharedInformerFactoryWithOptions(s.client, time.Second*30, informers.WithNamespace(class.Spec.TargetNamespace))
44+
handle := NewControllerHandle(factory, class, s.queue)
45+
err := handle.Start()
46+
if err != nil {
47+
return nil, fmt.Errorf("failed to start controller for class %s: %w", class.Name, err)
48+
}
49+
return handle, nil
50+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package stream
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/SneaksAndData/arcane-operator/pkg/apis/streaming/v1"
7+
"github.com/SneaksAndData/arcane-operator/services/controllers/common"
8+
"github.com/SneaksAndData/arcane-operator/services/controllers/stream_class"
9+
"k8s.io/apimachinery/pkg/runtime/schema"
10+
"k8s.io/client-go/informers"
11+
"k8s.io/client-go/tools/cache"
12+
"k8s.io/client-go/util/workqueue"
13+
"reflect"
14+
)
15+
16+
var _ stream_class.StreamControllerHandle = (*ControllerHandle)(nil)
17+
18+
type ControllerHandle struct {
19+
factory informers.SharedInformerFactory
20+
class *v1.StreamClass
21+
context context.Context
22+
cancelFunc context.CancelFunc
23+
sharedQueue workqueue.TypedRateLimitingInterface[any]
24+
informer informers.GenericInformer
25+
registration cache.ResourceEventHandlerRegistration
26+
}
27+
28+
func NewControllerHandle(factory informers.SharedInformerFactory, class *v1.StreamClass, sharedQueue workqueue.TypedRateLimitingInterface[any]) *ControllerHandle {
29+
ctx, cancelFunc := context.WithCancel(context.Background())
30+
return &ControllerHandle{
31+
factory: factory,
32+
class: class,
33+
context: ctx,
34+
cancelFunc: cancelFunc,
35+
sharedQueue: sharedQueue,
36+
}
37+
}
38+
39+
func (s ControllerHandle) Start() error {
40+
if s.informer != nil {
41+
return nil // Already started
42+
}
43+
informer, err := s.factory.ForResource(schema.GroupVersionResource{
44+
Group: s.class.Spec.APIGroupRef,
45+
Version: s.class.Spec.APIVersion,
46+
Resource: s.class.Spec.PluralName,
47+
})
48+
if err != nil {
49+
return fmt.Errorf("failed to create informer: %w", err)
50+
}
51+
52+
registration, err := informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
53+
AddFunc: func(obj any) {
54+
s.sharedQueue.Add(obj)
55+
},
56+
UpdateFunc: func(oldObj, newObj any) {
57+
s.sharedQueue.Add(newObj)
58+
},
59+
DeleteFunc: func(obj any) {
60+
s.sharedQueue.Add(obj)
61+
},
62+
})
63+
if err != nil {
64+
return fmt.Errorf("error adding StreamClass controller: %w", err)
65+
}
66+
67+
s.informer = informer
68+
s.registration = registration
69+
s.factory.Start(s.context.Done())
70+
return nil
71+
}
72+
73+
func (s ControllerHandle) IsUpdateNeeded(obj *v1.StreamClass) bool {
74+
return reflect.DeepEqual(s.class.Spec, obj.Spec)
75+
}
76+
77+
func (s ControllerHandle) Id() common.StreamClassWorkerIdentity {
78+
return s.class
79+
}
80+
81+
func (s ControllerHandle) Stop() error {
82+
s.cancelFunc()
83+
// Here we will wait for the informer and the operator to fully stop
84+
85+
return nil
86+
}

services/controllers/stream_class/controller.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
package stream_class
22

33
import (
4+
"context"
45
"fmt"
5-
"github.com/SneaksAndData/arcane-operator/pkg/generated/informers/externalversions/streaming/v1"
6+
v1 "github.com/SneaksAndData/arcane-operator/pkg/generated/informers/externalversions/streaming/v1"
67
"k8s.io/client-go/tools/cache"
78
)
89

910
type StreamClassHandler interface {
1011
HandleStreamClassAdded(obj any)
1112
HandleStreamClassUpdated(oldObj any, newObj any)
1213
HandleStreamClassDeleted(obj any)
14+
Start(ctx context.Context)
1315
}
1416

1517
// StreamClassController reconciles a StreamClass object
@@ -27,15 +29,20 @@ func NewStreamClassController(informer v1.StreamClassInformer, handler StreamCla
2729
handler: handler,
2830
}
2931

30-
_, err := informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
32+
inf := informer.Informer()
33+
_, err := inf.AddEventHandler(cache.ResourceEventHandlerFuncs{
3134
AddFunc: handler.HandleStreamClassAdded,
3235
UpdateFunc: handler.HandleStreamClassUpdated,
3336
DeleteFunc: handler.HandleStreamClassDeleted,
3437
})
3538

36-
if err != nil {
39+
if err != nil { // coverage-ignore
3740
return nil, fmt.Errorf("error adding StreamClass controller: %w", err)
3841
}
3942

4043
return controller, nil
4144
}
45+
46+
func (s *StreamClassController) Start(ctx context.Context) {
47+
s.handler.Start(ctx)
48+
}

services/controllers/stream_class/handler.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
package stream_class
22

33
import (
4+
"context"
45
"github.com/SneaksAndData/arcane-operator/configuration/conf"
56
"github.com/SneaksAndData/arcane-operator/pkg/apis/streaming/v1"
67
"golang.org/x/time/rate"
8+
"k8s.io/apimachinery/pkg/util/wait"
79
"k8s.io/client-go/util/workqueue"
810
"k8s.io/klog/v2"
911
)
1012

1113
type StreamClassWorker interface {
12-
HandleEvents(queue workqueue.TypedRateLimitingInterface[StreamClassEvent]) error
14+
HandleEvent(queue workqueue.TypedRateLimitingInterface[StreamClassEvent])
1315
}
1416

1517
var _ StreamClassHandler = (*StreamClassEventHandler)(nil)
@@ -44,13 +46,19 @@ func NewStreamClassEventHandler(
4446
}
4547
}
4648

49+
func (s *StreamClassEventHandler) Start(ctx context.Context) {
50+
go wait.UntilWithContext(ctx, func(_ context.Context) {
51+
s.worker.HandleEvent(s.workQueue)
52+
}, 0)
53+
}
54+
4755
func (s *StreamClassEventHandler) HandleStreamClassAdded(obj any) {
4856
if streamClass, ok := obj.(*v1.StreamClass); ok {
4957
s.workQueue.Add(StreamClassEvent{
5058
Type: StreamClassAdded,
5159
StreamClass: streamClass,
5260
})
53-
} else {
61+
} else { // coverage-ignore
5462
s.logger.Error(nil, "HandleStreamClassAdded: unable to cast object to StreamClass")
5563
}
5664
}
@@ -61,7 +69,7 @@ func (s *StreamClassEventHandler) HandleStreamClassUpdated(_ any, newObj any) {
6169
Type: StreamClassUpdated,
6270
StreamClass: streamClass,
6371
})
64-
} else {
72+
} else { // coverage-ignore
6573
s.logger.Error(nil, "HandleStreamClassAdded: unable to cast object to StreamClass")
6674
}
6775
}
@@ -72,7 +80,7 @@ func (s *StreamClassEventHandler) HandleStreamClassDeleted(obj any) {
7280
Type: StreamClassDeleted,
7381
StreamClass: streamClass,
7482
})
75-
} else {
83+
} else { // coverage-ignore
7684
s.logger.Error(nil, "HandleStreamClassAdded: unable to cast object to StreamClass")
7785
}
7886
}
Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
11
package stream_class
22

33
import (
4-
"context"
54
"github.com/SneaksAndData/arcane-operator/pkg/apis/streaming/v1"
65
"github.com/SneaksAndData/arcane-operator/services/controllers/common"
76
)
87

9-
// StreamingJobControllerHandle defines the interface for object that can be used to control a streaming job worker
10-
type StreamingJobControllerHandle interface {
8+
// StreamControllerHandle defines the interface for object that can be used to control a streaming job worker
9+
type StreamControllerHandle interface {
1110

1211
// IsUpdateNeeded Returns true if the streaming job needs to be updated
1312
IsUpdateNeeded(obj *v1.StreamClass) bool
1413

1514
// Id returns the identity of the streaming job worker
1615
Id() common.StreamClassWorkerIdentity
1716

17+
// Start starts the streaming job worker
18+
Start() error
19+
1820
// Stop stops the streaming job worker and returns a context that is done when the worker has stopped
19-
Stop(ctx context.Context) error
21+
Stop() error
2022
}

0 commit comments

Comments
 (0)