Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/apis/streaming/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v1
import (
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"time"
)

// Phase represents the current phase of the stream class
Expand Down Expand Up @@ -43,6 +44,9 @@ type StreamClassStatus struct {

// Conditions represent the latest available observations
Conditions []metav1.Condition `json:"conditions,omitempty"`

// ReconcileAfter is the time after which the stream class should be reconciled again
ReconcileAfter *time.Time `json:"reconcileAfter,omitempty"`
}

// StreamClass is the Schema for the stream class API
Expand Down
56 changes: 48 additions & 8 deletions services/controllers/stream_class/stream_class.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
"github.com/SneaksAndData/arcane-operator/services/controllers"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
runtime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sync"
"time"
)

var _ reconcile.Reconciler = (*StreamClassReconciler)(nil)
Expand Down Expand Up @@ -69,31 +71,44 @@
return s.tryStopStreamController(ctx, name, nil)

case sc.Status.Phase == "":
return s.updatePhase(ctx, sc, name, v1.PhasePending, func() {
return s.updatePhase(ctx, unsetRetry(sc), name, v1.PhasePending, func() {
s.eventRecorder.Event(sc,
corev1.EventTypeNormal,
"StreamClassCreated",
"New StreamClass has been created and is pending processing")
})
case sc.Status.Phase == v1.PhasePending:
return s.tryStartStreamController(ctx, sc, name, v1.PhaseReady, func() {
return s.tryStartStreamController(ctx, unsetRetry(sc), name, v1.PhaseReady, func() {
s.eventRecorder.Event(sc,
corev1.EventTypeNormal,
"StreamClassReady",
"StreamClass is ready and stream controller has been started")
})
case sc.Status.Phase == v1.PhaseFailed:
case sc.Status.Phase == v1.PhaseFailed && sc.Status.ReconcileAfter == nil:
logger.V(0).Info("StreamClass is in Failed state with no scheduled retry, not attempting to recover")
return reconcile.Result{}, nil

case sc.Status.Phase == v1.PhaseFailed && sc.Status.ReconcileAfter != nil:
logger.V(0).Info("Found StreamClass in Failed state, attempting to recover")
return s.tryStartStreamController(ctx, sc, name, v1.PhaseReady, func() {
t := time.Until(*sc.Status.ReconcileAfter)
if t >= 0 {
logger.V(0).Info("StreamClass is in Failed state, waiting until ReconcileAfter time to attempt recovery",
"reconcileAfter",
sc.Status.ReconcileAfter)
return reconcile.Result{RequeueAfter: t}, nil
}

logger.V(0).Info("ReconcileAfter time has passed, attempting to restart stream controller")
return s.tryStartStreamController(ctx, unsetRetry(sc), name, v1.PhaseReady, func() {
s.eventRecorder.Event(sc,
corev1.EventTypeWarning,
corev1.EventTypeNormal,
"StreamClassRecovered",
"StreamClass was found in Failed state and recovery was attempted")
"StreamClass has been recovered from Failed state and stream controller has been restarted")
})

case sc.Status.Phase == v1.PhaseReady:
return s.tryStartStreamController(ctx, sc, name, v1.PhaseReady, func() {
s.eventRecorder.Event(sc,
s.eventRecorder.Event(unsetRetry(sc),
corev1.EventTypeNormal,
"StreamClassReconciled",
"StreamClass is reconciled and stream controller is running")
Expand Down Expand Up @@ -129,13 +144,18 @@
go func() {
err := controller.Start(controllerContext)
if errors.Is(err, context.Canceled) {
logger.V(1).Info("stream controller is stopped")
logger.V(0).Info("stream controller is stopped")
return
}
if err != nil {
logger := s.getLogger(ctx, name)
logger.V(0).Error(err, "stream controller exited with error")

if apierrors.IsForbidden(err) {
err = s.setForRetry(ctx, sc, name, logger)

Check failure on line 155 in services/controllers/stream_class/stream_class.go

View workflow job for this annotation

GitHub Actions / Validate commit

ineffectual assignment to err (ineffassign)
return
}

_, err = s.updatePhase(ctx, sc, name, v1.PhaseFailed, func() {
s.eventRecorder.Event(sc,
corev1.EventTypeWarning,
Expand All @@ -159,6 +179,26 @@
return s.updatePhase(ctx, sc, name, nextPhase, eventFunc)
}

func unsetRetry(sc *v1.StreamClass) *v1.StreamClass {
sc.Status.ReconcileAfter = nil
return sc
}

func (s *StreamClassReconciler) setForRetry(ctx context.Context, sc *v1.StreamClass, name types.NamespacedName, logger klog.Logger) error {
now := metav1.Now().Add(time.Second * 10) // TODO: make the retry delay configurable and implement exponential backoff
sc.Status.ReconcileAfter = &now
_, err := s.updatePhase(ctx, sc, name, v1.PhaseFailed, func() {
s.eventRecorder.Eventf(sc,
corev1.EventTypeWarning,
"StreamControllerError",
"Stream controller exited with error, next reconcile attempt at %s, StreamClass moved to Failed state", sc.Status.ReconcileAfter.Format(time.RFC3339))
})
if err != nil {
logger.V(0).Error(err, "unable to update StreamClass phase to Failed after stream controller exited with error")
}
return err
}

func (s *StreamClassReconciler) tryStopStreamController(ctx context.Context, name types.NamespacedName, eventFunc controllers.EventFunc) (reconcile.Result, error) {
s.rwLock.Lock()
defer s.rwLock.Unlock()
Expand Down
140 changes: 118 additions & 22 deletions services/controllers/stream_class/stream_class_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"github.com/SneaksAndData/arcane-operator/tests/mocks"
"github.com/google/uuid"
"go.uber.org/mock/gomock"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"testing"
"time"

v1 "github.com/SneaksAndData/arcane-operator/pkg/apis/streaming/v1"
"github.com/stretchr/testify/require"
Expand All @@ -35,7 +37,7 @@ func Test_UpdatePhase_ToPending(t *testing.T) {
require.Equal(t, result, reconcile.Result{})

// Assert
expectPhase(t, k8sClient, name, v1.PhasePending)
expectPhase(t, k8sClient, name, v1.PhasePending, nil)
}

func Test_UpdatePhase_ToRunning(t *testing.T) {
Expand Down Expand Up @@ -74,7 +76,7 @@ func Test_UpdatePhase_ToRunning(t *testing.T) {
<-started

// Assert
expectPhase(t, k8sClient, name, v1.PhaseReady)
expectPhase(t, k8sClient, name, v1.PhaseReady, nil)
}

func Test_UpdatePhase_ToRunning_Idempotence(t *testing.T) {
Expand Down Expand Up @@ -115,7 +117,7 @@ func Test_UpdatePhase_ToRunning_Idempotence(t *testing.T) {
<-started

// Assert
expectPhase(t, k8sClient, name, v1.PhaseReady)
expectPhase(t, k8sClient, name, v1.PhaseReady, nil)
}

func Test_UpdatePhase_Ready_ToStopped(t *testing.T) {
Expand Down Expand Up @@ -227,11 +229,10 @@ func Test_UpdatePhase_Pending_ToFailed(t *testing.T) {
require.Equal(t, result, reconcile.Result{})

// Assert
expectPhase(t, k8sClient, name, v1.PhaseFailed)
expectPhase(t, k8sClient, name, v1.PhaseFailed, nil)
}

func Test_UpdatePhase_Ready_ToFailed(t *testing.T) {
t.Skip("Flaky")

// Arrange
mockCtrl := gomock.NewController(t)
Expand All @@ -244,18 +245,50 @@ func Test_UpdatePhase_Ready_ToFailed(t *testing.T) {
},
})

completed := make(chan struct{})
defer close(completed)
streamController := mocks.NewMockController[reconcile.Request](mockCtrl)
streamController.EXPECT().Start(gomock.Any()).Do(func(arg any) {
completed <- struct{}{}
}).Return(fmt.Errorf("some error"))
streamController.EXPECT().Start(gomock.Any()).Return(fmt.Errorf("some error"))

streamReconcilerFactory := mocks.NewMockUnmanagedControllerFactory(mockCtrl)
streamReconcilerFactory.EXPECT().CreateStreamController(gomock.Any(), gomock.Any(), gomock.Any()).Return(streamController, nil)

cacheProvider := mocks.NewMockCacheProvider(mockCtrl)
cacheProvider.EXPECT().GetCache().Return(nil).Times(1)
metricsMock := mocks.NewMockStreamClassMetricsReporter(mockCtrl)
metricsMock.EXPECT().AddStreamClass(gomock.Any(), gomock.Any(), gomock.Any())

recorder := record.NewFakeRecorder(10)
reconciler := NewStreamClassReconciler(k8sClient, streamReconcilerFactory, metricsMock, recorder)

// Start the stream controller first
result, err := reconciler.Reconcile(t.Context(), reconcile.Request{NamespacedName: types.NamespacedName{Name: name}})
require.NoError(t, err)
require.Equal(t, result, reconcile.Result{})

// Wait for the stream controller to start
waitForStatus(t, k8sClient, name, v1.PhaseFailed)

// Assert
expectPhase(t, k8sClient, name, v1.PhaseFailed, func(t *testing.T, sc *v1.StreamClass) {
require.Nil(t, sc.Status.ReconcileAfter)
})
}

func Test_UpdatePhase_Ready_ToFailed_withRetry(t *testing.T) {

// Arrange
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

k8sClient, name := setupFakeClient(t, &v1.StreamClass{
ObjectMeta: metav1.ObjectMeta{},
Status: v1.StreamClassStatus{
Phase: v1.PhaseReady,
},
})

streamController := mocks.NewMockController[reconcile.Request](mockCtrl)
streamController.EXPECT().Start(gomock.Any()).Return(apierrors.NewForbidden(v1.Resource("streams"), "", nil))

streamReconcilerFactory := mocks.NewMockUnmanagedControllerFactory(mockCtrl)
streamReconcilerFactory.EXPECT().CreateStreamController(gomock.Any(), gomock.Any(), gomock.Any()).Return(streamController, nil)

metricsMock := mocks.NewMockStreamClassMetricsReporter(mockCtrl)
metricsMock.EXPECT().AddStreamClass(gomock.Any(), gomock.Any(), gomock.Any())
Expand All @@ -269,30 +302,32 @@ func Test_UpdatePhase_Ready_ToFailed(t *testing.T) {
require.Equal(t, result, reconcile.Result{})

// Wait for the stream controller to start
<-completed
waitForStatus(t, k8sClient, name, v1.PhaseFailed)

// Assert
expectPhase(t, k8sClient, name, v1.PhaseFailed)
expectPhase(t, k8sClient, name, v1.PhaseFailed, func(t *testing.T, sc *v1.StreamClass) {
require.NotNil(t, sc.Status.ReconcileAfter)
})
}

func Test_UpdatePhase_Failed_ToReady(t *testing.T) {
// Arrange
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

tm := metav1.Now().Add(-3 * time.Second)
k8sClient, name := setupFakeClient(t, &v1.StreamClass{
ObjectMeta: metav1.ObjectMeta{},
Status: v1.StreamClassStatus{
Phase: v1.PhaseFailed,
Phase: v1.PhaseFailed,
ReconcileAfter: &tm,
},
})

completed := make(chan struct{})
defer close(completed)
streamController := mocks.NewMockController[reconcile.Request](mockCtrl)
streamController.EXPECT().Start(gomock.Any()).Do(func(arg any) {
completed <- struct{}{}
}).Return(nil)
streamController.EXPECT().Start(gomock.Any()).Return(nil)

streamReconcilerFactory := mocks.NewMockUnmanagedControllerFactory(mockCtrl)
streamReconcilerFactory.EXPECT().CreateStreamController(gomock.Any(), gomock.Any(), gomock.Any()).Return(streamController, nil)
Expand All @@ -308,18 +343,61 @@ func Test_UpdatePhase_Failed_ToReady(t *testing.T) {
require.NoError(t, err)
require.Equal(t, result, reconcile.Result{})

// Wait for the stream controller to start
<-completed
waitForStatus(t, k8sClient, name, v1.PhaseReady)
// Assert
expectPhase(t, k8sClient, name, v1.PhaseReady, func(t *testing.T, sc *v1.StreamClass) {
require.Nil(t, sc.Status.ReconcileAfter)
})
}

func Test_UpdatePhase_Failed_ToFailed(t *testing.T) {
// Arrange
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

tm := metav1.Now().Add(3 * time.Second)
k8sClient, name := setupFakeClient(t, &v1.StreamClass{
ObjectMeta: metav1.ObjectMeta{},
Status: v1.StreamClassStatus{
Phase: v1.PhaseFailed,
ReconcileAfter: &tm,
},
})

completed := make(chan struct{})
defer close(completed)
streamController := mocks.NewMockController[reconcile.Request](mockCtrl)
streamController.EXPECT().Start(gomock.Any()).Return(nil)

streamReconcilerFactory := mocks.NewMockUnmanagedControllerFactory(mockCtrl)
streamReconcilerFactory.EXPECT().CreateStreamController(gomock.Any(), gomock.Any(), gomock.Any()).Return(streamController, nil)

metricsMock := mocks.NewMockStreamClassMetricsReporter(mockCtrl)
metricsMock.EXPECT().AddStreamClass(gomock.Any(), gomock.Any(), gomock.Any())

recorder := record.NewFakeRecorder(10)
reconciler := NewStreamClassReconciler(k8sClient, streamReconcilerFactory, metricsMock, recorder)

// Start the stream controller first
result, err := reconciler.Reconcile(t.Context(), reconcile.Request{NamespacedName: types.NamespacedName{Name: name}})
require.NoError(t, err)
require.LessOrEqual(t, result.RequeueAfter, 3*time.Second)

waitForStatus(t, k8sClient, name, v1.PhaseReady)
// Assert
expectPhase(t, k8sClient, name, v1.PhaseReady)
expectPhase(t, k8sClient, name, v1.PhaseReady, func(t *testing.T, sc *v1.StreamClass) {
require.NotNil(t, sc.Status.ReconcileAfter)
})
}

func expectPhase(t *testing.T, k8sClient client.WithWatch, name string, phase v1.Phase) {
func expectPhase(t *testing.T, k8sClient client.WithWatch, name string, phase v1.Phase, additionalCheck func(*testing.T, *v1.StreamClass)) {
sc2 := &v1.StreamClass{}
err := k8sClient.Get(t.Context(), types.NamespacedName{Name: name}, sc2)
require.NoError(t, err)
require.Equal(t, phase, sc2.Status.Phase)
if additionalCheck != nil {
additionalCheck(t, sc2)
}
}

func setupFakeClient(t *testing.T, sc *v1.StreamClass) (client.WithWatch, string) {
Expand All @@ -333,3 +411,21 @@ func setupFakeClient(t *testing.T, sc *v1.StreamClass) (client.WithWatch, string
k8sClient := crfake.NewClientBuilder().WithStatusSubresource(&v1.StreamClass{}).WithScheme(scheme).WithObjects(sc).Build()
return k8sClient, name.String()
}

func waitForStatus(t *testing.T, k8sClient client.WithWatch, name string, expectedPhase v1.Phase) {
tick := time.Tick(1 * time.Second)

for {
select {
case <-t.Context().Done():
t.Fatal("timout waiting for status update")
case <-tick:
sc2 := &v1.StreamClass{}
err := k8sClient.Get(t.Context(), types.NamespacedName{Name: name}, sc2)
require.NoError(t, err)
if sc2.Status.Phase == expectedPhase {
return
}
}
}
}
Loading