Skip to content

Commit c63fee9

Browse files
committed
feat: introduce SearchQuery agains meilisearch
1 parent 480dde6 commit c63fee9

File tree

16 files changed

+585
-18
lines changed

16 files changed

+585
-18
lines changed

Taskfile.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,7 @@ tasks:
497497
- |
498498
# Use KUBECONFIG if set, otherwise fallback to default
499499
KCFG=${KUBECONFIG:-$HOME/.kube/config}
500+
export MEILISEARCH_API_KEY=search-master-key
500501
go run ./cmd/search serve \
501502
--etcd-servers http://127.0.0.1:2379 \
502503
--secure-port 9443 \
@@ -506,7 +507,9 @@ tasks:
506507
--authorization-kubeconfig="$KCFG" \
507508
--kubeconfig="$KCFG" \
508509
--client-ca-file="{{.CERTS_DIR}}/kind-ca.crt" \
509-
--authorization-always-allow-paths=/healthz,/readyz,/livez,/openapi,/openapi/v2,/openapi/v3,/apis,/api
510+
--authorization-always-allow-paths=/healthz,/readyz,/livez,/openapi,/openapi/v2,/openapi/v3,/apis,/api \
511+
--meilisearch-domain="http://127.0.0.1:7700" \
512+
510513
511514
dev:undeploy:
512515
desc: Undeploy Search server from test-infra cluster

cmd/search/indexer/command.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"k8s.io/klog/v2"
1818
runtimecache "sigs.k8s.io/controller-runtime/pkg/cache"
1919
"sigs.k8s.io/controller-runtime/pkg/client/config"
20+
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
2021
)
2122

