Skip to content

Basic enterprise search support #309

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: search/public-preview
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .evergreen-tasks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1290,3 +1290,8 @@ tasks:
tags: ["patch-run"]
commands:
- func: "e2e_test"

- name: e2e_search_enterprise_basic
tags: ["patch-run"]
commands:
- func: "e2e_test"
2 changes: 2 additions & 0 deletions .evergreen.yml
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,8 @@ task_groups:
- e2e_replica_set_oidc_workforce
- e2e_sharded_cluster_oidc_m2m_group
- e2e_sharded_cluster_oidc_m2m_user
# MongoDBSearch test group
- e2e_search_enterprise_basic
<<: *teardown_group

# this task group contains just a one task, which is smoke testing whether the operator
Expand Down
97 changes: 94 additions & 3 deletions controllers/operator/mongodbreplicaset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ import (
"context"
"fmt"

"github.com/blang/semver"
"go.uber.org/zap"
"golang.org/x/xerrors"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand All @@ -22,6 +26,7 @@ import (

mdbv1 "github.com/mongodb/mongodb-kubernetes/api/v1/mdb"
rolev1 "github.com/mongodb/mongodb-kubernetes/api/v1/role"
searchv1 "github.com/mongodb/mongodb-kubernetes/api/v1/search"
mdbstatus "github.com/mongodb/mongodb-kubernetes/api/v1/status"
"github.com/mongodb/mongodb-kubernetes/controllers/om"
"github.com/mongodb/mongodb-kubernetes/controllers/om/backup"
Expand All @@ -39,6 +44,7 @@ import (
"github.com/mongodb/mongodb-kubernetes/controllers/operator/recovery"
"github.com/mongodb/mongodb-kubernetes/controllers/operator/watch"
"github.com/mongodb/mongodb-kubernetes/controllers/operator/workflow"
"github.com/mongodb/mongodb-kubernetes/controllers/search_controller"
mcoConstruct "github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/controllers/construct"
"github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/kube/annotations"
"github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/kube/configmap"
Expand All @@ -52,6 +58,7 @@ import (
"github.com/mongodb/mongodb-kubernetes/pkg/util/architectures"
"github.com/mongodb/mongodb-kubernetes/pkg/util/env"
util_int "github.com/mongodb/mongodb-kubernetes/pkg/util/int"
"github.com/mongodb/mongodb-kubernetes/pkg/util/maputil"
"github.com/mongodb/mongodb-kubernetes/pkg/vault"
"github.com/mongodb/mongodb-kubernetes/pkg/vault/vaultwatcher"
)
Expand Down Expand Up @@ -219,6 +226,8 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco
return r.updateStatus(ctx, rs, workflow.Failed(xerrors.Errorf("Failed to reconcileHostnameOverrideConfigMap: %w", err)), log)
}

shouldMirrorKeyfile := r.applySearchOverrides(ctx, rs, log)

sts := construct.DatabaseStatefulSet(*rs, rsConfig, log)
if status := r.ensureRoles(ctx, rs.Spec.DbCommonSpec, r.enableClusterMongoDBRoles, conn, kube.ObjectKeyFromApiObject(rs), log); !status.IsOK() {
return r.updateStatus(ctx, rs, status, log)
Expand All @@ -238,7 +247,7 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco
// See CLOUDP-189433 and CLOUDP-229222 for more details.
if recovery.ShouldTriggerRecovery(rs.Status.Phase != mdbstatus.PhaseRunning, rs.Status.LastTransition) {
log.Warnf("Triggering Automatic Recovery. The MongoDB resource %s/%s is in %s state since %s", rs.Namespace, rs.Name, rs.Status.Phase, rs.Status.LastTransition)
automationConfigStatus := r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, rs, sts, log, caFilePath, agentCertSecretName, prometheusCertHash, true).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):")
automationConfigStatus := r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, rs, sts, log, caFilePath, agentCertSecretName, prometheusCertHash, true, shouldMirrorKeyfile).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):")
deploymentError := create.DatabaseInKubernetes(ctx, r.client, *rs, sts, rsConfig, log)
if deploymentError != nil {
log.Errorf("Recovery failed because of deployment errors, %w", deploymentError)
Expand All @@ -254,7 +263,7 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco
}
status = workflow.RunInGivenOrder(publishAutomationConfigFirst(ctx, r.client, *rs, lastSpec, rsConfig, log),
func() workflow.Status {
return r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, rs, sts, log, caFilePath, agentCertSecretName, prometheusCertHash, false).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):")
return r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, rs, sts, log, caFilePath, agentCertSecretName, prometheusCertHash, false, shouldMirrorKeyfile).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):")
},
func() workflow.Status {
workflowStatus := create.HandlePVCResize(ctx, r.client, &sts, log)
Expand Down Expand Up @@ -408,14 +417,24 @@ func AddReplicaSetController(ctx context.Context, mgr manager.Manager, imageUrls
zap.S().Errorf("Failed to watch for vault secret changes: %w", err)
}
}

