Skip to content

Commit 5ec3baa

Browse files
committed
Basic enterprise search support
1 parent 318eda4 commit 5ec3baa

18 files changed

+676
-98
lines changed

.evergreen-tasks.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1290,3 +1290,8 @@ tasks:
12901290
tags: ["patch-run"]
12911291
commands:
12921292
- func: "e2e_test"
1293+
1294+
- name: e2e_search_enterprise_basic
1295+
tags: ["patch-run"]
1296+
commands:
1297+
- func: "e2e_test"

.evergreen.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,8 @@ task_groups:
811811
- e2e_replica_set_oidc_workforce
812812
- e2e_sharded_cluster_oidc_m2m_group
813813
- e2e_sharded_cluster_oidc_m2m_user
814+
# MongoDBSearch test group
815+
- e2e_search_enterprise_basic
814816
<<: *teardown_group
815817

816818
# this task group contains just a one task, which is smoke testing whether the operator

controllers/operator/mongodbreplicaset_controller.go

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,16 @@ import (
44
"context"
55
"fmt"
66

7+
"github.com/blang/semver"
78
"go.uber.org/zap"
89
"golang.org/x/xerrors"
910
"k8s.io/apimachinery/pkg/api/errors"
11+
"k8s.io/apimachinery/pkg/fields"
1012
"k8s.io/apimachinery/pkg/runtime"
13+
"k8s.io/apimachinery/pkg/types"
1114
"sigs.k8s.io/controller-runtime/pkg/client"
1215
"sigs.k8s.io/controller-runtime/pkg/controller"
16+
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
1317
"sigs.k8s.io/controller-runtime/pkg/event"
1418
"sigs.k8s.io/controller-runtime/pkg/handler"
1519
"sigs.k8s.io/controller-runtime/pkg/manager"
@@ -22,6 +26,7 @@ import (
2226

2327
mdbv1 "github.com/mongodb/mongodb-kubernetes/api/v1/mdb"
2428
rolev1 "github.com/mongodb/mongodb-kubernetes/api/v1/role"
29+
searchv1 "github.com/mongodb/mongodb-kubernetes/api/v1/search"
2530
mdbstatus "github.com/mongodb/mongodb-kubernetes/api/v1/status"
2631
"github.com/mongodb/mongodb-kubernetes/controllers/om"
2732
"github.com/mongodb/mongodb-kubernetes/controllers/om/backup"
@@ -39,6 +44,7 @@ import (
3944
"github.com/mongodb/mongodb-kubernetes/controllers/operator/recovery"
4045
"github.com/mongodb/mongodb-kubernetes/controllers/operator/watch"
4146
"github.com/mongodb/mongodb-kubernetes/controllers/operator/workflow"
47+
"github.com/mongodb/mongodb-kubernetes/controllers/search_controller"
4248
mcoConstruct "github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/controllers/construct"
4349
"github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/kube/annotations"
4450
"github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/kube/configmap"
@@ -52,6 +58,7 @@ import (
5258
"github.com/mongodb/mongodb-kubernetes/pkg/util/architectures"
5359
"github.com/mongodb/mongodb-kubernetes/pkg/util/env"
5460
util_int "github.com/mongodb/mongodb-kubernetes/pkg/util/int"
61+
"github.com/mongodb/mongodb-kubernetes/pkg/util/maputil"
5562
"github.com/mongodb/mongodb-kubernetes/pkg/vault"
5663
"github.com/mongodb/mongodb-kubernetes/pkg/vault/vaultwatcher"
5764
)
@@ -219,6 +226,8 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco
219226
return r.updateStatus(ctx, rs, workflow.Failed(xerrors.Errorf("Failed to reconcileHostnameOverrideConfigMap: %w", err)), log)
220227
}
221228

229+
shouldMirrorKeyfile := r.applySearchOverrides(ctx, rs, log)
230+
222231
sts := construct.DatabaseStatefulSet(*rs, rsConfig, log)
223232
if status := r.ensureRoles(ctx, rs.Spec.DbCommonSpec, r.enableClusterMongoDBRoles, conn, kube.ObjectKeyFromApiObject(rs), log); !status.IsOK() {
224233
return r.updateStatus(ctx, rs, status, log)
@@ -238,7 +247,7 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco
238247
// See CLOUDP-189433 and CLOUDP-229222 for more details.
239248
if recovery.ShouldTriggerRecovery(rs.Status.Phase != mdbstatus.PhaseRunning, rs.Status.LastTransition) {
240249
log.Warnf("Triggering Automatic Recovery. The MongoDB resource %s/%s is in %s state since %s", rs.Namespace, rs.Name, rs.Status.Phase, rs.Status.LastTransition)
241-
automationConfigStatus := r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, rs, sts, log, caFilePath, agentCertSecretName, prometheusCertHash, true).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):")
250+
automationConfigStatus := r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, rs, sts, log, caFilePath, agentCertSecretName, prometheusCertHash, true, shouldMirrorKeyfile).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):")
242251
deploymentError := create.DatabaseInKubernetes(ctx, r.client, *rs, sts, rsConfig, log)
243252
if deploymentError != nil {
244253
log.Errorf("Recovery failed because of deployment errors, %w", deploymentError)
@@ -254,7 +263,7 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco
254263
}
255264
status = workflow.RunInGivenOrder(publishAutomationConfigFirst(ctx, r.client, *rs, lastSpec, rsConfig, log),
256265
func() workflow.Status {
257-
return r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, rs, sts, log, caFilePath, agentCertSecretName, prometheusCertHash, false).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):")
266+
return r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, rs, sts, log, caFilePath, agentCertSecretName, prometheusCertHash, false, shouldMirrorKeyfile).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):")
258267
},
259268
func() workflow.Status {
260269
workflowStatus := create.HandlePVCResize(ctx, r.client, &sts, log)
@@ -408,14 +417,24 @@ func AddReplicaSetController(ctx context.Context, mgr manager.Manager, imageUrls
408417
zap.S().Errorf("Failed to watch for vault secret changes: %w", err)
409418
}
410419
}
420+
421+
err = c.Watch(source.Kind(mgr.GetCache(), &searchv1.MongoDBSearch{},
422+
handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, search *searchv1.MongoDBSearch) []reconcile.Request {
423+
source := search.GetMongoDBResourceRef()
424+
return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: source.Namespace, Name: source.Name}}}
425+
})))
426+
if err != nil {
427+
return err
428+
}
429+
411430
zap.S().Infof("Registered controller %s", util.MongoDbReplicaSetController)
412431

413432
return nil
414433
}
415434

416435
// updateOmDeploymentRs performs OM registration operation for the replicaset. So the changes will be finally propagated
417436
// to automation agents in containers
418-
func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, conn om.Connection, membersNumberBefore int, rs *mdbv1.MongoDB, set appsv1.StatefulSet, log *zap.SugaredLogger, caFilePath string, agentCertSecretName string, prometheusCertHash string, isRecovering bool) workflow.Status {
437+
func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, conn om.Connection, membersNumberBefore int, rs *mdbv1.MongoDB, set appsv1.StatefulSet, log *zap.SugaredLogger, caFilePath string, agentCertSecretName string, prometheusCertHash string, isRecovering bool, shouldMirrorKeyfileForMongot bool) workflow.Status {
419438
log.Debug("Entering UpdateOMDeployments")
420439
// Only "concrete" RS members should be observed
421440
// - if scaling down, let's observe only members that will remain after scale-down operation
@@ -469,6 +488,11 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c
469488

470489
err = conn.ReadUpdateDeployment(
471490
func(d om.Deployment) error {
491+
if shouldMirrorKeyfileForMongot {
492+
if err := r.mirrorKeyfileIntoSecretForMongot(ctx, d, rs, log); err != nil {
493+
return err
494+
}
495+
}
472496
return ReconcileReplicaSetAC(ctx, d, rs.Spec.DbCommonSpec, lastRsConfig.ToMap(), rs.Name, replicaSet, caFilePath, internalClusterPath, &p, log)
473497
},
474498
log,
@@ -609,3 +633,70 @@ func getAllHostsRs(set appsv1.StatefulSet, clusterName string, membersCount int,
609633
hostnames, _ := dns.GetDnsForStatefulSetReplicasSpecified(set, clusterName, membersCount, externalDomain)
610634
return hostnames
611635
}
636+
637+
func (r *ReconcileMongoDbReplicaSet) applySearchOverrides(ctx context.Context, rs *mdbv1.MongoDB, log *zap.SugaredLogger) bool {
638+
search := r.lookupCorrespondingSearchResource(ctx, rs, log)
639+
if search == nil {
640+
log.Debugf("No MongoDBSearch resource found, skipping search overrides")
641+
return false
642+
}
643+
644+
log.Infof("Applying search overrides from MongoDBSearch %s", search.NamespacedName())
645+
646+
if rs.Spec.AdditionalMongodConfig == nil {
647+
rs.Spec.AdditionalMongodConfig = mdbv1.NewEmptyAdditionalMongodConfig()
648+
}
649+
searchMongodConfig := search_controller.GetMongodConfigParameters(search)
650+
rs.Spec.AdditionalMongodConfig.AddOption("setParameter", searchMongodConfig["setParameter"])
651+
652+
mdbVersion, err := semver.ParseTolerant(rs.Spec.Version)
653+
if err != nil {
654+
log.Warnf("Failed to parse MongoDB version %q: %w. Proceeding without the automatic creation of the searchCoordinator role that's necessary for MongoDB <8.2", rs.Spec.Version, err)
655+
} else if semver.MustParse("8.2.0").GT(mdbVersion) {
656+
log.Infof("Polyfilling the searchCoordinator role for MongoDB %s", rs.Spec.Version)
657+
658+
if rs.Spec.Security == nil {
659+
rs.Spec.Security = &mdbv1.Security{}
660+
}
661+
rs.Spec.Security.Roles = append(rs.Spec.Security.Roles, search_controller.SearchCoordinatorRole())
662+
}
663+
664+
return true
665+
}
666+
667+
func (r *ReconcileMongoDbReplicaSet) mirrorKeyfileIntoSecretForMongot(ctx context.Context, d om.Deployment, rs *mdbv1.MongoDB, log *zap.SugaredLogger) error {
668+
keyfileContents := maputil.ReadMapValueAsString(d, "auth", "key")
669+
keyfileSecret := &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("%s-keyfile", rs.Name), Namespace: rs.Namespace}}
670+
671+
log.Infof("Mirroring the replicaset %s's keyfile into the secret %s", rs.ObjectKey(), kube.ObjectKeyFromApiObject(keyfileSecret))
672+
673+
_, err := controllerutil.CreateOrUpdate(ctx, r.client, keyfileSecret, func() error {
674+
keyfileSecret.StringData = map[string]string{"keyfile": keyfileContents}
675+
return controllerutil.SetOwnerReference(rs, keyfileSecret, r.client.Scheme())
676+
})
677+
if err != nil {
678+
return xerrors.Errorf("Failed to mirror the replicaset's keyfile into a secret: %w", err)
679+
} else {
680+
return nil
681+
}
682+
}
683+
684+
func (r *ReconcileMongoDbReplicaSet) lookupCorrespondingSearchResource(ctx context.Context, rs *mdbv1.MongoDB, log *zap.SugaredLogger) *searchv1.MongoDBSearch {
685+
var search *searchv1.MongoDBSearch
686+
searchList := &searchv1.MongoDBSearchList{}
687+
if err := r.client.List(ctx, searchList, &client.ListOptions{
688+
FieldSelector: fields.OneTermEqualSelector(search_controller.MongoDBSearchIndexFieldName, rs.GetNamespace()+"/"+rs.GetName()),
689+
}); err != nil {
690+
log.Debugf("Failed to list MongoDBSearch resources: %v", err)
691+
}
692+
// this validates that there is exactly one MongoDBSearch pointing to this resource,
693+
// and that this resource passes search validations. If either fails, proceed without a search target
694+
// for the mongod automation config.
695+
if len(searchList.Items) == 1 {
696+
searchSource := search_controller.NewEnterpriseResourceSearchSource(rs)
697+
if searchSource.Validate() == nil {
698+
search = &searchList.Items[0]
699+
}
700+
}
701+
return search
702+
}

