Skip to content

Commit 0042f1b

Browse files
committed
feat: update policycache to use informer
1 parent d78dea2 commit 0042f1b

File tree

9 files changed

+296
-193
lines changed

9 files changed

+296
-193
lines changed

cmd/search/indexer/command.go

Lines changed: 23 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
"go.miloapis.net/search/pkg/meilisearch"
1616
"k8s.io/apimachinery/pkg/runtime"
1717
"k8s.io/klog/v2"
18-
"sigs.k8s.io/controller-runtime/pkg/client"
18+
runtimecache "sigs.k8s.io/controller-runtime/pkg/cache"
1919
"sigs.k8s.io/controller-runtime/pkg/client/config"
2020
)
2121

@@ -30,11 +30,6 @@ type ResourceIndexerOptions struct {
3030
NatsAckWait time.Duration
3131
NatsMaxInFlight int
3232

33-
// ResourceIndexPolicySyncInterval controls how often the indexer polls the Kubernetes API
34-
// to refresh the cache of ResourceIndexPolicies. This allows the indexer to pick up
35-
// changes to indexing rules without a restart.
36-
ResourceIndexPolicySyncInterval time.Duration
37-
3833
// Meilisearch connection and timeout settings
3934
MeilisearchTaskWaitTimeout time.Duration // Timeout for waiting for Meilisearch tasks to complete.
4035
MeilisearchHTTPTimeout time.Duration // Timeout for HTTP requests to Meilisearch.
@@ -52,23 +47,22 @@ type ResourceIndexerOptions struct {
5247
// NewResourceIndexerOptions creates a new ResourceIndexerOptions with default values.
5348
func NewResourceIndexerOptions() *ResourceIndexerOptions {
5449
return &ResourceIndexerOptions{
55-
NatsURL: "nats://nats.nats-system.svc.cluster.local:4222",
56-
NatsSubject: "audit.>",
57-
NatsQueueGroup: "search-indexer",
58-
NatsDurableName: "search-indexer",
59-
NatsStreamName: "AUDIT_EVENTS",
60-
NatsAckWait: 120 * time.Second,
61-
NatsMaxInFlight: 10000,
62-
ResourceIndexPolicySyncInterval: 2 * time.Minute,
63-
MeilisearchTaskWaitTimeout: 4 * time.Second,
64-
MeilisearchHTTPTimeout: 60 * time.Second,
65-
MeilisearchDomain: "http://meilisearch.meilisearch-system.svc.cluster.local:7700",
66-
MeilisearchChunkSize: 1000,
67-
BatchSize: 1000,
68-
FlushInterval: 5 * time.Second,
69-
MeilisearchMaxRetries: 3,
70-
MeilisearchRetryDelay: 500 * time.Millisecond,
71-
BatchMaxConcurrentUploads: 100,
50+
NatsURL: "nats://nats.nats-system.svc.cluster.local:4222",
51+
NatsSubject: "audit.>",
52+
NatsQueueGroup: "search-indexer",
53+
NatsDurableName: "search-indexer",
54+
NatsStreamName: "AUDIT_EVENTS",
55+
NatsAckWait: 120 * time.Second,
56+
NatsMaxInFlight: 10000,
57+
MeilisearchTaskWaitTimeout: 4 * time.Second,
58+
MeilisearchHTTPTimeout: 60 * time.Second,
59+
MeilisearchDomain: "http://meilisearch.meilisearch-system.svc.cluster.local:7700",
60+
MeilisearchChunkSize: 1000,
61+
BatchSize: 1000,
62+
FlushInterval: 5 * time.Second,
63+
MeilisearchMaxRetries: 3,
64+
MeilisearchRetryDelay: 500 * time.Millisecond,
65+
BatchMaxConcurrentUploads: 100,
7266
}
7367
}
7468

@@ -81,7 +75,6 @@ func (o *ResourceIndexerOptions) AddFlags(fs *pflag.FlagSet) {
8175
fs.StringVar(&o.NatsStreamName, "nats-stream-name", o.NatsStreamName, "The name of the JetStream stream.")
8276
fs.DurationVar(&o.NatsAckWait, "nats-ack-wait", o.NatsAckWait, "The time to wait for an acknowledgement.")
8377
fs.IntVar(&o.NatsMaxInFlight, "nats-max-in-flight", o.NatsMaxInFlight, "The maximum number of in-flight messages.")
84-
fs.DurationVar(&o.ResourceIndexPolicySyncInterval, "resource-index-policy-sync-interval", o.ResourceIndexPolicySyncInterval, "How often to re-sync ResourceIndexPolicies.")
8578
fs.StringVar(&o.MeilisearchDomain, "meilisearch-domain", o.MeilisearchDomain, "Domain of the Meilisearch instance.")
8679
fs.DurationVar(&o.MeilisearchTaskWaitTimeout, "meilisearch-task-wait-timeout", o.MeilisearchTaskWaitTimeout, "Timeout for waiting for Meilisearch tasks to complete.")
8780
fs.DurationVar(&o.MeilisearchHTTPTimeout, "meilisearch-http-timeout", o.MeilisearchHTTPTimeout, "Timeout for HTTP requests to Meilisearch.")
@@ -116,9 +109,6 @@ func (o *ResourceIndexerOptions) Validate() error {
116109
if o.NatsMaxInFlight < 1 {
117110
return fmt.Errorf("nats-max-in-flight must be greater than 0")
118111
}
119-
if o.ResourceIndexPolicySyncInterval < 10*time.Second {
120-
return fmt.Errorf("resource-index-policy-sync-interval must be at least 10s")
121-
}
122112
if o.MeilisearchDomain == "" {
123113
return fmt.Errorf("meilisearch-domain must be set")
124114
}
@@ -177,7 +167,7 @@ func NewIndexerCommand() *cobra.Command {
177167

178168
// Run starts the indexer consumer
179169
func Run(o *ResourceIndexerOptions, ctx context.Context) error {
180-
// Build a Kubernetes client for listing policies
170+
// Build a scheme and REST config for the controller-runtime cache.
181171
scheme := runtime.NewScheme()
182172
if err := searchv1alpha1.AddToScheme(scheme); err != nil {
183173
return fmt.Errorf("failed to add v1alpha1 scheme: %w", err)
@@ -188,13 +178,15 @@ func Run(o *ResourceIndexerOptions, ctx context.Context) error {
188178
return fmt.Errorf("failed to get kubeconfig: %w", err)
189179
}
190180

191-
k8sClient, err := client.New(cfg, client.Options{Scheme: scheme})
181+
// Create a controller-runtime cache that uses a watch stream (informer)
182+
// to keep ResourceIndexPolicies in-sync.
183+
k8sCache, err := runtimecache.New(cfg, runtimecache.Options{Scheme: scheme})
192184
if err != nil {
193-
return fmt.Errorf("failed to create kubernetes client: %w", err)
185+
return fmt.Errorf("failed to create controller-runtime cache: %w", err)
194186
}
195187

196188
// Create and start the policy cache
197-
policyCache, err := indexer.NewPolicyCache(k8sClient, o.ResourceIndexPolicySyncInterval)
189+
policyCache, err := indexer.NewPolicyCache(k8sCache)
198190
if err != nil {
199191
return fmt.Errorf("failed to create policy cache: %w", err)
200192
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
---
2+
apiVersion: rbac.authorization.k8s.io/v1
3+
kind: ClusterRole
4+
metadata:
5+
name: milo-controller-manager
6+
rules:
7+
- apiGroups:
8+
- search.miloapis.com
9+
resources:
10+
- resourceindexpolicies
11+
verbs:
12+
- get
13+
- list
14+
- watch
15+
- apiGroups:
16+
- search.miloapis.com
17+
resources:
18+
- resourceindexpolicies/status
19+
verbs:
20+
- get
21+
- patch
22+
- update

config/samples/policy_v1alpha1_configmap_policy.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
apiVersion: policy.search.miloapis.com/v1alpha1
1+
apiVersion: search.miloapis.com/v1alpha1
22
kind: ResourceIndexPolicy
33
metadata:
44
name: configmap-index-policy-1

config/samples/policy_v1alpha1_service_policy.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
apiVersion: policy.search.miloapis.com/v1alpha1
1+
apiVersion: search.miloapis.com/v1alpha1
22
kind: ResourceIndexPolicy
33
metadata:
44
name: service-index-policy-1

internal/indexer/batcher.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,8 @@ func (b *Batcher) QueueUpsert(indexUID string, doc any, msg *jetstream.Msg) {
110110
}
111111
}
112112

113-
var a int = 0
114-
115113
// QueueDelete adds a document ID to the pending map and triggers an asynchronous flush if the batch size is reached.
116114
func (b *Batcher) QueueDelete(indexUID string, docID string, msg *jetstream.Msg) {
117-
a++
118-
klog.Infof("QueueDelete %d, %d", len(b.pendingDeletes), len(b.deleteMsgs))
119115
b.mu.Lock()
120116
defer b.mu.Unlock()
121117

internal/indexer/consumer_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ import (
99

1010
"github.com/nats-io/nats.go/jetstream"
1111
"github.com/stretchr/testify/mock"
12+
internalcel "go.miloapis.net/search/internal/cel"
13+
policyevaluation "go.miloapis.net/search/internal/policy/evaluation"
1214
"go.miloapis.net/search/pkg/apis/search/v1alpha1"
1315
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14-
"k8s.io/apimachinery/pkg/runtime"
15-
"sigs.k8s.io/controller-runtime/pkg/client/fake"
1616
)
1717

1818
// MockConsumer is a mock for jetstream.Consumer
@@ -74,9 +74,6 @@ func (m *MockConsumeContext) Closed() <-chan struct{} { return nil }
7474

7575
func TestIndexer_Start_ConsumeFlow(t *testing.T) {
7676
// 1. Setup PolicyCache with a policy
77-
scheme := runtime.NewScheme()
78-
v1alpha1.AddToScheme(scheme)
79-
8077
policy := &v1alpha1.ResourceIndexPolicy{
8178
ObjectMeta: metav1.ObjectMeta{Name: "pod-policy"},
8279
Spec: v1alpha1.ResourceIndexPolicySpec{
@@ -91,9 +88,12 @@ func TestIndexer_Start_ConsumeFlow(t *testing.T) {
9188
},
9289
}
9390

94-
k8sClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(policy).Build()
95-
policyCache, _ := NewPolicyCache(k8sClient, 1*time.Minute)
96-
policyCache.refresh(context.Background())
91+
env, _ := internalcel.NewEnv()
92+
policyCache := &PolicyCache{
93+
policies: make(map[string]*policyevaluation.CachedPolicy),
94+
celEnv: env,
95+
}
96+
policyCache.upsertPolicy(policy)
9797

9898
// 2. Setup Batcher with Mock Search Client
9999
mockSearch := new(MockSearchClient)
@@ -159,9 +159,6 @@ func TestIndexer_Start_ConsumeFlow(t *testing.T) {
159159

160160
func TestIndexer_Consume_Delete(t *testing.T) {
161161
// Setup similar to above but for DELETE event
162-
scheme := runtime.NewScheme()
163-
v1alpha1.AddToScheme(scheme)
164-
165162
policy := &v1alpha1.ResourceIndexPolicy{
166163
ObjectMeta: metav1.ObjectMeta{Name: "pod-policy"},
167164
Spec: v1alpha1.ResourceIndexPolicySpec{
@@ -173,9 +170,12 @@ func TestIndexer_Consume_Delete(t *testing.T) {
173170
},
174171
}
175172

176-
k8sClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(policy).Build()
177-
policyCache, _ := NewPolicyCache(k8sClient, 1*time.Minute)
178-
policyCache.refresh(context.Background())
173+
env2, _ := internalcel.NewEnv()
174+
policyCache := &PolicyCache{
175+
policies: make(map[string]*policyevaluation.CachedPolicy),
176+
celEnv: env2,
177+
}
178+
policyCache.upsertPolicy(policy)
179179

180180
mockSearch := new(MockSearchClient)
181181
batcher := NewBatcher(mockSearch, BatchConfig{BatchSize: 1, FlushInterval: 1 * time.Minute})

0 commit comments

Comments
 (0)