Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/internal/database/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,5 @@ type Orchestrator interface {
GenerateInstanceRestoreResources(spec *InstanceSpec, taskID uuid.UUID) (*InstanceResources, error)
GetInstanceConnectionInfo(ctx context.Context, databaseID, instanceID uuid.UUID) (*ConnectionInfo, error)
CreatePgBackRestBackup(ctx context.Context, w io.Writer, instanceID uuid.UUID, options *pgbackrest.BackupOptions) error
ValidateVolumes(ctx context.Context, spec *Spec) (*ValidationResult, error)
ValidateVolumes(ctx context.Context, spec *InstanceSpec) (*ValidationResult, error)
}
2 changes: 1 addition & 1 deletion server/internal/database/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (s *Spec) NodeInstances() ([]*NodeInstances, error) {
// Create a merged PostgreSQL configuration with node-level overrides
postgresqlConf := maps.Clone(s.PostgreSQLConf)
maps.Copy(node.PostgreSQLConf, postgresqlConf)
extrernalVolumes := slices.Clone(s.ExtraVolumes)
extrernalVolumes := slices.Clone(node.ExtraVolumes)

instances := make([]*InstanceSpec, len(node.HostIDs))
for hostIdx, hostID := range node.HostIDs {
Expand Down
20 changes: 16 additions & 4 deletions server/internal/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/api/types/system"
Expand Down Expand Up @@ -139,10 +140,12 @@ func (d *Docker) ContainerRemove(ctx context.Context, containerID string, opts c
return nil
}

func (d *Docker) ContainerStop(ctx context.Context, containerID string, timeout *int) error {
return d.client.ContainerStop(ctx, containerID, container.StopOptions{
Timeout: timeout, // Default timeout for stopping a container}
})
func (d *Docker) ContainerStop(ctx context.Context, containerID string, timeoutSeconds *int) error {
err := d.client.ContainerStop(ctx, containerID, container.StopOptions{Timeout: timeoutSeconds})
if err != nil {
return fmt.Errorf("failed to stop container: %w", errTranslate(err))
}
return nil
}

func (d *Docker) Info(ctx context.Context) (system.Info, error) {
Expand Down Expand Up @@ -486,3 +489,12 @@ func errTranslate(err error) error {
}
return err
}

func BuildMount(source, target string, readOnly bool) mount.Mount {
return mount.Mount{
Type: mount.TypeBind,
Source: source,
Target: target,
ReadOnly: readOnly,
}
}
21 changes: 7 additions & 14 deletions server/internal/orchestrator/swarm/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pgEdge/control-plane/server/internal/patroni"
"github.com/pgEdge/control-plane/server/internal/pgbackrest"
"github.com/pgEdge/control-plane/server/internal/resource"
"github.com/pgEdge/control-plane/server/internal/utils"
)

var defaultVersion = host.MustPgEdgeVersion("17", "4")
Expand Down Expand Up @@ -443,17 +442,11 @@ func (o *Orchestrator) CreatePgBackRestBackup(ctx context.Context, w io.Writer,
return nil
}

func (o *Orchestrator) ValidateVolumes(ctx context.Context, spec *database.Spec) (*database.ValidationResult, error) {
var specVersion *host.PgEdgeVersion
if spec.PostgresVersion == "" || spec.SpockVersion == "" {
o.logger.Warn().Msg("PostgresVersion or SpockVersion not provided, using default version")
func (o *Orchestrator) ValidateVolumes(ctx context.Context, spec *database.InstanceSpec) (*database.ValidationResult, error) {
specVersion := spec.PgEdgeVersion
if specVersion == nil {
o.logger.Warn().Msg("PostgresVersion not provided, using default version")
specVersion = defaultVersion
} else {
var err error
specVersion, err = host.NewPgEdgeVersion(spec.PostgresVersion, spec.SpockVersion)
if err != nil {
return nil, fmt.Errorf("invalid version: %w", err)
}
}

images, err := GetImages(o.cfg, specVersion)
Expand All @@ -464,7 +457,7 @@ func (o *Orchestrator) ValidateVolumes(ctx context.Context, spec *database.Spec)
var mounts []mount.Mount
var mountTargets []string
for _, vol := range spec.ExtraVolumes {
mounts = append(mounts, utils.BuildMount(vol.HostPath, vol.DestinationPath, false))
mounts = append(mounts, docker.BuildMount(vol.HostPath, vol.DestinationPath, false))
mountTargets = append(mountTargets, vol.DestinationPath)
}

Expand Down Expand Up @@ -513,8 +506,8 @@ func (o *Orchestrator) runVolumeValidationContainer(ctx context.Context, image s
}

// Stop the container gracefully
timeout := 5
if err := o.docker.ContainerStop(ctx, containerID, &timeout); err != nil {
timeoutSeconds := 5
if err := o.docker.ContainerStop(ctx, containerID, &timeoutSeconds); err != nil {
o.logger.Warn().Err(err).Msg("graceful stop failed")
}

Expand Down
10 changes: 5 additions & 5 deletions server/internal/orchestrator/swarm/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/docker/docker/api/types/swarm"

"github.com/pgEdge/control-plane/server/internal/database"
"github.com/pgEdge/control-plane/server/internal/utils"
"github.com/pgEdge/control-plane/server/internal/docker"
)

type Paths struct {
Expand Down Expand Up @@ -42,16 +42,16 @@ func DatabaseServiceSpec(
}

mounts := []mount.Mount{
utils.BuildMount(options.Paths.Configs, "/opt/pgedge/configs", true),
docker.BuildMount(options.Paths.Configs, "/opt/pgedge/configs", true),
// We're using a mount for the certificates instead of
// a secret because secrets can't be rotated without
// restarting the container.
utils.BuildMount(options.Paths.Certificates, "/opt/pgedge/certificates", true),
utils.BuildMount(options.Paths.Data, "/opt/pgedge/data", false),
docker.BuildMount(options.Paths.Certificates, "/opt/pgedge/certificates", true),
docker.BuildMount(options.Paths.Data, "/opt/pgedge/data", false),
}

for _, vol := range instance.ExtraVolumes {
mounts = append(mounts, utils.BuildMount(vol.HostPath, vol.DestinationPath, false))
mounts = append(mounts, docker.BuildMount(vol.HostPath, vol.DestinationPath, false))
}

return swarm.ServiceSpec{
Expand Down
11 changes: 0 additions & 11 deletions server/internal/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"strings"
"time"
"unicode"

"github.com/docker/docker/api/types/mount"
)

var ErrTimedOut = errors.New("operation timed out")
Expand Down Expand Up @@ -102,12 +100,3 @@ func Clean(s string) string {
return -1
}, s)
}

func BuildMount(source, target string, readOnly bool) mount.Mount {
return mount.Mount{
Type: mount.TypeBind,
Source: source,
Target: target,
ReadOnly: readOnly,
}
}
7 changes: 4 additions & 3 deletions server/internal/workflows/activities/validate_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
)

type ValidateVolumesInput struct {
DatabaseID uuid.UUID `json:"database_id"`
Spec *database.Spec `json:"spec"`
DatabaseID uuid.UUID `json:"database_id"`
Spec *database.InstanceSpec `json:"spec"`
}

type ValidateVolumesOutput struct {
Expand All @@ -23,10 +23,11 @@ type ValidateVolumesOutput struct {

func (a *Activities) ExecuteValidateVolumes(
ctx workflow.Context,
hostID uuid.UUID,
input *ValidateVolumesInput,
) workflow.Future[*ValidateVolumesOutput] {
options := workflow.ActivityOptions{
Queue: workflow.Queue(a.Config.HostID.String()),
Queue: workflow.Queue(hostID.String()),
RetryOptions: workflow.RetryOptions{
MaxAttempts: 1,
},
Expand Down
48 changes: 36 additions & 12 deletions server/internal/workflows/validate_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,48 @@ type ValidateVolumesInput struct {
}

func (w *Workflows) ValidateSpec(ctx workflow.Context, input *ValidateVolumesInput) (*activities.ValidateVolumesOutput, error) {
logger := workflow.Logger(ctx).With("database_id", input.DatabaseID.String())
databaseID := input.DatabaseID
logger := workflow.Logger(ctx).With("database_id", databaseID.String())
logger.Info("Starting volume validation")

activityInput := &activities.ValidateVolumesInput{
DatabaseID: input.DatabaseID,
Spec: input.Spec,
nodeInstances, err := input.Spec.NodeInstances()
if err != nil {
logger.Error("Failed to get node instances", "error", err)
return nil, fmt.Errorf("failed to get node instances: %w", err)
}
var instanceFutures []workflow.Future[*activities.ValidateVolumesOutput]
for _, nodeInstance := range nodeInstances {
for _, instance := range nodeInstance.Instances {
instanceFuture := w.Activities.ExecuteValidateVolumes(ctx, instance.HostID, &activities.ValidateVolumesInput{
DatabaseID: databaseID,
Spec: instance,
})
instanceFutures = append(instanceFutures, instanceFuture)
}
}

output, err := w.Activities.ExecuteValidateVolumes(ctx, activityInput).Get(ctx)
if err != nil {
logger.Error("Volume validation activity failed", "error", err)
return output, fmt.Errorf("volume validation activity error: %w", err)
var allErrors []string
for _, instanceFuture := range instanceFutures {
output, err := instanceFuture.Get(ctx)
if err != nil {
logger.Error("Volume validation activity failed", "error", err)
allErrors = append(allErrors, fmt.Sprintf("activity error: %v", err))
continue
}

if !output.Valid {
logger.Error("Volume validation failed", "errors", output.Errors)
allErrors = append(allErrors, output.Errors...)
}
}
if !output.Valid {
logger.Error("Volume validation failed", "errors", output.Errors)
return output, fmt.Errorf("volume validation errors: %v", output.Errors)

if len(allErrors) > 0 {
return &activities.ValidateVolumesOutput{
Valid: false,
Errors: allErrors,
}, fmt.Errorf("volume validation encountered %d issues", len(allErrors))
}

logger.Info("Volume validation succeeded")
return output, nil
return &activities.ValidateVolumesOutput{Valid: true}, nil
}