66 "fmt"
77
88 "github.com/nats-io/nats.go/jetstream"
9+ "go.miloapis.net/search/internal/utils"
910 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
1011 "k8s.io/klog/v2"
1112)
@@ -58,49 +59,7 @@ func (r *ReindexConsumer) Start(ctx context.Context) error {
5859 return
5960 }
6061
61- queued := false
62- policies := r .policyCache .GetPolicies ()
63-
64- for _ , cp := range policies {
65- // Skip if index name is not set yet
66- if cp .Policy .Status .IndexName == "" {
67- klog .V (2 ).Infof ("ReindexConsumer: policy %s has no IndexName in status, skipping" , cp .Policy .Name )
68- continue
69- }
70-
71- evalResult , err := cp .Evaluate (obj )
72- if err != nil {
73- klog .Errorf ("ReindexConsumer: policy %s evaluation error: %v" , cp .Policy .Name , err )
74- continue
75- }
76-
77- if evalResult .Matched {
78- klog .V (4 ).Infof ("ReindexConsumer: match policy=%s resource=%s/%s (id=%s)" ,
79- cp .Policy .Name , obj .GetNamespace (), obj .GetName (), event .ID )
80-
81- // Transform into indexable document
82- doc := evalResult .Transform ()
83-
84- // Ensure UID is set as primary key
85- ensureUID (doc , resourceUID )
86-
87- r .batcher .QueueUpsert (cp .Policy .Status .IndexName , doc , & msg )
88- queued = true
89- } else {
90- klog .V (4 ).Infof ("ReindexConsumer: policy %s did not match resource %s/%s (id=%s), ensuring deletion" , cp .Policy .Name , obj .GetNamespace (), obj .GetName (), event .ID )
91-
92- // If it doesn't match this policy, we should ensure it's removed from the index
93- // in case it was previously indexed there.
94- r .batcher .QueueDelete (cp .Policy .Status .IndexName , resourceUID , & msg )
95- queued = true
96- }
97- }
98-
99- // If the message wasn't queued for any operation (e.g. no policies), acknowledge it
100- if ! queued {
101- klog .Warningf ("ReindexConsumer: event (id=%s) matched no active policies in cache, skipping" , event .ID )
102- msg .Ack ()
103- }
62+ r .processTargetedEvent (msg , event , obj , resourceUID )
10463 })
10564
10665 if err != nil {
@@ -113,3 +72,58 @@ func (r *ReindexConsumer) Start(ctx context.Context) error {
11372 klog .Info ("Shutting down ReindexConsumer..." )
11473 return nil
11574}
75+
76+ // processTargetedEvent evaluates a resource against the specific policy identified
77+ // in the event. It verifies the cached policy's spec hash matches the event's
78+ // spec hash to ensure evaluation uses the correct (updated) conditions.
79+ func (r * ReindexConsumer ) processTargetedEvent (msg jetstream.Msg , event ReindexEvent , obj * unstructured.Unstructured , resourceUID string ) {
80+ if event .PolicyName == "" || event .IndexName == "" {
81+ klog .Warningf ("ReindexConsumer: event (id=%s) missing policyName or indexName, dropping" , event .ID )
82+ msg .Ack ()
83+ return
84+ }
85+
86+ cp := r .policyCache .GetPolicy (event .PolicyName )
87+ if cp == nil {
88+ // Policy not in cache yet — NAK so the message is redelivered
89+ // after the informer propagates the policy to the cache.
90+ klog .V (2 ).Infof ("ReindexConsumer: policy %s not in cache yet, NAK for redelivery (id=%s)" ,
91+ event .PolicyName , event .ID )
92+ msg .Nak ()
93+ return
94+ }
95+
96+ // Verify the cached policy spec matches the version that triggered re-indexing.
97+ if event .SpecHash != "" {
98+ cachedHash := utils .ComputeSpecHash (& cp .Policy .Spec )
99+ if cachedHash != event .SpecHash {
100+ // Cache is stale — NAK so the message is redelivered after
101+ // the informer propagates the updated policy spec.
102+ klog .V (2 ).Infof ("ReindexConsumer: policy %s cache stale (cached=%s, event=%s), NAK for redelivery (id=%s)" ,
103+ event .PolicyName , cachedHash [:8 ], event .SpecHash [:8 ], event .ID )
104+ msg .Nak ()
105+ return
106+ }
107+ }
108+
109+ evalResult , err := cp .Evaluate (obj )
110+ if err != nil {
111+ klog .Errorf ("ReindexConsumer: policy %s evaluation error: %v" , event .PolicyName , err )
112+ msg .Ack ()
113+ return
114+ }
115+
116+ if evalResult .Matched {
117+ klog .V (4 ).Infof ("ReindexConsumer: match policy=%s resource=%s/%s (id=%s)" ,
118+ event .PolicyName , obj .GetNamespace (), obj .GetName (), event .ID )
119+
120+ doc := evalResult .Transform ()
121+ ensureUID (doc , resourceUID )
122+ r .batcher .QueueUpsert (event .IndexName , doc , & msg )
123+ } else {
124+ klog .V (4 ).Infof ("ReindexConsumer: policy %s did not match resource %s/%s (id=%s), deleting from index" ,
125+ event .PolicyName , obj .GetNamespace (), obj .GetName (), event .ID )
126+
127+ r .batcher .QueueDelete (event .IndexName , resourceUID , & msg )
128+ }
129+ }
0 commit comments