2223
// ResourceIndexerOptions holds the configuration for the resource indexer.
@@ -159,6 +160,7 @@ func NewIndexerCommand() *cobra.Command {
159160

160161
// Run starts the indexer consumer
161162
func Run(o *ResourceIndexerOptions, ctx context.Context) error {
163+
ctrllog.SetLogger(klog.NewKlogr())
162164
// Build a scheme and REST config for the controller-runtime cache.
163165
scheme := runtime.NewScheme()
164166
if err := searchv1alpha1.AddToScheme(scheme); err != nil {

cmd/search/main.go

Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@ import (
55
"fmt"
66
"os"
77
"strings"
8+
"time"
89

10+
"github.com/google/uuid"
911
"github.com/spf13/cobra"
1012
"github.com/spf13/pflag"
1113
searchapiserver "go.miloapis.net/search/internal/apiserver"
1214
"go.miloapis.net/search/internal/version"
1315
searchv1alpha1 "go.miloapis.net/search/pkg/apis/search/v1alpha1"
16+
"k8s.io/apimachinery/pkg/runtime"
1417
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
1518
apiopenapi "k8s.io/apiserver/pkg/endpoints/openapi"
1619
genericapiserver "k8s.io/apiserver/pkg/server"
@@ -22,9 +25,13 @@ import (
2225
logsapi "k8s.io/component-base/logs/api/v1"
2326
"k8s.io/klog/v2"
2427
"k8s.io/kube-openapi/pkg/common"
28+
runtimecache "sigs.k8s.io/controller-runtime/pkg/cache"
29+
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
2530

2631
"go.miloapis.net/search/cmd/search/indexer"
2732
"go.miloapis.net/search/cmd/search/manager"
33+
internalindexer "go.miloapis.net/search/internal/indexer"
34+
"go.miloapis.net/search/pkg/meilisearch"
2835

2936
// Register JSON logging format
3037
_ "k8s.io/component-base/logs/json/register"
@@ -140,8 +147,14 @@ func NewVersionCommand() *cobra.Command {
140147

141148
// SearchServerOptions contains configuration for the search server.
142149
type SearchServerOptions struct {
143-
RecommendedOptions *options.RecommendedOptions
144-
Logs *logsapi.LoggingConfiguration
150+
RecommendedOptions *options.RecommendedOptions
151+
Logs *logsapi.LoggingConfiguration
152+
MeilisearchDomain string
153+
MeilisearchHTTPTimeout time.Duration
154+
MeilisearchTaskWaitTimeout time.Duration
155+
MaxSearchLimit int
156+
DefaultSearchLimit int
157+
PagingTimeout time.Duration
145158
}
146159

147160
// NewSearchServerOptions creates options with default values.
@@ -151,14 +164,23 @@ func NewSearchServerOptions() *SearchServerOptions {
151164
"/registry/search.miloapis.com",
152165
searchapiserver.Codecs.LegacyCodec(searchapiserver.Scheme.PrioritizedVersionsAllGroups()...),
153166
),
154-
Logs: logsapi.NewLoggingConfiguration(),
167+
Logs: logsapi.NewLoggingConfiguration(),
168+
MeilisearchDomain: "http://meilisearch.meilisearch-system.svc.cluster.local:7700",
169+
MeilisearchHTTPTimeout: 60 * time.Second,
170+
MaxSearchLimit: 100,
171+
DefaultSearchLimit: 10,
172+
PagingTimeout: 24 * time.Hour,
155173
}
156174

157175
return o
158176
}
159177

160178
func (o *SearchServerOptions) AddFlags(fs *pflag.FlagSet) {
161179
o.RecommendedOptions.AddFlags(fs)
180+
fs.StringVar(&o.MeilisearchDomain, "meilisearch-domain", o.MeilisearchDomain, "Domain of the Meilisearch instance.")
181+
fs.IntVar(&o.MaxSearchLimit, "max-search-limit", o.MaxSearchLimit, "The maximum number of results a SearchQuery can return in a single request.")
182+
fs.IntVar(&o.DefaultSearchLimit, "default-search-limit", o.DefaultSearchLimit, "The default number of results a SearchQuery returns when no limit is specified.")
183+
fs.DurationVar(&o.PagingTimeout, "paging-timeout", o.PagingTimeout, "The duration for which a paging (continue) token is valid.")
162184
}
163185

164186
func (o *SearchServerOptions) Complete() error {
@@ -197,9 +219,53 @@ func (o *SearchServerOptions) Config() (*searchapiserver.Config, error) {
197219
return nil, fmt.Errorf("failed to apply recommended options: %w", err)
198220
}
199221

222+
meiliClient, err := meilisearch.NewSDKClient(meilisearch.SDKConfig{
223+
Domain: o.MeilisearchDomain,
224+
APIKey: os.Getenv("MEILISEARCH_API_KEY"),
225+
WaitTimeout: o.MeilisearchTaskWaitTimeout,
226+
HTTPTimeout: o.MeilisearchHTTPTimeout,
227+
})
228+
if err != nil {
229+
return nil, fmt.Errorf("failed to initialize meilisearch client: %w", err)
230+
}
231+
232+
// Create a dedicated scheme for the policy cache that only contains versioned types.
233+
// This avoids ambiguity when the main scheme registers the same type for both
234+
// v1alpha1 and __internal versions.
235+
policyScheme := runtime.NewScheme()
236+
if err := searchv1alpha1.AddToScheme(policyScheme); err != nil {
237+
return nil, fmt.Errorf("failed to add v1alpha1 to policy scheme: %w", err)
238+
}
239+
240+
// Create a controller-runtime cache that uses a watch stream (informer)
241+
// to keep ResourceIndexPolicies in-sync.
242+
k8sCache, err := runtimecache.New(genericConfig.LoopbackClientConfig, runtimecache.Options{Scheme: policyScheme})
243+
if err != nil {
244+
return nil, fmt.Errorf("failed to create controller-runtime cache: %w", err)
245+
}
246+
247+
// Create the policy cache (strict ready checking enabled)
248+
indexPolicyCache, err := internalindexer.NewPolicyCache(k8sCache, true)
249+
if err != nil {
250+
return nil, fmt.Errorf("failed to create policy cache: %w", err)
251+
}
252+
253+
pagingSecret := []byte(os.Getenv("SEARCH_PAGING_SECRET"))
254+
if len(pagingSecret) == 0 {
255+
klog.Info("SEARCH_PAGING_SECRET not set, generating a random one")
256+
pagingSecret = []byte(uuid.New().String())
257+
}
258+
200259
serverConfig := &searchapiserver.Config{
201260
GenericConfig: genericConfig,
202-
ExtraConfig: searchapiserver.ExtraConfig{},
261+
ExtraConfig: searchapiserver.ExtraConfig{
262+
MeiliClient: meiliClient,
263+
PolicyCache: indexPolicyCache,
264+
MaxSearchLimit: o.MaxSearchLimit,
265+
DefaultSearchLimit: o.DefaultSearchLimit,
266+
PagingSecret: pagingSecret,
267+
PagingTimeout: o.PagingTimeout,
268+
},
203269
}
204270

205271
return serverConfig, nil
@@ -211,6 +277,8 @@ func Run(options *SearchServerOptions, ctx context.Context) error {
211277
return fmt.Errorf("failed to apply logging configuration: %w", err)
212278
}
213279

280+
ctrllog.SetLogger(klog.NewKlogr())
281+
214282
config, err := options.Config()
215283
if err != nil {
216284
return err

config/base/apiserver/deployment.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ spec:
9797
value: "4"
9898
- name: TRACING_CONFIG_FILE
9999
value: ""
100+
envFrom:
101+
- secretRef:
102+
name: search-apiserver
100103
# TEMPLATE NOTE: Add your service-specific environment variables here
101104
# Example: database credentials, API keys, configuration values
102105
resources:

config/overlays/ci/kustomization.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,6 @@ secretGenerator:
6262
- name: search-indexer
6363
literals:
6464
- MEILISEARCH_API_KEY=search-master-key
65+
- name: search-apiserver
66+
literals:
67+
- MEILISEARCH_API_KEY=search-master-key

config/overlays/dev/kustomization.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,6 @@ secretGenerator:
5252
- name: search-indexer
5353
literals:
5454
- MEILISEARCH_API_KEY=search-master-key
55-
55+
- name: search-apiserver
56+
literals:
57+
- MEILISEARCH_API_KEY=search-master-key
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
apiVersion: search.miloapis.com/v1alpha1
2+
kind: SearchQuery
3+
metadata:
4+
name: configmap-search-sample
5+
spec:
6+
7+
query: "backend-api" # Empty query returns everything, or put a specific string
8+
limit: 100
9+
offset: 0

internal/apiserver/apiserver.go

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package apiserver
22

33
import (
44
"context"
5+
"fmt"
6+
"time"
57

68
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
79
"k8s.io/apimachinery/pkg/runtime"
@@ -11,11 +13,13 @@ import (
1113
genericapiserver "k8s.io/apiserver/pkg/server"
1214
"k8s.io/klog/v2"
1315

16+
"go.miloapis.net/search/internal/indexer"
1417
_ "go.miloapis.net/search/internal/metrics"
1518
"go.miloapis.net/search/internal/registry/policy/resourceindexpolicy"
19+
"go.miloapis.net/search/internal/registry/searchquery"
1620
"go.miloapis.net/search/pkg/apis/search/install"
17-
searchinstall "go.miloapis.net/search/pkg/apis/search/install"
1821
searchv1alpha1 "go.miloapis.net/search/pkg/apis/search/v1alpha1"
22+
"go.miloapis.net/search/pkg/meilisearch"
1923
)
2024

2125
var (
@@ -27,7 +31,6 @@ var (
2731

2832
func init() {
2933
install.Install(Scheme)
30-
searchinstall.Install(Scheme)
3134

3235
metav1.AddToGroupVersion(Scheme, schema.GroupVersion{Version: "v1"})
3336

@@ -36,6 +39,8 @@ func init() {
3639
Scheme.AddKnownTypes(schema.GroupVersion{Group: searchv1alpha1.GroupName, Version: runtime.APIVersionInternal},
3740
&searchv1alpha1.ResourceIndexPolicy{},
3841
&searchv1alpha1.ResourceIndexPolicyList{},
42+
&searchv1alpha1.SearchQuery{},
43+
&searchv1alpha1.SearchQueryList{},
3944
)
4045

4146
// Register unversioned meta types required by the API machinery.
@@ -51,7 +56,12 @@ func init() {
5156

5257
// ExtraConfig extends the generic apiserver configuration with search-specific settings.
5358
type ExtraConfig struct {
54-
// Add custom configuration here as needed
59+
MeiliClient *meilisearch.SDKClient
60+
PolicyCache *indexer.PolicyCache
61+
MaxSearchLimit int
62+
DefaultSearchLimit int
63+
PagingSecret []byte
64+
PagingTimeout time.Duration
5565
}
5666

5767
// Config combines generic and search-specific configuration.
@@ -110,12 +120,42 @@ func (c completedConfig) New() (*SearchServer, error) {
110120
searchV1alpha1Storage["resourceindexpolicies"] = policyStorage.Store
111121
searchV1alpha1Storage["resourceindexpolicies/status"] = policyStorage.Status
112122

123+
// Add searchquery resources
124+
searchqueryStorage := searchquery.NewREST(
125+
c.ExtraConfig.MeiliClient,
126+
c.ExtraConfig.PolicyCache,
127+
c.ExtraConfig.MaxSearchLimit,
128+
c.ExtraConfig.DefaultSearchLimit,
129+
c.ExtraConfig.PagingSecret,
130+
c.ExtraConfig.PagingTimeout,
131+
)
132+
searchV1alpha1Storage["searchqueries"] = searchqueryStorage
133+
113134
searchAPIGroupInfo.VersionedResourcesStorageMap["v1alpha1"] = searchV1alpha1Storage
114135

115136
if err := s.GenericAPIServer.InstallAPIGroup(&searchAPIGroupInfo); err != nil {
116137
return nil, err
117138
}
118139

140+
// Add PostStartHook to start policy cache
141+
s.GenericAPIServer.AddPostStartHookOrDie("search-policy-cache", func(ctx genericapiserver.PostStartHookContext) error {
142+
if err := c.ExtraConfig.PolicyCache.RegisterHandlers(ctx.Context); err != nil {
143+
return fmt.Errorf("failed to register policy cache handlers: %w", err)
144+
}
145+
146+
go func() {
147+
klog.Info("Starting Search policy cache...")
148+
if err := c.ExtraConfig.PolicyCache.Start(ctx.Context); err != nil {
149+
klog.Errorf("Policy cache stopped with error: %v", err)
150+
}
151+
}()
152+
153+
if !c.ExtraConfig.PolicyCache.WaitForCacheSync(ctx.Context) {
154+
return fmt.Errorf("failed to wait for policy cache sync")
155+
}
156+
return nil
157+
})
158+
119159
klog.Info("Search server initialized successfully")
120160

121161
return s, nil

internal/indexer/policy_cache.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,16 @@ func (c *PolicyCache) deletePolicy(name string) {
155155
klog.Infof("Policy %s removed from cache", name)
156156
}
157157

158+
// Start starts the underlying cache.
159+
func (c *PolicyCache) Start(ctx context.Context) error {
160+
return c.cache.Start(ctx)
161+
}
162+
163+
// WaitForCacheSync waits for the underlying cache to sync.
164+
func (c *PolicyCache) WaitForCacheSync(ctx context.Context) bool {
165+
return c.cache.WaitForCacheSync(ctx)
166+
}
167+
158168
// GetPolicies returns a snapshot of all cached policies.
159169
func (c *PolicyCache) GetPolicies() []*policyevaluation.CachedPolicy {
160170
c.mu.RLock()

0 commit comments

Comments
 (0)