Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions controllers/api_key_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"k8s.io/klog/v2"

v1 "github.com/neutree-ai/neutree/api/v1"
"github.com/neutree-ai/neutree/controllers/reconcile"
"github.com/neutree-ai/neutree/internal/gateway"
"github.com/neutree-ai/neutree/pkg/storage"
)
Expand Down Expand Up @@ -32,15 +33,15 @@ func NewApiKeyController(option *ApiKeyControllerOption) (*ApiKeyController, err
return c, nil
}

func (c *ApiKeyController) Reconcile(obj interface{}) error {
func (c *ApiKeyController) Reconcile(obj interface{}) (reconcile.Result, error) {
apiKey, ok := obj.(*v1.ApiKey)
if !ok {
return errors.New("failed to assert obj to *v1.ApiKey")
return reconcile.Result{}, errors.New("failed to assert obj to *v1.ApiKey")
}

klog.V(4).Info("Reconcile api_key " + apiKey.Metadata.Name)

return c.syncHandler(apiKey)
return reconcile.Result{}, c.syncHandler(apiKey)
}

func (c *ApiKeyController) sync(obj *v1.ApiKey) error {
Expand Down
2 changes: 1 addition & 1 deletion controllers/api_key_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func TestApiKeyController_Reconcile(t *testing.T) {
}

// Directly call the Reconcile method.
err := c.Reconcile(tt.inputKey)
_, err := c.Reconcile(tt.inputKey)

// Assertions.
if tt.wantErr {
Expand Down
20 changes: 18 additions & 2 deletions controllers/base_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"github.com/neutree-ai/neutree/controllers/reconcile"
"github.com/neutree-ai/neutree/pkg/storage"
)

Expand All @@ -19,7 +20,7 @@ const (
type HookFunc func(obj interface{}) error

type Reconciler interface {
Reconcile(obj interface{}) error
Reconcile(obj interface{}) (reconcile.Result, error)
}

type BaseController struct {
Expand Down Expand Up @@ -89,8 +90,23 @@ func (bc *BaseController) processNextWorkItem(r Reconciler) bool {
}
}

if err := r.Reconcile(obj); err != nil {
result, err := r.Reconcile(obj)
if err != nil {
klog.Error(err)

if result.RequeueAfter > 0 {
bc.queue.AddAfter(key, result.RequeueAfter)
} else if result.Requeue {
bc.queue.AddRateLimited(key)
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When reconciliation returns an error, the item is not automatically requeued unless a Result flag is set. This means errors without explicit requeue flags will cause the item to be dropped from the queue entirely. The standard pattern should be to always requeue on error (using rate limiting) regardless of the Result flags. Consider adding a fallback to ensure items are requeued with rate limiting when an error occurs and no requeue flags are set.

Suggested change
bc.queue.AddRateLimited(key)
bc.queue.AddRateLimited(key)
} else {
// Ensure items are always retried on error, even if Result does not request requeue.
bc.queue.AddRateLimited(key)

Copilot uses AI. Check for mistakes.
}
} else {
bc.queue.Forget(key)

if result.RequeueAfter > 0 {
bc.queue.AddAfter(key, result.RequeueAfter)
} else if result.Requeue {
bc.queue.Add(key)
}
}

for _, hook := range bc.afterReconcileHooks {
Expand Down
7 changes: 4 additions & 3 deletions controllers/base_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

v1 "github.com/neutree-ai/neutree/api/v1"
"github.com/neutree-ai/neutree/controllers/mocks"
"github.com/neutree-ai/neutree/controllers/reconcile"
"github.com/neutree-ai/neutree/pkg/storage"
)

Expand All @@ -30,7 +31,7 @@ func TestBaseController_processNextWorkItem(t *testing.T) {
m.On("Get", "1").Return(&v1.Cluster{ID: 1}, nil)
},
mockReconciler: func(m *mocks.MockReconciler) {
m.On("Reconcile", &v1.Cluster{ID: 1}).Return(nil)
m.On("Reconcile", &v1.Cluster{ID: 1}).Return(reconcile.Result{}, nil)
},
expected: true,
},
Expand All @@ -43,7 +44,7 @@ func TestBaseController_processNextWorkItem(t *testing.T) {
m.On("Get", "1").Return(&v1.Cluster{ID: 1}, nil)
},
mockReconciler: func(m *mocks.MockReconciler) {
m.On("Reconcile", &v1.Cluster{ID: 1}).Return(errors.New("test error"))
m.On("Reconcile", &v1.Cluster{ID: 1}).Return(reconcile.Result{}, errors.New("test error"))
},
expected: true,
},
Expand Down Expand Up @@ -130,7 +131,7 @@ func TestBaseController_processNextWorkItem(t *testing.T) {
m.On("Get", "1").Return(&v1.Cluster{ID: 1}, nil)
},
mockReconciler: func(m *mocks.MockReconciler) {
m.On("Reconcile", &v1.Cluster{ID: 1}).Return(nil)
m.On("Reconcile", &v1.Cluster{ID: 1}).Return(reconcile.Result{}, nil)
},
expected: true,
},
Expand Down
7 changes: 4 additions & 3 deletions controllers/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

v1 "github.com/neutree-ai/neutree/api/v1"

"github.com/neutree-ai/neutree/controllers/reconcile"
"github.com/neutree-ai/neutree/internal/accelerator"
"github.com/neutree-ai/neutree/internal/cluster"
"github.com/neutree-ai/neutree/internal/gateway"
Expand Down Expand Up @@ -59,15 +60,15 @@ func NewClusterController(opt *ClusterControllerOption) (*ClusterController, err
return c, nil
}

func (c *ClusterController) Reconcile(obj interface{}) error {
func (c *ClusterController) Reconcile(obj interface{}) (reconcile.Result, error) {
cluster, ok := obj.(*v1.Cluster)
if !ok {
return errors.New("failed to assert obj to *v1.Cluster")
return reconcile.Result{}, errors.New("failed to assert obj to *v1.Cluster")
}

klog.V(4).Info("Reconciling cluster " + cluster.Metadata.WorkspaceName())

return c.syncHandler(cluster)
return reconcile.Result{}, c.syncHandler(cluster)
}

func (controller *ClusterController) sync(obj *v1.Cluster) error {
Expand Down
2 changes: 1 addition & 1 deletion controllers/cluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func TestClusterController_Reconcile(t *testing.T) {
}

c := &ClusterController{storage: mockStorage, syncHandler: func(*v1.Cluster) error { return nil }}
err := c.Reconcile(tt.input)
_, err := c.Reconcile(tt.input)

if tt.wantErr {
assert.Error(t, err)
Expand Down
7 changes: 4 additions & 3 deletions controllers/endpoint_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"k8s.io/klog/v2"

v1 "github.com/neutree-ai/neutree/api/v1"
"github.com/neutree-ai/neutree/controllers/reconcile"
"github.com/neutree-ai/neutree/internal/accelerator"
"github.com/neutree-ai/neutree/internal/gateway"
"github.com/neutree-ai/neutree/internal/orchestrator"
Expand Down Expand Up @@ -40,15 +41,15 @@ func NewEndpointController(option *EndpointControllerOption) (*EndpointControlle
return c, nil
}

func (c *EndpointController) Reconcile(obj interface{}) error {
func (c *EndpointController) Reconcile(obj interface{}) (reconcile.Result, error) {
endpoint, ok := obj.(*v1.Endpoint)
if !ok {
return errors.New("failed to assert obj to *v1.Endpoint")
return reconcile.Result{}, errors.New("failed to assert obj to *v1.Endpoint")
}

klog.V(4).Info("Reconcile endpoint " + endpoint.Metadata.WorkspaceName())

return c.syncHandler(endpoint)
return reconcile.Result{}, c.syncHandler(endpoint)
}

func (c *EndpointController) sync(obj *v1.Endpoint) error {
Expand Down
2 changes: 1 addition & 1 deletion controllers/endpoint_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func TestEndpointController_Reconcile(t *testing.T) {
tt.setup(ms)
}
c := &EndpointController{storage: ms, syncHandler: func(*v1.Endpoint) error { return nil }}
err := c.Reconcile(tt.key)
_, err := c.Reconcile(tt.key)
if tt.wantErr {
assert.Error(t, err)
} else {
Expand Down
7 changes: 4 additions & 3 deletions controllers/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"k8s.io/klog/v2"

v1 "github.com/neutree-ai/neutree/api/v1"
"github.com/neutree-ai/neutree/controllers/reconcile"
"github.com/neutree-ai/neutree/pkg/storage"
)

Expand All @@ -29,15 +30,15 @@ func NewEngineController(option *EngineControllerOption) (*EngineController, err
return c, nil
}

func (c *EngineController) Reconcile(obj interface{}) error {
func (c *EngineController) Reconcile(obj interface{}) (reconcile.Result, error) {
engine, ok := obj.(*v1.Engine)
if !ok {
return errors.New("failed to assert obj to *v1.Engine")
return reconcile.Result{}, errors.New("failed to assert obj to *v1.Engine")
}

klog.V(4).Info("Reconcile engine " + engine.Metadata.Name)

return c.syncHandler(engine)
return reconcile.Result{}, c.syncHandler(engine)
}

func (c *EngineController) sync(obj *v1.Engine) error {
Expand Down
2 changes: 1 addition & 1 deletion controllers/engine_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func TestEngineController_Reconcile(t *testing.T) {
}

// Directly call the Reconcile method.
err := c.Reconcile(tt.inputKey)
_, err := c.Reconcile(tt.inputKey)

// Assertions.
if tt.wantErr {
Expand Down
7 changes: 4 additions & 3 deletions controllers/image_registry_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"k8s.io/klog/v2"

v1 "github.com/neutree-ai/neutree/api/v1"
"github.com/neutree-ai/neutree/controllers/reconcile"
"github.com/neutree-ai/neutree/internal/registry"
"github.com/neutree-ai/neutree/internal/util"
"github.com/neutree-ai/neutree/pkg/storage"
Expand Down Expand Up @@ -37,15 +38,15 @@ func NewImageRegistryController(option *ImageRegistryControllerOption) (*ImageRe
return c, nil
}

func (c *ImageRegistryController) Reconcile(obj interface{}) error {
func (c *ImageRegistryController) Reconcile(obj interface{}) (reconcile.Result, error) {
imageRegistry, ok := obj.(*v1.ImageRegistry)
if !ok {
return errors.New("failed to assert obj to *v1.ImageRegistry")
return reconcile.Result{}, errors.New("failed to assert obj to *v1.ImageRegistry")
}

klog.V(4).Info("Reconcile image registry " + imageRegistry.Metadata.Name)

return c.syncHandler(imageRegistry)
return reconcile.Result{}, c.syncHandler(imageRegistry)
}

func (c *ImageRegistryController) sync(obj *v1.ImageRegistry) error {
Expand Down
2 changes: 1 addition & 1 deletion controllers/image_registry_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func TestImageRegistryController_Reconcile(t *testing.T) {
}

c := &ImageRegistryController{storage: mockStorage, syncHandler: func(obj *v1.ImageRegistry) error { return nil }}
err := c.Reconcile(tt.input)
_, err := c.Reconcile(tt.input)

if tt.wantErr {
assert.Error(t, err)
Expand Down
31 changes: 22 additions & 9 deletions controllers/mocks/mock_reconciler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions controllers/model_catalog_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"k8s.io/klog/v2"

v1 "github.com/neutree-ai/neutree/api/v1"
"github.com/neutree-ai/neutree/controllers/reconcile"
"github.com/neutree-ai/neutree/pkg/storage"
)

Expand All @@ -31,15 +32,15 @@ func NewModelCatalogController(opt *ModelCatalogControllerOption) (*ModelCatalog
return c, nil
}

func (c *ModelCatalogController) Reconcile(obj interface{}) error {
func (c *ModelCatalogController) Reconcile(obj interface{}) (reconcile.Result, error) {
modelCatalog, ok := obj.(*v1.ModelCatalog)
if !ok {
return errors.New("failed to assert obj to *v1.ModelCatalog")
return reconcile.Result{}, errors.New("failed to assert obj to *v1.ModelCatalog")
}

klog.V(4).Info("Reconciling model catalog " + modelCatalog.Metadata.Name)

return c.syncHandler(modelCatalog)
return reconcile.Result{}, c.syncHandler(modelCatalog)
}

func (c *ModelCatalogController) sync(modelCatalog *v1.ModelCatalog) error {
Expand Down
2 changes: 1 addition & 1 deletion controllers/model_catalog_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestModelCatalogController_Reconcile(t *testing.T) {

mockStorage.On("UpdateModelCatalog", "1", mock.AnythingOfType("*v1.ModelCatalog")).Return(nil)

err := controller.Reconcile(modelCatalog)
_, err := controller.Reconcile(modelCatalog)

assert.NoError(t, err)
mockStorage.AssertExpectations(t)
Expand Down
7 changes: 4 additions & 3 deletions controllers/model_registry_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"k8s.io/klog/v2"

v1 "github.com/neutree-ai/neutree/api/v1"
"github.com/neutree-ai/neutree/controllers/reconcile"
"github.com/neutree-ai/neutree/pkg/model_registry"
"github.com/neutree-ai/neutree/pkg/storage"
)
Expand All @@ -31,15 +32,15 @@ func NewModelRegistryController(option *ModelRegistryControllerOption) (*ModelRe
return c, nil
}

func (c *ModelRegistryController) Reconcile(obj interface{}) error {
func (c *ModelRegistryController) Reconcile(obj interface{}) (reconcile.Result, error) {
modelRegistry, ok := obj.(*v1.ModelRegistry)
if !ok {
return errors.New("failed to assert obj to *v1.ModelRegistry")
return reconcile.Result{}, errors.New("failed to assert obj to *v1.ModelRegistry")
}

klog.V(4).Info("Reconcile model registry " + modelRegistry.Metadata.Name)

return c.syncHandler(modelRegistry)
return reconcile.Result{}, c.syncHandler(modelRegistry)
}

func (c *ModelRegistryController) sync(obj *v1.ModelRegistry) (err error) {
Expand Down
2 changes: 1 addition & 1 deletion controllers/model_registry_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func TestModelRegistryController_Reconcile(t *testing.T) {
}

c := &ModelRegistryController{storage: mockStorage, syncHandler: func(obj *v1.ModelRegistry) error { return nil }}
err := c.Reconcile(tt.input)
_, err := c.Reconcile(tt.input)

if tt.wantErr {
assert.Error(t, err)
Expand Down
Loading