Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Taskfile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ tasks:
- "\"{{.TOOL_DIR}}/controller-gen\" object paths=\"./pkg/apis/...\""
# Generate RBAC rules for the controllers.
- echo "Generating RBAC rules for the controllers..."
- "\"{{.TOOL_DIR}}/controller-gen\" rbac:roleName=milo-controller-manager paths=\"./internal/controllers/...\" output:dir=\"./config/controller-manager/overlays/core-control-plane/rbac\""
- "\"{{.TOOL_DIR}}/controller-gen\" rbac:roleName=milo-controller-manager paths=\"./internal/controllers/...\" output:dir=\"./config/overlays/controller-manager/core-control-plane/rbac\""
- task: generate:openapi
silent: true

Expand Down
151 changes: 100 additions & 51 deletions cmd/search/indexer/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,35 @@ import (
type ResourceIndexerOptions struct {
// NATS connection and consumer settings
NatsURL string
NatsSubject string
NatsQueueGroup string
NatsDurableName string
NatsStreamName string
NatsAckWait time.Duration
NatsMaxInFlight int

// NATS re-index consumer settings (separate REINDEX_EVENTS stream)
NatsReindexStream string
NatsReindexDurableName string

// Meilisearch connection and timeout settings
MeilisearchTaskWaitTimeout time.Duration // Timeout for waiting for Meilisearch tasks to complete.
MeilisearchHTTPTimeout time.Duration // Timeout for HTTP requests to Meilisearch.
MeilisearchDomain string // Domain of the Meilisearch instance.
MeilisearchChunkSize int // The number of documents to process in a single chunk.
MeilisearchMaxRetries int // The maximum number of retries for transient Meilisearch errors.
MeilisearchRetryDelay time.Duration // The base delay between Meilisearch retries.

// Audit events batching and throughput tuning
BatchSize int // The maximum number of documents to process in a single batch.
FlushInterval time.Duration // The time to wait before flushing a batch.
BatchMaxConcurrentUploads int // The maximum number of concurrent uploads to Meilisearch.
MeilisearchTaskWaitTimeout time.Duration
MeilisearchHTTPTimeout time.Duration
MeilisearchDomain string
MeilisearchChunkSize int
MeilisearchMaxRetries int
MeilisearchRetryDelay time.Duration

// Batching and throughput tuning
BatchSize int
FlushInterval time.Duration
BatchMaxConcurrentUploads int
}

// NewResourceIndexerOptions creates a new ResourceIndexerOptions with default values.
func NewResourceIndexerOptions() *ResourceIndexerOptions {
return &ResourceIndexerOptions{
NatsURL: "nats://nats.nats-system.svc.cluster.local:4222",
NatsSubject: "audit.>",
NatsQueueGroup: "search-indexer",
NatsDurableName: "search-indexer",
NatsStreamName: "AUDIT_EVENTS",
NatsAckWait: 120 * time.Second,
NatsMaxInFlight: 10000,
NatsReindexStream: "REINDEX_EVENTS",
NatsReindexDurableName: "search-reindexer",
MeilisearchTaskWaitTimeout: 4 * time.Second,
MeilisearchHTTPTimeout: 60 * time.Second,
MeilisearchDomain: "http://meilisearch.meilisearch-system.svc.cluster.local:7700",
Expand All @@ -69,12 +67,12 @@ func NewResourceIndexerOptions() *ResourceIndexerOptions {
// AddFlags adds the flags for the resource indexer to the command.
func (o *ResourceIndexerOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.NatsURL, "nats-url", o.NatsURL, "The URL of the NATS server.")
fs.StringVar(&o.NatsSubject, "nats-subject", o.NatsSubject, "The NATS subject to subscribe to.")
fs.StringVar(&o.NatsQueueGroup, "nats-queue-group", o.NatsQueueGroup, "The NATS queue group for load balancing.")
fs.StringVar(&o.NatsDurableName, "nats-durable-name", o.NatsDurableName, "The durable name for the JetStream consumer.")
fs.StringVar(&o.NatsStreamName, "nats-stream-name", o.NatsStreamName, "The name of the JetStream stream.")
fs.DurationVar(&o.NatsAckWait, "nats-ack-wait", o.NatsAckWait, "The time to wait for an acknowledgement.")
fs.IntVar(&o.NatsMaxInFlight, "nats-max-in-flight", o.NatsMaxInFlight, "The maximum number of in-flight messages.")
fs.StringVar(&o.NatsDurableName, "nats-durable-name", o.NatsDurableName, "The durable name of the audit-events JetStream consumer (must match the manifest).")
fs.StringVar(&o.NatsStreamName, "nats-stream-name", o.NatsStreamName, "The name of the audit-events JetStream stream.")

fs.StringVar(&o.NatsReindexStream, "nats-reindex-stream", o.NatsReindexStream, "The JetStream stream name for re-index messages.")
fs.StringVar(&o.NatsReindexDurableName, "nats-reindex-durable-name", o.NatsReindexDurableName, "The durable name of the re-index JetStream consumer (must match the manifest).")

fs.StringVar(&o.MeilisearchDomain, "meilisearch-domain", o.MeilisearchDomain, "Domain of the Meilisearch instance.")
fs.DurationVar(&o.MeilisearchTaskWaitTimeout, "meilisearch-task-wait-timeout", o.MeilisearchTaskWaitTimeout, "Timeout for waiting for Meilisearch tasks to complete.")
fs.DurationVar(&o.MeilisearchHTTPTimeout, "meilisearch-http-timeout", o.MeilisearchHTTPTimeout, "Timeout for HTTP requests to Meilisearch.")
Expand All @@ -91,23 +89,17 @@ func (o *ResourceIndexerOptions) Validate() error {
if o.NatsURL == "" {
return fmt.Errorf("nats-url must be set")
}
if o.NatsSubject == "" {
return fmt.Errorf("nats-subject must be set")
}
if o.NatsQueueGroup == "" {
return fmt.Errorf("nats-queue-group must be set")
}
if o.NatsDurableName == "" {
return fmt.Errorf("nats-durable-name must be set")
}
if o.NatsStreamName == "" {
return fmt.Errorf("nats-stream-name must be set")
}
if o.NatsAckWait == 0 {
return fmt.Errorf("nats-ack-wait must be set")
if o.NatsReindexStream == "" {
return fmt.Errorf("nats-reindex-stream must be set")
}
if o.NatsMaxInFlight < 1 {
return fmt.Errorf("nats-max-in-flight must be greater than 0")
if o.NatsReindexDurableName == "" {
return fmt.Errorf("nats-reindex-durable-name must be set")
}
if o.MeilisearchDomain == "" {
return fmt.Errorf("meilisearch-domain must be set")
Expand Down Expand Up @@ -186,14 +178,25 @@ func Run(o *ResourceIndexerOptions, ctx context.Context) error {
}

// Create and start the policy cache
policyCache, err := indexer.NewPolicyCache(k8sCache)
indexPolicyCache, err := indexer.NewPolicyCache(k8sCache, true)
if err != nil {
return fmt.Errorf("failed to create policy cache: %w", err)
}

reindexPolicyCache, err := indexer.NewPolicyCache(k8sCache, false)
if err != nil {
return fmt.Errorf("failed to create policy cache: %w", err)
}

go func() {
if err := policyCache.Start(ctx); err != nil {
klog.Errorf("Policy cache stopped: %v", err)
if err := indexPolicyCache.Start(ctx); err != nil {
klog.Errorf("Index Policy cache stopped: %v", err)
}
}()

go func() {
if err := reindexPolicyCache.Start(ctx); err != nil {
klog.Errorf("Reindex Policy cache stopped: %v", err)
}
}()

Expand All @@ -210,22 +213,31 @@ func Run(o *ResourceIndexerOptions, ctx context.Context) error {
return fmt.Errorf("failed to create JetStream context: %w", err)
}

stream, err := js.Stream(ctx, o.NatsStreamName)
auditStream, err := js.Stream(ctx, o.NatsStreamName)
if err != nil {
return fmt.Errorf("failed to get stream %s: %w", o.NatsStreamName, err)
}

consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: o.NatsDurableName,
FilterSubject: o.NatsSubject,
AckPolicy: jetstream.AckExplicitPolicy,
MaxAckPending: o.NatsMaxInFlight,
AckWait: o.NatsAckWait,
})
// Consumer is declared in config/components/nats-config/nats-consumer.yaml
auditConsumer, err := auditStream.Consumer(ctx, o.NatsDurableName)
if err != nil {
return fmt.Errorf("failed to get consumer %s: %w", o.NatsDurableName, err)
}

// ── Re-index consumer (separate REINDEX_EVENTS stream) ──────────────────
// The stream is declared in config/components/nats-streams/reindex-stream.yaml
reindexStream, err := js.Stream(ctx, o.NatsReindexStream)
if err != nil {
return fmt.Errorf("failed to get/create consumer %s: %w", o.NatsDurableName, err)
return fmt.Errorf("failed to get re-index stream %s: %w", o.NatsReindexStream, err)
}

// Consumer is declared in config/components/nats-config/nats-consumer.yaml
reindexJSConsumer, err := reindexStream.Consumer(ctx, o.NatsReindexDurableName)
if err != nil {
return fmt.Errorf("failed to get re-index consumer %s: %w", o.NatsReindexDurableName, err)
}

// ── Meilisearch client ──────────────────────────────────────────────────
searchClient, err := meilisearch.NewSDKClient(meilisearch.SDKConfig{
Domain: o.MeilisearchDomain,
APIKey: os.Getenv("MEILISEARCH_API_KEY"),
Expand All @@ -245,9 +257,46 @@ func Run(o *ResourceIndexerOptions, ctx context.Context) error {
MaxConcurrentUploads: o.BatchMaxConcurrentUploads,
}

batcher := indexer.NewBatcher(searchClient, batchConfig)
idx := indexer.NewIndexer(consumer, policyCache, batcher)
// Create separate batchers for audit events and re-indexing events
// so they don't block each other and can be tuned independently if needed.
auditBatcher := indexer.NewBatcher(searchClient, batchConfig)
reindexBatcher := indexer.NewBatcher(searchClient, batchConfig)

// Start both batchers
auditBatcher.Start(ctx)
reindexBatcher.Start(ctx)

auditIdx := indexer.NewIndexer(auditConsumer, indexPolicyCache, auditBatcher)
reindexIdx := indexer.NewReindexConsumer(reindexJSConsumer, reindexPolicyCache, reindexBatcher)

klog.Info("Starting indexer...")
return idx.Start(ctx)
klog.Info("Starting audit indexer and re-index consumer...")

consumerCtx, cancelConsumers := context.WithCancel(ctx)
defer cancelConsumers()

errCh := make(chan error, 2)

go func() {
if err := auditIdx.Start(consumerCtx); err != nil {
errCh <- fmt.Errorf("audit indexer: %w", err)
} else {
errCh <- nil
}
}()

go func() {
if err := reindexIdx.Start(consumerCtx); err != nil {
errCh <- fmt.Errorf("reindex consumer: %w", err)
} else {
errCh <- nil
}
}()

select {
case err := <-errCh:
cancelConsumers()
return err
case <-ctx.Done():
return nil
}
}
58 changes: 51 additions & 7 deletions cmd/search/manager/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ import (
"os"
"time"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"go.miloapis.net/search/internal/indexer"
"go.miloapis.net/search/pkg/apis/search/install"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/dynamic"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
Expand Down Expand Up @@ -47,6 +51,10 @@ type ControllerManagerOptions struct {
MeilisearchChunkSize int
MeilisearchTaskWaitTimeout time.Duration
MeilisearchHTTPTimeout time.Duration

// NATS settings for publishing per-resource re-index messages.
NatsURL string
NatsReindexSubject string
}

// NewControllerManagerOptions creates a new ControllerManagerOptions with default values
Expand All @@ -62,6 +70,8 @@ func NewControllerManagerOptions() *ControllerManagerOptions {
MeilisearchTaskWaitTimeout: 30 * time.Second,
MeilisearchHTTPTimeout: 60 * time.Second,
MeilisearchDomain: "http://meilisearch.meilisearch-system.svc.cluster.local:7700",
NatsURL: "nats://nats.nats-system.svc.cluster.local:4222",
NatsReindexSubject: "reindex.all",
}
}

Expand All @@ -84,6 +94,9 @@ func (o *ControllerManagerOptions) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&o.MeilisearchHTTPTimeout, "meilisearch-http-timeout", o.MeilisearchHTTPTimeout, "Timeout for HTTP requests to Meilisearch.")
fs.IntVar(&o.MeilisearchChunkSize, "meilisearch-chunk-size", o.MeilisearchChunkSize, "The number of documents to process in a single chunk.")

// NATS
fs.StringVar(&o.NatsURL, "nats-url", o.NatsURL, "The URL of the NATS server used to publish re-index messages.")
fs.StringVar(&o.NatsReindexSubject, "nats-reindex-subject", o.NatsReindexSubject, "The NATS subject to publish per-resource re-index messages to.")
}

// Validate validates the options
Expand All @@ -104,6 +117,10 @@ func (o *ControllerManagerOptions) Validate() error {
return fmt.Errorf("meilisearch-api-key must be set")
}

if o.NatsURL == "" {
return fmt.Errorf("nats-url must be set")
}

return nil
}

Expand Down Expand Up @@ -147,7 +164,9 @@ func Run(o *ControllerManagerOptions, ctx context.Context) error {
})
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
cfg := ctrl.GetConfigOrDie()

mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme,
Metrics: metricsserver.Options{BindAddress: o.MetricsAddr, SecureServing: o.SecureMetrics, TLSOpts: tlsOpts},
HealthProbeBindAddress: o.ProbeAddr,
Expand All @@ -159,14 +178,12 @@ func Run(o *ControllerManagerOptions, ctx context.Context) error {
os.Exit(1)
}

// Register Webhook
celValidator, err := cel.NewValidator(o.MaxCELDepth)
if err != nil {
setupLog.Error(err, "unable to create CEL validator")
os.Exit(1)
}

// Initialize Meilisearch SDK
searchSDK, err := meilisearch.NewSDKClient(meilisearch.SDKConfig{
Domain: o.MeilisearchDomain,
APIKey: os.Getenv("MEILISEARCH_API_KEY"),
Expand All @@ -179,11 +196,38 @@ func Run(o *ControllerManagerOptions, ctx context.Context) error {
os.Exit(1)
}

// Dynamic client — used by the policy controller to list target resources.
dynamicClient, err := dynamic.NewForConfig(cfg)
if err != nil {
setupLog.Error(err, "unable to create dynamic client")
os.Exit(1)
}

// Connect to NATS and set up the re-index stream + publisher.
setupLog.Info("Connecting to NATS for re-index publishing", "url", o.NatsURL)
nc, err := nats.Connect(o.NatsURL)
if err != nil {
setupLog.Error(err, "unable to connect to NATS")
os.Exit(1)
}
defer nc.Close()

js, err := jetstream.New(nc)
if err != nil {
setupLog.Error(err, "unable to create JetStream context")
os.Exit(1)
}

reindexPub := indexer.NewReindexPublisher(js, o.NatsReindexSubject)

if err = (&policycontroller.ResourceIndexPolicyReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
CelValidator: celValidator,
SearchSDK: searchSDK,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
CelValidator: celValidator,
SearchSDK: searchSDK,
DynamicClient: dynamicClient,
RESTMapper: mgr.GetRESTMapper(),
ReindexPublisher: reindexPub,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ResourceIndexPolicy")
os.Exit(1)
Expand Down
20 changes: 19 additions & 1 deletion config/components/nats-config/nats-consumer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,22 @@ spec:
maxAckPending: 1000000
# ReplayPolicy indicates that the consumer will replay messages from the stream
replayPolicy: instant
ackWait: 30s
ackWait: 120s
---
apiVersion: jetstream.nats.io/v1beta2
kind: Consumer
metadata:
name: search-reindexer
namespace: search-system
spec:
streamName: REINDEX_EVENTS
durableName: search-reindexer
deliverPolicy: all
ackPolicy: explicit
# The filter subject ensures we only get relevant events
filterSubject: reindex.>
# MaxAckPending controls flow control
maxAckPending: 1000000
# ReplayPolicy indicates that the consumer will replay messages from the stream
replayPolicy: instant
ackWait: 120s
4 changes: 2 additions & 2 deletions config/components/nats-streams/audit-stream.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ spec:
# Maximum age: 7 days retention as per architecture
maxAge: 168h # 7 days = 168 hours

# Maximum bytes: 100GB as per architecture
maxBytes: 107374182400 # 100 * 1024 * 1024 * 1024
# Maximum bytes: 10GB as per architecture
maxBytes: 10737418240 # 10 * 1024 * 1024 * 1024

# Number of replicas for high availability
replicas: 1 # Can be increased to 3 for production HA
Expand Down
Loading
Loading