err = c.Watch(source.Kind(mgr.GetCache(), &searchv1.MongoDBSearch{},
handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, search *searchv1.MongoDBSearch) []reconcile.Request {
source := search.GetMongoDBResourceRef()
return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: source.Namespace, Name: source.Name}}}
})))
if err != nil {
return err
}

zap.S().Infof("Registered controller %s", util.MongoDbReplicaSetController)

return nil
}

// updateOmDeploymentRs performs OM registration operation for the replicaset. So the changes will be finally propagated
// to automation agents in containers
func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, conn om.Connection, membersNumberBefore int, rs *mdbv1.MongoDB, set appsv1.StatefulSet, log *zap.SugaredLogger, caFilePath string, agentCertSecretName string, prometheusCertHash string, isRecovering bool) workflow.Status {
func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, conn om.Connection, membersNumberBefore int, rs *mdbv1.MongoDB, set appsv1.StatefulSet, log *zap.SugaredLogger, caFilePath string, agentCertSecretName string, prometheusCertHash string, isRecovering bool, shouldMirrorKeyfileForMongot bool) workflow.Status {
log.Debug("Entering UpdateOMDeployments")
// Only "concrete" RS members should be observed
// - if scaling down, let's observe only members that will remain after scale-down operation
Expand Down Expand Up @@ -469,6 +488,11 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c

err = conn.ReadUpdateDeployment(
func(d om.Deployment) error {
if shouldMirrorKeyfileForMongot {
if err := r.mirrorKeyfileIntoSecretForMongot(ctx, d, rs, log); err != nil {
return err
}
}
return ReconcileReplicaSetAC(ctx, d, rs.Spec.DbCommonSpec, lastRsConfig.ToMap(), rs.Name, replicaSet, caFilePath, internalClusterPath, &p, log)
},
log,
Expand Down Expand Up @@ -609,3 +633,70 @@ func getAllHostsRs(set appsv1.StatefulSet, clusterName string, membersCount int,
hostnames, _ := dns.GetDnsForStatefulSetReplicasSpecified(set, clusterName, membersCount, externalDomain)
return hostnames
}

func (r *ReconcileMongoDbReplicaSet) applySearchOverrides(ctx context.Context, rs *mdbv1.MongoDB, log *zap.SugaredLogger) bool {
search := r.lookupCorrespondingSearchResource(ctx, rs, log)
if search == nil {
log.Debugf("No MongoDBSearch resource found, skipping search overrides")
return false
}

log.Infof("Applying search overrides from MongoDBSearch %s", search.NamespacedName())

if rs.Spec.AdditionalMongodConfig == nil {
rs.Spec.AdditionalMongodConfig = mdbv1.NewEmptyAdditionalMongodConfig()
}
searchMongodConfig := search_controller.GetMongodConfigParameters(search)
rs.Spec.AdditionalMongodConfig.AddOption("setParameter", searchMongodConfig["setParameter"])

mdbVersion, err := semver.ParseTolerant(rs.Spec.Version)
if err != nil {
log.Warnf("Failed to parse MongoDB version %q: %w. Proceeding without the automatic creation of the searchCoordinator role that's necessary for MongoDB <8.2", rs.Spec.Version, err)
} else if semver.MustParse("8.2.0").GT(mdbVersion) {
log.Infof("Polyfilling the searchCoordinator role for MongoDB %s", rs.Spec.Version)

if rs.Spec.Security == nil {
rs.Spec.Security = &mdbv1.Security{}
}
rs.Spec.Security.Roles = append(rs.Spec.Security.Roles, search_controller.SearchCoordinatorRole())
}

return true
}

func (r *ReconcileMongoDbReplicaSet) mirrorKeyfileIntoSecretForMongot(ctx context.Context, d om.Deployment, rs *mdbv1.MongoDB, log *zap.SugaredLogger) error {
keyfileContents := maputil.ReadMapValueAsString(d, "auth", "key")
keyfileSecret := &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("%s-keyfile", rs.Name), Namespace: rs.Namespace}}

log.Infof("Mirroring the replicaset %s's keyfile into the secret %s", rs.ObjectKey(), kube.ObjectKeyFromApiObject(keyfileSecret))

_, err := controllerutil.CreateOrUpdate(ctx, r.client, keyfileSecret, func() error {
keyfileSecret.StringData = map[string]string{"keyfile": keyfileContents}
return controllerutil.SetOwnerReference(rs, keyfileSecret, r.client.Scheme())
})
if err != nil {
return xerrors.Errorf("Failed to mirror the replicaset's keyfile into a secret: %w", err)
} else {
return nil
}
}

