Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Taskfile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ tasks:
- "\"{{.TOOL_DIR}}/controller-gen\" object paths=\"./pkg/apis/...\""
# Generate RBAC rules for the controllers.
- echo "Generating RBAC rules for the controllers..."
- "\"{{.TOOL_DIR}}/controller-gen\" rbac:roleName=milo-controller-manager paths=\"./internal/controllers/...\" output:dir=\"./config/controller-manager/overlays/core-control-plane/rbac\""
- "\"{{.TOOL_DIR}}/controller-gen\" rbac:roleName=milo-controller-manager paths=\"./internal/controllers/...\" output:dir=\"./config/overlays/controller-manager/core-control-plane/rbac\""
- task: generate:openapi
silent: true

Expand Down Expand Up @@ -437,7 +437,8 @@ tasks:
--nats-url="nats://127.0.0.1:4222" \
--nats-subject="audit.>" \
--nats-queue-group="search-indexer" \
--nats-durable-name="search-indexer" \
--nats-audit-consumer-name="search-indexer" \
--nats-reindex-consumer-name="search-reindexer" \
--nats-stream-name="AUDIT_EVENTS" \
--meilisearch-domain="http://127.0.0.1:7700"
silent: true
Expand Down
163 changes: 106 additions & 57 deletions cmd/search/indexer/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,36 @@ import (
// ResourceIndexerOptions holds the configuration for the resource indexer.
type ResourceIndexerOptions struct {
// NATS connection and consumer settings
NatsURL string
NatsSubject string
NatsQueueGroup string
NatsDurableName string
NatsStreamName string
NatsAckWait time.Duration
NatsMaxInFlight int
NatsURL string
NatsAuditConsumerName string
NatsStreamName string

// NATS re-index consumer settings (separate REINDEX_EVENTS stream)
NatsReindexStream string
NatsReindexConsumerName string

// Meilisearch connection and timeout settings
MeilisearchTaskWaitTimeout time.Duration // Timeout for waiting for Meilisearch tasks to complete.
MeilisearchHTTPTimeout time.Duration // Timeout for HTTP requests to Meilisearch.
MeilisearchDomain string // Domain of the Meilisearch instance.
MeilisearchChunkSize int // The number of documents to process in a single chunk.
MeilisearchMaxRetries int // The maximum number of retries for transient Meilisearch errors.
MeilisearchRetryDelay time.Duration // The base delay between Meilisearch retries.

// Audit events batching and throughput tuning
BatchSize int // The maximum number of documents to process in a single batch.
FlushInterval time.Duration // The time to wait before flushing a batch.
BatchMaxConcurrentUploads int // The maximum number of concurrent uploads to Meilisearch.
MeilisearchTaskWaitTimeout time.Duration
MeilisearchHTTPTimeout time.Duration
MeilisearchDomain string
MeilisearchChunkSize int
MeilisearchMaxRetries int
MeilisearchRetryDelay time.Duration

// Batching and throughput tuning
BatchSize int
FlushInterval time.Duration
BatchMaxConcurrentUploads int
}

// NewResourceIndexerOptions creates a new ResourceIndexerOptions with default values.
func NewResourceIndexerOptions() *ResourceIndexerOptions {
return &ResourceIndexerOptions{
NatsURL: "nats://nats.nats-system.svc.cluster.local:4222",
NatsSubject: "audit.>",
NatsQueueGroup: "search-indexer",
NatsDurableName: "search-indexer",
NatsAuditConsumerName: "search-indexer",
NatsStreamName: "AUDIT_EVENTS",
NatsAckWait: 120 * time.Second,
NatsMaxInFlight: 10000,
NatsReindexStream: "REINDEX_EVENTS",
NatsReindexConsumerName: "search-reindexer",
MeilisearchTaskWaitTimeout: 4 * time.Second,
MeilisearchHTTPTimeout: 60 * time.Second,
MeilisearchDomain: "http://meilisearch.meilisearch-system.svc.cluster.local:7700",
Expand All @@ -69,12 +67,12 @@ func NewResourceIndexerOptions() *ResourceIndexerOptions {
// AddFlags adds the flags for the resource indexer to the command.
func (o *ResourceIndexerOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.NatsURL, "nats-url", o.NatsURL, "The URL of the NATS server.")
fs.StringVar(&o.NatsSubject, "nats-subject", o.NatsSubject, "The NATS subject to subscribe to.")
fs.StringVar(&o.NatsQueueGroup, "nats-queue-group", o.NatsQueueGroup, "The NATS queue group for load balancing.")
fs.StringVar(&o.NatsDurableName, "nats-durable-name", o.NatsDurableName, "The durable name for the JetStream consumer.")
fs.StringVar(&o.NatsStreamName, "nats-stream-name", o.NatsStreamName, "The name of the JetStream stream.")
fs.DurationVar(&o.NatsAckWait, "nats-ack-wait", o.NatsAckWait, "The time to wait for an acknowledgement.")
fs.IntVar(&o.NatsMaxInFlight, "nats-max-in-flight", o.NatsMaxInFlight, "The maximum number of in-flight messages.")
fs.StringVar(&o.NatsAuditConsumerName, "nats-audit-consumer-name", o.NatsAuditConsumerName, "The name of the audit-events JetStream consumer (must match the manifest).")
fs.StringVar(&o.NatsStreamName, "nats-stream-name", o.NatsStreamName, "The name of the audit-events JetStream stream.")

fs.StringVar(&o.NatsReindexStream, "nats-reindex-stream", o.NatsReindexStream, "The JetStream stream name for re-index messages.")
fs.StringVar(&o.NatsReindexConsumerName, "nats-reindex-consumer-name", o.NatsReindexConsumerName, "The name of the re-index JetStream consumer (must match the manifest).")

fs.StringVar(&o.MeilisearchDomain, "meilisearch-domain", o.MeilisearchDomain, "Domain of the Meilisearch instance.")
fs.DurationVar(&o.MeilisearchTaskWaitTimeout, "meilisearch-task-wait-timeout", o.MeilisearchTaskWaitTimeout, "Timeout for waiting for Meilisearch tasks to complete.")
fs.DurationVar(&o.MeilisearchHTTPTimeout, "meilisearch-http-timeout", o.MeilisearchHTTPTimeout, "Timeout for HTTP requests to Meilisearch.")
Expand All @@ -91,23 +89,17 @@ func (o *ResourceIndexerOptions) Validate() error {
if o.NatsURL == "" {
return fmt.Errorf("nats-url must be set")
}
if o.NatsSubject == "" {
return fmt.Errorf("nats-subject must be set")
}
if o.NatsQueueGroup == "" {
return fmt.Errorf("nats-queue-group must be set")
}
if o.NatsDurableName == "" {
return fmt.Errorf("nats-durable-name must be set")
if o.NatsAuditConsumerName == "" {
return fmt.Errorf("nats-consummer-name must be set")
}
if o.NatsStreamName == "" {
return fmt.Errorf("nats-stream-name must be set")
}
if o.NatsAckWait == 0 {
return fmt.Errorf("nats-ack-wait must be set")
if o.NatsReindexStream == "" {
return fmt.Errorf("nats-reindex-stream must be set")
}
if o.NatsMaxInFlight < 1 {
return fmt.Errorf("nats-max-in-flight must be greater than 0")
if o.NatsReindexConsumerName == "" {
return fmt.Errorf("nats-reindex-consumer-name must be set")
}
if o.MeilisearchDomain == "" {
return fmt.Errorf("meilisearch-domain must be set")
Expand Down Expand Up @@ -186,14 +178,25 @@ func Run(o *ResourceIndexerOptions, ctx context.Context) error {
}

// Create and start the policy cache
policyCache, err := indexer.NewPolicyCache(k8sCache)
indexPolicyCache, err := indexer.NewPolicyCache(k8sCache, true)
if err != nil {
return fmt.Errorf("failed to create policy cache: %w", err)
}

reindexPolicyCache, err := indexer.NewPolicyCache(k8sCache, false)
if err != nil {
return fmt.Errorf("failed to create policy cache: %w", err)
}

go func() {
if err := policyCache.Start(ctx); err != nil {
klog.Errorf("Policy cache stopped: %v", err)
if err := indexPolicyCache.Start(ctx); err != nil {
klog.Errorf("Index Policy cache stopped: %v", err)
}
}()

go func() {
if err := reindexPolicyCache.Start(ctx); err != nil {
klog.Errorf("Reindex Policy cache stopped: %v", err)
}
}()

Expand All @@ -210,22 +213,31 @@ func Run(o *ResourceIndexerOptions, ctx context.Context) error {
return fmt.Errorf("failed to create JetStream context: %w", err)
}

stream, err := js.Stream(ctx, o.NatsStreamName)
auditStream, err := js.Stream(ctx, o.NatsStreamName)
if err != nil {
return fmt.Errorf("failed to get stream %s: %w", o.NatsStreamName, err)
}

consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: o.NatsDurableName,
FilterSubject: o.NatsSubject,
AckPolicy: jetstream.AckExplicitPolicy,
MaxAckPending: o.NatsMaxInFlight,
AckWait: o.NatsAckWait,
})
// Consumer is declared in config/components/nats-config/nats-consumer.yaml
auditConsumer, err := auditStream.Consumer(ctx, o.NatsAuditConsumerName)
if err != nil {
return fmt.Errorf("failed to get consumer %s: %w", o.NatsAuditConsumerName, err)
}

// ── Re-index consumer (separate REINDEX_EVENTS stream) ──────────────────
// The stream is declared in config/components/nats-streams/reindex-stream.yaml
reindexStream, err := js.Stream(ctx, o.NatsReindexStream)
if err != nil {
return fmt.Errorf("failed to get/create consumer %s: %w", o.NatsDurableName, err)
return fmt.Errorf("failed to get re-index stream %s: %w", o.NatsReindexStream, err)
}

// Consumer is declared in config/components/nats-config/nats-consumer.yaml
reindexJSConsumer, err := reindexStream.Consumer(ctx, o.NatsReindexConsumerName)
if err != nil {
return fmt.Errorf("failed to get re-index consumer %s: %w", o.NatsReindexConsumerName, err)
}

// ── Meilisearch client ──────────────────────────────────────────────────
searchClient, err := meilisearch.NewSDKClient(meilisearch.SDKConfig{
Domain: o.MeilisearchDomain,
APIKey: os.Getenv("MEILISEARCH_API_KEY"),
Expand All @@ -245,9 +257,46 @@ func Run(o *ResourceIndexerOptions, ctx context.Context) error {
MaxConcurrentUploads: o.BatchMaxConcurrentUploads,
}

batcher := indexer.NewBatcher(searchClient, batchConfig)
idx := indexer.NewIndexer(consumer, policyCache, batcher)
// Create separate batchers for audit events and re-indexing events
// so they don't block each other and can be tuned independently if needed.
auditBatcher := indexer.NewBatcher(searchClient, batchConfig)
reindexBatcher := indexer.NewBatcher(searchClient, batchConfig)

// Start both batchers
auditBatcher.Start(ctx)
reindexBatcher.Start(ctx)

auditIdx := indexer.NewIndexer(auditConsumer, indexPolicyCache, auditBatcher)
reindexIdx := indexer.NewReindexConsumer(reindexJSConsumer, reindexPolicyCache, reindexBatcher)

klog.Info("Starting indexer...")
return idx.Start(ctx)
klog.Info("Starting audit indexer and re-index consumer...")

consumerCtx, cancelConsumers := context.WithCancel(ctx)
defer cancelConsumers()

errCh := make(chan error, 2)

go func() {
if err := auditIdx.Start(consumerCtx); err != nil {
errCh <- fmt.Errorf("audit indexer: %w", err)
} else {
errCh <- nil
}
}()

go func() {
if err := reindexIdx.Start(consumerCtx); err != nil {
errCh <- fmt.Errorf("reindex consumer: %w", err)
} else {
errCh <- nil
}
}()

select {
case err := <-errCh:
cancelConsumers()
return err
case <-ctx.Done():
return nil
}
}
58 changes: 51 additions & 7 deletions cmd/search/manager/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ import (
"os"
"time"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"go.miloapis.net/search/internal/indexer"
"go.miloapis.net/search/pkg/apis/search/install"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/dynamic"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
Expand Down Expand Up @@ -47,6 +51,10 @@ type ControllerManagerOptions struct {
MeilisearchChunkSize int
MeilisearchTaskWaitTimeout time.Duration
MeilisearchHTTPTimeout time.Duration

// NATS settings for publishing per-resource re-index messages.
NatsURL string
NatsReindexSubject string
}

// NewControllerManagerOptions creates a new ControllerManagerOptions with default values
Expand All @@ -62,6 +70,8 @@ func NewControllerManagerOptions() *ControllerManagerOptions {
MeilisearchTaskWaitTimeout: 30 * time.Second,
MeilisearchHTTPTimeout: 60 * time.Second,
MeilisearchDomain: "http://meilisearch.meilisearch-system.svc.cluster.local:7700",
NatsURL: "nats://nats.nats-system.svc.cluster.local:4222",
NatsReindexSubject: "reindex.all",
}
}

Expand All @@ -84,6 +94,9 @@ func (o *ControllerManagerOptions) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&o.MeilisearchHTTPTimeout, "meilisearch-http-timeout", o.MeilisearchHTTPTimeout, "Timeout for HTTP requests to Meilisearch.")
fs.IntVar(&o.MeilisearchChunkSize, "meilisearch-chunk-size", o.MeilisearchChunkSize, "The number of documents to process in a single chunk.")

// NATS
fs.StringVar(&o.NatsURL, "nats-url", o.NatsURL, "The URL of the NATS server used to publish re-index messages.")
fs.StringVar(&o.NatsReindexSubject, "nats-reindex-subject", o.NatsReindexSubject, "The NATS subject to publish per-resource re-index messages to.")
}

// Validate validates the options
Expand All @@ -104,6 +117,10 @@ func (o *ControllerManagerOptions) Validate() error {
return fmt.Errorf("meilisearch-api-key must be set")
}

if o.NatsURL == "" {
return fmt.Errorf("nats-url must be set")
}

return nil
}

Expand Down Expand Up @@ -147,7 +164,9 @@ func Run(o *ControllerManagerOptions, ctx context.Context) error {
})
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
cfg := ctrl.GetConfigOrDie()

mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme,
Metrics: metricsserver.Options{BindAddress: o.MetricsAddr, SecureServing: o.SecureMetrics, TLSOpts: tlsOpts},
HealthProbeBindAddress: o.ProbeAddr,
Expand All @@ -159,14 +178,12 @@ func Run(o *ControllerManagerOptions, ctx context.Context) error {
os.Exit(1)
}

// Register Webhook
celValidator, err := cel.NewValidator(o.MaxCELDepth)
if err != nil {
setupLog.Error(err, "unable to create CEL validator")
os.Exit(1)
}

// Initialize Meilisearch SDK
searchSDK, err := meilisearch.NewSDKClient(meilisearch.SDKConfig{
Domain: o.MeilisearchDomain,
APIKey: os.Getenv("MEILISEARCH_API_KEY"),
Expand All @@ -179,11 +196,38 @@ func Run(o *ControllerManagerOptions, ctx context.Context) error {
os.Exit(1)
}

// Dynamic client — used by the policy controller to list target resources.
dynamicClient, err := dynamic.NewForConfig(cfg)
if err != nil {
setupLog.Error(err, "unable to create dynamic client")
os.Exit(1)
}

// Connect to NATS and set up the re-index stream + publisher.
setupLog.Info("Connecting to NATS for re-index publishing", "url", o.NatsURL)
nc, err := nats.Connect(o.NatsURL)
if err != nil {
setupLog.Error(err, "unable to connect to NATS")
os.Exit(1)
}
defer nc.Close()

js, err := jetstream.New(nc)
if err != nil {
setupLog.Error(err, "unable to create JetStream context")
os.Exit(1)
}

reindexPub := indexer.NewReindexPublisher(js, o.NatsReindexSubject)

if err = (&policycontroller.ResourceIndexPolicyReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
CelValidator: celValidator,
SearchSDK: searchSDK,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
CelValidator: celValidator,
SearchSDK: searchSDK,
DynamicClient: dynamicClient,
RESTMapper: mgr.GetRESTMapper(),
ReindexPublisher: reindexPub,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ResourceIndexPolicy")
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion config/base/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ kind: Kustomization
resources:
- apiserver
- resource-indexer
- controller-manager/base
- controller-manager

images:
- name: ghcr.io/datum-cloud/search
Expand Down
Loading