Skip to content
Open
9 changes: 5 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,19 @@ NPROCS ?= 1
# to half the number of CPU cores.
GO_TEST_PARALLEL := $(shell echo $$(( $(NPROCS) / 2 )))

GOLANGCILINT_VERSION ?= 1.63.4
GOLANGCILINT_VERSION ?= 2.1.2
GO_STATIC_PACKAGES = $(GO_PROJECT)/cmd/provider
GO_LDFLAGS += -X $(GO_PROJECT)/pkg/version.Version=$(VERSION)
GO_SUBDIRS += cmd pkg apis
GO111MODULE = on
GOLANGCILINT_VERSION = 2.1.2
-include build/makelib/golang.mk

# ====================================================================================
# Setup Kubernetes tools
KIND_NODE_IMAGE_TAG ?= v1.23.4
DOCKER_REGISTRY ?= "xpkg.upbound.io"
KIND_NODE_IMAGE_TAG ?= v1.30.13
KIND_VERSION ?= v0.29.0
KUBECTL_VERSION ?= v1.30.13
CROSSPLANE_CLI_VERSION ?= v1.20.0
-include build/makelib/k8s_tools.mk

# ====================================================================================
Expand Down
2 changes: 2 additions & 0 deletions apis/postgresql/v1alpha1/role_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ type RoleConfigurationParameter struct {

// A RoleObservation represents the observed state of a PostgreSQL role.
type RoleObservation struct {
// ConnectionLimit represents the applied connectionlimit
ConnectionLimit *int32 `json:"connectionLimit,omitempty"`
// PrivilegesAsClauses represents the applied privileges state, taking into account
// any defaults applied by Postgres, and expressed as a list of ROLE PRIVILEGE clauses.
PrivilegesAsClauses []string `json:"privilegesAsClauses,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions apis/postgresql/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions cluster/local/integration_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ EOF

echo_step "tag controller image and load it into kind cluster"

docker tag "${CONTROLLER_IMAGE}" "xpkg.upbound.io/${PACKAGE_NAME}"
"${KIND}" load docker-image "xpkg.upbound.io/${PACKAGE_NAME}" --name="${K8S_CLUSTER}"
docker tag "${CONTROLLER_IMAGE}" "xpkg.crossplane.io/${PACKAGE_NAME}"
"${KIND}" load docker-image "xpkg.crossplane.io/${PACKAGE_NAME}" --name="${K8S_CLUSTER}"

echo_step "create crossplane-system namespace"

Expand Down Expand Up @@ -220,7 +220,7 @@ cleanup_provider() {
timeout=60
current=0
step=3
while [[ $(kubectl get providerrevision.pkg.crossplane.io -o name | wc -l | tr -d '[:space:]') != "0" ]]; do
while [[ $("${KUBECTL}" get providerrevision.pkg.crossplane.io -o name | wc -l | tr -d '[:space:]') != "0" ]]; do
echo "waiting another $step seconds"
current=$((current + step))
if [[ $current -ge $timeout ]]; then
Expand Down Expand Up @@ -491,4 +491,4 @@ echo_step "--- TESTING POSTGRESDB ---"
integration_tests_postgres
echo_step "--- INTEGRATION TESTS FOR POSTGRESDB ACCOMPLISHED SUCCESSFULLY ---"

integration_tests_end
integration_tests_end
45 changes: 43 additions & 2 deletions cluster/local/postgresdb_functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,46 @@ check_role_privileges() {
fi
}

check_all_conn_limits() {
# check if the role connection limit is working properly
echo_step "check if role connection update is working"

TARGET_DB='db1'
PARENT_ROLE='parent-role'
USER_ROLE='example-role'

roles="$PARENT_ROLE $USER_ROLE"
conn_limits="12 -1"

# Iterate over roles and expected connection limits
role_index=1
for role in $roles; do
expected_limits=$(echo "$conn_limits" | cut -d ' ' -f $role_index)
check_role_limits "$role" "$expected_limits" "${postgres_root_pw}" "$TARGET_DB"
role_index=$((role_index + 1))
done

echo_step_completed
}

check_role_limits() {
local role=$1
local expected_limits=$2
local target_db=$4

echo_info "Checking connection limits for role: $role (expected: $expected_limits)"
echo ""
result=$(PGPASSWORD="$3" psql -h localhost -p 5432 -U postgres -d postgres -wtAc" SELECT rolconnlimit FROM pg_roles WHERE rolname = '$role' " | tr '\n' ',' | sed 's/,$//')

if [ "$result" = "$expected_limits" ]; then
echo_info "Connection limits for $role are as expected: $result"
echo ""
else
echo_error "ERROR: Connection limits for $role do not match expected. Found: $result, Expected: $expected_limits"
echo ""
fi
}

check_schema_privileges(){
# check if schema privileges are set properly
echo_step "check if schema privileges are set properly"
Expand Down Expand Up @@ -149,7 +189,7 @@ check_observe_only_database(){
echo_step "check if observe only database is preserved after deletion"

# Delete the database kubernetes object, it should not delete the database
kubectl delete database.postgresql.sql.crossplane.io db-observe
"${KUBECTL}" delete database.postgresql.sql.crossplane.io db-observe

local datname
datname="$(PGPASSWORD="${postgres_root_pw}" psql -h localhost -p 5432 -U postgres -wtAc "SELECT datname FROM pg_database WHERE datname = 'db-observe';")"
Expand Down Expand Up @@ -196,6 +236,7 @@ integration_tests_postgres() {
setup_postgresdb_tests
check_observe_only_database
check_all_roles_privileges
check_all_conn_limits
check_schema_privileges
delete_postgresdb_resources
}
}
33 changes: 32 additions & 1 deletion examples/postgresql/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,35 @@ spec:
createDb: true
login: true
createRole: true
inherit: true
inherit: true
---
apiVersion: postgresql.sql.crossplane.io/v1alpha1
kind: Role
metadata:
name: example-role
spec:
forProvider:
connectionLimit: -1
privileges:
createDb: true
writeConnectionSecretToRef:
name: example-role-secret
namespace: default
---
apiVersion: postgresql.sql.crossplane.io/v1alpha1
kind: Role
metadata:
name: parent-role
spec:
forProvider:
connectionLimit: 12
privileges:
login: true
configurationParameters:
- name: 'statement_timeout'
value: '123'
- name: 'search_path'
value: '"$user",public'
writeConnectionSecretToRef:
name: example-parent-role-secret
namespace: default
4 changes: 4 additions & 0 deletions package/crds/postgresql.sql.crossplane.io_roles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@ spec:
type: string
type: object
type: array
connectionLimit:
description: ConnectionLimit represents the applied connectionlimit
format: int32
type: integer
privilegesAsClauses:
description: |-
PrivilegesAsClauses represents the applied privileges state, taking into account
Expand Down
71 changes: 59 additions & 12 deletions pkg/clients/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"net/url"
"time"

"github.com/crossplane-contrib/provider-sql/pkg/clients/xsql"
"github.com/lib/pq"
Expand All @@ -20,6 +21,8 @@ const (
)

type postgresDB struct {
db *sql.DB
err error
dsn string
endpoint string
port string
Expand All @@ -38,14 +41,46 @@ func New(creds map[string][]byte, database, sslmode string) xsql.DB {
password := string(creds[xpv1.ResourceCredentialsSecretPasswordKey])
dsn := DSN(username, password, endpoint, port, database, sslmode)

db, err := openDB(dsn, true)

return postgresDB{
db: db,
err: err,
dsn: dsn,
endpoint: endpoint,
port: port,
sslmode: sslmode,
}
}

// openDB returns a new database connection
func openDB(dsn string, setLimits bool) (*sql.DB, error) {
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}

// Since we are now using connection pooling, establish some sensible defaults for connections
// Ideally these parameters would be set in the config section for the provider, but that
// can be deferred to a later time.
if setLimits {
db.SetMaxOpenConns(5)
db.SetMaxIdleConns(2)
db.SetConnMaxIdleTime(2 * time.Minute)
db.SetConnMaxLifetime(10 * time.Minute)
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err = db.PingContext(ctx)
if err != nil {
return nil, err
}

return db, nil
}

// DSN returns the DSN URL
func DSN(username, password, endpoint, port, database, sslmode string) string {
// Use net/url UserPassword to encode the username and password
Expand All @@ -63,20 +98,23 @@ func DSN(username, password, endpoint, port, database, sslmode string) string {
// ExecTx executes an array of queries, committing if all are successful and
// rolling back immediately on failure.
func (c postgresDB) ExecTx(ctx context.Context, ql []xsql.Query) error {
d, err := sql.Open("postgres", c.dsn)
if c.db == nil || c.err != nil {
return c.err
}

err := c.db.PingContext(ctx)
if err != nil {
return err
}

tx, err := d.BeginTx(ctx, nil)
tx, err := c.db.BeginTx(ctx, nil)
if err != nil {
return err
}

// Rollback or Commit based on error state. Defer close in defer to make
// sure the connection is always closed.
defer func() {
defer d.Close() //nolint:errcheck
if err != nil {
tx.Rollback() //nolint:errcheck
return
Expand All @@ -94,37 +132,46 @@ func (c postgresDB) ExecTx(ctx context.Context, ql []xsql.Query) error {

// Exec the supplied query.
func (c postgresDB) Exec(ctx context.Context, q xsql.Query) error {
d, err := sql.Open("postgres", c.dsn)
if c.db == nil || c.err != nil {
return c.err
}

err := c.db.PingContext(ctx)
if err != nil {
return err
}
defer d.Close() //nolint:errcheck

_, err = d.ExecContext(ctx, q.String, q.Parameters...)
_, err = c.db.ExecContext(ctx, q.String, q.Parameters...)
return err
}

// Query the supplied query.
func (c postgresDB) Query(ctx context.Context, q xsql.Query) (*sql.Rows, error) {
d, err := sql.Open("postgres", c.dsn)
if c.err != nil || c.db == nil {
return nil, c.err
}

err := c.db.PingContext(ctx)
if err != nil {
return nil, err
}
defer d.Close() //nolint:errcheck

rows, err := d.QueryContext(ctx, q.String, q.Parameters...)
rows, err := c.db.QueryContext(ctx, q.String, q.Parameters...)
return rows, err
}

// Scan the results of the supplied query into the supplied destination.
func (c postgresDB) Scan(ctx context.Context, q xsql.Query, dest ...interface{}) error {
db, err := sql.Open("postgres", c.dsn)
if c.db == nil || c.err != nil {
return c.err
}

err := c.db.PingContext(ctx)
if err != nil {
return err
}
defer db.Close() //nolint:errcheck

return db.QueryRowContext(ctx, q.String, q.Parameters...).Scan(dest...)
return c.db.QueryRowContext(ctx, q.String, q.Parameters...).Scan(dest...)
}

// GetConnectionDetails returns the connection details for a user of this DB
Expand Down
10 changes: 7 additions & 3 deletions pkg/controller/postgresql/role/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex
Replication: new(bool),
BypassRls: new(bool),
},
ConnectionLimit: new(int32),
}

query := "SELECT " +
Expand Down Expand Up @@ -259,6 +260,8 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex
}
cr.Status.AtProvider.ConfigurationParameters = observed.ConfigurationParameters

cr.Status.AtProvider.ConnectionLimit = observed.ConnectionLimit

_, pwdChanged, err := c.getPassword(ctx, cr)
if err != nil {
return managed.ExternalObservation{}, err
Expand Down Expand Up @@ -405,10 +408,11 @@ func (c *external) Update(ctx context.Context, mg resource.Managed) (managed.Ext
// Update state to reflect the current configuration parameters
cr.Status.AtProvider.ConfigurationParameters = cr.Spec.ForProvider.ConfigurationParameters
}
cl := cr.Spec.ForProvider.ConnectionLimit
if cl != nil {
newCl := cr.Spec.ForProvider.ConnectionLimit
currCl := cr.Status.AtProvider.ConnectionLimit
if (newCl != nil && currCl != nil) && (int64(*currCl) != int64(*newCl)) {
if err := c.db.Exec(ctx, xsql.Query{
String: fmt.Sprintf("ALTER ROLE %s CONNECTION LIMIT %d", crn, int64(*cl)),
String: fmt.Sprintf("ALTER ROLE %s CONNECTION LIMIT %d", crn, int64(*newCl)),
}); err != nil {
return managed.ExternalUpdate{}, errors.Wrap(err, errUpdateRole)
}
Expand Down