func (r *ReconcileMongoDbReplicaSet) lookupCorrespondingSearchResource(ctx context.Context, rs *mdbv1.MongoDB, log *zap.SugaredLogger) *searchv1.MongoDBSearch {
var search *searchv1.MongoDBSearch
searchList := &searchv1.MongoDBSearchList{}
if err := r.client.List(ctx, searchList, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(search_controller.MongoDBSearchIndexFieldName, rs.GetNamespace()+"/"+rs.GetName()),
}); err != nil {
log.Debugf("Failed to list MongoDBSearch resources: %v", err)
}
// this validates that there is exactly one MongoDBSearch pointing to this resource,
// and that this resource passes search validations. If either fails, proceed without a search target
// for the mongod automation config.
if len(searchList.Items) == 1 {
searchSource := search_controller.NewEnterpriseResourceSearchSource(rs)
if searchSource.Validate() == nil {
search = &searchList.Items[0]
}
}
return search
}
47 changes: 36 additions & 11 deletions controllers/operator/mongodbsearch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,35 @@ import (

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"

mdbv1 "github.com/mongodb/mongodb-kubernetes/api/v1/mdb"
searchv1 "github.com/mongodb/mongodb-kubernetes/api/v1/search"
"github.com/mongodb/mongodb-kubernetes/controllers/search_controller"
mdbcv1 "github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/api/v1"
"github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/controllers/watch"
kubernetesClient "github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/kube/client"
"github.com/mongodb/mongodb-kubernetes/pkg/kube"
"github.com/mongodb/mongodb-kubernetes/pkg/kube/commoncontroller"
"github.com/mongodb/mongodb-kubernetes/pkg/util"
"github.com/mongodb/mongodb-kubernetes/pkg/util/env"
)

type MongoDBSearchReconciler struct {
kubeClient kubernetesClient.Client
mdbcWatcher *watch.ResourceWatcher
mdbcWatcher watch.ResourceWatcher
mdbWatcher watch.ResourceWatcher
secretWatcher watch.ResourceWatcher
operatorSearchConfig search_controller.OperatorSearchConfig
}

func newMongoDBSearchReconciler(client client.Client, operatorSearchConfig search_controller.OperatorSearchConfig) *MongoDBSearchReconciler {
mdbcWatcher := watch.New()
return &MongoDBSearchReconciler{
kubeClient: kubernetesClient.NewClient(client),
mdbcWatcher: &mdbcWatcher,
mdbcWatcher: watch.New(),
mdbWatcher: watch.New(),
secretWatcher: watch.New(),
operatorSearchConfig: operatorSearchConfig,
}
}
Expand All @@ -51,26 +57,43 @@ func (r *MongoDBSearchReconciler) Reconcile(ctx context.Context, request reconci
return result, err
}

sourceResource, err := getSourceMongoDBForSearch(ctx, r.kubeClient, mdbSearch)
sourceResource, err := r.getSourceMongoDBForSearch(ctx, r.kubeClient, mdbSearch, log)
if err != nil {
return reconcile.Result{RequeueAfter: time.Second * util.RetryTimeSec}, err
}

