Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changes/unreleased/Added-20250606-180846.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
kind: Added
body: Enabled volume validation during database creation and update to verify that specified extra_volumes paths exist and are accessible.
time: 2025-06-06T18:08:46.195576+05:30
48 changes: 48 additions & 0 deletions server/internal/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"net/url"
"strings"

"github.com/google/uuid"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -171,6 +172,16 @@ func (s *Service) CreateDatabase(ctx context.Context, req *api.CreateDatabaseReq
return nil, api.MakeInvalidInput(err)
}

err = s.dbSvc.PopulateSpecDefaults(ctx, spec)
if err != nil {
return nil, api.MakeInvalidInput(fmt.Errorf("failed to validate database spec: %w", err))
}

err = s.ValidateSpec(ctx, spec)
if err != nil {
return nil, api.MakeInvalidInput(fmt.Errorf("%w", err))
}

db, err := s.dbSvc.CreateDatabase(ctx, spec)
if errors.Is(err, database.ErrDatabaseAlreadyExists) {
return nil, api.MakeDatabaseAlreadyExists(err)
Expand Down Expand Up @@ -211,6 +222,16 @@ func (s *Service) UpdateDatabase(ctx context.Context, req *api.UpdateDatabasePay
return nil, api.MakeInvalidInput(err)
}

err = s.dbSvc.PopulateSpecDefaults(ctx, spec)
if err != nil {
return nil, api.MakeInvalidInput(fmt.Errorf("failed to validate database spec: %w", err))
}

err = s.ValidateSpec(ctx, spec)
if err != nil {
return nil, api.MakeInvalidInput(fmt.Errorf("%w", err))
}

db, err := s.dbSvc.UpdateDatabase(ctx, database.DatabaseStateModifying, spec)
if errors.Is(err, database.ErrDatabaseNotFound) {
return nil, api.MakeNotFound(fmt.Errorf("database %s not found", *req.DatabaseID))
Expand Down Expand Up @@ -424,6 +445,12 @@ func (s *Service) RestoreDatabase(ctx context.Context, req *api.RestoreDatabaseP
// Remove backup configuration from nodes that are being restored and
// persist the updated spec.
db.Spec.RemoveBackupConfigFrom(targetNodes...)

err = s.dbSvc.PopulateSpecDefaults(ctx, db.Spec)
if err != nil {
return nil, api.MakeInvalidInput(fmt.Errorf("failed to validate database spec: %w", err))
}

db, err = s.dbSvc.UpdateDatabase(ctx, database.DatabaseStateRestoring, db.Spec)
if err != nil {
return nil, fmt.Errorf("failed to persist db spec updates: %w", err)
Expand Down Expand Up @@ -475,3 +502,24 @@ func (s *Service) InitCluster(ctx context.Context) (*api.ClusterJoinToken, error
func (s *Service) JoinCluster(ctx context.Context, token *api.ClusterJoinToken) error {
return ErrAlreadyInitialized
}

func (s *Service) ValidateSpec(ctx context.Context, spec *database.Spec) error {
if spec == nil {
return errors.New("spec cannot be nil")
}

output := s.workflowSvc.ValidateSpec(ctx, spec)
if output == nil {
return errors.New("failed to validate spec")

}
if !output.Valid {
return fmt.Errorf(
"spec validation failed. Please ensure all required fields in the provided spec are valid.\nDetails: %s",
strings.Join(output.Errors, " "),
)
}
s.logger.Info().Msg("Spec validation succeeded")

return nil
}
6 changes: 6 additions & 0 deletions server/internal/database/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ type InstanceResources struct {
Resources []*resource.ResourceData
}

type ValidationResult struct {
Success bool
Reason string
}

func NewInstanceResources(instance *InstanceResource, resources []resource.Resource) (*InstanceResources, error) {
data := make([]*resource.ResourceData, len(resources))
for i, res := range resources {
Expand Down Expand Up @@ -83,4 +88,5 @@ type Orchestrator interface {
GenerateInstanceRestoreResources(spec *InstanceSpec, taskID uuid.UUID) (*InstanceResources, error)
GetInstanceConnectionInfo(ctx context.Context, databaseID, instanceID uuid.UUID) (*ConnectionInfo, error)
CreatePgBackRestBackup(ctx context.Context, w io.Writer, instanceID uuid.UUID, options *pgbackrest.BackupOptions) error
ValidateVolumes(ctx context.Context, spec *InstanceSpec) (*ValidationResult, error)
}
9 changes: 1 addition & 8 deletions server/internal/database/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ func (s *Service) CreateDatabase(ctx context.Context, spec *Spec) (*Database, er
return nil, ErrDatabaseAlreadyExists
}

if err := s.populateSpecDefaults(ctx, spec); err != nil {
return nil, fmt.Errorf("failed to validate database spec: %w", err)
}

now := time.Now()
db := &Database{
DatabaseID: spec.DatabaseID,
Expand Down Expand Up @@ -87,9 +83,6 @@ func (s *Service) UpdateDatabase(ctx context.Context, state DatabaseState, spec
if !DatabaseStateModifiable(currentDB.State) {
return nil, ErrDatabaseNotModifiable
}
if err := s.populateSpecDefaults(ctx, spec); err != nil {
return nil, fmt.Errorf("failed to validate database spec: %w", err)
}

instances, err := s.GetInstances(ctx, spec.DatabaseID)
if err != nil {
Expand Down Expand Up @@ -314,7 +307,7 @@ func (s *Service) GetAllInstances(ctx context.Context) ([]*Instance, error) {
return instances, nil
}

func (s *Service) populateSpecDefaults(ctx context.Context, spec *Spec) error {
func (s *Service) PopulateSpecDefaults(ctx context.Context, spec *Spec) error {
var hostIDs []uuid.UUID
// First pass to build out hostID list
for _, node := range spec.Nodes {
Expand Down
7 changes: 5 additions & 2 deletions server/internal/database/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,10 @@ func (s *Spec) NodeInstances() ([]*NodeInstances, error) {
// Create a merged PostgreSQL configuration with node-level overrides
postgresqlConf := maps.Clone(s.PostgreSQLConf)
maps.Copy(node.PostgreSQLConf, postgresqlConf)
extrernalVolumes := slices.Clone(s.ExtraVolumes)
extraVolumes := s.ExtraVolumes
if len(node.ExtraVolumes) > 0 {
extraVolumes = node.ExtraVolumes
}

instances := make([]*InstanceSpec, len(node.HostIDs))
for hostIdx, hostID := range node.HostIDs {
Expand Down Expand Up @@ -388,7 +391,7 @@ func (s *Spec) NodeInstances() ([]*NodeInstances, error) {
// cluster into this decision when we implement updates.
EnableBackups: backupConfig != nil && hostIdx == len(node.HostIDs)-1,
ClusterSize: clusterSize,
ExtraVolumes: extrernalVolumes,
ExtraVolumes: extraVolumes,
}
}

Expand Down
18 changes: 18 additions & 0 deletions server/internal/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/api/types/system"
Expand Down Expand Up @@ -139,6 +140,14 @@ func (d *Docker) ContainerRemove(ctx context.Context, containerID string, opts c
return nil
}

func (d *Docker) ContainerStop(ctx context.Context, containerID string, timeoutSeconds *int) error {
err := d.client.ContainerStop(ctx, containerID, container.StopOptions{Timeout: timeoutSeconds})
if err != nil {
return fmt.Errorf("failed to stop container: %w", errTranslate(err))
}
return nil
}

func (d *Docker) Info(ctx context.Context) (system.Info, error) {
info, err := d.client.Info(ctx)
if err != nil {
Expand Down Expand Up @@ -480,3 +489,12 @@ func errTranslate(err error) error {
}
return err
}

func BuildMount(source, target string, readOnly bool) mount.Mount {
return mount.Mount{
Type: mount.TypeBind,
Source: source,
Target: target,
ReadOnly: readOnly,
}
}
81 changes: 81 additions & 0 deletions server/internal/orchestrator/swarm/orchestrator.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package swarm

import (
"bytes"
"context"
"fmt"
"io"
"net/netip"
"path"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/cschleiden/go-workflows/workflow"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/go-connections/nat"
Expand Down Expand Up @@ -436,3 +441,79 @@ func (o *Orchestrator) CreatePgBackRestBackup(ctx context.Context, w io.Writer,

return nil
}

func (o *Orchestrator) ValidateVolumes(ctx context.Context, spec *database.InstanceSpec) (*database.ValidationResult, error) {
specVersion := spec.PgEdgeVersion
if specVersion == nil {
o.logger.Warn().Msg("PostgresVersion not provided, using default version")
specVersion = defaultVersion
}

images, err := GetImages(o.cfg, specVersion)
if err != nil {
return nil, fmt.Errorf("image fetch error: %w", err)
}

var mounts []mount.Mount
var mountTargets []string
for _, vol := range spec.ExtraVolumes {
mounts = append(mounts, docker.BuildMount(vol.HostPath, vol.DestinationPath, false))
mountTargets = append(mountTargets, vol.DestinationPath)
}

cmd := buildVolumeCheckCommand(mountTargets)
output, err := o.runVolumeValidationContainer(ctx, images.PgEdgeImage, cmd, mounts)
if err != nil {
return nil, err
}

if strings.HasSuffix(output, "OK") {
return &database.ValidationResult{Success: true, Reason: "All volumes are valid"}, nil
}
return &database.ValidationResult{Success: false, Reason: output}, nil
}

func (o *Orchestrator) runVolumeValidationContainer(ctx context.Context, image string, cmd []string, mounts []mount.Mount) (string, error) {
// Start container
containerID, err := o.docker.ContainerRun(ctx, docker.ContainerRunOptions{
Config: &container.Config{
Image: image,
Entrypoint: cmd,
},
Host: &container.HostConfig{
Mounts: mounts,
},
})
if err != nil {
return "", fmt.Errorf("failed to start container: %w", err)
}
// Ensure container is removed afterward
defer func() {
if err := o.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{Force: true}); err != nil {
o.logger.Error().Err(err).Msg("container cleanup failed")
}
}()

// Wait for the container to complete
if err := o.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning, 30*time.Second); err != nil {
return "", fmt.Errorf("container wait failed: %w", err)
}

// Capture logs
buf := new(bytes.Buffer)
if err := o.docker.ContainerLogs(ctx, buf, containerID, container.LogsOptions{ShowStdout: true}); err != nil {
return "", fmt.Errorf("log fetch failed: %w", err)
}

// Stop the container gracefully
timeoutSeconds := 5
if err := o.docker.ContainerStop(ctx, containerID, &timeoutSeconds); err != nil {
o.logger.Warn().Err(err).Msg("graceful stop failed")
}

return strings.TrimSpace(buf.String()), nil
}

func buildVolumeCheckCommand(mountTargets []string) []string {
return []string{"sh", "-c", fmt.Sprintf("for d in %s; do [ -d \"$d\" ] || { echo \"FAIL: $d not found\"; exit 1; }; done; echo OK", strings.Join(mountTargets, " "))}
}
18 changes: 5 additions & 13 deletions server/internal/orchestrator/swarm/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/docker/docker/api/types/swarm"

"github.com/pgEdge/control-plane/server/internal/database"
"github.com/pgEdge/control-plane/server/internal/docker"
)

type Paths struct {
Expand Down Expand Up @@ -41,16 +42,16 @@ func DatabaseServiceSpec(
}

mounts := []mount.Mount{
buildMount(options.Paths.Configs, "/opt/pgedge/configs", true),
docker.BuildMount(options.Paths.Configs, "/opt/pgedge/configs", true),
// We're using a mount for the certificates instead of
// a secret because secrets can't be rotated without
// restarting the container.
buildMount(options.Paths.Certificates, "/opt/pgedge/certificates", true),
buildMount(options.Paths.Data, "/opt/pgedge/data", false),
docker.BuildMount(options.Paths.Certificates, "/opt/pgedge/certificates", true),
docker.BuildMount(options.Paths.Data, "/opt/pgedge/data", false),
}

for _, vol := range instance.ExtraVolumes {
mounts = append(mounts, buildMount(vol.HostPath, vol.DestinationPath, false))
mounts = append(mounts, docker.BuildMount(vol.HostPath, vol.DestinationPath, false))
}

return swarm.ServiceSpec{
Expand Down Expand Up @@ -119,12 +120,3 @@ func DatabaseServiceSpec(
},
}, nil
}

func buildMount(source, target string, readOnly bool) mount.Mount {
return mount.Mount{
Type: mount.TypeBind,
Source: source,
Target: target,
ReadOnly: readOnly,
}
}
1 change: 1 addition & 0 deletions server/internal/workflows/activities/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (a *Activities) Register(work *worker.Worker) error {
work.RegisterActivity(a.RestoreSpec),
work.RegisterActivity(a.UpdateDbState),
work.RegisterActivity(a.UpdateTask),
work.RegisterActivity(a.ValidateVolumes),
}
return errors.Join(errs...)
}
Loading