Skip to content

Commit 4ac0465

Browse files
committed
feat: port conflict validation
Adapts the `ValidateInstanceSpec` activity to take and validate multiple specs in order to detect when a port has been allocated multiple times on a single host. Note that this will not consistently detect when a port has been allocated multiple times on a machine when there are multiple control-plane instances with different host IDs on the same machine - as in our development configuration. End users should not run the control plane this way, so we donn't need to handle that case explicitly. PLAT-86
1 parent e46b22d commit 4ac0465

File tree

8 files changed

+220
-139
lines changed

8 files changed

+220
-139
lines changed

server/internal/database/orchestrator.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@ type InstanceResources struct {
2020
}
2121

2222
type ValidationResult struct {
23-
Valid bool
24-
Error string
23+
InstanceID string `json:"instance_id"`
24+
HostID string `json:"host_id"`
25+
NodeName string `json:"node_name"`
26+
Valid bool `json:"valid"`
27+
Errors []string `json:"errors"`
2528
}
2629

2730
func NewInstanceResources(instance *InstanceResource, resources []resource.Resource) (*InstanceResources, error) {
@@ -88,5 +91,5 @@ type Orchestrator interface {
8891
GenerateInstanceRestoreResources(spec *InstanceSpec, taskID uuid.UUID) (*InstanceResources, error)
8992
GetInstanceConnectionInfo(ctx context.Context, databaseID, instanceID string) (*ConnectionInfo, error)
9093
CreatePgBackRestBackup(ctx context.Context, w io.Writer, instanceID string, options *pgbackrest.BackupOptions) error
91-
ValidateInstanceSpec(ctx context.Context, spec *InstanceSpec) (*ValidationResult, error)
94+
ValidateInstanceSpecs(ctx context.Context, specs []*InstanceSpec) ([]*ValidationResult, error)
9295
}

server/internal/docker/docker.go

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"io"
88
"net/netip"
9+
"regexp"
910
"strings"
1011
"time"
1112

@@ -527,20 +528,28 @@ func ExtractBindMountErrorMsg(err error) string {
527528
return strings.TrimPrefix(msg[idx:], bindMountErrPrefix)
528529
}
529530

530-
// Another internal error message:
531+
// ExtractPortErrorMsg extracts the port bind error message from the given error
532+
// if it is a port bind or allocation error. Otherwise, returns an empty string.
533+
func ExtractPortErrorMsg(err error) string {
534+
if err == nil {
535+
return ""
536+
}
537+
if msg := extractPortBindErrorMsg(err); msg != "" {
538+
return msg
539+
}
540+
if msg := extractPortAlreadyAllocatedErrorMsg(err); msg != "" {
541+
return msg
542+
}
543+
return ""
544+
}
545+
546+
// More internal error messages:
531547
// https://github.com/moby/moby/blob/cab4ac834e8bf36aa38a2ca49599773df6e6805a/libnetwork/drivers/bridge/port_mapping_linux.go#L622-L627
532548
// This one is less stable, so we'll do our best. In the worst case, we return a
533549
// 500 with a longer error message, which will still be helpful to the user.
534550
const portBindErrPrefix = `failed to bind`
535551

536-
// ExtractPortBindError extracts the port bind error message from the given
537-
// error if it is a port bind error. Otherwise, returns an empty string.
538-
// ExtractBindError extracts the bind error message from the given error if it
539-
// is a bind error. Otherwise, returns an empty string.
540-
func ExtractPortBindErrorMsg(err error) string {
541-
if err == nil {
542-
return ""
543-
}
552+
func extractPortBindErrorMsg(err error) string {
544553
msg := err.Error()
545554
idx := strings.Index(msg, portBindErrPrefix)
546555
if idx < 0 {
@@ -550,6 +559,14 @@ func ExtractPortBindErrorMsg(err error) string {
550559
return msg[idx:]
551560
}
552561

562+
// https://github.com/moby/moby/blob/9b4f68d64cde951e5b985a0c589f16f1416d3968/libnetwork/portallocator/portallocator.go#L33
563+
// This message has been stable for 10 years.
564+
var portAlreadyAllocatedPattern = regexp.MustCompile(`Bind for .* failed: port is already allocated`)
565+
566+
func extractPortAlreadyAllocatedErrorMsg(err error) string {
567+
return portAlreadyAllocatedPattern.FindString(err.Error())
568+
}
569+
553570
// The docker errors are annoying to check further up in the stack since they
554571
// rely on type checks. Wrapping them in our own errors makes it easier for
555572
// callers to explicitly handle specific errors.

server/internal/orchestrator/swarm/orchestrator.go

Lines changed: 62 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/pgEdge/control-plane/server/internal/config"
2727
"github.com/pgEdge/control-plane/server/internal/database"
2828
"github.com/pgEdge/control-plane/server/internal/docker"
29+
"github.com/pgEdge/control-plane/server/internal/ds"
2930
"github.com/pgEdge/control-plane/server/internal/filesystem"
3031
"github.com/pgEdge/control-plane/server/internal/host"
3132
"github.com/pgEdge/control-plane/server/internal/patroni"
@@ -448,10 +449,40 @@ func (o *Orchestrator) CreatePgBackRestBackup(ctx context.Context, w io.Writer,
448449
return nil
449450
}
450451

451-
func (o *Orchestrator) ValidateInstanceSpec(ctx context.Context, spec *database.InstanceSpec) (*database.ValidationResult, error) {
452+
func (o *Orchestrator) ValidateInstanceSpecs(ctx context.Context, specs []*database.InstanceSpec) ([]*database.ValidationResult, error) {
453+
results := make([]*database.ValidationResult, len(specs))
454+
455+
occupiedPorts := ds.NewSet[int]()
456+
for i, instance := range specs {
457+
result := &database.ValidationResult{
458+
InstanceID: instance.InstanceID,
459+
HostID: instance.HostID,
460+
NodeName: instance.NodeName,
461+
Valid: true,
462+
}
463+
if instance.Port > 0 {
464+
if occupiedPorts.Has(instance.Port) {
465+
result.Valid = false
466+
result.Errors = append(
467+
result.Errors,
468+
fmt.Sprintf("port %d allocated to multiple instances on this host", instance.Port),
469+
)
470+
}
471+
occupiedPorts.Add(instance.Port)
472+
}
473+
if err := o.validateInstanceSpec(ctx, instance, result); err != nil {
474+
return nil, err
475+
}
476+
results[i] = result
477+
}
478+
479+
return results, nil
480+
}
481+
482+
func (o *Orchestrator) validateInstanceSpec(ctx context.Context, spec *database.InstanceSpec, result *database.ValidationResult) error {
452483
// Short-circuit if there's nothing to validate
453484
if len(spec.ExtraVolumes) < 1 && spec.Port == 0 {
454-
return &database.ValidationResult{Valid: true}, nil
485+
return nil
455486
}
456487

457488
specVersion := spec.PgEdgeVersion
@@ -462,7 +493,7 @@ func (o *Orchestrator) ValidateInstanceSpec(ctx context.Context, spec *database.
462493

463494
images, err := GetImages(o.cfg, specVersion)
464495
if err != nil {
465-
return nil, fmt.Errorf("image fetch error: %w", err)
496+
return fmt.Errorf("image fetch error: %w", err)
466497
}
467498

468499
var mounts []mount.Mount
@@ -474,30 +505,23 @@ func (o *Orchestrator) ValidateInstanceSpec(ctx context.Context, spec *database.
474505

475506
cmd := buildVolumeCheckCommand(mountTargets)
476507
output, err := o.runValidationContainer(ctx, images.PgEdgeImage, cmd, mounts, spec.Port)
477-
if msg := docker.ExtractBindMountErrorMsg(err); msg != "" {
478-
return &database.ValidationResult{
479-
Valid: false,
480-
Error: msg,
481-
}, nil
482-
}
483-
if msg := docker.ExtractPortBindErrorMsg(err); msg != "" {
484-
return &database.ValidationResult{
485-
Valid: false,
486-
Error: msg,
487-
}, nil
488-
}
489-
if err != nil {
490-
return nil, err
491-
}
492-
493-
if len(output) > 0 {
494-
return &database.ValidationResult{
495-
Valid: false,
496-
Error: output,
497-
}, nil
508+
bindMsg := docker.ExtractBindMountErrorMsg(err)
509+
portMsg := docker.ExtractPortErrorMsg(err)
510+
switch {
511+
case bindMsg != "":
512+
result.Valid = false
513+
result.Errors = append(result.Errors, bindMsg)
514+
case portMsg != "":
515+
result.Valid = false
516+
result.Errors = append(result.Errors, portMsg)
517+
case err != nil:
518+
return err
519+
case len(output) > 0:
520+
result.Valid = false
521+
result.Errors = append(result.Errors, output)
498522
}
499523

500-
return &database.ValidationResult{Valid: true}, nil
524+
return nil
501525
}
502526

503527
func (o *Orchestrator) runValidationContainer(ctx context.Context, image string, cmd []string, mounts []mount.Mount, port int) (string, error) {
@@ -563,6 +587,18 @@ func validationContainerOpts(image string, cmd []string, mounts []mount.Mount, p
563587
return opts
564588
}
565589

590+
const cmdTemplate = `
591+
for d in %s; do
592+
if [ ! -d "$d" ]; then
593+
echo "$d is not a directory"
594+
fi
595+
done
596+
`
597+
566598
func buildVolumeCheckCommand(mountTargets []string) []string {
567-
return []string{"sh", "-c", fmt.Sprintf(`for d in %s; do if [ -d "$d" ]; then echo "$d is not a directory"; fi; done"`, strings.Join(mountTargets, " "))}
599+
if len(mountTargets) < 1 {
600+
return []string{"true"}
601+
}
602+
603+
return []string{"sh", "-c", fmt.Sprintf(cmdTemplate, strings.Join(mountTargets, " "))}
568604
}

server/internal/workflows/activities/activities.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func (a *Activities) Register(work *worker.Worker) error {
3434
work.RegisterActivity(a.RestoreSpec),
3535
work.RegisterActivity(a.UpdateDbState),
3636
work.RegisterActivity(a.UpdateTask),
37-
work.RegisterActivity(a.ValidateInstanceSpec),
37+
work.RegisterActivity(a.ValidateInstanceSpecs),
3838
}
3939
return errors.Join(errs...)
4040
}

server/internal/workflows/activities/validate_instance_spec.go

Lines changed: 0 additions & 69 deletions
This file was deleted.
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package activities
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
8+
"github.com/cschleiden/go-workflows/activity"
9+
"github.com/cschleiden/go-workflows/workflow"
10+
"github.com/pgEdge/control-plane/server/internal/database"
11+
)
12+
13+
type ValidateInstanceSpecsInput struct {
14+
DatabaseID string `json:"database_id"`
15+
Specs []*database.InstanceSpec `json:"spec"`
16+
}
17+
18+
type ValidateInstanceSpecsOutput struct {
19+
Results []*database.ValidationResult `json:"results"`
20+
}
21+
22+
func (a *Activities) ExecuteValidateInstanceSpecs(
23+
ctx workflow.Context,
24+
hostID string,
25+
input *ValidateInstanceSpecsInput,
26+
) workflow.Future[*ValidateInstanceSpecsOutput] {
27+
options := workflow.ActivityOptions{
28+
Queue: workflow.Queue(hostID),
29+
RetryOptions: workflow.RetryOptions{
30+
MaxAttempts: 1,
31+
},
32+
}
33+
return workflow.ExecuteActivity[*ValidateInstanceSpecsOutput](ctx, options, a.ValidateInstanceSpecs, input)
34+
}
35+
36+
func (a *Activities) ValidateInstanceSpecs(ctx context.Context, input *ValidateInstanceSpecsInput) (*ValidateInstanceSpecsOutput, error) {
37+
logger := activity.Logger(ctx)
38+
39+
if input == nil {
40+
return nil, errors.New("input is nil")
41+
}
42+
43+
logger = logger.With(
44+
"database_id", input.DatabaseID,
45+
"host_id", a.Config.HostID,
46+
)
47+
logger.Info("starting instance spec validation")
48+
49+
results, err := a.Orchestrator.ValidateInstanceSpecs(ctx, input.Specs)
50+
if err != nil {
51+
return nil, fmt.Errorf("instance spec validation failed: %w", err)
52+
}
53+
54+
logger.Info("instance spec validation completed")
55+
return &ValidateInstanceSpecsOutput{
56+
Results: results,
57+
}, nil
58+
}

server/internal/workflows/service.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -251,18 +251,14 @@ func (s *Service) ValidateSpec(ctx context.Context, spec *database.Spec) (*Valid
251251

252252
instance, err := s.client.CreateWorkflowInstance(ctx, opts, s.workflows.ValidateSpec, input)
253253
if err != nil {
254-
s.logger.Error().Err(err).Str("database_id", databaseID).Msg("failed to create volume validation workflow")
254+
s.logger.Error().Err(err).Str("database_id", databaseID).Msg("failed to create spec validation workflow")
255255
return nil, fmt.Errorf("failed to create workflow instance: %w", err)
256256
}
257257

258258
output, err := client.GetWorkflowResult[*ValidateSpecOutput](ctx, s.client, instance, 5*time.Minute)
259259
if err != nil {
260-
261-
}
262-
263-
if err != nil {
264-
s.logger.Error().Err(err).Str("database_id", databaseID).Msg("volume validation workflow failed")
265-
return nil, fmt.Errorf("volume validation workflow failed: %w", err)
260+
s.logger.Error().Err(err).Str("database_id", databaseID).Msg("spec validation workflow failed")
261+
return nil, fmt.Errorf("spec validation workflow failed: %w", err)
266262
}
267263

268264
return output, nil

0 commit comments

Comments
 (0)