Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changes/unreleased/Added-20250606-180846.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
kind: Added
body: Enabled volume validation during database creation and update to verify that specified extra_volumes paths exist and are accessible.
time: 2025-06-06T18:08:46.195576+05:30
37 changes: 37 additions & 0 deletions server/internal/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"net/url"
"strings"

"github.com/google/uuid"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -171,6 +172,11 @@ func (s *Service) CreateDatabase(ctx context.Context, req *api.CreateDatabaseReq
return nil, api.MakeInvalidInput(err)
}

err = s.ValidateVolumes(ctx, spec)
if err != nil {
return nil, api.MakeInvalidInput(fmt.Errorf("%w", err))
}

db, err := s.dbSvc.CreateDatabase(ctx, spec)
if errors.Is(err, database.ErrDatabaseAlreadyExists) {
return nil, api.MakeDatabaseAlreadyExists(err)
Expand Down Expand Up @@ -211,6 +217,11 @@ func (s *Service) UpdateDatabase(ctx context.Context, req *api.UpdateDatabasePay
return nil, api.MakeInvalidInput(err)
}

err = s.ValidateVolumes(ctx, spec)
if err != nil {
return nil, api.MakeInvalidInput(fmt.Errorf("%w", err))
}

db, err := s.dbSvc.UpdateDatabase(ctx, database.DatabaseStateModifying, spec)
if errors.Is(err, database.ErrDatabaseNotFound) {
return nil, api.MakeNotFound(fmt.Errorf("database %s not found", *req.DatabaseID))
Expand Down Expand Up @@ -475,3 +486,29 @@ func (s *Service) InitCluster(ctx context.Context) (*api.ClusterJoinToken, error
func (s *Service) JoinCluster(ctx context.Context, token *api.ClusterJoinToken) error {
return ErrAlreadyInitialized
}

func (s *Service) ValidateVolumes(ctx context.Context, spec *database.Spec) error {
if spec == nil {
return errors.New("spec cannot be nil")
}

if len(spec.ExtraVolumes) == 0 {
s.logger.Info().Msg("No volumes to validate, skipping volume validation")
return nil
}

output := s.workflowSvc.ValidateVolumes(ctx, spec)
if output == nil {
return errors.New("failed to validate volumes")

}
if !output.Valid {
return fmt.Errorf(
"volume validation failed. Please ensure that the paths provided in 'extra_volumes' exist and are accessible on the host system.\nDetails: %s",
strings.Join(output.Errors, " "),
)
}
s.logger.Info().Msg("Volume validation succeeded")

return nil
}
6 changes: 6 additions & 0 deletions server/internal/database/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ type InstanceResources struct {
Resources []*resource.ResourceData
}

type ValidationResult struct {
Success bool
Reason string
}

func NewInstanceResources(instance *InstanceResource, resources []resource.Resource) (*InstanceResources, error) {
data := make([]*resource.ResourceData, len(resources))
for i, res := range resources {
Expand Down Expand Up @@ -83,4 +88,5 @@ type Orchestrator interface {
GenerateInstanceRestoreResources(spec *InstanceSpec, taskID uuid.UUID) (*InstanceResources, error)
GetInstanceConnectionInfo(ctx context.Context, databaseID, instanceID uuid.UUID) (*ConnectionInfo, error)
CreatePgBackRestBackup(ctx context.Context, w io.Writer, instanceID uuid.UUID, options *pgbackrest.BackupOptions) error
ValidateVolumes(ctx context.Context, spec *InstanceSpec) (*ValidationResult, error)
}
7 changes: 5 additions & 2 deletions server/internal/database/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,10 @@ func (s *Spec) NodeInstances() ([]*NodeInstances, error) {
// Create a merged PostgreSQL configuration with node-level overrides
postgresqlConf := maps.Clone(s.PostgreSQLConf)
maps.Copy(node.PostgreSQLConf, postgresqlConf)
extrernalVolumes := slices.Clone(s.ExtraVolumes)
extraVolumes := s.ExtraVolumes
if len(node.ExtraVolumes) > 0 {
extraVolumes = node.ExtraVolumes
}

instances := make([]*InstanceSpec, len(node.HostIDs))
for hostIdx, hostID := range node.HostIDs {
Expand Down Expand Up @@ -388,7 +391,7 @@ func (s *Spec) NodeInstances() ([]*NodeInstances, error) {
// cluster into this decision when we implement updates.
EnableBackups: backupConfig != nil && hostIdx == len(node.HostIDs)-1,
ClusterSize: clusterSize,
ExtraVolumes: extrernalVolumes,
ExtraVolumes: extraVolumes,
}
}

Expand Down
18 changes: 18 additions & 0 deletions server/internal/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/api/types/system"
Expand Down Expand Up @@ -139,6 +140,14 @@ func (d *Docker) ContainerRemove(ctx context.Context, containerID string, opts c
return nil
}

func (d *Docker) ContainerStop(ctx context.Context, containerID string, timeoutSeconds *int) error {
err := d.client.ContainerStop(ctx, containerID, container.StopOptions{Timeout: timeoutSeconds})
if err != nil {
return fmt.Errorf("failed to stop container: %w", errTranslate(err))
}
return nil
}

func (d *Docker) Info(ctx context.Context) (system.Info, error) {
info, err := d.client.Info(ctx)
if err != nil {
Expand Down Expand Up @@ -480,3 +489,12 @@ func errTranslate(err error) error {
}
return err
}

func BuildMount(source, target string, readOnly bool) mount.Mount {
return mount.Mount{
Type: mount.TypeBind,
Source: source,
Target: target,
ReadOnly: readOnly,
}
}
81 changes: 81 additions & 0 deletions server/internal/orchestrator/swarm/orchestrator.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package swarm

import (
"bytes"
"context"
"fmt"
"io"
"net/netip"
"path"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/cschleiden/go-workflows/workflow"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/go-connections/nat"
Expand Down Expand Up @@ -436,3 +441,79 @@ func (o *Orchestrator) CreatePgBackRestBackup(ctx context.Context, w io.Writer,

return nil
}

func (o *Orchestrator) ValidateVolumes(ctx context.Context, spec *database.InstanceSpec) (*database.ValidationResult, error) {
specVersion := spec.PgEdgeVersion
if specVersion == nil {
o.logger.Warn().Msg("PostgresVersion not provided, using default version")
specVersion = defaultVersion
}

images, err := GetImages(o.cfg, specVersion)
if err != nil {
return nil, fmt.Errorf("image fetch error: %w", err)
}

var mounts []mount.Mount
var mountTargets []string
for _, vol := range spec.ExtraVolumes {
mounts = append(mounts, docker.BuildMount(vol.HostPath, vol.DestinationPath, false))
mountTargets = append(mountTargets, vol.DestinationPath)
}

cmd := buildVolumeCheckCommand(mountTargets)
output, err := o.runVolumeValidationContainer(ctx, images.PgEdgeImage, cmd, mounts)
if err != nil {
return nil, err
}

if strings.HasSuffix(output, "OK") {
return &database.ValidationResult{Success: true, Reason: "All volumes are valid"}, nil
}
return &database.ValidationResult{Success: false, Reason: output}, nil
}

func (o *Orchestrator) runVolumeValidationContainer(ctx context.Context, image string, cmd []string, mounts []mount.Mount) (string, error) {
// Start container
containerID, err := o.docker.ContainerRun(ctx, docker.ContainerRunOptions{
Config: &container.Config{
Image: image,
Entrypoint: cmd,
},
Host: &container.HostConfig{
Mounts: mounts,
},
})
if err != nil {
return "", fmt.Errorf("failed to start container: %w", err)
}
// Ensure container is removed afterward
defer func() {
if err := o.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{Force: true}); err != nil {
o.logger.Error().Err(err).Msg("container cleanup failed")
}
}()

// Wait for the container to complete
if err := o.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning, 30*time.Second); err != nil {
return "", fmt.Errorf("container wait failed: %w", err)
}

// Capture logs
buf := new(bytes.Buffer)
if err := o.docker.ContainerLogs(ctx, buf, containerID, container.LogsOptions{ShowStdout: true}); err != nil {
return "", fmt.Errorf("log fetch failed: %w", err)
}

// Stop the container gracefully
timeoutSeconds := 5
if err := o.docker.ContainerStop(ctx, containerID, &timeoutSeconds); err != nil {
o.logger.Warn().Err(err).Msg("graceful stop failed")
}

return strings.TrimSpace(buf.String()), nil
}

func buildVolumeCheckCommand(mountTargets []string) []string {
return []string{"sh", "-c", fmt.Sprintf("for d in %s; do [ -d \"$d\" ] || { echo \"FAIL: $d not found\"; exit 1; }; done; echo OK", strings.Join(mountTargets, " "))}
}
18 changes: 5 additions & 13 deletions server/internal/orchestrator/swarm/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/docker/docker/api/types/swarm"

"github.com/pgEdge/control-plane/server/internal/database"
"github.com/pgEdge/control-plane/server/internal/docker"
)

type Paths struct {
Expand Down Expand Up @@ -41,16 +42,16 @@ func DatabaseServiceSpec(
}

mounts := []mount.Mount{
buildMount(options.Paths.Configs, "/opt/pgedge/configs", true),
docker.BuildMount(options.Paths.Configs, "/opt/pgedge/configs", true),
// We're using a mount for the certificates instead of
// a secret because secrets can't be rotated without
// restarting the container.
buildMount(options.Paths.Certificates, "/opt/pgedge/certificates", true),
buildMount(options.Paths.Data, "/opt/pgedge/data", false),
docker.BuildMount(options.Paths.Certificates, "/opt/pgedge/certificates", true),
docker.BuildMount(options.Paths.Data, "/opt/pgedge/data", false),
}

for _, vol := range instance.ExtraVolumes {
mounts = append(mounts, buildMount(vol.HostPath, vol.DestinationPath, false))
mounts = append(mounts, docker.BuildMount(vol.HostPath, vol.DestinationPath, false))
}

return swarm.ServiceSpec{
Expand Down Expand Up @@ -119,12 +120,3 @@ func DatabaseServiceSpec(
},
}, nil
}

func buildMount(source, target string, readOnly bool) mount.Mount {
return mount.Mount{
Type: mount.TypeBind,
Source: source,
Target: target,
ReadOnly: readOnly,
}
}
1 change: 1 addition & 0 deletions server/internal/workflows/activities/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (a *Activities) Register(work *worker.Worker) error {
work.RegisterActivity(a.RestoreSpec),
work.RegisterActivity(a.UpdateDbState),
work.RegisterActivity(a.UpdateTask),
work.RegisterActivity(a.ValidateVolumes),
}
return errors.Join(errs...)
}
77 changes: 77 additions & 0 deletions server/internal/workflows/activities/validate_volumes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package activities

import (
"context"
"errors"

"github.com/cschleiden/go-workflows/activity"
"github.com/cschleiden/go-workflows/workflow"
"github.com/google/uuid"
"github.com/pgEdge/control-plane/server/internal/database"
"github.com/samber/do"
)

type ValidateVolumesInput struct {
DatabaseID uuid.UUID `json:"database_id"`
Spec *database.InstanceSpec `json:"spec"`
}

type ValidateVolumesOutput struct {
Valid bool `json:"valid"`
Errors []string `json:"errors,omitempty"`
}

func (a *Activities) ExecuteValidateVolumes(
ctx workflow.Context,
hostID uuid.UUID,
input *ValidateVolumesInput,
) workflow.Future[*ValidateVolumesOutput] {
options := workflow.ActivityOptions{
Queue: workflow.Queue(hostID.String()),
RetryOptions: workflow.RetryOptions{
MaxAttempts: 1,
},
}
return workflow.ExecuteActivity[*ValidateVolumesOutput](ctx, options, a.ValidateVolumes, input)
}

func (a *Activities) ValidateVolumes(ctx context.Context, input *ValidateVolumesInput) (*ValidateVolumesOutput, error) {
logger := activity.Logger(ctx)

fail := func(err error, msg string) (*ValidateVolumesOutput, error) {
logger.Error(msg, "error", err)
return &ValidateVolumesOutput{
Valid: false,
Errors: []string{msg + ": " + err.Error()},
}, err
}

if input == nil {
return fail(errors.New("input is nil"), "input cannot be nil")
}
if input.DatabaseID == uuid.Nil {
return fail(errors.New("invalid UUID"), "invalid database ID")
}

logger = logger.With("database_id", input.DatabaseID.String())
logger.Info("starting volume validation")

if input.Spec == nil {
return fail(errors.New("spec is nil"), "spec is required for volume validation")
}

orch, err := do.Invoke[database.Orchestrator](a.Injector)
if err != nil {
return fail(err, "failed to resolve orchestrator")
}

result, err := orch.ValidateVolumes(ctx, input.Spec)
if err != nil {
return fail(err, "volume validation failed")
}

logger.Info("volume validation completed", "success", result.Success)
return &ValidateVolumesOutput{
Valid: result.Success,
}, nil
}
Loading