r.mdbcWatcher.Watch(ctx, sourceResource.NamespacedName(), request.NamespacedName)
r.secretWatcher.Watch(ctx, kube.ObjectKey(sourceResource.GetNamespace(), sourceResource.KeyfileSecretName()), mdbSearch.NamespacedName())

reconcileHelper := search_controller.NewMongoDBSearchReconcileHelper(kubernetesClient.NewClient(r.kubeClient), mdbSearch, sourceResource, r.operatorSearchConfig)

return reconcileHelper.Reconcile(ctx, log).ReconcileResult()
}

func getSourceMongoDBForSearch(ctx context.Context, kubeClient client.Client, search *searchv1.MongoDBSearch) (search_controller.SearchSourceDBResource, error) {
func (r *MongoDBSearchReconciler) getSourceMongoDBForSearch(ctx context.Context, kubeClient client.Client, search *searchv1.MongoDBSearch, log *zap.SugaredLogger) (search_controller.SearchSourceDBResource, error) {
sourceMongoDBResourceRef := search.GetMongoDBResourceRef()
mdbcName := types.NamespacedName{Namespace: search.GetNamespace(), Name: sourceMongoDBResourceRef.Name}
sourceName := types.NamespacedName{Namespace: search.GetNamespace(), Name: sourceMongoDBResourceRef.Name}
log.Infof("Looking up Search source %s", sourceName)

mdb := &mdbv1.MongoDB{}
if err := kubeClient.Get(ctx, sourceName, mdb); err != nil {
if !apierrors.IsNotFound(err) {
return nil, xerrors.Errorf("error getting MongoDB %s: %w", sourceName, err)
}
} else {
r.mdbWatcher.Watch(ctx, sourceName, search.NamespacedName())
return search_controller.NewEnterpriseResourceSearchSource(mdb), nil
}

mdbc := &mdbcv1.MongoDBCommunity{}
if err := kubeClient.Get(ctx, mdbcName, mdbc); err != nil {
return nil, xerrors.Errorf("error getting MongoDBCommunity %s: %w", mdbcName, err)
if err := kubeClient.Get(ctx, sourceName, mdbc); err != nil {
if !apierrors.IsNotFound(err) {
return nil, xerrors.Errorf("error getting MongoDBCommunity %s: %w", sourceName, err)
}
} else {
r.mdbcWatcher.Watch(ctx, sourceName, search.NamespacedName())
return search_controller.NewCommunityResourceSearchSource(mdbc), nil
}
return search_controller.NewSearchSourceDBResourceFromMongoDBCommunity(mdbc), nil

return nil, xerrors.Errorf("No database resource named %s found", sourceName)
}

func mdbcSearchIndexBuilder(rawObj client.Object) []string {
Expand All @@ -88,7 +111,9 @@ func AddMongoDBSearchController(ctx context.Context, mgr manager.Manager, operat
return ctrl.NewControllerManagedBy(mgr).
WithOptions(controller.Options{MaxConcurrentReconciles: env.ReadIntOrDefault(util.MaxConcurrentReconcilesEnv, 1)}). // nolint:forbidigo
For(&searchv1.MongoDBSearch{}).
Watches(&mdbv1.MongoDB{}, r.mdbWatcher).
Watches(&mdbcv1.MongoDBCommunity{}, r.mdbcWatcher).
Watches(&corev1.Secret{}, r.secretWatcher).
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.Secret{}).
Complete(r)
Expand Down
12 changes: 11 additions & 1 deletion controllers/operator/mongodbsearch_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
mdbcv1 "github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/api/v1"
"github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/api/v1/common"
"github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/mongot"
"github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/util/constants"
)

func newMongoDBCommunity(name, namespace string) *mdbcv1.MongoDBCommunity {
Expand Down Expand Up @@ -62,7 +63,16 @@ func newSearchReconcilerWithOperatorConfig(
builder.WithIndex(&searchv1.MongoDBSearch{}, search_controller.MongoDBSearchIndexFieldName, mdbcSearchIndexBuilder)

if mdbc != nil {
builder.WithObjects(mdbc)
keyfileSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: mdbc.GetAgentKeyfileSecretNamespacedName().Name,
Namespace: mdbc.Namespace,
},
StringData: map[string]string{
constants.AgentKeyfileKey: "keyfile",
},
}
builder.WithObjects(mdbc, keyfileSecret)
}

for _, search := range searches {
Expand Down
Loading