Skip to content

Commit a8f35cc

Browse files
committed
feat: Implement ResourceIndexPolicy finalizer to clean up Meilisearch indexes and update policy cache eviction logic.
1 parent 93b9def commit a8f35cc

File tree

4 files changed

+157
-6
lines changed

4 files changed

+157
-6
lines changed

config/overlays/controller-manager/core-control-plane/rbac/role.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,15 @@ rules:
1919
verbs:
2020
- get
2121
- list
22+
- patch
23+
- update
2224
- watch
25+
- apiGroups:
26+
- search.miloapis.com
27+
resources:
28+
- resourceindexpolicies/finalizers
29+
verbs:
30+
- update
2331
- apiGroups:
2432
- search.miloapis.com
2533
resources:

internal/controllers/policy/policy_controller.go

Lines changed: 89 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"k8s.io/client-go/dynamic"
2020
ctrl "sigs.k8s.io/controller-runtime"
2121
"sigs.k8s.io/controller-runtime/pkg/client"
22+
"sigs.k8s.io/controller-runtime/pkg/finalizer"
2223
logf "sigs.k8s.io/controller-runtime/pkg/log"
2324

2425
"go.miloapis.net/search/internal/cel"
@@ -57,12 +58,17 @@ type ResourceIndexPolicyReconciler struct {
5758

5859
RetryBaseDelay time.Duration
5960
RetryMaxDelay time.Duration
61+
62+
Finalizers finalizer.Finalizers
6063
}
6164