controllers/operator/mongodbsearch_controller.go

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,35 @@ import (
1414

1515
appsv1 "k8s.io/api/apps/v1"
1616
corev1 "k8s.io/api/core/v1"
17+
apierrors "k8s.io/apimachinery/pkg/api/errors"
1718
ctrl "sigs.k8s.io/controller-runtime"
1819

20+
mdbv1 "github.com/mongodb/mongodb-kubernetes/api/v1/mdb"
1921
searchv1 "github.com/mongodb/mongodb-kubernetes/api/v1/search"
2022
"github.com/mongodb/mongodb-kubernetes/controllers/search_controller"
2123
mdbcv1 "github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/api/v1"
2224
"github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/controllers/watch"
2325
kubernetesClient "github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/kube/client"
26+
"github.com/mongodb/mongodb-kubernetes/pkg/kube"
2427
"github.com/mongodb/mongodb-kubernetes/pkg/kube/commoncontroller"
2528
"github.com/mongodb/mongodb-kubernetes/pkg/util"
2629
"github.com/mongodb/mongodb-kubernetes/pkg/util/env"
2730
)
2831

2932
type MongoDBSearchReconciler struct {
3033
kubeClient kubernetesClient.Client
31-
mdbcWatcher *watch.ResourceWatcher
34+
mdbcWatcher watch.ResourceWatcher
35+
mdbWatcher watch.ResourceWatcher
36+
secretWatcher watch.ResourceWatcher
3237
operatorSearchConfig search_controller.OperatorSearchConfig
3338
}
3439

