Skip to content

Commit 480dde6

Browse files
committed
refactor: centralize Kubernetes cache startup and synchronization for indexer policies and update deployment restart logic.
1 parent 1d732f3 commit 480dde6

File tree

4 files changed

+29
-17
lines changed

4 files changed

+29
-17
lines changed

Taskfile.yaml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ tasks:
377377
echo " Etcd: task test-infra:kubectl -- logs -l app.kubernetes.io/name=etcd -n etcd-system -f"
378378
echo " Search API Server: task test-infra:kubectl -- logs -l app.kubernetes.io/name=search-apiserver -n search-system -f"
379379
echo " Search Controller: task test-infra:kubectl -- logs -l app.kubernetes.io/name=search-controller-manager -n search-system -f"
380+
echo " Search Indexer: task test-infra:kubectl -- logs -l app.kubernetes.io/name=resource-indexer -n search-system -f"
380381
381382
dev:generate-webhook-certs:
382383
desc: Generate all certificates for webhook server
@@ -661,7 +662,6 @@ tasks:
661662
deps:
662663
- dev:build
663664
- dev:load
664-
- dev:deploy
665665
cmds:
666666
- |
667667
set -e
@@ -672,13 +672,16 @@ tasks:
672672
# Restart the deployment to pick up new image
673673
task test-infra:kubectl -- rollout restart deployment/search-apiserver -n search-system
674674
task test-infra:kubectl -- rollout restart deployment/search-controller-manager -n search-system
675+
task test-infra:kubectl -- rollout restart deployment/resource-indexer -n search-system
675676
676677
# Wait for rollout to complete
677678
echo "Waiting for rollout to complete..."
679+
task test-infra:kubectl -- rollout status deployment/search-apiserver -n search-system --timeout=1000s
678680
task test-infra:kubectl -- rollout status deployment/search-controller-manager -n search-system --timeout=1000s
681+
task test-infra:kubectl -- rollout status deployment/resource-indexer -n search-system --timeout=1000s
679682
680683
echo "✅ Redeployment complete!"
681-
echo "Check logs with: task test-infra:kubectl -- logs -n search-system -l app.kubernetes.io/name=search-controller-manager"
684+
echo "Check pods with: task test-infra:kubectl -- get pods -n search-system"
682685
683686
dev:nats-queue:
684687
desc: View the NATS queue for the indexer

cmd/search/indexer/command.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -188,18 +188,28 @@ func Run(o *ResourceIndexerOptions, ctx context.Context) error {
188188
return fmt.Errorf("failed to create policy cache: %w", err)
189189
}
190190

191-
go func() {
192-
if err := indexPolicyCache.Start(ctx); err != nil {
193-
klog.Errorf("Index Policy cache stopped: %v", err)
194-
}
195-
}()
191+
// Register handlers for both caches. They share the same underlying informer.
192+
if err := indexPolicyCache.RegisterHandlers(ctx); err != nil {
193+
return fmt.Errorf("failed to register index policy handlers: %w", err)
194+
}
195+
if err := reindexPolicyCache.RegisterHandlers(ctx); err != nil {
196+
return fmt.Errorf("failed to register reindex policy handlers: %w", err)
197+
}
196198

199+
// Start the shared cache and wait for it to be synced.
197200
go func() {
198-
if err := reindexPolicyCache.Start(ctx); err != nil {
199-
klog.Errorf("Reindex Policy cache stopped: %v", err)
201+
klog.Info("Starting shared Kubernetes cache...")
202+
if err := k8sCache.Start(ctx); err != nil {
203+
klog.Fatalf("Kubernetes cache stopped with error: %v", err)
200204
}
201205
}()
202206

207+
klog.Info("Waiting for cache to sync...")
208+
if !k8sCache.WaitForCacheSync(ctx) {
209+
return fmt.Errorf("failed to sync Kubernetes cache")
210+
}
211+
klog.Info("Cache synced successfully")
212+
203213
// Connect to NATS
204214
klog.Infof("Connecting to NATS at %s...", o.NatsURL)
205215
nc, err := nats.Connect(o.NatsURL)

internal/indexer/policy_cache.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ func NewPolicyCache(c runtimecache.Cache, requireReadyCondition bool) (*PolicyCa
5353
}, nil
5454
}
5555

56-
// Start registers informer event handlers for ResourceIndexPolicy objects
57-
func (c *PolicyCache) Start(ctx context.Context) error {
58-
klog.Info("Starting policy cache informer")
56+
// RegisterHandlers registers informer event handlers for ResourceIndexPolicy objects.
57+
func (c *PolicyCache) RegisterHandlers(ctx context.Context) error {
58+
klog.Info("Registering policy cache informer handlers")
5959

6060
informer, err := c.cache.GetInformer(ctx, &v1alpha1.ResourceIndexPolicy{})
6161
if err != nil {
@@ -95,10 +95,6 @@ func (c *PolicyCache) Start(ctx context.Context) error {
9595
return fmt.Errorf("failed to add event handler to ResourceIndexPolicy informer: %w", err)
9696
}
9797

98-
// Start runs all informers and blocks until ctx is cancelled.
99-
if err := c.cache.Start(ctx); err != nil {
100-
return fmt.Errorf("policy cache informer stopped with error: %w", err)
101-
}
10298
return nil
10399
}
104100

@@ -112,7 +108,7 @@ func (c *PolicyCache) upsertPolicy(p *v1alpha1.ResourceIndexPolicy) {
112108
// still being initialized (e.g. index creation or initial re-indexing).
113109
if c.requireReadyCondition {
114110
if !meta.IsStatusConditionTrue(p.Status.Conditions, "Ready") {
115-
klog.Infof("Policy %s is not yet Ready; skipping cache", key)
111+
klog.Infof("Policy %s is not yet Ready (Ready=True condition missing); skipping cache", key)
116112
c.deletePolicy(key)
117113
return
118114
}

internal/indexer/reindex_consumer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func (r *ReindexConsumer) Start(ctx context.Context) error {
6464
for _, cp := range policies {
6565
// Skip if index name is not set yet
6666
if cp.Policy.Status.IndexName == "" {
67+
klog.V(2).Infof("ReindexConsumer: policy %s has no IndexName in status, skipping", cp.Policy.Name)
6768
continue
6869
}
6970

@@ -86,6 +87,8 @@ func (r *ReindexConsumer) Start(ctx context.Context) error {
8687
r.batcher.QueueUpsert(cp.Policy.Status.IndexName, doc, &msg)
8788
queued = true
8889
} else {
90+
klog.V(4).Infof("ReindexConsumer: policy %s did not match resource %s/%s (id=%s), ensuring deletion", cp.Policy.Name, obj.GetNamespace(), obj.GetName(), event.ID)
91+
8992
// If it doesn't match this policy, we should ensure it's removed from the index
9093
// in case it was previously indexed there.
9194
r.batcher.QueueDelete(cp.Policy.Status.IndexName, resourceUID, &msg)

0 commit comments

Comments
 (0)