6265
const (
63-
ReadyConditionType = "Ready"
64-
ReadyConditionReason = "PolicyReady"
65-
NotReadyConditionReason = "PolicyNotReady"
66+
FinalizerName = "search.miloapis.com/cleanup"
67+
68+
ReadyConditionType = "Ready"
69+
ReadyConditionReason = "PolicyReady"
70+
NotReadyConditionReason = "PolicyNotReady"
71+
TerminatingConditionReason = "Terminating"
6672

6773
SearchIndexReadyConditionType = "SearchIndexReady"
6874
IndexCreatedReason = "IndexCreated"
@@ -83,8 +89,9 @@ const (
8389
ReindexingInProgressReason = "ReindexingInProgress"
8490
)
8591

86-
// +kubebuilder:rbac:groups=search.miloapis.com,resources=resourceindexpolicies,verbs=get;list;watch
92+
// +kubebuilder:rbac:groups=search.miloapis.com,resources=resourceindexpolicies,verbs=get;list;watch;update;patch
8793
// +kubebuilder:rbac:groups=search.miloapis.com,resources=resourceindexpolicies/status,verbs=get;update;patch
94+
// +kubebuilder:rbac:groups=search.miloapis.com,resources=resourceindexpolicies/finalizers,verbs=update
8895
// +kubebuilder:rbac:groups=*,resources=*,verbs=get;list;watch
8996

9097
// Reconcile matches the state of the cluster with the desired state of a ResourceIndexPolicy.
@@ -104,9 +111,26 @@ func (r *ResourceIndexPolicyReconciler) Reconcile(ctx context.Context, req ctrl.
104111
return ctrl.Result{}, err
105112
}
106113

107-
// Check if the policy is being deleted
114+
// Run finalizers:
115+
finalizeResult, err := r.Finalizers.Finalize(ctx, policy)
116+
if err != nil {
117+
return ctrl.Result{}, fmt.Errorf("failed to run finalizers: %w", err)
118+
}
119+
if finalizeResult.Updated {
120+
logger.Info("Finalizer updated the policy object, persisting to API server")
121+
if updateErr := r.Client.Update(ctx, policy); updateErr != nil {
122+
if errors.IsConflict(updateErr) {
123+
logger.Info("Conflict updating policy after finalizer update; requeuing")
124+
return ctrl.Result{Requeue: true}, nil
125+
}
126+
logger.Error(updateErr, "Failed to update ResourceIndexPolicy after finalizer update")
127+
return ctrl.Result{}, updateErr
128+
}
129+
return ctrl.Result{}, nil
130+
}
131+
108132
if policy.GetDeletionTimestamp() != nil {
109-
logger.Info("ResourceIndexPolicy is being deleted")
133+
logger.Info("ResourceIndexPolicy is being deleted, skipping reconciliation")
110134
return ctrl.Result{}, nil
111135
}
112136

@@ -493,8 +517,67 @@ func (r *ResourceIndexPolicyReconciler) publishReindexMessages(
493517
return nil
494518
}
495519

520+
// resourceIndexPolicyFinalizer handles Meilisearch cleanup when a
521+
// ResourceIndexPolicy is deleted.
522+
type resourceIndexPolicyFinalizer struct {
523+
Client client.Client
524+
SearchSDK *meilisearch.SDKClient
525+
}
526+
527+
func (f *resourceIndexPolicyFinalizer) Finalize(ctx context.Context, obj client.Object) (finalizer.Result, error) {
528+
log := logf.FromContext(ctx).WithName("resourceindexpolicy-finalizer")
529+
log.Info("Finalizing ResourceIndexPolicy")
530+
531+
policy, ok := obj.(*searchv1alpha1.ResourceIndexPolicy)
532+
if !ok {
533+
return finalizer.Result{}, fmt.Errorf("object is not a ResourceIndexPolicy")
534+
}
535+
536+
// Set Ready=False immediately so consumers can observe the terminating state.
537+
oldStatus := policy.Status.DeepCopy()
538+
meta.SetStatusCondition(&policy.Status.Conditions, metav1.Condition{
539+
Type: ReadyConditionType,
540+
Status: metav1.ConditionFalse,
541+
Reason: TerminatingConditionReason,
542+
Message: "ResourceIndexPolicy is being deleted",
543+
})
544+
if err := utils.UpdateStatusIfChanged(ctx, f.Client, log, policy, oldStatus, &policy.Status); err != nil {
545+
log.Error(err, "Failed to update ResourceIndexPolicy status")
546+
return finalizer.Result{}, err
547+
}
548+
549+
// Remove all documents then delete the index itself.
550+
searchIndex := policy.Status.IndexName
551+
if searchIndex == "" {
552+
log.Info("No index name found, skipping cleanup")
553+
return finalizer.Result{}, nil
554+
}
555+
log.Info("Deleting all documents from search index", "index", searchIndex)
556+
if err := f.SearchSDK.DeleteAllDocuments(searchIndex); err != nil {
557+
log.Error(err, "Failed to delete all documents from search index")
558+
return finalizer.Result{}, err
559+
}
560+
561+
log.Info("Deleting search index", "index", searchIndex)
562+
if err := f.SearchSDK.DeleteIndex(searchIndex); err != nil {
563+
log.Error(err, "Failed to delete search index")
564+
return finalizer.Result{}, err
565+
}
566+
567+
log.Info("ResourceIndexPolicy cleanup complete")
568+
return finalizer.Result{}, nil
569+
}
570+
496571
// SetupWithManager sets up the controller with the Manager.
497572
func (r *ResourceIndexPolicyReconciler) SetupWithManager(mgr ctrl.Manager) error {
573+
r.Finalizers = finalizer.NewFinalizers()
574+
if err := r.Finalizers.Register(FinalizerName, &resourceIndexPolicyFinalizer{
575+
Client: r.Client,
576+
SearchSDK: r.SearchSDK,
577+
}); err != nil {
578+
return fmt.Errorf("failed to register finalizer: %w", err)
579+
}
580+
498581
return ctrl.NewControllerManagedBy(mgr).
499582
For(&searchv1alpha1.ResourceIndexPolicy{}).
500583
Complete(r)

internal/indexer/policy_cache.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,16 @@ func (c *PolicyCache) RegisterHandlers(ctx context.Context) error {
103103
func (c *PolicyCache) upsertPolicy(p *v1alpha1.ResourceIndexPolicy) {
104104
key := p.Name
105105

106+
// Evict policies that are being deleted so the indexer stops processing new
107+
// resources against an index that is about to be torn down. DeletionTimestamp
108+
// is set by Kubernetes as soon as a delete request is received — before the
109+
// finalizer runs — giving us the earliest possible eviction signal.
110+
if p.DeletionTimestamp != nil {
111+
klog.Infof("Policy %s is being deleted; evicting from cache", key)
112+
c.deletePolicy(key)
113+
return
114+
}
115+
106116
// If strict ready checking is enabled, we only cache policies that are fully Ready.
107117
// This prevents the primary indexer from processing events for policies that are
108118
// still being initialized (e.g. index creation or initial re-indexing).

pkg/meilisearch/sdk_client.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,56 @@ func (s *SDKClient) UpdateSearchableAttributes(indexUID string, attributes []str
337337
return &meilisearch.Task{TaskUID: resp.TaskUID, IndexUID: indexUID}, nil
338338
}
339339

340+
// DeleteAllDocuments deletes all documents from the given index and waits for the
341+
// task to complete before returning. If the index does not exist, it is a no-op.
342+
func (s *SDKClient) DeleteAllDocuments(indexUID string) error {
343+
resp, err := s.client.Index(indexUID).DeleteAllDocuments(nil)
344+
if err != nil {
345+
if strings.Contains(err.Error(), "index_not_found") {
346+
klog.Infof("Index %s not found, skipping document deletion", indexUID)
347+
return nil
348+
}
349+
return fmt.Errorf("failed to delete all documents from index %s: %w", indexUID, err)
350+
}
351+
352+
task, err := s.waitForTask(resp.TaskUID)
353+
if err != nil {
354+
return fmt.Errorf("failed to wait for document deletion task: %w", err)
355+
}
356+
357+
if task.Status != meilisearch.TaskStatusSucceeded {
358+
return fmt.Errorf("document deletion task failed with status %s: %s", task.Status, task.Error.Message)
359+
}
360+
361+
klog.Infof("All documents deleted from index %s", indexUID)
362+
return nil
363+
}
364+
365+
// DeleteIndex deletes the index with the given UID and waits for the task to
366+
// complete. If the index does not exist, it is a no-op.
367+
func (s *SDKClient) DeleteIndex(indexUID string) error {
368+
resp, err := s.client.DeleteIndex(indexUID)
369+
if err != nil {
370+
if strings.Contains(err.Error(), "index_not_found") {
371+
klog.Infof("Index %s not found, skipping deletion", indexUID)
372+
return nil
373+
}
374+
return fmt.Errorf("failed to delete index %s: %w", indexUID, err)
375+
}
376+
377+
task, err := s.waitForTask(resp.TaskUID)
378+
if err != nil {
379+
return fmt.Errorf("failed to wait for index deletion task: %w", err)
380+
}
381+
382+
if task.Status != meilisearch.TaskStatusSucceeded {
383+
return fmt.Errorf("index deletion task failed with status %s: %s", task.Status, task.Error.Message)
384+
}
385+
386+
klog.Infof("Index %s deleted", indexUID)
387+
return nil
388+
}
389+
340390
// GetSettingsUpdateTask returns the most recent settings update task for the given index.
341391
func (s *SDKClient) GetSettingsUpdateTask(indexUID string) (*meilisearch.Task, error) {
342392
resp, err := s.client.GetTasks(&meilisearch.TasksQuery{

0 commit comments

Comments
 (0)