3540
func newMongoDBSearchReconciler(client client.Client, operatorSearchConfig search_controller.OperatorSearchConfig) *MongoDBSearchReconciler {
36-
mdbcWatcher := watch.New()
3741
return &MongoDBSearchReconciler{
3842
kubeClient: kubernetesClient.NewClient(client),
39-
mdbcWatcher: &mdbcWatcher,
43+
mdbcWatcher: watch.New(),
44+
mdbWatcher: watch.New(),
45+
secretWatcher: watch.New(),
4046
operatorSearchConfig: operatorSearchConfig,
4147
}
4248
}
@@ -51,26 +57,43 @@ func (r *MongoDBSearchReconciler) Reconcile(ctx context.Context, request reconci
5157
return result, err
5258
}
5359

54-
sourceResource, err := getSourceMongoDBForSearch(ctx, r.kubeClient, mdbSearch)
60+
sourceResource, err := r.getSourceMongoDBForSearch(ctx, r.kubeClient, mdbSearch, log)
5561
if err != nil {
5662
return reconcile.Result{RequeueAfter: time.Second * util.RetryTimeSec}, err
5763
}
58-
59-
r.mdbcWatcher.Watch(ctx, sourceResource.NamespacedName(), request.NamespacedName)
64+
r.secretWatcher.Watch(ctx, kube.ObjectKey(sourceResource.GetNamespace(), sourceResource.KeyfileSecretName()), mdbSearch.NamespacedName())
6065

6166
reconcileHelper := search_controller.NewMongoDBSearchReconcileHelper(kubernetesClient.NewClient(r.kubeClient), mdbSearch, sourceResource, r.operatorSearchConfig)
6267

6368
return reconcileHelper.Reconcile(ctx, log).ReconcileResult()
6469
}
6570

66-
func getSourceMongoDBForSearch(ctx context.Context, kubeClient client.Client, search *searchv1.MongoDBSearch) (search_controller.SearchSourceDBResource, error) {
71+
func (r *MongoDBSearchReconciler) getSourceMongoDBForSearch(ctx context.Context, kubeClient client.Client, search *searchv1.MongoDBSearch, log *zap.SugaredLogger) (search_controller.SearchSourceDBResource, error) {
6772
sourceMongoDBResourceRef := search.GetMongoDBResourceRef()
68-
mdbcName := types.NamespacedName{Namespace: search.GetNamespace(), Name: sourceMongoDBResourceRef.Name}
73+
sourceName := types.NamespacedName{Namespace: search.GetNamespace(), Name: sourceMongoDBResourceRef.Name}
74+
log.Infof("Looking up Search source %s", sourceName)
75+
76+
mdb := &mdbv1.MongoDB{}
77+
if err := kubeClient.Get(ctx, sourceName, mdb); err != nil && !apierrors.IsNotFound(err) {
78+
if !apierrors.IsNotFound(err) {
79+
return nil, xerrors.Errorf("error getting MongoDB %s: %w", sourceName, err)
80+
}
81+
} else {
82+
r.mdbWatcher.Watch(ctx, sourceName, search.NamespacedName())
83+
return search_controller.NewEnterpriseResourceSearchSource(mdb), nil
84+
}
85+
6986
mdbc := &mdbcv1.MongoDBCommunity{}
70-
if err := kubeClient.Get(ctx, mdbcName, mdbc); err != nil {
71-
return nil, xerrors.Errorf("error getting MongoDBCommunity %s: %w", mdbcName, err)
87+
if err := kubeClient.Get(ctx, sourceName, mdbc); err != nil {
88+
if !apierrors.IsNotFound(err) {
89+
return nil, xerrors.Errorf("error getting MongoDBCommunity %s: %w", sourceName, err)
90+
}
91+
} else {
92+
r.mdbcWatcher.Watch(ctx, sourceName, search.NamespacedName())
93+
return search_controller.NewCommunityResourceSearchSource(mdbc), nil
7294
}
73-
return search_controller.NewSearchSourceDBResourceFromMongoDBCommunity(mdbc), nil
95+
96+
return nil, xerrors.Errorf("No database resource named %s found", sourceName)
7497
}
7598

7699
func mdbcSearchIndexBuilder(rawObj client.Object) []string {
@@ -88,7 +111,9 @@ func AddMongoDBSearchController(ctx context.Context, mgr manager.Manager, operat
88111
return ctrl.NewControllerManagedBy(mgr).
89112
WithOptions(controller.Options{MaxConcurrentReconciles: env.ReadIntOrDefault(util.MaxConcurrentReconcilesEnv, 1)}). // nolint:forbidigo
90113
For(&searchv1.MongoDBSearch{}).
114+
Watches(&mdbv1.MongoDB{}, r.mdbWatcher).
91115
Watches(&mdbcv1.MongoDBCommunity{}, r.mdbcWatcher).
116+
Watches(&corev1.Secret{}, r.secretWatcher).
92117
Owns(&appsv1.StatefulSet{}).
93118
Owns(&corev1.Secret{}).
94119
Complete(r)
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package search_controller
2+
3+
import (
4+
"strings"
5+
6+
"github.com/blang/semver"
7+
"golang.org/x/xerrors"
8+
"k8s.io/apimachinery/pkg/types"
9+
10+
mdbcv1 "github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/api/v1"
11+
"github.com/mongodb/mongodb-kubernetes/pkg/util"
12+
)
13+
14+
func NewCommunityResourceSearchSource(mdbc *mdbcv1.MongoDBCommunity) SearchSourceDBResource {
15+
return &CommunitySearchSource{MongoDBCommunity: mdbc}
16+
}
17+
18+
type CommunitySearchSource struct {
19+
*mdbcv1.MongoDBCommunity
20+
}
21+
22+
func (r *CommunitySearchSource) Members() int {
23+
return r.Spec.Members
24+
}
25+
26+
func (r *CommunitySearchSource) GetName() string {
27+
return r.Name
28+
}
29+
30+
func (r *CommunitySearchSource) NamespacedName() types.NamespacedName {
31+
return r.MongoDBCommunity.NamespacedName()
32+
}
33+
34+
func (r *CommunitySearchSource) KeyfileSecretName() string {
35+
return r.MongoDBCommunity.GetAgentKeyfileSecretNamespacedName().Name
36+
}
37+
38+
func (r *CommunitySearchSource) GetNamespace() string {
39+
return r.Namespace
40+
}
41+
42+
func (r *CommunitySearchSource) DatabaseServiceName() string {
43+
return r.ServiceName()
44+
}
45+
46+
func (r *CommunitySearchSource) IsSecurityTLSConfigEnabled() bool {
47+
return r.Spec.Security.TLS.Enabled
48+
}
49+
50+
func (r *CommunitySearchSource) DatabasePort() int {
51+
return r.MongoDBCommunity.GetMongodConfiguration().GetDBPort()
52+
}
53+
54+
func (r *CommunitySearchSource) TLSOperatorCASecretNamespacedName() types.NamespacedName {
55+
return r.MongoDBCommunity.TLSOperatorCASecretNamespacedName()
56+
}
57+
58+
func (r *CommunitySearchSource) Validate() error {
59+
version, err := semver.ParseTolerant(r.GetMongoDBVersion())
60+
if err != nil {
61+
return xerrors.Errorf("error parsing MongoDB version '%s': %w", r.Spec.Version, err)
62+
} else if version.LT(semver.MustParse("8.0.10")) {
63+
return xerrors.New("MongoDB version must be 8.0.10 or higher")
64+
}
65+
66+
foundScram := false
67+
for _, authMode := range r.Spec.Security.Authentication.Modes {
68+
// Check for SCRAM, SCRAM-SHA-1, or SCRAM-SHA-256
69+
if strings.HasPrefix(string(authMode), util.SCRAM) {
70+
foundScram = true
71+
break
72+
}
73+
}
74+
75+
if !foundScram {
76+
return xerrors.New("MongoDBSearch requires SCRAM authentication to be enabled")
77+
}
78+
79+
return nil
80+
}

0 commit comments

Comments
 (0)