Skip to content
Open
9 changes: 5 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,19 @@ NPROCS ?= 1
# to half the number of CPU cores.
GO_TEST_PARALLEL := $(shell echo $$(( $(NPROCS) / 2 )))

GOLANGCILINT_VERSION ?= 1.63.4
GOLANGCILINT_VERSION ?= 2.1.2
GO_STATIC_PACKAGES = $(GO_PROJECT)/cmd/provider
GO_LDFLAGS += -X $(GO_PROJECT)/pkg/version.Version=$(VERSION)
GO_SUBDIRS += cmd pkg apis
GO111MODULE = on
GOLANGCILINT_VERSION = 2.1.2
-include build/makelib/golang.mk

# ====================================================================================
# Setup Kubernetes tools
KIND_NODE_IMAGE_TAG ?= v1.23.4
DOCKER_REGISTRY ?= "xpkg.upbound.io"
KIND_NODE_IMAGE_TAG ?= v1.30.13
KIND_VERSION ?= v0.29.0
KUBECTL_VERSION ?= v1.30.13
CROSSPLANE_CLI_VERSION ?= v1.20.0
-include build/makelib/k8s_tools.mk

# ====================================================================================
Expand Down
2 changes: 2 additions & 0 deletions apis/postgresql/v1alpha1/role_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ type RoleConfigurationParameter struct {

// A RoleObservation represents the observed state of a PostgreSQL role.
type RoleObservation struct {
// ConnectionLimit represents the applied connectionlimit
ConnectionLimit *int32 `json:"connectionLimit,omitempty"`
// PrivilegesAsClauses represents the applied privileges state, taking into account
// any defaults applied by Postgres, and expressed as a list of ROLE PRIVILEGE clauses.
PrivilegesAsClauses []string `json:"privilegesAsClauses,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions apis/postgresql/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions cluster/local/integration_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ EOF

echo_step "tag controller image and load it into kind cluster"

docker tag "${CONTROLLER_IMAGE}" "xpkg.upbound.io/${PACKAGE_NAME}"
"${KIND}" load docker-image "xpkg.upbound.io/${PACKAGE_NAME}" --name="${K8S_CLUSTER}"
docker tag "${CONTROLLER_IMAGE}" "xpkg.crossplane.io/${PACKAGE_NAME}"
"${KIND}" load docker-image "xpkg.crossplane.io/${PACKAGE_NAME}" --name="${K8S_CLUSTER}"

echo_step "create crossplane-system namespace"

Expand Down Expand Up @@ -220,7 +220,7 @@ cleanup_provider() {
timeout=60
current=0
step=3
while [[ $(kubectl get providerrevision.pkg.crossplane.io -o name | wc -l | tr -d '[:space:]') != "0" ]]; do
while [[ $("${KUBECTL}" get providerrevision.pkg.crossplane.io -o name | wc -l | tr -d '[:space:]') != "0" ]]; do
echo "waiting another $step seconds"
current=$((current + step))
if [[ $current -ge $timeout ]]; then
Expand Down Expand Up @@ -491,4 +491,4 @@ echo_step "--- TESTING POSTGRESDB ---"
integration_tests_postgres
echo_step "--- INTEGRATION TESTS FOR POSTGRESDB ACCOMPLISHED SUCCESSFULLY ---"

integration_tests_end
integration_tests_end
45 changes: 43 additions & 2 deletions cluster/local/postgresdb_functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,46 @@ check_role_privileges() {
fi
}

check_all_conn_limits() {
# check if the role connection limit is working properly
echo_step "check if role connection update is working"

TARGET_DB='db1'
PARENT_ROLE='parent-role'
USER_ROLE='example-role'

roles="$PARENT_ROLE $USER_ROLE"
conn_limits="12 -1"

# Iterate over roles and expected connection limits
role_index=1
for role in $roles; do
expected_limits=$(echo "$conn_limits" | cut -d ' ' -f $role_index)
check_role_limits "$role" "$expected_limits" "${postgres_root_pw}" "$TARGET_DB"
role_index=$((role_index + 1))
done

echo_step_completed
}

check_role_limits() {
local role=$1
local expected_limits=$2
local target_db=$4

echo_info "Checking connection limits for role: $role (expected: $expected_limits)"
echo ""
result=$(PGPASSWORD="$3" psql -h localhost -p 5432 -U postgres -d postgres -wtAc" SELECT rolconnlimit FROM pg_roles WHERE rolname = '$role' " | tr '\n' ',' | sed 's/,$//')

if [ "$result" = "$expected_limits" ]; then
echo_info "Connection limits for $role are as expected: $result"
echo ""
else
echo_error "ERROR: Connection limits for $role do not match expected. Found: $result, Expected: $expected_limits"
echo ""
fi
}

check_schema_privileges(){
# check if schema privileges are set properly
echo_step "check if schema privileges are set properly"
Expand Down Expand Up @@ -149,7 +189,7 @@ check_observe_only_database(){
echo_step "check if observe only database is preserved after deletion"

# Delete the database kubernetes object, it should not delete the database
kubectl delete database.postgresql.sql.crossplane.io db-observe
"${KUBECTL}" delete database.postgresql.sql.crossplane.io db-observe

local datname
datname="$(PGPASSWORD="${postgres_root_pw}" psql -h localhost -p 5432 -U postgres -wtAc "SELECT datname FROM pg_database WHERE datname = 'db-observe';")"
Expand Down Expand Up @@ -196,6 +236,7 @@ integration_tests_postgres() {
setup_postgresdb_tests
check_observe_only_database
check_all_roles_privileges
check_all_conn_limits
check_schema_privileges
delete_postgresdb_resources
}
}
33 changes: 32 additions & 1 deletion examples/postgresql/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,35 @@ spec:
createDb: true
login: true
createRole: true
inherit: true
inherit: true
---
apiVersion: postgresql.sql.crossplane.io/v1alpha1
kind: Role
metadata:
name: example-role
spec:
forProvider:
connectionLimit: -1
privileges:
createDb: true
writeConnectionSecretToRef:
name: example-role-secret
namespace: default
---
apiVersion: postgresql.sql.crossplane.io/v1alpha1
kind: Role
metadata:
name: parent-role
spec:
forProvider:
connectionLimit: 12
privileges:
login: true
configurationParameters:
- name: 'statement_timeout'
value: '123'
- name: 'search_path'
value: '"$user",public'
writeConnectionSecretToRef:
name: example-parent-role-secret
namespace: default
4 changes: 4 additions & 0 deletions package/crds/postgresql.sql.crossplane.io_roles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@ spec:
type: string
type: object
type: array
connectionLimit:
description: ConnectionLimit represents the applied connectionlimit
format: int32
type: integer
privilegesAsClauses:
description: |-
PrivilegesAsClauses represents the applied privileges state, taking into account
Expand Down
71 changes: 59 additions & 12 deletions pkg/clients/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"net/url"
"time"

"github.com/crossplane-contrib/provider-sql/pkg/clients/xsql"
"github.com/lib/pq"
Expand All @@ -20,6 +21,8 @@ const (
)

type postgresDB struct {
db *sql.DB
err error
dsn string
endpoint string
port string
Expand All @@ -38,14 +41,46 @@ func New(creds map[string][]byte, database, sslmode string) xsql.DB {
password := string(creds[xpv1.ResourceCredentialsSecretPasswordKey])
dsn := DSN(username, password, endpoint, port, database, sslmode)

db, err := openDB(dsn, true)

return postgresDB{
db: db,
err: err,
dsn: dsn,
endpoint: endpoint,
port: port,
sslmode: sslmode,
}
}

// openDB returns a new database connection
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you do this in a separate PR?

Would rather have many smaller PRs which focus on one feature, thanks!

func openDB(dsn string, setLimits bool) (*sql.DB, error) {
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}

// Since we are now using connection pooling, establish some sensible defaults for connections
// Ideally these parameters would be set in the config section for the provider, but that
// can be deferred to a later time.
if setLimits {
db.SetMaxOpenConns(5)
db.SetMaxIdleConns(2)
db.SetConnMaxIdleTime(2 * time.Minute)
db.SetConnMaxLifetime(10 * time.Minute)
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err = db.PingContext(ctx)
if err != nil {
return nil, err
}

return db, nil
}

// DSN returns the DSN URL
func DSN(username, password, endpoint, port, database, sslmode string) string {
// Use net/url UserPassword to encode the username and password
Expand All @@ -63,20 +98,23 @@ func DSN(username, password, endpoint, port, database, sslmode string) string {
// ExecTx executes an array of queries, committing if all are successful and
// rolling back immediately on failure.
func (c postgresDB) ExecTx(ctx context.Context, ql []xsql.Query) error {
d, err := sql.Open("postgres", c.dsn)
if c.db == nil || c.err != nil {
return c.err
}

err := c.db.PingContext(ctx)
if err != nil {
return err
}

tx, err := d.BeginTx(ctx, nil)
tx, err := c.db.BeginTx(ctx, nil)
if err != nil {
return err
}

// Rollback or Commit based on error state. Defer close in defer to make
// sure the connection is always closed.
defer func() {
defer d.Close() //nolint:errcheck
if err != nil {
tx.Rollback() //nolint:errcheck
return
Expand All @@ -94,37 +132,46 @@ func (c postgresDB) ExecTx(ctx context.Context, ql []xsql.Query) error {

// Exec the supplied query.
func (c postgresDB) Exec(ctx context.Context, q xsql.Query) error {
d, err := sql.Open("postgres", c.dsn)
if c.db == nil || c.err != nil {
return c.err
}

err := c.db.PingContext(ctx)
if err != nil {
return err
}
defer d.Close() //nolint:errcheck

_, err = d.ExecContext(ctx, q.String, q.Parameters...)
_, err = c.db.ExecContext(ctx, q.String, q.Parameters...)
return err
}

// Query the supplied query.
func (c postgresDB) Query(ctx context.Context, q xsql.Query) (*sql.Rows, error) {
d, err := sql.Open("postgres", c.dsn)
if c.err != nil || c.db == nil {
return nil, c.err
}

err := c.db.PingContext(ctx)
if err != nil {
return nil, err
}
defer d.Close() //nolint:errcheck

rows, err := d.QueryContext(ctx, q.String, q.Parameters...)
rows, err := c.db.QueryContext(ctx, q.String, q.Parameters...)
return rows, err
}

// Scan the results of the supplied query into the supplied destination.
func (c postgresDB) Scan(ctx context.Context, q xsql.Query, dest ...interface{}) error {
db, err := sql.Open("postgres", c.dsn)
if c.db == nil || c.err != nil {
return c.err
}

err := c.db.PingContext(ctx)
if err != nil {
return err
}
defer db.Close() //nolint:errcheck

return db.QueryRowContext(ctx, q.String, q.Parameters...).Scan(dest...)
return c.db.QueryRowContext(ctx, q.String, q.Parameters...).Scan(dest...)
}

// GetConnectionDetails returns the connection details for a user of this DB
Expand Down
10 changes: 7 additions & 3 deletions pkg/controller/postgresql/role/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex
Replication: new(bool),
BypassRls: new(bool),
},
ConnectionLimit: new(int32),
}

query := "SELECT " +
Expand Down Expand Up @@ -259,6 +260,8 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex
}
cr.Status.AtProvider.ConfigurationParameters = observed.ConfigurationParameters

cr.Status.AtProvider.ConnectionLimit = observed.ConnectionLimit

_, pwdChanged, err := c.getPassword(ctx, cr)
if err != nil {
return managed.ExternalObservation{}, err
Expand Down Expand Up @@ -405,10 +408,11 @@ func (c *external) Update(ctx context.Context, mg resource.Managed) (managed.Ext
// Update state to reflect the current configuration parameters
cr.Status.AtProvider.ConfigurationParameters = cr.Spec.ForProvider.ConfigurationParameters
}
cl := cr.Spec.ForProvider.ConnectionLimit
if cl != nil {
newCl := cr.Spec.ForProvider.ConnectionLimit
currCl := cr.Status.AtProvider.ConnectionLimit
if (newCl != nil && currCl != nil) && (int64(*currCl) != int64(*newCl)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the three casts to int64? Could we simply drop then?

Comment on lines +411 to +413
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default here is unlimited, represented as -1?

Could we do this instead, or do you see any issues with this?

Suggested change
newCl := cr.Spec.ForProvider.ConnectionLimit
currCl := cr.Status.AtProvider.ConnectionLimit
if (newCl != nil && currCl != nil) && (int64(*currCl) != int64(*newCl)) {
newCl := ptr.Deref(cr.Spec.ForProvider.ConnectionLimit, -1)
currCl := ptr.Deref(cr.Status.AtProvider.ConnectionLimit, -1)
if currCl != newCl {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you also need this ptr.Deref-ing in

func upToDate(observed *v1alpha1.RoleParameters, desired *v1alpha1.RoleParameters) bool {
	if observed.ConnectionLimit != desired.ConnectionLimit {
		return false
	}

to

func upToDate(observed *v1alpha1.RoleParameters, desired *v1alpha1.RoleParameters) bool {
	if ptr.Deref(observed.ConnectionLimit, -1) != ptr.Deref(desired.ConnectionLimit, -1) {
		return false
	}

Could you add some tests for Observe or upToDate covering this case?

if err := c.db.Exec(ctx, xsql.Query{
String: fmt.Sprintf("ALTER ROLE %s CONNECTION LIMIT %d", crn, int64(*cl)),
String: fmt.Sprintf("ALTER ROLE %s CONNECTION LIMIT %d", crn, int64(*newCl)),
}); err != nil {
return managed.ExternalUpdate{}, errors.Wrap(err, errUpdateRole)
}
Expand Down