Skip to content

Commit 41bce7a

Browse files
authored
Implement SetupUnmanaged method for the stream reconciler (#179)
* Implement SetupUnmanaged method for the stream reconciler * Remove cache provider from stream class operator * Fix mock file names * Linter fix
1 parent 7bb71fb commit 41bce7a

13 files changed

+219
-193
lines changed

generate.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package main
22

3-
//go:generate mockgen -destination=./tests/mocks/stream_reconciler_factory.go -package=mocks github.com/SneaksAndData/arcane-operator/services/controllers/stream_class StreamReconcilerFactory
3+
//go:generate mockgen -destination=./tests/mocks/unmanaged_controller_factory.go -package=mocks github.com/SneaksAndData/arcane-operator/services/controllers/stream_class UnmanagedControllerFactory
44
//go:generate mockgen -destination=./tests/mocks/cache_provider.go -package=mocks github.com/SneaksAndData/arcane-operator/services/controllers/stream_class CacheProvider
5-
//go:generate mockgen -destination=./tests/mocks/stream_reconciler.go -package=mocks github.com/SneaksAndData/arcane-operator/services/controllers StreamReconciler
5+
//go:generate mockgen -destination=./tests/mocks/unmanaged_reconciler.go -package=mocks github.com/SneaksAndData/arcane-operator/services/controllers UnmanagedReconciler
66
//go:generate mockgen -destination=./tests/mocks/controller.go -package=mocks sigs.k8s.io/controller-runtime/pkg/controller Controller
77
//go:generate mockgen -destination=./tests/mocks/job_builder.go -package=mocks github.com/SneaksAndData/arcane-operator/services/controllers/stream JobBuilder

services/controllers/stream/stream_reconciler.go

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,28 @@ import (
55
"fmt"
66
v1 "github.com/SneaksAndData/arcane-operator/pkg/apis/streaming/v1"
77
"github.com/SneaksAndData/arcane-operator/services"
8+
"github.com/SneaksAndData/arcane-operator/services/controllers"
89
batchv1 "k8s.io/api/batch/v1"
910
"k8s.io/apimachinery/pkg/api/errors"
11+
"k8s.io/apimachinery/pkg/api/meta"
1012
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1113
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
14+
"k8s.io/apimachinery/pkg/runtime"
1215
"k8s.io/apimachinery/pkg/runtime/schema"
1316
"k8s.io/apimachinery/pkg/types"
1417
"k8s.io/klog/v2"
18+
"sigs.k8s.io/controller-runtime/pkg/cache"
1519
"sigs.k8s.io/controller-runtime/pkg/client"
20+
"sigs.k8s.io/controller-runtime/pkg/controller"
21+
"sigs.k8s.io/controller-runtime/pkg/handler"
1622
"sigs.k8s.io/controller-runtime/pkg/reconcile"
23+
"sigs.k8s.io/controller-runtime/pkg/source"
1724
)
1825

19-
var _ reconcile.Reconciler = (*streamReconciler)(nil)
26+
var (
27+
_ reconcile.Reconciler = (*streamReconciler)(nil)
28+
_ controllers.UnmanagedReconciler = (*streamReconciler)(nil)
29+
)
2030

2131
type streamReconciler struct {
2232
gvk schema.GroupVersionKind
@@ -25,6 +35,43 @@ type streamReconciler struct {
2535
className string
2636
}
2737

38+
func (s *streamReconciler) SetupUnmanaged(cache cache.Cache, scheme *runtime.Scheme, mapper meta.RESTMapper) (controller.Controller, error) {
39+
resource := &unstructured.Unstructured{}
40+
resource.SetGroupVersionKind(s.gvk)
41+
42+
newController, err := controller.NewUnmanaged("stream-controller", controller.Options{
43+
Reconciler: s,
44+
})
45+
46+
if err != nil {
47+
return nil, fmt.Errorf("failed to start unmanaged stream controller: %w", err)
48+
}
49+
50+
// Watch for changes to primary resource Stream
51+
newSource := source.Kind(cache, resource, &handler.TypedEnqueueRequestForObject[*unstructured.Unstructured]{})
52+
53+
err = newController.Watch(newSource)
54+
if err != nil {
55+
return nil, fmt.Errorf("failed to watch stream resource: %w", err)
56+
}
57+
58+
// Watch for changes to secondary resource Jobs and requeue the owner Stream
59+
h := handler.TypedEnqueueRequestForOwner[*batchv1.Job](
60+
scheme,
61+
mapper,
62+
&batchv1.Job{},
63+
handler.OnlyControllerOwner(),
64+
)
65+
66+
jobSource := source.Kind(cache, &batchv1.Job{}, h, nil)
67+
err = newController.Watch(jobSource)
68+
if err != nil {
69+
return nil, fmt.Errorf("failed to watch stream resource: %w", err)
70+
}
71+
72+
return newController, nil
73+
}
74+
2875
// NewStreamReconciler creates a new StreamReconciler instance.
2976
func NewStreamReconciler(client client.Client, gvk schema.GroupVersionKind, jobBuilder JobBuilder, className string) reconcile.Reconciler {
3077
return &streamReconciler{
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package stream_class
2+
3+
import "k8s.io/apimachinery/pkg/runtime"
4+
5+
type SchemeProvider interface {
6+
GetScheme() *runtime.Scheme
7+
}

services/controllers/stream_class/stream_class.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,13 @@ var _ reconcile.Reconciler = (*streamClassReconciler)(nil)
1717
type streamClassReconciler struct {
1818
client client.Client
1919
streamControllers map[types.NamespacedName]*StreamControllerHandle
20-
cacheProvider CacheProvider
21-
streamControllerFactory StreamReconcilerFactory
20+
streamControllerFactory UnmanagedControllerFactory
2221
}
2322

24-
func NewStreamClassReconciler(client client.Client, cacheProvider CacheProvider, streamControllerFactory StreamReconcilerFactory) reconcile.Reconciler {
23+
func NewStreamClassReconciler(client client.Client, streamControllerFactory UnmanagedControllerFactory) reconcile.Reconciler {
2524
return &streamClassReconciler{
2625
client: client,
2726
streamControllers: make(map[types.NamespacedName]*StreamControllerHandle),
28-
cacheProvider: cacheProvider,
2927
streamControllerFactory: streamControllerFactory,
3028
}
3129
}
@@ -79,19 +77,13 @@ func (s *streamClassReconciler) tryStartStreamController(ctx context.Context, sc
7977
return s.updatePhase(ctx, sc, name, nextPhase)
8078
}
8179

82-
reconciler, err := s.streamControllerFactory.CreateStreamReconciler(ctx, sc.TargetResourceGvk())
80+
controller, err := s.streamControllerFactory.CreateStreamController(ctx, sc.TargetResourceGvk())
8381

8482
if err != nil {
8583
logger.V(0).Error(err, "unable to create stream reconciler")
8684
return s.updatePhase(ctx, sc, name, v1.PhaseFailed)
8785
}
8886

89-
controller, err := reconciler.SetupUnmanaged(s.cacheProvider.GetCache())
90-
if err != nil {
91-
logger.V(0).Error(err, "unable to setup stream reconciler")
92-
return s.updatePhase(ctx, sc, name, v1.PhaseFailed)
93-
}
94-
9587
controllerContext, cancelFunc := context.WithCancel(context.Background())
9688
go func() {
9789
err := controller.Start(controllerContext)

services/controllers/stream_class/stream_class_test.go

Lines changed: 19 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ func Test_UpdatePhase_ToPending(t *testing.T) {
2222
mockCtrl := gomock.NewController(t)
2323
defer mockCtrl.Finish()
2424
k8sClient := setupFakeClient(&v1.StreamClass{ObjectMeta: metav1.ObjectMeta{Name: "sc1"}})
25-
streamReconcilerFactory := mocks.NewMockStreamReconcilerFactory(mockCtrl)
26-
reconciler := NewStreamClassReconciler(k8sClient, nil, streamReconcilerFactory)
25+
streamReconcilerFactory := mocks.NewMockUnmanagedControllerFactory(mockCtrl)
26+
reconciler := NewStreamClassReconciler(k8sClient, streamReconcilerFactory)
2727

2828
// Act
2929
result, err := reconciler.Reconcile(t.Context(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "sc1"}})
@@ -49,16 +49,10 @@ func Test_UpdatePhase_ToRunning(t *testing.T) {
4949
streamController := mocks.NewMockController[reconcile.Request](mockCtrl)
5050
streamController.EXPECT().Start(gomock.Any())
5151

52-
streamReconciler := mocks.NewMockStreamReconciler(mockCtrl)
53-
streamReconciler.EXPECT().SetupUnmanaged(gomock.Any()).Return(streamController, nil)
52+
streamReconcilerFactory := mocks.NewMockUnmanagedControllerFactory(mockCtrl)
53+
streamReconcilerFactory.EXPECT().CreateStreamController(gomock.Any(), gomock.Any()).Return(streamController, nil)
5454

55-
streamReconcilerFactory := mocks.NewMockStreamReconcilerFactory(mockCtrl)
56-
streamReconcilerFactory.EXPECT().CreateStreamReconciler(gomock.Any(), gomock.Any()).Return(streamReconciler, nil)
57-
58-
cacheProvider := mocks.NewMockCacheProvider(mockCtrl)
59-
cacheProvider.EXPECT().GetCache().Return(nil).Times(1)
60-
61-
reconciler := NewStreamClassReconciler(k8sClient, cacheProvider, streamReconcilerFactory)
55+
reconciler := NewStreamClassReconciler(k8sClient, streamReconcilerFactory)
6256

6357
// Act
6458
result, err := reconciler.Reconcile(t.Context(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "sc1"}})
@@ -84,16 +78,10 @@ func Test_UpdatePhase_ToRunning_Idempotence(t *testing.T) {
8478
streamController := mocks.NewMockController[reconcile.Request](mockCtrl)
8579
streamController.EXPECT().Start(gomock.Any())
8680

87-
streamReconciler := mocks.NewMockStreamReconciler(mockCtrl)
88-
streamReconciler.EXPECT().SetupUnmanaged(gomock.Any()).Return(streamController, nil)
89-
90-
streamReconcilerFactory := mocks.NewMockStreamReconcilerFactory(mockCtrl)
91-
streamReconcilerFactory.EXPECT().CreateStreamReconciler(gomock.Any(), gomock.Any()).Return(streamReconciler, nil)
92-
93-
cacheProvider := mocks.NewMockCacheProvider(mockCtrl)
94-
cacheProvider.EXPECT().GetCache().Return(nil).Times(1)
81+
streamReconcilerFactory := mocks.NewMockUnmanagedControllerFactory(mockCtrl)
82+
streamReconcilerFactory.EXPECT().CreateStreamController(gomock.Any(), gomock.Any()).Return(streamController, nil)
9583

96-
reconciler := NewStreamClassReconciler(k8sClient, cacheProvider, streamReconcilerFactory)
84+
reconciler := NewStreamClassReconciler(k8sClient, streamReconcilerFactory)
9785

9886
// Act
9987
for i := 0; i < 5; i++ {
@@ -121,16 +109,10 @@ func Test_UpdatePhase_Ready_ToStopped(t *testing.T) {
121109
streamController := mocks.NewMockController[reconcile.Request](mockCtrl)
122110
streamController.EXPECT().Start(gomock.Any()).AnyTimes()
123111

124-
streamReconciler := mocks.NewMockStreamReconciler(mockCtrl)
125-
streamReconciler.EXPECT().SetupUnmanaged(gomock.Any()).Return(streamController, nil)
112+
streamReconcilerFactory := mocks.NewMockUnmanagedControllerFactory(mockCtrl)
113+
streamReconcilerFactory.EXPECT().CreateStreamController(gomock.Any(), gomock.Any()).Return(streamController, nil)
126114

127-
streamReconcilerFactory := mocks.NewMockStreamReconcilerFactory(mockCtrl)
128-
streamReconcilerFactory.EXPECT().CreateStreamReconciler(gomock.Any(), gomock.Any()).Return(streamReconciler, nil)
129-
130-
cacheProvider := mocks.NewMockCacheProvider(mockCtrl)
131-
cacheProvider.EXPECT().GetCache().Return(nil).Times(1)
132-
133-
reconciler := NewStreamClassReconciler(k8sClient, cacheProvider, streamReconcilerFactory)
115+
reconciler := NewStreamClassReconciler(k8sClient, streamReconcilerFactory)
134116

135117
// Start the stream controller first
136118
result, err := reconciler.Reconcile(t.Context(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "sc1"}})
@@ -163,11 +145,9 @@ func Test_UpdatePhase_Pending_ToStopped(t *testing.T) {
163145
ObjectMeta: metav1.ObjectMeta{Name: "sc1"},
164146
})
165147

166-
streamReconcilerFactory := mocks.NewMockStreamReconcilerFactory(mockCtrl)
148+
streamReconcilerFactory := mocks.NewMockUnmanagedControllerFactory(mockCtrl)
167149

168-
cacheProvider := mocks.NewMockCacheProvider(mockCtrl)
169-
170-
reconciler := NewStreamClassReconciler(k8sClient, cacheProvider, streamReconcilerFactory)
150+
reconciler := NewStreamClassReconciler(k8sClient, streamReconcilerFactory)
171151

172152
// Transit the stream class to Pending state first
173153
result, err := reconciler.Reconcile(t.Context(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "sc1"}})
@@ -203,11 +183,10 @@ func Test_UpdatePhase_Pending_ToFailed(t *testing.T) {
203183
},
204184
})
205185

206-
streamReconcilerFactory := mocks.NewMockStreamReconcilerFactory(mockCtrl)
207-
streamReconcilerFactory.EXPECT().CreateStreamReconciler(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("some error"))
186+
streamReconcilerFactory := mocks.NewMockUnmanagedControllerFactory(mockCtrl)
187+
streamReconcilerFactory.EXPECT().CreateStreamController(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("some error"))
208188

209-
cacheProvider := mocks.NewMockCacheProvider(mockCtrl)
210-
reconciler := NewStreamClassReconciler(k8sClient, cacheProvider, streamReconcilerFactory)
189+
reconciler := NewStreamClassReconciler(k8sClient, streamReconcilerFactory)
211190

212191
// Act
213192
result, err := reconciler.Reconcile(t.Context(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "sc1"}})
@@ -239,16 +218,13 @@ func Test_UpdatePhase_Ready_ToFailed(t *testing.T) {
239218
completed <- struct{}{}
240219
}).Return(fmt.Errorf("some error"))
241220

242-
streamReconciler := mocks.NewMockStreamReconciler(mockCtrl)
243-
streamReconciler.EXPECT().SetupUnmanaged(gomock.Any()).Return(streamController, nil)
244-
245-
streamReconcilerFactory := mocks.NewMockStreamReconcilerFactory(mockCtrl)
246-
streamReconcilerFactory.EXPECT().CreateStreamReconciler(gomock.Any(), gomock.Any()).Return(streamReconciler, nil)
221+
streamReconcilerFactory := mocks.NewMockUnmanagedControllerFactory(mockCtrl)
222+
streamReconcilerFactory.EXPECT().CreateStreamController(gomock.Any(), gomock.Any()).Return(streamController, nil)
247223

248224
cacheProvider := mocks.NewMockCacheProvider(mockCtrl)
249225
cacheProvider.EXPECT().GetCache().Return(nil).Times(1)
250226

251-
reconciler := NewStreamClassReconciler(k8sClient, cacheProvider, streamReconcilerFactory)
227+
reconciler := NewStreamClassReconciler(k8sClient, streamReconcilerFactory)
252228

253229
// Start the stream controller first
254230
result, err := reconciler.Reconcile(t.Context(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "sc1"}})

services/controllers/stream_class/stream_reconciler_factory.go

Lines changed: 0 additions & 11 deletions
This file was deleted.
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package stream_class
2+
3+
import (
4+
"context"
5+
"k8s.io/apimachinery/pkg/runtime/schema"
6+
"sigs.k8s.io/controller-runtime/pkg/controller"
7+
)
8+
9+
type UnmanagedControllerFactory interface {
10+
CreateStreamController(ctx context.Context, gvk schema.GroupVersionKind) (controller.Controller, error)
11+
}

services/controllers/stream_reconciler.go

Lines changed: 0 additions & 10 deletions
This file was deleted.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package controllers
2+
3+
import (
4+
"k8s.io/apimachinery/pkg/api/meta"
5+
"k8s.io/apimachinery/pkg/runtime"
6+
"sigs.k8s.io/controller-runtime/pkg/cache"
7+
"sigs.k8s.io/controller-runtime/pkg/controller"
8+
)
9+
10+
type UnmanagedReconciler interface {
11+
SetupUnmanaged(cache cache.Cache, scheme *runtime.Scheme, mapper meta.RESTMapper) (controller.Controller, error)
12+
}

tests/mocks/stream_reconciler.go

Lines changed: 0 additions & 57 deletions
This file was deleted.

0 commit comments

Comments
 (0)