Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

90 changes: 34 additions & 56 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec cpov1.Postgresq
if !ok {
passwordEncryption = "scram-sha-256"
}
if pgSpec.Spec.Monitoring != nil {
flg := cpov1.UserFlags{constants.RoleFlagLogin}
if pgSpec.Spec.Users != nil {
pgSpec.Spec.Users[monitorUsername] = flg
} else {
users := make(map[string]cpov1.UserFlags)
pgSpec.Spec.Users = users
pgSpec.Spec.Users[monitorUsername] = flg
}
}

cluster := &Cluster{
Config: cfg,
Postgresql: pgSpec,
Expand Down Expand Up @@ -359,12 +350,6 @@ func (c *Cluster) Create() (err error) {
}
c.logger.Info("a TDE secret was successfully created")
}
if c.Postgresql.Spec.Monitoring != nil {
if err := c.createMonitoringSecret(); err != nil {
return fmt.Errorf("could not create the monitoring secret: %v", err)
}
c.logger.Info("a monitoring secret was successfully created")
}

if specHasPgbackrestClone(&c.Postgresql.Spec) {
if err := c.createPgbackrestCloneConfig(); err != nil {
Expand Down Expand Up @@ -440,27 +425,8 @@ func (c *Cluster) Create() (err error) {
// something fails, report warning
c.createConnectionPooler(c.installLookupFunction)

//Setup cpo monitoring related sql statements
if c.Spec.Monitoring != nil {
c.logger.Info("setting up CPO monitoring")

// Open a new connection to the postgres db tp setup monitoring struc and permissions
if err := c.initDbConnWithName("postgres"); err != nil {
return fmt.Errorf("could not init database connection")
}
defer func() {
if c.connectionIsClosed() {
return
}

if err := c.closeDbConn(); err != nil {
c.logger.Errorf("could not close database connection: %v", err)
}
}()
_, err := c.pgDb.Exec(CPOmonitoring)
if err != nil {
return fmt.Errorf("CPO monitoring could not be setup: %v", err)
}
c.addMonitoringPermissions()
}

// remember slots to detect deletion from manifest
Expand Down Expand Up @@ -961,22 +927,10 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error {
updateFailed = true
}
}
//Add monitoring user if required
if newSpec.Spec.Monitoring != nil {
flags := []string{constants.RoleFlagLogin}
monitorUser := map[string]spec.PgUser{
monitorUsername: {
Origin: spec.RoleOriginInfrastructure,
Name: monitorUsername,
Namespace: c.Namespace,
Flags: flags,
},
}
c.pgUsers[monitorUsername] = monitorUser[monitorUsername]
}
//Check if monitoring user is added in manifest
if _, ok := newSpec.Spec.Users["cpo-exporter"]; ok {
c.logger.Error("creating user of name cpo-exporter is not allowed as it is reserved for monitoring")
updateFailed = true
}

// Users
Expand All @@ -990,11 +944,14 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error {
// only when disabled in oldSpec and enabled in newSpec
needPoolerUser := c.needConnectionPoolerUser(&oldSpec.Spec, &newSpec.Spec)

// Check if Monitor-User needs to be created
needMonitoring := newSpec.Spec.Monitoring != nil && oldSpec.Spec.Monitoring == nil

// streams new replication user created who is initialized in initUsers
// only when streams were not specified in oldSpec but in newSpec
needStreamUser := len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0

if !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser {
if !sameUsers || !sameRotatedUsers || needPoolerUser || needMonitoring || needStreamUser {
c.logger.Debugf("initialize users")
if err := c.initUsers(); err != nil {
c.logger.Errorf("could not init users - skipping sync of secrets and databases: %v", err)
Expand Down Expand Up @@ -1024,12 +981,6 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error {
syncStatefulSet = true
}

//sync monitoring container
if !reflect.DeepEqual(oldSpec.Spec.Monitoring, newSpec.Spec.Monitoring) {
syncStatefulSet = true
c.syncMonitoringSecret(oldSpec, newSpec)
}

//sync sts when there is a change in the pgbackrest secret, since we need to mount this
if newSpec.Spec.Backup != nil && oldSpec.Spec.Backup != nil &&
newSpec.Spec.Backup.Pgbackrest != nil && oldSpec.Spec.Backup.Pgbackrest != nil &&
Expand Down Expand Up @@ -1213,6 +1164,17 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error {
updateFailed = true
}

// Check if we need to call addMonitoringPermissions-func
if c.Spec.Monitoring != nil && newSpec.Spec.Monitoring != nil && oldSpec.Spec.Monitoring == nil {
c.addMonitoringPermissions()
}
// Check if Monitoring-Secret needs to be removed
if newSpec.Spec.Monitoring == nil && oldSpec.Spec.Monitoring != nil {
if err := c.deleteMonitoringSecret(); err != nil {
return fmt.Errorf("could not remove the Monitoring secret: %v", err)
}
}

// streams
if len(newSpec.Spec.Streams) > 0 {
if err := c.syncStreams(); err != nil {
Expand Down Expand Up @@ -1460,6 +1422,22 @@ func (c *Cluster) initSystemUsers() error {
}
}

// if the monitor object has been created, a monitoring user is required.
if c.Spec.Monitoring != nil {

MonitoringUser := spec.PgUser{
Origin: spec.RoleMonitoring,
Name: constants.MonitoringUserKeyName,
Namespace: c.Namespace,
Flags: []string{constants.RoleFlagLogin},
Password: util.RandomPassword(constants.PasswordLength),
}

if _, exists := c.systemUsers[constants.MonitoringUserKeyName]; !exists {
c.systemUsers[constants.MonitoringUserKeyName] = MonitoringUser
}
}

// replication users for event streams are another exception
// the operator will create one replication user for all streams
if len(c.Spec.Streams) > 0 {
Expand Down
12 changes: 8 additions & 4 deletions pkg/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"strings"
"testing"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
cpov1 "github.com/cybertec-postgresql/cybertec-pg-operator/pkg/apis/cpo.opensource.cybertec.at/v1"
fakecpov1 "github.com/cybertec-postgresql/cybertec-pg-operator/pkg/generated/clientset/versioned/fake"
"github.com/cybertec-postgresql/cybertec-pg-operator/pkg/spec"
Expand All @@ -17,6 +15,8 @@ import (
"github.com/cybertec-postgresql/cybertec-pg-operator/pkg/util/constants"
"github.com/cybertec-postgresql/cybertec-pg-operator/pkg/util/k8sutil"
"github.com/cybertec-postgresql/cybertec-pg-operator/pkg/util/teams"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
Expand Down Expand Up @@ -944,16 +944,20 @@ func TestServiceAnnotations(t *testing.T) {
}

func TestInitSystemUsers(t *testing.T) {
// reset system users, pooler and stream section
// reset system users, pooler, monitoring and stream section
cl.systemUsers = make(map[string]spec.PgUser)
cl.Spec.EnableConnectionPooler = boolToPointer(false)
cl.Spec.Monitoring = nil
cl.Spec.Streams = []cpov1.Stream{}

// default cluster without connection pooler and event streams
// default cluster without connection pooler, monitoring and event streams
cl.initSystemUsers()
if _, exist := cl.systemUsers[constants.ConnectionPoolerUserKeyName]; exist {
t.Errorf("%s, connection pooler user is present", t.Name())
}
if _, exist := cl.systemUsers[constants.MonitoringUserKeyName]; exist {
t.Errorf("%s, Monitoring user is present", t.Name())
}
if _, exist := cl.systemUsers[constants.EventStreamUserKeyName]; exist {
t.Errorf("%s, stream user is present", t.Name())
}
Expand Down
27 changes: 26 additions & 1 deletion pkg/cluster/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ const (
TO {{.pooler_user}};
GRANT USAGE ON SCHEMA {{.pooler_schema}} TO {{.pooler_user}};
`
CPOmonitoring = `

cpoMonitoring = `
GRANT pg_monitor TO cpo_exporter;
GRANT SELECT ON TABLE pg_authid TO cpo_exporter;

Expand Down Expand Up @@ -793,3 +794,27 @@ func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error {

return nil
}

// Creates the needes structure and grant needed permissions for the Monitoring
func (c *Cluster) addMonitoringPermissions() error {
c.logger.Info("setting up CPO monitoring")

// Open a new connection to the postgres db tp setup monitoring struc and permissions
if err := c.initDbConnWithName("postgres"); err != nil {
return fmt.Errorf("could not init database connection")
}
defer func() {
if c.connectionIsClosed() {
return
}

if err := c.closeDbConn(); err != nil {
c.logger.Errorf("could not close database connection: %v", err)
}
}()
_, err := c.pgDb.Exec(cpoMonitoring)
if err != nil {
return fmt.Errorf("CPO monitoring could not be setup: %v", err)
}
return nil
}
11 changes: 0 additions & 11 deletions pkg/cluster/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

cpov1 "github.com/cybertec-postgresql/cybertec-pg-operator/pkg/apis/cpo.opensource.cybertec.at/v1"
"github.com/cybertec-postgresql/cybertec-pg-operator/pkg/util"
"github.com/cybertec-postgresql/cybertec-pg-operator/pkg/util/constants"
"github.com/cybertec-postgresql/cybertec-pg-operator/pkg/util/k8sutil"
"github.com/cybertec-postgresql/cybertec-pg-operator/pkg/util/retryutil"
)
Expand Down Expand Up @@ -95,16 +94,6 @@ func (c *Cluster) createStatefulSet() (*appsv1.StatefulSet, error) {
Env: c.generateMonitoringEnvVars(),
}
c.Spec.Sidecars = append(c.Spec.Sidecars, *sidecar) //populate the sidecar spec so that the sidecar is automatically created

//Add monitoring user
flg := cpov1.UserFlags{constants.RoleFlagLogin}
if c.Spec.Users != nil {
c.Spec.Users[monitorUsername] = flg
} else {
users := make(map[string]cpov1.UserFlags)
c.Spec.Users = users
c.Spec.Users[monitorUsername] = flg
}
}

statefulSetSpec, err := c.generateStatefulSet(&c.Spec)
Expand Down
84 changes: 7 additions & 77 deletions pkg/cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,6 @@ func (c *Cluster) Sync(newSpec *cpov1.Postgresql) error {
return fmt.Errorf("error refreshing restore configmap: %v", err)
}

// sync monitoring
if err = c.syncMonitoringSecret(&oldSpec, newSpec); err != nil {
return fmt.Errorf("could not sync monitoring: %v", err)
}

if err = c.initUsers(); err != nil {
err = fmt.Errorf("could not init users: %v", err)
return err
Expand Down Expand Up @@ -1032,6 +1027,13 @@ func (c *Cluster) updateSecret(
userMap = c.systemUsers
}
}
// use system user when Monitoring is enabled and Monitoring user is specfied in manifest
if _, exists := c.systemUsers[constants.MonitoringUserKeyName]; exists {
if secretUsername == c.systemUsers[constants.MonitoringUserKeyName].Name {
userKey = constants.MonitoringUserKeyName
userMap = c.systemUsers
}
}
// use system user when streams are defined and fes_user is specfied in manifest
if _, exists := c.systemUsers[constants.EventStreamUserKeyName]; exists {
if secretUsername == c.systemUsers[constants.EventStreamUserKeyName].Name {
Expand Down Expand Up @@ -1682,37 +1684,6 @@ func (c *Cluster) createTDESecret() error {
return nil
}

func (c *Cluster) createMonitoringSecret() error {
c.logger.Info("creating Monitoring secret")
c.setProcessName("creating Monitoring secret")
generatedKey := make([]byte, 16)
rand.Read(generatedKey)

generatedSecret := v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: c.getMonitoringSecretName(),
Namespace: c.Namespace,
Labels: c.labelsSet(true),
},
Type: v1.SecretTypeOpaque,
Data: map[string][]byte{
"username": []byte(monitorUsername),
"password": []byte(fmt.Sprintf("%x", generatedKey)),
},
}
secret, err := c.KubeClient.Secrets(generatedSecret.Namespace).Create(context.TODO(), &generatedSecret, metav1.CreateOptions{})
if err == nil {
c.Secrets[secret.UID] = secret
c.logger.Debugf("created new secret %s, namespace: %s, uid: %s", util.NameFromMeta(secret.ObjectMeta), generatedSecret.Namespace, secret.UID)
} else {
if !k8sutil.ResourceAlreadyExists(err) {
return fmt.Errorf("could not create secret for Monitoring %s: in namespace %s: %v", util.NameFromMeta(secret.ObjectMeta), generatedSecret.Namespace, err)
}
}

return nil
}

// delete monitoring secret
func (c *Cluster) deleteMonitoringSecret() (err error) {
// Repeat the same for the secret object
Expand All @@ -1732,47 +1703,6 @@ func (c *Cluster) deleteMonitoringSecret() (err error) {
return nil
}

// Sync monitoring
// In case of monitoring is added/deleted, we need to
// 1. Update sts to in/exclude the exporter contianer
// 2. Add/Delete the respective user
// 3. Add/Delete the respective secret
func (c *Cluster) syncMonitoringSecret(oldSpec, newSpec *cpov1.Postgresql) error {
c.logger.Info("syncing Monitoring secret")
c.setProcessName("syncing Monitoring secret")

if newSpec.Spec.Monitoring != nil && oldSpec.Spec.Monitoring == nil {
// Create monitoring secret
if err := c.createMonitoringSecret(); err != nil {
return fmt.Errorf("could not create the monitoring secret: %v", err)
} else {
flags := []string{constants.RoleFlagLogin}
monitorUser := map[string]spec.PgUser{
monitorUsername: {
Origin: spec.RoleOriginInfrastructure,
Name: monitorUsername,
Namespace: c.Namespace,
Flags: flags,
},
}
c.pgUsers[monitorUsername] = monitorUser[monitorUsername]
}
c.logger.Info("monitoring secret was successfully created")
} else if newSpec.Spec.Monitoring == nil && oldSpec.Spec.Monitoring != nil {
// Delete the monitoring secret
if err := c.deleteMonitoringSecret(); err != nil {
return fmt.Errorf("could not delete the monitoring secret: %v", err)
} else {
// Delete the monitoring user
monitorUser := c.pgUsers[monitorUsername]
monitorUser.Deleted = true
c.pgUsers[monitorUsername] = monitorUser
}
c.logger.Info("monitoring secret was successfully deleted")
}
return nil
}

func generateRootCertificate(
privateKey *ecdsa.PrivateKey, serialNumber *big.Int,
) (*x509.Certificate, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/generated/clientset/versioned/clientset.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/generated/clientset/versioned/doc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading