diff --git a/e2e-tests/demand-backup-incremental-sharded/run b/e2e-tests/demand-backup-incremental-sharded/run index 88f3ded75f..d8a9bcbff2 100755 --- a/e2e-tests/demand-backup-incremental-sharded/run +++ b/e2e-tests/demand-backup-incremental-sharded/run @@ -49,14 +49,15 @@ run_recovery_check() { # we don't wait for cluster readiness here because the annotation gets removed then wait_restore "${backup_name}" "${cluster}" "ready" "0" "3000" - kubectl_bin get psmdb ${cluster} -o yaml - if [ $(kubectl_bin get psmdb ${cluster} -o yaml | yq '.metadata.annotations."percona.com/resync-pbm"') == null ]; then + + if [ "$(kubectl_bin get psmdb ${cluster} -o yaml | yq '.metadata.annotations."percona.com/resync-pbm"')" == null ] && [ "$(kubectl_bin get psmdb ${cluster} -o yaml | yq '.metadata.annotations."percona.com/resync-in-progress"')" == null ]; then log "psmdb/${cluster} should be annotated with percona.com/resync-pbm after a incremental restore" exit 1 fi echo wait_cluster_consistency ${cluster} 60 + sleep 5 wait_for_pbm_operations ${cluster} if [[ $base == true ]]; then diff --git a/e2e-tests/demand-backup-incremental/run b/e2e-tests/demand-backup-incremental/run index 283f3f30ed..6a07825ed8 100755 --- a/e2e-tests/demand-backup-incremental/run +++ b/e2e-tests/demand-backup-incremental/run @@ -50,13 +50,14 @@ run_recovery_check() { # we don't wait for cluster readiness here because the annotation gets removed then wait_restore "${backup_name}" "${cluster}" "ready" "0" "1800" - if [ $(kubectl_bin get psmdb ${cluster} -o yaml | yq '.metadata.annotations."percona.com/resync-pbm"') == null ]; then + if [ "$(kubectl_bin get psmdb ${cluster} -o yaml | yq '.metadata.annotations."percona.com/resync-pbm"')" == null ] && [ "$(kubectl_bin get psmdb ${cluster} -o yaml | yq '.metadata.annotations."percona.com/resync-in-progress"')" == null ]; then log "psmdb/${cluster} should be annotated with percona.com/resync-pbm after a incremental restore" exit 1 fi echo wait_cluster_consistency ${cluster} + sleep 5 wait_for_pbm_operations ${cluster} if [[ $base == true ]]; then diff --git a/pkg/controller/perconaservermongodb/balancer.go b/pkg/controller/perconaservermongodb/balancer.go index 65aa7159ce..c5ca9be22c 100644 --- a/pkg/controller/perconaservermongodb/balancer.go +++ b/pkg/controller/perconaservermongodb/balancer.go @@ -4,7 +4,6 @@ import ( "context" "time" - "github.com/percona/percona-server-mongodb-operator/pkg/psmdb" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" k8sErrors "k8s.io/apimachinery/pkg/api/errors" @@ -12,6 +11,7 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" api "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1" + "github.com/percona/percona-server-mongodb-operator/pkg/psmdb" ) func (r *ReconcilePerconaServerMongoDB) enableBalancerIfNeeded(ctx context.Context, cr *api.PerconaServerMongoDB) error { @@ -85,7 +85,7 @@ func (r *ReconcilePerconaServerMongoDB) enableBalancerIfNeeded(ctx context.Conte } } - mongosSession, err := r.mongosClientWithRole(ctx, cr, api.RoleClusterAdmin) + mongosSession, err := r.MongoClient().Mongos(ctx, cr, api.RoleClusterAdmin) if err != nil { return errors.Wrap(err, "failed to get mongos connection") } @@ -141,7 +141,7 @@ func (r *ReconcilePerconaServerMongoDB) disableBalancer(ctx context.Context, cr return errors.Wrapf(err, "get mongos statefulset %s", msSts.Name) } - mongosSession, err := r.mongosClientWithRole(ctx, cr, api.RoleClusterAdmin) + mongosSession, err := r.MongoClient().Mongos(ctx, cr, api.RoleClusterAdmin) if err != nil { return errors.Wrap(err, "failed to get mongos connection") } diff --git a/pkg/controller/perconaservermongodb/connections.go b/pkg/controller/perconaservermongodb/connections.go deleted file mode 100644 index 697f6372df..0000000000 --- a/pkg/controller/perconaservermongodb/connections.go +++ /dev/null @@ -1,73 +0,0 @@ -package perconaservermongodb - -import ( - "context" - - "github.com/pkg/errors" - corev1 "k8s.io/api/core/v1" - "sigs.k8s.io/controller-runtime/pkg/client" - - api "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1" - "github.com/percona/percona-server-mongodb-operator/pkg/psmdb" - "github.com/percona/percona-server-mongodb-operator/pkg/psmdb/mongo" -) - -type MongoClientProvider interface { - Mongo(ctx context.Context, cr *api.PerconaServerMongoDB, rs *api.ReplsetSpec, role api.SystemUserRole) (mongo.Client, error) - Mongos(ctx context.Context, cr *api.PerconaServerMongoDB, role api.SystemUserRole) (mongo.Client, error) - Standalone(ctx context.Context, cr *api.PerconaServerMongoDB, role api.SystemUserRole, host string, tlsEnabled bool) (mongo.Client, error) -} - -func (r *ReconcilePerconaServerMongoDB) getMongoClientProvider() MongoClientProvider { - if r.mongoClientProvider == nil { - return &mongoClientProvider{r.client} - } - return r.mongoClientProvider -} - -type mongoClientProvider struct { - k8sclient client.Client -} - -func (p *mongoClientProvider) Mongo(ctx context.Context, cr *api.PerconaServerMongoDB, rs *api.ReplsetSpec, role api.SystemUserRole) (mongo.Client, error) { - c, err := getInternalCredentials(ctx, p.k8sclient, cr, role) - if err != nil { - return nil, errors.Wrap(err, "failed to get credentials") - } - - return psmdb.MongoClient(ctx, p.k8sclient, cr, rs, c) -} - -func (p *mongoClientProvider) Mongos(ctx context.Context, cr *api.PerconaServerMongoDB, role api.SystemUserRole) (mongo.Client, error) { - c, err := getInternalCredentials(ctx, p.k8sclient, cr, role) - if err != nil { - return nil, errors.Wrap(err, "failed to get credentials") - } - - return psmdb.MongosClient(ctx, p.k8sclient, cr, c) -} - -func (p *mongoClientProvider) Standalone(ctx context.Context, cr *api.PerconaServerMongoDB, role api.SystemUserRole, host string, tlsEnabled bool) (mongo.Client, error) { - c, err := getInternalCredentials(ctx, p.k8sclient, cr, role) - if err != nil { - return nil, errors.Wrap(err, "failed to get credentials") - } - - return psmdb.StandaloneClient(ctx, p.k8sclient, cr, c, host, tlsEnabled) -} - -func (r *ReconcilePerconaServerMongoDB) mongoClientWithRole(ctx context.Context, cr *api.PerconaServerMongoDB, rs *api.ReplsetSpec, role api.SystemUserRole) (mongo.Client, error) { - return r.getMongoClientProvider().Mongo(ctx, cr, rs, role) -} - -func (r *ReconcilePerconaServerMongoDB) mongosClientWithRole(ctx context.Context, cr *api.PerconaServerMongoDB, role api.SystemUserRole) (mongo.Client, error) { - return r.getMongoClientProvider().Mongos(ctx, cr, role) -} - -func (r *ReconcilePerconaServerMongoDB) standaloneClientWithRole(ctx context.Context, cr *api.PerconaServerMongoDB, rs *api.ReplsetSpec, role api.SystemUserRole, pod corev1.Pod) (mongo.Client, error) { - host, err := psmdb.MongoHost(ctx, r.client, cr, cr.Spec.ClusterServiceDNSMode, rs, rs.Expose.Enabled, pod) - if err != nil { - return nil, errors.Wrap(err, "failed to get mongo host") - } - return r.getMongoClientProvider().Standalone(ctx, cr, role, host, cr.TLSEnabled()) -} diff --git a/pkg/controller/perconaservermongodb/connections_test.go b/pkg/controller/perconaservermongodb/connections_test.go index 823c85e656..974db799f5 100644 --- a/pkg/controller/perconaservermongodb/connections_test.go +++ b/pkg/controller/perconaservermongodb/connections_test.go @@ -158,7 +158,7 @@ func TestConnectionLeaks(t *testing.T) { connectionCount := new(int) r := buildFakeClient(obj...) - r.mongoClientProvider = &fakeMongoClientProvider{pods: rsPods, cr: cr, connectionCount: connectionCount} + r.MongoProviderBase = psmdb.NewProviderBase(r.client, &fakeMongoClientProvider{pods: rsPods, cr: cr, connectionCount: connectionCount}) r.serverVersion = &version.ServerVersion{Platform: version.PlatformKubernetes} r.crons = NewCronRegistry() @@ -395,18 +395,18 @@ func (g *fakeMongoClientProvider) Mongos(ctx context.Context, cr *api.PerconaSer return &fakeMongoClient{pods: g.pods, cr: g.cr, connectionCount: g.connectionCount, Client: fakeClient}, nil } -func (g *fakeMongoClientProvider) Standalone(ctx context.Context, cr *api.PerconaServerMongoDB, role api.SystemUserRole, host string, tlsEnabled bool) (mongo.Client, error) { +func (g *fakeMongoClientProvider) Standalone(ctx context.Context, cr *api.PerconaServerMongoDB, rs *api.ReplsetSpec, role api.SystemUserRole, pod corev1.Pod) (mongo.Client, error) { *g.connectionCount++ fakeClient := mongoFake.NewClient() - return &fakeMongoClient{pods: g.pods, cr: g.cr, connectionCount: g.connectionCount, Client: fakeClient, host: host}, nil + return &fakeMongoClient{pods: g.pods, cr: g.cr, connectionCount: g.connectionCount, Client: fakeClient, pod: &pod}, nil } type fakeMongoClient struct { pods []client.Object cr *api.PerconaServerMongoDB connectionCount *int - host string + pod *corev1.Pod mongo.Client } @@ -522,7 +522,7 @@ func (c *fakeMongoClient) IsMaster(ctx context.Context) (*mongo.IsMasterResp, er if err := c.cr.CheckNSetDefaults(ctx, version.PlatformKubernetes); err != nil { return nil, err } - if c.host == psmdb.GetAddr(c.cr, c.pods[0].GetName(), c.cr.Spec.Replsets[0].Name, c.cr.Spec.Replsets[0].GetPort()) { + if c.pod.GetName() == c.pods[0].GetName() { isMaster = true } return &mongo.IsMasterResp{ diff --git a/pkg/controller/perconaservermongodb/custom_users.go b/pkg/controller/perconaservermongodb/custom_users.go index 05f8eb81c4..38795df9f1 100644 --- a/pkg/controller/perconaservermongodb/custom_users.go +++ b/pkg/controller/perconaservermongodb/custom_users.go @@ -35,9 +35,9 @@ func (r *ReconcilePerconaServerMongoDB) reconcileCustomUsers(ctx context.Context var err error var mongoCli mongo.Client if cr.Spec.Sharding.Enabled { - mongoCli, err = r.mongosClientWithRole(ctx, cr, api.RoleUserAdmin) + mongoCli, err = r.MongoClient().Mongos(ctx, cr, api.RoleUserAdmin) } else { - mongoCli, err = r.mongoClientWithRole(ctx, cr, cr.Spec.Replsets[0], api.RoleUserAdmin) + mongoCli, err = r.MongoClient().Mongo(ctx, cr, cr.Spec.Replsets[0], api.RoleUserAdmin) } if err != nil { return errors.Wrap(err, "failed to get mongo client") @@ -310,7 +310,8 @@ func updatePass( user *api.User, userInfo *mongo.User, secret *corev1.Secret, - annotationKey, passKey string) error { + annotationKey, passKey string, +) error { log := logf.FromContext(ctx) if userInfo == nil || user.IsExternalDB() { @@ -395,7 +396,8 @@ func createUser( mongoCli mongo.Client, user *api.User, secret *corev1.Secret, - annotationKey, passKey string) error { + annotationKey, passKey string, +) error { log := logf.FromContext(ctx) roles := make([]mongo.Role, 0) diff --git a/pkg/controller/perconaservermongodb/fcv.go b/pkg/controller/perconaservermongodb/fcv.go index 50ca8e2fc9..95fef67fd3 100644 --- a/pkg/controller/perconaservermongodb/fcv.go +++ b/pkg/controller/perconaservermongodb/fcv.go @@ -12,7 +12,7 @@ import ( ) func (r *ReconcilePerconaServerMongoDB) getFCV(ctx context.Context, cr *api.PerconaServerMongoDB) (string, error) { - c, err := r.mongoClientWithRole(ctx, cr, cr.Spec.Replsets[0], api.RoleClusterAdmin) + c, err := r.MongoClient().Mongo(ctx, cr, cr.Spec.Replsets[0], api.RoleClusterAdmin) if err != nil { return "", errors.Wrap(err, "failed to get connection") } @@ -40,9 +40,9 @@ func (r *ReconcilePerconaServerMongoDB) setFCV(ctx context.Context, cr *api.Perc var connErr error if cr.Spec.Sharding.Enabled { - cli, connErr = r.mongosClientWithRole(ctx, cr, api.RoleClusterAdmin) + cli, connErr = r.MongoClient().Mongos(ctx, cr, api.RoleClusterAdmin) } else { - cli, connErr = r.mongoClientWithRole(ctx, cr, cr.Spec.Replsets[0], api.RoleClusterAdmin) + cli, connErr = r.MongoClient().Mongo(ctx, cr, cr.Spec.Replsets[0], api.RoleClusterAdmin) } if connErr != nil { diff --git a/pkg/controller/perconaservermongodb/finalizers.go b/pkg/controller/perconaservermongodb/finalizers.go index 544d475da2..1419a326ec 100644 --- a/pkg/controller/perconaservermongodb/finalizers.go +++ b/pkg/controller/perconaservermongodb/finalizers.go @@ -85,7 +85,7 @@ func (r *ReconcilePerconaServerMongoDB) checkFinalizers(ctx context.Context, cr } func (r *ReconcilePerconaServerMongoDB) deleteAllPITRChunks(ctx context.Context, cr *api.PerconaServerMongoDB) error { - pbmc, err := r.newPBM(ctx, r.client, cr) + pbmc, err := r.newPBMFunc(ctx, r.client, cr) if err != nil { return errors.Wrap(err, "new pbm") } diff --git a/pkg/controller/perconaservermongodb/mgo.go b/pkg/controller/perconaservermongodb/mgo.go index 783f6488ae..fa56ed8844 100644 --- a/pkg/controller/perconaservermongodb/mgo.go +++ b/pkg/controller/perconaservermongodb/mgo.go @@ -87,7 +87,7 @@ func (r *ReconcilePerconaServerMongoDB) reconcileCluster(ctx context.Context, cr } } - cli, err := r.mongoClientWithRole(ctx, cr, replset, api.RoleClusterAdmin) + cli, err := r.MongoClient().Mongo(ctx, cr, replset, api.RoleClusterAdmin) if err != nil { if cr.Spec.Unmanaged { return api.AppStateInit, nil, nil @@ -185,7 +185,7 @@ func (r *ReconcilePerconaServerMongoDB) reconcileCluster(ctx context.Context, cr replset.ClusterRole == api.ClusterRoleShardSvr && len(mongosPods) > 0 && cr.Spec.Sharding.Mongos.Size > 0 { - mongosSession, err := r.mongosClientWithRole(ctx, cr, api.RoleClusterAdmin) + mongosSession, err := r.MongoClient().Mongos(ctx, cr, api.RoleClusterAdmin) if err != nil { return api.AppStateError, nil, errors.Wrap(err, "failed to get mongos connection") } @@ -569,7 +569,7 @@ func (r *ReconcilePerconaServerMongoDB) removeRSFromShard(ctx context.Context, c return nil } - cli, err := r.mongosClientWithRole(ctx, cr, api.RoleClusterAdmin) + cli, err := r.MongoClient().Mongos(ctx, cr, api.RoleClusterAdmin) if err != nil { return errors.Errorf("failed to get mongos connection: %v", err) } @@ -619,7 +619,7 @@ func (r *ReconcilePerconaServerMongoDB) handleRsAddToShard(ctx context.Context, return errors.Wrapf(err, "get rsPod %s host", rspod.Name) } - cli, err := r.mongosClientWithRole(ctx, cr, api.RoleClusterAdmin) + cli, err := r.MongoClient().Mongos(ctx, cr, api.RoleClusterAdmin) if err != nil { return errors.Wrap(err, "failed to get mongos client") } @@ -722,7 +722,7 @@ func (r *ReconcilePerconaServerMongoDB) handleReplsetInit(ctx context.Context, c time.Sleep(time.Second * 5) log.Info("creating user admin", "replset", replsetName, "pod", pod.Name, "user", api.RoleUserAdmin) - userAdmin, err := getInternalCredentials(ctx, r.client, cr, api.RoleUserAdmin) + userAdmin, err := psmdb.GetCredentials(ctx, r.client, cr, api.RoleUserAdmin) if err != nil { return nil, nil, errors.Wrap(err, "failed to get userAdmin credentials") } @@ -755,7 +755,7 @@ func (r *ReconcilePerconaServerMongoDB) handleReplicaSetNoPrimary(ctx context.Co } log.Info("Connecting to pod", "pod", pod.Name, "user", api.RoleClusterAdmin) - cli, err := r.standaloneClientWithRole(ctx, cr, replset, api.RoleClusterAdmin, pod) + cli, err := r.MongoClient().Standalone(ctx, cr, replset, api.RoleClusterAdmin, pod) if err != nil { return errors.Wrap(err, "get standalone mongo client") } @@ -920,7 +920,7 @@ func compareRoles(x []mongo.Role, y []mongo.Role) bool { func (r *ReconcilePerconaServerMongoDB) createOrUpdateSystemUsers(ctx context.Context, cr *api.PerconaServerMongoDB, replset *api.ReplsetSpec) error { log := logf.FromContext(ctx) - cli, err := r.mongoClientWithRole(ctx, cr, replset, api.RoleUserAdmin) + cli, err := r.MongoClient().Mongo(ctx, cr, replset, api.RoleUserAdmin) if err != nil { return errors.Wrap(err, "failed to get mongo client") } @@ -1011,7 +1011,7 @@ func (r *ReconcilePerconaServerMongoDB) createOrUpdateSystemUsers(ctx context.Co } for _, role := range users { - creds, err := getInternalCredentials(ctx, r.client, cr, role) + creds, err := psmdb.GetCredentials(ctx, r.client, cr, role) if err != nil { log.Error(err, "failed to get credentials", "role", role) continue diff --git a/pkg/controller/perconaservermongodb/psmdb_controller.go b/pkg/controller/perconaservermongodb/psmdb_controller.go index 47d8632a27..2a1867206d 100644 --- a/pkg/controller/perconaservermongodb/psmdb_controller.go +++ b/pkg/controller/perconaservermongodb/psmdb_controller.go @@ -88,13 +88,14 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { } return &ReconcilePerconaServerMongoDB{ + MongoProviderBase: psmdb.NewProviderBase(client, nil), client: client, - scheme: mgr.GetScheme(), + scheme: client.Scheme(), + newPBMFunc: backup.NewPBM, serverVersion: sv, reconcileIn: time.Second * 5, crons: NewCronRegistry(), lockers: newLockStore(), - newPBM: backup.NewPBM, restConfig: mgr.GetConfig(), newCertManagerCtrlFunc: tls.NewCertManagerController, @@ -172,21 +173,21 @@ func NewCronRegistry() CronRegistry { // ReconcilePerconaServerMongoDB reconciles a PerconaServerMongoDB object type ReconcilePerconaServerMongoDB struct { + psmdb.MongoProviderBase // This client, initialized using mgr.Client() above, is a split client // that reads objects from the cache and writes to the apiserver client client.Client scheme *runtime.Scheme restConfig *rest.Config - crons CronRegistry - clientcmd *clientcmd.Client - serverVersion *version.ServerVersion - reconcileIn time.Duration - mongoClientProvider MongoClientProvider + crons CronRegistry + clientcmd *clientcmd.Client + serverVersion *version.ServerVersion + reconcileIn time.Duration - newCertManagerCtrlFunc tls.NewCertManagerControllerFunc + newPBMFunc backup.NewPBMFunc - newPBM backup.NewPBMFunc + newCertManagerCtrlFunc tls.NewCertManagerControllerFunc initImage string @@ -871,7 +872,7 @@ func (r *ReconcilePerconaServerMongoDB) checkIfUserDataExistInRS(ctx context.Con return errors.Wrap(err, "failed to set port") } - mc, err := r.mongoClientWithRole(ctx, cr, rs, api.RoleClusterAdmin) + mc, err := r.MongoClient().Mongo(ctx, cr, rs, api.RoleClusterAdmin) if err != nil { return errors.Wrap(err, "dial:") } diff --git a/pkg/controller/perconaservermongodb/secrets.go b/pkg/controller/perconaservermongodb/secrets.go index c618b65089..d3e98395f4 100644 --- a/pkg/controller/perconaservermongodb/secrets.go +++ b/pkg/controller/perconaservermongodb/secrets.go @@ -10,58 +10,12 @@ import ( k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" api "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1" "github.com/percona/percona-server-mongodb-operator/pkg/naming" - "github.com/percona/percona-server-mongodb-operator/pkg/psmdb" "github.com/percona/percona-server-mongodb-operator/pkg/psmdb/secret" ) -func getUserSecret(ctx context.Context, cl client.Reader, cr *api.PerconaServerMongoDB, name string) (corev1.Secret, error) { - secrets := corev1.Secret{} - err := cl.Get(ctx, types.NamespacedName{Name: name, Namespace: cr.Namespace}, &secrets) - return secrets, errors.Wrap(err, "get user secrets") -} - -func getInternalCredentials(ctx context.Context, cl client.Reader, cr *api.PerconaServerMongoDB, role api.SystemUserRole) (psmdb.Credentials, error) { - return getCredentials(ctx, cl, cr, api.UserSecretName(cr), role) -} - -func getCredentials(ctx context.Context, cl client.Reader, cr *api.PerconaServerMongoDB, name string, role api.SystemUserRole) (psmdb.Credentials, error) { - creds := psmdb.Credentials{} - usersSecret, err := getUserSecret(ctx, cl, cr, name) - if err != nil { - return creds, errors.Wrap(err, "failed to get user secret") - } - - switch role { - case api.RoleDatabaseAdmin: - creds.Username = string(usersSecret.Data[api.EnvMongoDBDatabaseAdminUser]) - creds.Password = string(usersSecret.Data[api.EnvMongoDBDatabaseAdminPassword]) - case api.RoleClusterAdmin: - creds.Username = string(usersSecret.Data[api.EnvMongoDBClusterAdminUser]) - creds.Password = string(usersSecret.Data[api.EnvMongoDBClusterAdminPassword]) - case api.RoleUserAdmin: - creds.Username = string(usersSecret.Data[api.EnvMongoDBUserAdminUser]) - creds.Password = string(usersSecret.Data[api.EnvMongoDBUserAdminPassword]) - case api.RoleClusterMonitor: - creds.Username = string(usersSecret.Data[api.EnvMongoDBClusterMonitorUser]) - creds.Password = string(usersSecret.Data[api.EnvMongoDBClusterMonitorPassword]) - case api.RoleBackup: - creds.Username = string(usersSecret.Data[api.EnvMongoDBBackupUser]) - creds.Password = string(usersSecret.Data[api.EnvMongoDBBackupPassword]) - default: - return creds, errors.Errorf("not implemented for role: %s", role) - } - - if creds.Username == "" || creds.Password == "" { - return creds, errors.Errorf("can't find credentials for role %s", role) - } - - return creds, nil -} - func (r *ReconcilePerconaServerMongoDB) reconcileUsersSecret(ctx context.Context, cr *api.PerconaServerMongoDB) error { secretObj := corev1.Secret{} err := r.client.Get(ctx, diff --git a/pkg/controller/perconaservermongodb/smart.go b/pkg/controller/perconaservermongodb/smart.go index 08040f9335..c9cb2a7837 100644 --- a/pkg/controller/perconaservermongodb/smart.go +++ b/pkg/controller/perconaservermongodb/smart.go @@ -151,7 +151,7 @@ func (r *ReconcilePerconaServerMongoDB) smartUpdate(ctx context.Context, cr *api return nil } - hasActiveJobs, err := backup.HasActiveJobs(ctx, r.newPBM, r.client, cr, backup.Job{}, backup.NotPITRLock) + hasActiveJobs, err := backup.HasActiveJobs(ctx, r.newPBMFunc, r.client, cr, backup.Job{}, backup.NotPITRLock) if err != nil { if cr.Status.State == api.AppStateError { log.Info("Failed to check active jobs. Proceeding with Smart Update because the cluster is in an error state", "error", err.Error()) @@ -202,7 +202,7 @@ func (r *ReconcilePerconaServerMongoDB) smartUpdate(ctx context.Context, cr *api if component != naming.ComponentNonVoting && component != naming.ComponentHidden && len(primaryPod.Name) > 0 { forceStepDown := replset.Size == 1 log.Info("doing step down...", "force", forceStepDown) - client, err := r.mongoClientWithRole(ctx, cr, replset, api.RoleClusterAdmin) + client, err := r.MongoClient().Mongo(ctx, cr, replset, api.RoleClusterAdmin) if err != nil { return fmt.Errorf("failed to get mongo client: %v", err) } @@ -346,7 +346,7 @@ func (r *ReconcilePerconaServerMongoDB) setPrimary(ctx context.Context, cr *api. func (r *ReconcilePerconaServerMongoDB) stepDownPod(ctx context.Context, cr *api.PerconaServerMongoDB, rs *api.ReplsetSpec, pod corev1.Pod, seconds int) error { log := logf.FromContext(ctx) - mgoClient, err := r.standaloneClientWithRole(ctx, cr, rs, api.RoleClusterAdmin, pod) + mgoClient, err := r.MongoClient().Standalone(ctx, cr, rs, api.RoleClusterAdmin, pod) if err != nil { return errors.Wrap(err, "failed to create standalone client") } @@ -365,7 +365,7 @@ func (r *ReconcilePerconaServerMongoDB) stepDownPod(ctx context.Context, cr *api func (r *ReconcilePerconaServerMongoDB) freezePod(ctx context.Context, cr *api.PerconaServerMongoDB, rs *api.ReplsetSpec, pod corev1.Pod, seconds int) error { log := logf.FromContext(ctx) - mgoClient, err := r.standaloneClientWithRole(ctx, cr, rs, api.RoleClusterAdmin, pod) + mgoClient, err := r.MongoClient().Standalone(ctx, cr, rs, api.RoleClusterAdmin, pod) if err != nil { return errors.Wrap(err, "failed to create standalone client") } @@ -384,7 +384,7 @@ func (r *ReconcilePerconaServerMongoDB) freezePod(ctx context.Context, cr *api.P func (r *ReconcilePerconaServerMongoDB) isPodPrimary(ctx context.Context, cr *api.PerconaServerMongoDB, pod corev1.Pod, rs *api.ReplsetSpec) (bool, error) { log := logf.FromContext(ctx) - mgoClient, err := r.standaloneClientWithRole(ctx, cr, rs, api.RoleClusterAdmin, pod) + mgoClient, err := r.MongoClient().Standalone(ctx, cr, rs, api.RoleClusterAdmin, pod) if err != nil { return false, errors.Wrap(err, "failed to create standalone client") } @@ -435,7 +435,7 @@ func (r *ReconcilePerconaServerMongoDB) smartMongosUpdate(ctx context.Context, c return nil } - hasActiveJobs, err := backup.HasActiveJobs(ctx, r.newPBM, r.client, cr, backup.Job{}, backup.NotPITRLock) + hasActiveJobs, err := backup.HasActiveJobs(ctx, r.newPBMFunc, r.client, cr, backup.Job{}, backup.NotPITRLock) if err != nil { return errors.Wrap(err, "failed to check active jobs") } diff --git a/pkg/controller/perconaservermongodb/status_test.go b/pkg/controller/perconaservermongodb/status_test.go index e2d6765d94..d519ff7679 100644 --- a/pkg/controller/perconaservermongodb/status_test.go +++ b/pkg/controller/perconaservermongodb/status_test.go @@ -14,6 +14,7 @@ import ( mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" api "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1" + "github.com/percona/percona-server-mongodb-operator/pkg/psmdb" fakeBackup "github.com/percona/percona-server-mongodb-operator/pkg/psmdb/backup/fake" faketls "github.com/percona/percona-server-mongodb-operator/pkg/psmdb/tls/fake" "github.com/percona/percona-server-mongodb-operator/pkg/version" @@ -38,10 +39,11 @@ func buildFakeClient(objs ...client.Object) *ReconcilePerconaServerMongoDB { cl := fake.NewClientBuilder().WithScheme(s).WithObjects(objs...).WithStatusSubresource(objs...).Build() return &ReconcilePerconaServerMongoDB{ + MongoProviderBase: psmdb.NewProviderBase(cl, nil), + newPBMFunc: fakeBackup.NewPBM, client: cl, scheme: s, lockers: newLockStore(), - newPBM: fakeBackup.NewPBM, newCertManagerCtrlFunc: faketls.NewCertManagerController, } } diff --git a/pkg/controller/perconaservermongodb/suite_test.go b/pkg/controller/perconaservermongodb/suite_test.go index 1d89bb4248..adf65efab4 100644 --- a/pkg/controller/perconaservermongodb/suite_test.go +++ b/pkg/controller/perconaservermongodb/suite_test.go @@ -80,11 +80,11 @@ func reconciler() *ReconcilePerconaServerMongoDB { return (&ReconcilePerconaServerMongoDB{ client: k8sClient, scheme: k8sClient.Scheme(), + newPBMFunc: backup.NewPBM, crons: NewCronRegistry(), lockers: newLockStore(), clientcmd: cli, restConfig: cfg, - newPBM: backup.NewPBM, newCertManagerCtrlFunc: tls.NewCertManagerController, serverVersion: &version.ServerVersion{ Platform: version.PlatformKubernetes, diff --git a/pkg/controller/perconaservermongodb/users.go b/pkg/controller/perconaservermongodb/users.go index 05105bd8d3..21eda2b6a6 100644 --- a/pkg/controller/perconaservermongodb/users.go +++ b/pkg/controller/perconaservermongodb/users.go @@ -319,7 +319,7 @@ func (r *ReconcilePerconaServerMongoDB) updateUsers(ctx context.Context, cr *api for i := range repls { replset := repls[i] grp.Go(func() error { - client, err := r.mongoClientWithRole(gCtx, cr, replset, api.RoleUserAdmin) + client, err := r.MongoClient().Mongo(gCtx, cr, replset, api.RoleUserAdmin) if err != nil { return errors.Wrap(err, "dial:") } diff --git a/pkg/controller/perconaservermongodb/version.go b/pkg/controller/perconaservermongodb/version.go index cc89fee9c0..69f8ad36d9 100644 --- a/pkg/controller/perconaservermongodb/version.go +++ b/pkg/controller/perconaservermongodb/version.go @@ -579,7 +579,7 @@ func (r *ReconcilePerconaServerMongoDB) fetchVersionFromMongo(ctx context.Contex return nil } - session, err := r.mongoClientWithRole(ctx, cr, replset, api.RoleClusterAdmin) + session, err := r.MongoClient().Mongo(ctx, cr, replset, api.RoleClusterAdmin) if err != nil { return errors.Wrap(err, "dial") } diff --git a/pkg/controller/perconaservermongodbbackup/perconaservermongodbbackup_controller.go b/pkg/controller/perconaservermongodbbackup/perconaservermongodbbackup_controller.go index 48cc4480be..f5bc4682b3 100644 --- a/pkg/controller/perconaservermongodbbackup/perconaservermongodbbackup_controller.go +++ b/pkg/controller/perconaservermongodbbackup/perconaservermongodbbackup_controller.go @@ -59,8 +59,8 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { client: mgr.GetClient(), apiReader: mgr.GetAPIReader(), scheme: mgr.GetScheme(), - newPBMFunc: backup.NewPBM, clientcmd: cli, + newPBMFunc: backup.NewPBM, }, nil } diff --git a/pkg/controller/perconaservermongodbrestore/perconaservermongodbrestore_controller.go b/pkg/controller/perconaservermongodbrestore/perconaservermongodbrestore_controller.go index 38e8904100..510d8b1041 100644 --- a/pkg/controller/perconaservermongodbrestore/perconaservermongodbrestore_controller.go +++ b/pkg/controller/perconaservermongodbrestore/perconaservermongodbrestore_controller.go @@ -52,10 +52,11 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { } return &ReconcilePerconaServerMongoDBRestore{ - client: mgr.GetClient(), - scheme: mgr.GetScheme(), - clientcmd: cli, - newPBMFunc: backup.NewPBM, + MongoProviderBase: psmdb.NewProviderBase(mgr.GetClient(), nil), + client: mgr.GetClient(), + scheme: mgr.GetScheme(), + clientcmd: cli, + newPBMFunc: backup.NewPBM, }, nil } @@ -81,6 +82,8 @@ var _ reconcile.Reconciler = &ReconcilePerconaServerMongoDBRestore{} type ReconcilePerconaServerMongoDBRestore struct { // This client, initialized using mgr.Client() above, is a split client // that reads objects from the cache and writes to the apiserver + psmdb.MongoProviderBase + client client.Client scheme *runtime.Scheme clientcmd *clientcmd.Client diff --git a/pkg/controller/perconaservermongodbrestore/physical.go b/pkg/controller/perconaservermongodbrestore/physical.go index 6f522f9077..7bb233d896 100644 --- a/pkg/controller/perconaservermongodbrestore/physical.go +++ b/pkg/controller/perconaservermongodbrestore/physical.go @@ -4,15 +4,18 @@ import ( "bytes" "context" "encoding/json" + stdError "errors" "fmt" "strconv" "strings" "time" "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/x/mongo/driver/topology" "gopkg.in/yaml.v2" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" @@ -90,42 +93,43 @@ func (r *ReconcilePerconaServerMongoDBRestore) reconcilePhysicalRestore( } status.State = psmdbv1.RestoreStateWaiting + return status, nil } - if err := r.prepareStatefulSetsForPhysicalRestore(ctx, cluster); err != nil { - return status, errors.Wrap(err, "prepare statefulsets for physical restore") - } - - sfsReady, err := r.checkIfStatefulSetsAreReadyForPhysicalRestore(ctx, cluster) - if err != nil { - return status, errors.Wrap(err, "check if statefulsets are ready for physical restore") - } + stdoutBuf := &bytes.Buffer{} + stderrBuf := &bytes.Buffer{} - if (!sfsReady && cr.Status.State != psmdbv1.RestoreStateRunning) || cr.Status.State == psmdbv1.RestoreStateNew { - log.Info("Waiting for statefulsets to be ready before restore", "ready", sfsReady) - return status, nil - } + if cr.Status.State == psmdbv1.RestoreStateWaiting { + if err := r.prepareStatefulSetsForPhysicalRestore(ctx, cluster); err != nil { + return status, errors.Wrap(err, "prepare statefulsets for physical restore") + } - if cr.Status.State == psmdbv1.RestoreStateWaiting && sfsReady && cr.Spec.PITR != nil { - rsReady, err := r.checkIfReplsetsAreReadyForPhysicalRestore(ctx, cluster) + sfsReady, err := r.checkIfStatefulSetsAreReadyForPhysicalRestore(ctx, cluster) if err != nil { - return status, errors.Wrap(err, "check if replsets are ready for physical restore") + return status, errors.Wrap(err, "check if statefulsets are ready for physical restore") } - if !rsReady { - if err := r.prepareReplsetsForPhysicalRestore(ctx, cluster); err != nil { - return status, errors.Wrap(err, "prepare replsets for physical restore") - } - - log.Info("Waiting for replsets to be ready before restore", "ready", rsReady) + if !sfsReady { + log.Info("Waiting for statefulsets to be ready before restore", "ready", sfsReady) return status, nil } - } - stdoutBuf := &bytes.Buffer{} - stderrBuf := &bytes.Buffer{} + if cr.Spec.PITR != nil { + rsReady, err := r.checkIfReplsetsAreReadyForPhysicalRestore(ctx, cluster) + if err != nil { + return status, errors.Wrap(err, "check if replsets are ready for physical restore") + } + + if !rsReady { + if err := r.prepareReplsetsForPhysicalRestore(ctx, cluster); err != nil { + return status, errors.Wrap(err, "prepare replsets for physical restore") + } + + log.Info("Waiting for replsets to be ready before restore", "ready", rsReady) + return status, nil + } + } - if cr.Status.State == psmdbv1.RestoreStateWaiting { rs := replsets[0] pbmAgentsReady, err := r.checkIfPBMAgentsReadyForPhysicalRestore(ctx, cluster) @@ -187,171 +191,318 @@ func (r *ReconcilePerconaServerMongoDBRestore) reconcilePhysicalRestore( return status, nil } - meta := backup.BackupMeta{} + pod := corev1.Pod{} + if err := r.client.Get(ctx, types.NamespacedName{Name: replsets[0].PodName(cluster, 0), Namespace: cluster.Namespace}, &pod); err != nil { + if k8serrors.IsNotFound(err) { + return status, nil + } + return status, errors.Wrap(err, "get pod") + } + if !pod.DeletionTimestamp.IsZero() { + return status, nil + } - err = retry.OnError(retry.DefaultBackoff, func(err error) bool { - return strings.Contains(err.Error(), "container is not created or running") || - strings.Contains(err.Error(), "error dialing backend: No agent available") || - strings.Contains(err.Error(), "unable to upgrade connection") || - strings.Contains(err.Error(), "unmarshal PBM describe-restore output") - }, func() error { - stdoutBuf.Reset() - stderrBuf.Reset() + if !hasContainerName(pod.Spec.Containers, naming.ContainerBackupAgent) { + meta := backup.BackupMeta{} + notFound := false - command := []string{ - "/opt/percona/pbm", "describe-restore", cr.Status.PBMname, - "--config", "/etc/pbm/pbm_config.yaml", - "--out", "json", - } + err = retry.OnError(retry.DefaultBackoff, func(err error) bool { + return (strings.Contains(err.Error(), "container is not created or running") || + strings.Contains(err.Error(), "error dialing backend: No agent available") || + strings.Contains(err.Error(), "unable to upgrade connection") || + strings.Contains(err.Error(), "unmarshal PBM describe-restore output")) + }, func() error { + stdoutBuf.Reset() + stderrBuf.Reset() - pod := corev1.Pod{} - if err := r.client.Get(ctx, types.NamespacedName{Name: replsets[0].PodName(cluster, 0), Namespace: cluster.Namespace}, &pod); err != nil { - return errors.Wrap(err, "get pod") + command := []string{ + "/opt/percona/pbm", "describe-restore", cr.Status.PBMname, + "--config", "/etc/pbm/pbm_config.yaml", + "--out", "json", + } + + log.V(1).Info("Check restore status", "command", command, "pod", pod.Name) + + if err := r.clientcmd.Exec(ctx, &pod, "mongod", command, nil, stdoutBuf, stderrBuf, false); err != nil { + return errors.Wrapf(err, "describe restore stderr: %s stdout: %s", stderrBuf.String(), stdoutBuf.String()) + } + + return nil + }) + if err != nil { + return status, err } - log.V(1).Info("Check restore status", "command", command, "pod", pod.Name) + if notFound { + return status, nil + } - if err := r.clientcmd.Exec(ctx, &pod, "mongod", command, nil, stdoutBuf, stderrBuf, false); err != nil { - return errors.Wrapf(err, "describe restore stderr: %s stdout: %s", stderrBuf.String(), stdoutBuf.String()) + if err := json.Unmarshal(stdoutBuf.Bytes(), &meta); err != nil { + return status, errors.Wrap(err, "unmarshal PBM describe-restore output") } - return nil - }) - if err != nil { - return status, err - } + log.V(1).Info("PBM restore status", "status", meta) - if err := json.Unmarshal(stdoutBuf.Bytes(), &meta); err != nil { - return status, errors.Wrap(err, "unmarshal PBM describe-restore output") - } + restoreIsDone := false + switch meta.Status { + case defs.StatusStarting: + for _, rs := range meta.Replsets { + if rs.Status == defs.StatusRunning { + status.State = psmdbv1.RestoreStateRunning + return status, nil + } + } + case defs.StatusError: + status.State = psmdbv1.RestoreStateError + status.Error = meta.Err + case defs.StatusRunning: + status.State = psmdbv1.RestoreStateRunning + case defs.StatusDone: + for _, rs := range meta.Replsets { + if rs.Status == defs.StatusDone { + continue + } - log.V(1).Info("PBM restore status", "status", meta) + log.Info("Waiting replset restore to finish", "replset", rs.Name, "status", rs.Status) - switch meta.Status { - case defs.StatusStarting: - for _, rs := range meta.Replsets { - if rs.Status == defs.StatusRunning { status.State = psmdbv1.RestoreStateRunning return status, nil } - } - case defs.StatusError: - status.State = psmdbv1.RestoreStateError - status.Error = meta.Err - case defs.StatusRunning: - status.State = psmdbv1.RestoreStateRunning - case defs.StatusDone: - for _, rs := range meta.Replsets { - if rs.Status == defs.StatusDone { - continue - } - log.Info("Waiting replset restore to finish", "replset", rs.Name, "status", rs.Status) + restoreIsDone = true + } - status.State = psmdbv1.RestoreStateRunning + if !restoreIsDone { return status, nil } - status.State = psmdbv1.RestoreStateReady + if err := r.iterateOverMongodSts(ctx, cluster, func(s *appsv1.StatefulSet) error { + if err := r.client.Delete(ctx, s); err != nil { + return errors.Wrapf(err, "delete statefulset %s", s) + } + return nil + }); client.IgnoreNotFound(err) != nil { + return status, err + } + return status, nil + } + + finished, err := r.finishPhysicalRestore(ctx, cluster) + if err != nil { + log.Error(err, "Failed to recover the cluster after the restore") + return status, nil + } + if !finished { + return status, nil } - if status.State == psmdbv1.RestoreStateReady { - replsets := cluster.Spec.Replsets - if cluster.Spec.Sharding.Enabled { - replsets = append(replsets, cluster.Spec.Sharding.ConfigsvrReplSet) - } + status.State = psmdbv1.RestoreStateReady - for _, rs := range replsets { - stsName := naming.MongodStatefulSetName(cluster, rs) + return status, nil +} - log.Info("Deleting statefulset", "statefulset", stsName) +func hasContainerName(containers []corev1.Container, name string) bool { + for _, c := range containers { + if c.Name == name { + return true + } + } + return false +} - sts := appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: stsName, - Namespace: cluster.Namespace, - }, +func (r *ReconcilePerconaServerMongoDBRestore) finishPhysicalRestore(ctx context.Context, cluster *api.PerconaServerMongoDB) (bool, error) { + stsIsUpdated := true + if err := r.iterateOverMongodSts(ctx, cluster, func(s *appsv1.StatefulSet) error { + sts := new(appsv1.StatefulSet) + if err := r.client.Get(ctx, client.ObjectKeyFromObject(s), sts); err != nil { + return err + } + if !sts.DeletionTimestamp.IsZero() { + return nil + } + + if sts.Labels[naming.LabelKubernetesComponent] != naming.ComponentArbiter { + if !hasContainerName(sts.Spec.Template.Spec.Containers, naming.ContainerBackupAgent) { + return errors.Errorf("statefulset %s wasn't deleted", sts.Name) } + } - if err := r.client.Delete(ctx, &sts); err != nil { - return status, errors.Wrapf(err, "delete statefulset %s", stsName) + if sts.Annotations[psmdbv1.AnnotationRestoreInProgress] == "true" { + return nil + } + + stsIsUpdated = false + + return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + if err := r.client.Get(ctx, client.ObjectKeyFromObject(s), sts); err != nil { + return err } - if rs.NonVoting.Enabled { - stsName := naming.NonVotingStatefulSetName(cluster, rs) + sts.Annotations[psmdbv1.AnnotationRestoreInProgress] = "true" - log.Info("Deleting statefulset", "statefulset", stsName) + return r.client.Update(ctx, sts) + }) + }); client.IgnoreNotFound(err) != nil { + return false, errors.Wrap(err, "delete restore in progress annotation") + } + if !stsIsUpdated { + return false, nil + } - sts := appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: stsName, - Namespace: cluster.Namespace, - }, - } + ready := true + err := r.iterateOverMongodSts(ctx, cluster, func(s *appsv1.StatefulSet) error { + if !ready { + return nil + } + + var err error + ready, err = r.isStatefulSetReady(ctx, cluster, s) + return err + }) + if client.IgnoreNotFound(err) != nil { + return false, err + } + if !ready || k8serrors.IsNotFound(err) { + return false, err + } - if err := r.client.Delete(ctx, &sts); err != nil { - return status, errors.Wrapf(err, "delete statefulset %s", stsName) + wait := false + if err := r.iterateOverMongodSts(ctx, cluster, func(s *appsv1.StatefulSet) error { + if s.Labels[naming.LabelKubernetesComponent] != naming.ComponentMongod { + return nil + } + rs := cluster.Spec.Replset(s.Labels[naming.LabelKubernetesReplset]) + c, err := r.MongoClient().Mongo(ctx, cluster, rs, api.RoleClusterAdmin) + if err != nil { + wait = true + + if errors.Is(err, topology.ErrServerSelectionTimeout) && strings.Contains(err.Error(), "ReplicaSetNoPrimary") { + pods, err := r.getReplsetPods(ctx, cluster, rs, s.Labels[naming.LabelKubernetesComponent]) + if err != nil { + return err + } + updateConfig := func(pod corev1.Pod) error { + cli, err := r.MongoClient().Standalone(ctx, cluster, rs, api.RoleClusterAdmin, pod) + if err != nil { + return nil + } + defer func() { _ = cli.Disconnect(ctx) }() + + cfg, err := cli.ReadConfig(ctx) + if err != nil { + return errors.Wrap(err, "read replset config") + } + + if err := cli.WriteConfig(ctx, cfg, true); err != nil { + return errors.Wrap(err, "reconfigure replset") + } + return nil + } + for _, pod := range pods.Items { + if err := updateConfig(pod); err != nil { + return err + } } + return nil + } else { + return err } + } + return c.Disconnect(ctx) + }); err != nil { + return false, err + } + if wait { + return false, nil + } - if rs.Hidden.Enabled { - stsName := naming.HiddenStatefulSetName(cluster, rs) + if err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { + c := new(psmdbv1.PerconaServerMongoDB) + if err := r.client.Get(ctx, client.ObjectKeyFromObject(cluster), c); err != nil { + return err + } - log.Info("Deleting statefulset", "statefulset", stsName) + if c.Annotations == nil { + c.Annotations = make(map[string]string) + } + if c.Annotations[psmdbv1.AnnotationResyncPBM] == "true" { + return nil + } - sts := appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: stsName, - Namespace: cluster.Namespace, - }, - } + c.Annotations[psmdbv1.AnnotationResyncPBM] = "true" - if err := r.client.Delete(ctx, &sts); err != nil { - return status, errors.Wrapf(err, "delete statefulset %s", stsName) - } - } + return r.client.Update(ctx, c) + }); err != nil { + return false, errors.Wrapf(err, "annotate psmdb/%s for PBM resync", cluster.Name) + } + + if err := r.updateMongodSts(ctx, cluster, func(sts *appsv1.StatefulSet) error { + if sts.Annotations[psmdbv1.AnnotationRestoreInProgress] == "true" { + delete(sts.Annotations, psmdbv1.AnnotationRestoreInProgress) + } + return nil + }); err != nil { + return false, errors.Wrap(err, "delete restore in progress annotation") + } - if rs.Arbiter.Enabled { - stsName := naming.ArbiterStatefulSetName(cluster, rs) + return true, nil +} - log.Info("Deleting statefulset", "statefulset", stsName) +func (r *ReconcilePerconaServerMongoDBRestore) iterateOverMongodSts(ctx context.Context, cluster *psmdbv1.PerconaServerMongoDB, itFunc func(s *appsv1.StatefulSet) error) error { + replsets := cluster.Spec.Replsets + if cluster.Spec.Sharding.Enabled { + replsets = append(replsets, cluster.Spec.Sharding.ConfigsvrReplSet) + } - sts := appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: stsName, - Namespace: cluster.Namespace, - }, - } + var errList []error + for _, rs := range replsets { + stsList := []string{naming.MongodStatefulSetName(cluster, rs)} + if rs.NonVoting.Enabled { + stsList = append(stsList, naming.NonVotingStatefulSetName(cluster, rs)) + } + if rs.Arbiter.Enabled { + stsList = append(stsList, naming.ArbiterStatefulSetName(cluster, rs)) + } + if rs.Hidden.Enabled { + stsList = append(stsList, naming.HiddenStatefulSetName(cluster, rs)) + } - if err := r.client.Delete(ctx, &sts); err != nil { - return status, errors.Wrapf(err, "delete statefulset %s", stsName) - } + var rsErrList []error + for _, sts := range stsList { + s := new(appsv1.StatefulSet) + if err := r.client.Get(ctx, types.NamespacedName{Name: sts, Namespace: cluster.Namespace}, s); err != nil { + return err + } + if err := itFunc(s); err != nil { + rsErrList = append(rsErrList, err) } } - err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - c := &psmdbv1.PerconaServerMongoDB{} - err := r.client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, c) - if err != nil { + if len(rsErrList) > 0 { + errList = append(errList, errors.Wrapf(stdError.Join(rsErrList...), "failed to edit statefulsets for replset %s", rs.Name)) + } + } + if len(errList) > 0 { + return stdError.Join(errList...) + } + return nil +} + +func (r *ReconcilePerconaServerMongoDBRestore) updateMongodSts(ctx context.Context, cluster *psmdbv1.PerconaServerMongoDB, updateFunc func(s *appsv1.StatefulSet) error) error { + return r.iterateOverMongodSts(ctx, cluster, func(s *appsv1.StatefulSet) error { + return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + sts := new(appsv1.StatefulSet) + if err := r.client.Get(ctx, client.ObjectKeyFromObject(s), sts); err != nil { return err } - orig := c.DeepCopy() - - if c.Annotations == nil { - c.Annotations = make(map[string]string) + if err := updateFunc(sts); err != nil { + return err } - c.Annotations[psmdbv1.AnnotationResyncPBM] = "true" - return r.client.Patch(ctx, c, client.MergeFrom(orig)) + return r.client.Update(ctx, sts) }) - if err != nil { - return status, errors.Wrapf(err, "annotate psmdb/%s for PBM resync", cluster.Name) - } - - } - - return status, nil + }) } // updateStatefulSetForPhysicalRestore updates the StatefulSet to prepare it for a physical restore of PerconaServerMongoDB. @@ -363,7 +514,8 @@ func (r *ReconcilePerconaServerMongoDBRestore) reconcilePhysicalRestore( // - Adjusting the primary container's command, environment variables, and volume mounts for the restore process. // It returns an error if there's any issue during the update or if the backup-agent container is not found. func (r *ReconcilePerconaServerMongoDBRestore) updateStatefulSetForPhysicalRestore( - ctx context.Context, cluster *psmdbv1.PerconaServerMongoDB, namespacedName types.NamespacedName, port int32) error { + ctx context.Context, cluster *psmdbv1.PerconaServerMongoDB, namespacedName types.NamespacedName, port int32, +) error { log := logf.FromContext(ctx) sts := appsv1.StatefulSet{} @@ -519,98 +671,29 @@ func (r *ReconcilePerconaServerMongoDBRestore) updateStatefulSetForPhysicalResto func (r *ReconcilePerconaServerMongoDBRestore) prepareStatefulSetsForPhysicalRestore(ctx context.Context, cluster *psmdbv1.PerconaServerMongoDB) error { log := logf.FromContext(ctx) - replsets := cluster.Spec.Replsets - if cluster.Spec.Sharding.Enabled { - replsets = append(replsets, cluster.Spec.Sharding.ConfigsvrReplSet) - } - - for _, rs := range replsets { - stsName := naming.MongodStatefulSetName(cluster, rs) - - sts := appsv1.StatefulSet{} - nn := types.NamespacedName{Namespace: cluster.Namespace, Name: stsName} - err := r.client.Get(ctx, nn, &sts) - if err != nil { - return err - } - + if err := r.updateMongodSts(ctx, cluster, func(sts *appsv1.StatefulSet) error { _, ok := sts.Annotations[psmdbv1.AnnotationRestoreInProgress] if ok { - continue - } - - log.Info("Preparing statefulset for physical restore", "name", stsName) - - err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { - return r.updateStatefulSetForPhysicalRestore(ctx, cluster, types.NamespacedName{Namespace: cluster.Namespace, Name: stsName}, rs.GetPort()) - }) - if err != nil { - return errors.Wrapf(err, "prepare statefulset %s for physical restore", stsName) - } - - if rs.NonVoting.Enabled { - stsName := naming.NonVotingStatefulSetName(cluster, rs) - nn := types.NamespacedName{Namespace: cluster.Namespace, Name: stsName} - - log.Info("Preparing statefulset for physical restore", "name", stsName) - - err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { - return r.updateStatefulSetForPhysicalRestore(ctx, cluster, nn, rs.GetPort()) - }) - if err != nil { - return errors.Wrapf(err, "prepare statefulset %s for physical restore", stsName) - } + return nil } + log.Info("Preparing statefulset for physical restore", "name", sts.Name) - if rs.Hidden.Enabled { - stsName := naming.HiddenStatefulSetName(cluster, rs) - nn := types.NamespacedName{Namespace: cluster.Namespace, Name: stsName} + if sts.Labels[naming.LabelKubernetesComponent] == naming.ComponentArbiter { + zero := int32(0) - log.Info("Preparing statefulset for physical restore", "name", stsName) + sts.Spec.Replicas = &zero - err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { - return r.updateStatefulSetForPhysicalRestore(ctx, cluster, nn, rs.GetPort()) - }) - if err != nil { - return errors.Wrapf(err, "prepare statefulset %s for physical restore", stsName) + if sts.Annotations == nil { + sts.Annotations = make(map[string]string) } + sts.Annotations[psmdbv1.AnnotationRestoreInProgress] = "true" + return nil } - if rs.Arbiter.Enabled { - stsName := naming.ArbiterStatefulSetName(cluster, rs) - nn := types.NamespacedName{Namespace: cluster.Namespace, Name: stsName} - - log.Info("Preparing statefulset for physical restore", "name", stsName) - - err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { - sts := appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: stsName, - Namespace: cluster.Namespace, - }, - } - - err := r.client.Get(ctx, nn, &sts) - if err != nil { - return err - } - - orig := sts.DeepCopy() - zero := int32(0) - - sts.Spec.Replicas = &zero - - if sts.Annotations == nil { - sts.Annotations = make(map[string]string) - } - sts.Annotations[psmdbv1.AnnotationRestoreInProgress] = "true" - - return r.client.Patch(ctx, &sts, client.MergeFrom(orig)) - }) - if err != nil { - return errors.Wrapf(err, "prepare statefulset %s for physical restore", stsName) - } - } + rs := cluster.Spec.Replset(sts.Labels[naming.LabelKubernetesReplset]) + return r.updateStatefulSetForPhysicalRestore(ctx, cluster, client.ObjectKeyFromObject(sts), rs.GetPort()) + }); err != nil { + return errors.Wrap(err, "update replset sts") } return nil @@ -873,91 +956,68 @@ func (r *ReconcilePerconaServerMongoDBRestore) getReplsetPods( } func (r *ReconcilePerconaServerMongoDBRestore) checkIfStatefulSetsAreReadyForPhysicalRestore(ctx context.Context, cluster *psmdbv1.PerconaServerMongoDB) (bool, error) { - replsets := cluster.Spec.Replsets - if cluster.Spec.Sharding.Enabled { - replsets = append(replsets, cluster.Spec.Sharding.ConfigsvrReplSet) - } - - for _, rs := range replsets { - ready, err := r.checkStatefulSetForPhysicalRestore(ctx, cluster, rs, naming.ComponentMongod) - if err != nil { - return false, errors.Wrapf(err, "check %s %s statefulset", rs.Name, naming.ComponentMongod) - } - - if !ready { - return false, nil + ready := true + if err := r.iterateOverMongodSts(ctx, cluster, func(s *appsv1.StatefulSet) error { + if s.Labels[naming.LabelKubernetesComponent] == naming.ComponentArbiter || !ready { + return nil } - - if rs.NonVoting.Enabled { - ready, err := r.checkStatefulSetForPhysicalRestore(ctx, cluster, rs, naming.ComponentNonVoting) - if err != nil { - return false, errors.Wrapf(err, "check %s %s statefulset", rs.Name, naming.ComponentNonVoting) - } - - if !ready { - return false, nil - } + var err error + ready, err = r.checkStatefulSetForPhysicalRestore(ctx, cluster, s) + if err != nil { + return errors.Wrapf(err, "check %s %s statefulset", s.Labels[naming.LabelKubernetesReplset], s.Labels[naming.LabelKubernetesComponent]) } + return nil + }); err != nil { + return false, err + } + return ready, nil +} - if rs.Hidden.Enabled { - ready, err := r.checkStatefulSetForPhysicalRestore(ctx, cluster, rs, naming.ComponentHidden) - if err != nil { - return false, errors.Wrapf(err, "check %s %s statefulset", rs.Name, naming.ComponentHidden) - } +func (r *ReconcilePerconaServerMongoDBRestore) isStatefulSetReady(ctx context.Context, cluster *psmdbv1.PerconaServerMongoDB, sts *appsv1.StatefulSet) (bool, error) { + if sts.Status.Replicas != sts.Status.ReadyReplicas { + return false, nil + } - if !ready { - return false, nil - } + rs := cluster.Spec.Replset(sts.Labels[naming.LabelKubernetesReplset]) + podList, err := r.getReplsetPods(ctx, cluster, rs, sts.Labels[naming.LabelKubernetesComponent]) + if err != nil { + return false, errors.Wrapf(err, "get replset %s pods", sts.Labels[naming.LabelKubernetesReplset]) + } + for _, pod := range podList.Items { + if pod.Labels["controller-revision-hash"] != sts.Status.UpdateRevision { + return false, nil } } - return true, nil } func (r *ReconcilePerconaServerMongoDBRestore) checkStatefulSetForPhysicalRestore( ctx context.Context, cluster *psmdbv1.PerconaServerMongoDB, - rs *psmdbv1.ReplsetSpec, - component string, + sts *appsv1.StatefulSet, ) (bool, error) { log := logf.FromContext(ctx) - var stsName string - switch component { - case naming.ComponentMongod: - stsName = naming.MongodStatefulSetName(cluster, rs) - case naming.ComponentNonVoting: - stsName = naming.NonVotingStatefulSetName(cluster, rs) - case naming.ComponentHidden: - stsName = naming.HiddenStatefulSetName(cluster, rs) - } - - sts := appsv1.StatefulSet{} - nn := types.NamespacedName{Namespace: cluster.Namespace, Name: stsName} - err := r.client.Get(ctx, nn, &sts) - if err != nil { - return false, err - } - _, ok := sts.Annotations[psmdbv1.AnnotationRestoreInProgress] if !ok { return false, nil } - if sts.Status.Replicas != sts.Status.ReadyReplicas { - return false, nil + ready, err := r.isStatefulSetReady(ctx, cluster, sts) + if err != nil { + return false, err + } + if !ready { + return ready, nil } - podList, err := r.getReplsetPods(ctx, cluster, rs, component) + rs := cluster.Spec.Replset(sts.Labels[naming.LabelKubernetesReplset]) + podList, err := r.getReplsetPods(ctx, cluster, rs, sts.Labels[naming.LabelKubernetesComponent]) if err != nil { - return false, errors.Wrapf(err, "get replset %s pods", rs.Name) + return false, errors.Wrapf(err, "get replset %s pods", sts.Labels[naming.LabelKubernetesReplset]) } for _, pod := range podList.Items { - if pod.ObjectMeta.Labels["controller-revision-hash"] != sts.Status.UpdateRevision { - return false, nil - } - for _, c := range pod.Spec.Containers { if c.Name == naming.ContainerBackupAgent { return false, nil @@ -967,7 +1027,7 @@ func (r *ReconcilePerconaServerMongoDBRestore) checkStatefulSetForPhysicalRestor log.V(1).Info("Pod is ready for physical restore", "pod", pod.Name) } - log.V(1).Info("Statefulset is ready for physical restore", "sts", sts.Name, "replset", rs.Name) + log.V(1).Info("Statefulset is ready for physical restore", "sts", sts.Name, "replset", sts.Labels[naming.LabelKubernetesReplset]) return true, nil } diff --git a/pkg/controller/perconaservermongodbrestore/validate.go b/pkg/controller/perconaservermongodbrestore/validate.go index 8d261e7c7f..43bee844f3 100644 --- a/pkg/controller/perconaservermongodbrestore/validate.go +++ b/pkg/controller/perconaservermongodbrestore/validate.go @@ -11,9 +11,7 @@ import ( "github.com/percona/percona-server-mongodb-operator/pkg/psmdb/backup" ) -var ( - errWaitingPBM = errors.New("waiting for pbm-agent") -) +var errWaitingPBM = errors.New("waiting for pbm-agent") func (r *ReconcilePerconaServerMongoDBRestore) validate(ctx context.Context, cr *psmdbv1.PerconaServerMongoDBRestore, cluster *psmdbv1.PerconaServerMongoDB) error { if cluster.Spec.Unmanaged { diff --git a/pkg/psmdb/client.go b/pkg/psmdb/client.go index 3c093cf8f4..d7f12d9e81 100644 --- a/pkg/psmdb/client.go +++ b/pkg/psmdb/client.go @@ -62,7 +62,7 @@ func MongoClient(ctx context.Context, k8sClient client.Client, cr *api.PerconaSe return mongo.Dial(conf) } -func MongosClient(ctx context.Context, k8sclient client.Client, cr *api.PerconaServerMongoDB, c Credentials) (mongo.Client, error) { +func mongosClient(ctx context.Context, k8sclient client.Client, cr *api.PerconaServerMongoDB, c Credentials) (mongo.Client, error) { hosts, err := GetMongosAddrs(ctx, k8sclient, cr, true) if err != nil { return nil, errors.Wrap(err, "get mongos addrs") @@ -85,7 +85,7 @@ func MongosClient(ctx context.Context, k8sclient client.Client, cr *api.PerconaS return mongo.Dial(&conf) } -func StandaloneClient(ctx context.Context, k8sclient client.Client, cr *api.PerconaServerMongoDB, c Credentials, host string, tlsEnabled bool) (mongo.Client, error) { +func standaloneClient(ctx context.Context, k8sclient client.Client, cr *api.PerconaServerMongoDB, c Credentials, host string, tlsEnabled bool) (mongo.Client, error) { conf := mongo.Config{ Hosts: []string{host}, Username: c.Username, diff --git a/pkg/psmdb/provider.go b/pkg/psmdb/provider.go new file mode 100644 index 0000000000..049bc28c09 --- /dev/null +++ b/pkg/psmdb/provider.go @@ -0,0 +1,114 @@ +package psmdb + +import ( + "context" + + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + api "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1" + "github.com/percona/percona-server-mongodb-operator/pkg/psmdb/mongo" +) + +type MongoClientProvider interface { + Mongo(ctx context.Context, cr *api.PerconaServerMongoDB, rs *api.ReplsetSpec, role api.SystemUserRole) (mongo.Client, error) + Mongos(ctx context.Context, cr *api.PerconaServerMongoDB, role api.SystemUserRole) (mongo.Client, error) + Standalone(ctx context.Context, cr *api.PerconaServerMongoDB, rs *api.ReplsetSpec, role api.SystemUserRole, pod corev1.Pod) (mongo.Client, error) +} + +type mongoClientProvider struct { + k8sclient client.Client +} + +func (p *mongoClientProvider) Mongo(ctx context.Context, cr *api.PerconaServerMongoDB, rs *api.ReplsetSpec, role api.SystemUserRole) (mongo.Client, error) { + c, err := GetCredentials(ctx, p.k8sclient, cr, role) + if err != nil { + return nil, errors.Wrap(err, "failed to get credentials") + } + + return MongoClient(ctx, p.k8sclient, cr, rs, c) +} + +func (p *mongoClientProvider) Mongos(ctx context.Context, cr *api.PerconaServerMongoDB, role api.SystemUserRole) (mongo.Client, error) { + c, err := GetCredentials(ctx, p.k8sclient, cr, role) + if err != nil { + return nil, errors.Wrap(err, "failed to get credentials") + } + + return mongosClient(ctx, p.k8sclient, cr, c) +} + +func (p *mongoClientProvider) Standalone(ctx context.Context, cr *api.PerconaServerMongoDB, rs *api.ReplsetSpec, role api.SystemUserRole, pod corev1.Pod) (mongo.Client, error) { + c, err := GetCredentials(ctx, p.k8sclient, cr, role) + if err != nil { + return nil, errors.Wrap(err, "failed to get credentials") + } + host, err := MongoHost(ctx, p.k8sclient, cr, cr.Spec.ClusterServiceDNSMode, rs, rs.Expose.Enabled, pod) + if err != nil { + return nil, errors.Wrap(err, "failed to get mongo host") + } + + return standaloneClient(ctx, p.k8sclient, cr, c, host, cr.TLSEnabled()) +} + +type MongoProviderBase struct { + cl client.Client + + provider MongoClientProvider +} + +func NewProviderBase(cl client.Client, provider MongoClientProvider) MongoProviderBase { + return MongoProviderBase{ + cl: cl, + provider: provider, + } +} + +func (provider *MongoProviderBase) MongoClient() MongoClientProvider { + if provider.provider == nil { + return &mongoClientProvider{k8sclient: provider.cl} + } + return provider.provider +} + +func getUserSecret(ctx context.Context, cl client.Reader, cr *api.PerconaServerMongoDB, name string) (corev1.Secret, error) { + secrets := corev1.Secret{} + err := cl.Get(ctx, types.NamespacedName{Name: name, Namespace: cr.Namespace}, &secrets) + return secrets, errors.Wrap(err, "get user secrets") +} + +func GetCredentials(ctx context.Context, cl client.Reader, cr *api.PerconaServerMongoDB, role api.SystemUserRole) (Credentials, error) { + creds := Credentials{} + usersSecret, err := getUserSecret(ctx, cl, cr, api.UserSecretName(cr)) + if err != nil { + return creds, errors.Wrap(err, "failed to get user secret") + } + + switch role { + case api.RoleDatabaseAdmin: + creds.Username = string(usersSecret.Data[api.EnvMongoDBDatabaseAdminUser]) + creds.Password = string(usersSecret.Data[api.EnvMongoDBDatabaseAdminPassword]) + case api.RoleClusterAdmin: + creds.Username = string(usersSecret.Data[api.EnvMongoDBClusterAdminUser]) + creds.Password = string(usersSecret.Data[api.EnvMongoDBClusterAdminPassword]) + case api.RoleUserAdmin: + creds.Username = string(usersSecret.Data[api.EnvMongoDBUserAdminUser]) + creds.Password = string(usersSecret.Data[api.EnvMongoDBUserAdminPassword]) + case api.RoleClusterMonitor: + creds.Username = string(usersSecret.Data[api.EnvMongoDBClusterMonitorUser]) + creds.Password = string(usersSecret.Data[api.EnvMongoDBClusterMonitorPassword]) + case api.RoleBackup: + creds.Username = string(usersSecret.Data[api.EnvMongoDBBackupUser]) + creds.Password = string(usersSecret.Data[api.EnvMongoDBBackupPassword]) + default: + return creds, errors.Errorf("not implemented for role: %s", role) + } + + if creds.Username == "" || creds.Password == "" { + return creds, errors.Errorf("can't find credentials for role %s", role) + } + + return creds, nil +}