Skip to content

Commit c38604e

Browse files
committed
Merge branch '314-sync-status' into 'master'
feat(engine): show continuous PGDATA synchronization details in API+CLI (#314) Closes #314 See merge request postgres-ai/database-lab!562
2 parents 75ceff7 + 0d2e1d6 commit c38604e

File tree

10 files changed

+385
-14
lines changed

10 files changed

+385
-14
lines changed

engine/api/swagger-spec/dblab_server_swagger.yaml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,8 @@ definitions:
610610
$ref: "#/definitions/Retrieving"
611611
provisioner:
612612
$ref: "#/definitions/Provisioner"
613+
synchronization:
614+
$ref: "#/definitions/Synchronization"
613615

614616
Status:
615617
type: "object"
@@ -717,6 +719,24 @@ definitions:
717719
containerConfig:
718720
type: "object"
719721

722+
Synchronization:
723+
type: "object"
724+
properties:
725+
status:
726+
$ref: "#/definitions/Status"
727+
startedAt:
728+
type: "string"
729+
format: "date-time"
730+
lastReplayedLsn:
731+
type: "string"
732+
lastReplayedLsnAt:
733+
type: "string"
734+
format: "date-time"
735+
replicationLag:
736+
type: "string"
737+
replicationUptime:
738+
type: "integer"
739+
720740
Snapshot:
721741
type: "object"
722742
properties:

engine/internal/retrieval/engine/postgres/physical/physical.go

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@ import (
1111
"fmt"
1212
"io"
1313
"os"
14+
"path/filepath"
1415
"strings"
1516
"time"
1617

1718
"github.com/docker/docker/api/types"
1819
"github.com/docker/docker/api/types/container"
1920
"github.com/docker/docker/api/types/filters"
21+
"github.com/docker/docker/api/types/mount"
2022
"github.com/docker/docker/client"
2123

2224
"github.com/pkg/errors"
@@ -207,7 +209,12 @@ func (r *RestoreJob) Run(ctx context.Context) (err error) {
207209
return errors.Wrap(err, "failed to generate PostgreSQL password")
208210
}
209211

210-
contID, err := r.startContainer(ctx, r.restoreContainerName(), r.buildContainerConfig(cont.DBLabRestoreLabel, pwd))
212+
hostConfig, err := cont.BuildHostConfig(ctx, r.dockerClient, r.fsPool.DataDir(), r.CopyOptions.ContainerConfig)
213+
if err != nil {
214+
return errors.Wrap(err, "failed to build container host config")
215+
}
216+
217+
contID, err := r.startContainer(ctx, r.restoreContainerName(), r.buildContainerConfig(cont.DBLabRestoreLabel, pwd), hostConfig)
211218
if err != nil {
212219
return err
213220
}
@@ -288,12 +295,8 @@ func (r *RestoreJob) Run(ctx context.Context) (err error) {
288295
return nil
289296
}
290297

291-
func (r *RestoreJob) startContainer(ctx context.Context, containerName string, containerConfig *container.Config) (string, error) {
292-
hostConfig, err := cont.BuildHostConfig(ctx, r.dockerClient, r.fsPool.DataDir(), r.CopyOptions.ContainerConfig)
293-
if err != nil {
294-
return "", errors.Wrap(err, "failed to build container host config")
295-
}
296-
298+
func (r *RestoreJob) startContainer(ctx context.Context, containerName string, containerConfig *container.Config,
299+
hostConfig *container.HostConfig) (string, error) {
297300
if err := tools.PullImage(ctx, r.dockerClient, r.CopyOptions.DockerImage); err != nil {
298301
return "", err
299302
}
@@ -354,7 +357,24 @@ func (r *RestoreJob) runSyncInstance(ctx context.Context) (err error) {
354357

355358
log.Msg("Starting sync instance: ", r.syncInstanceName())
356359

357-
syncInstanceID, err := r.startContainer(ctx, r.syncInstanceName(), syncInstanceConfig)
360+
hostConfig, err := cont.BuildHostConfig(ctx, r.dockerClient, r.fsPool.DataDir(), r.CopyOptions.ContainerConfig)
361+
if err != nil {
362+
return fmt.Errorf("failed to build container host config: %w", err)
363+
}
364+
365+
path := filepath.Join(r.fsPool.SocketDir(), r.syncInstanceName())
366+
if err := os.MkdirAll(path, 0755); err != nil {
367+
return fmt.Errorf("failed to make socket directory: %w", err)
368+
}
369+
370+
hostConfig.Mounts = append(hostConfig.Mounts, mount.Mount{
371+
Type: mount.TypeBind,
372+
Source: path,
373+
Target: cont.DefaultPostgresSocket,
374+
ReadOnly: false,
375+
})
376+
377+
syncInstanceID, err := r.startContainer(ctx, r.syncInstanceName(), syncInstanceConfig, hostConfig)
358378
if err != nil {
359379
return err
360380
}

engine/internal/retrieval/engine/postgres/tools/cont/container.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ const (
6262

6363
// DBLabRunner defines a label to mark runner containers.
6464
DBLabRunner = "dblab_runner"
65+
66+
// DefaultPostgresSocket defines default path for Postgres socket
67+
DefaultPostgresSocket = "/var/run/postgresql"
6568
)
6669

6770
// TODO(akartasov): Control container manager.

engine/internal/retrieval/retrieval.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@ import (
99
"context"
1010
"fmt"
1111
"os"
12+
"path/filepath"
1213
"strings"
1314
"time"
1415

16+
"github.com/docker/docker/api/types"
17+
"github.com/docker/docker/api/types/filters"
1518
"github.com/docker/docker/client"
1619
"github.com/pkg/errors"
1720
"github.com/robfig/cron/v3"
@@ -26,9 +29,11 @@ import (
2629
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/logical"
2730
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/physical"
2831
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/snapshot"
32+
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools"
2933
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools/cont"
3034
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/engine/postgres/tools/db"
3135
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/options"
36+
"gitlab.com/postgres-ai/database-lab/v3/internal/retrieval/status"
3237
"gitlab.com/postgres-ai/database-lab/v3/internal/telemetry"
3338
"gitlab.com/postgres-ai/database-lab/v3/pkg/util"
3439

@@ -690,3 +695,85 @@ func (r *Retrieval) GetStageSpec(stage string) (config.JobSpec, error) {
690695

691696
return stageSpec, nil
692697
}
698+
699+
// ReportSyncStatus return status of sync containers.
700+
func (r *Retrieval) ReportSyncStatus(ctx context.Context) (*models.Sync, error) {
701+
if r.State.Mode != models.Physical {
702+
return &models.Sync{
703+
Status: models.Status{Code: models.SyncStatusNotAvailable},
704+
}, nil
705+
}
706+
707+
filterArgs := filters.NewArgs(
708+
filters.KeyValuePair{Key: "label",
709+
Value: fmt.Sprintf("%s=%s", cont.DBLabControlLabel, cont.DBLabSyncLabel)})
710+
711+
filterArgs.Add("label", fmt.Sprintf("%s=%s", cont.DBLabInstanceIDLabel, r.engineProps.InstanceID))
712+
713+
ids, err := tools.ListContainersByLabel(ctx, r.docker, filterArgs)
714+
if err != nil {
715+
return &models.Sync{
716+
Status: models.Status{Code: models.SyncStatusError, Message: err.Error()},
717+
}, fmt.Errorf("failed to list containers by label %w", err)
718+
}
719+
720+
if len(ids) != 1 {
721+
return &models.Sync{
722+
Status: models.Status{Code: models.SyncStatusError},
723+
}, fmt.Errorf("failed to match sync container")
724+
}
725+
726+
id := ids[0]
727+
728+
sync, err := r.reportContainerSyncStatus(ctx, id)
729+
730+
return sync, err
731+
}
732+
733+
func (r *Retrieval) reportContainerSyncStatus(ctx context.Context, containerID string) (*models.Sync, error) {
734+
resp, err := r.docker.ContainerInspect(ctx, containerID)
735+
736+
if err != nil {
737+
return nil, fmt.Errorf("failed to inspect container %w", err)
738+
}
739+
740+
if resp.State == nil {
741+
return nil, fmt.Errorf("failed to read container state")
742+
}
743+
744+
if resp.State.Health != nil && resp.State.Health.Status == types.Unhealthy {
745+
// in case of Unhealthy state, add health check output to status
746+
var healthCheckOutput = ""
747+
748+
if healthCheckLength := len(resp.State.Health.Log); healthCheckLength > 0 {
749+
if lastHealthCheck := resp.State.Health.Log[healthCheckLength-1]; lastHealthCheck.ExitCode > 1 {
750+
healthCheckOutput = lastHealthCheck.Output
751+
}
752+
}
753+
754+
return &models.Sync{
755+
Status: models.Status{
756+
Code: models.SyncStatusDown,
757+
Message: healthCheckOutput,
758+
},
759+
}, nil
760+
}
761+
762+
socketPath := filepath.Join(r.poolManager.First().Pool().SocketDir(), resp.Name)
763+
value, err := status.FetchSyncMetrics(ctx, r.global, socketPath)
764+
765+
if err != nil {
766+
log.Warn("Failed to fetch synchronization metrics", err)
767+
768+
return &models.Sync{
769+
Status: models.Status{
770+
Code: models.SyncStatusError,
771+
Message: err.Error(),
772+
},
773+
}, nil
774+
}
775+
776+
value.StartedAt = resp.State.StartedAt
777+
778+
return value, nil
779+
}

engine/internal/retrieval/retrieval_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package retrieval
22

33
import (
4+
"context"
45
"os"
56
"path"
67
"testing"
@@ -117,3 +118,15 @@ func TestPendingMarker(t *testing.T) {
117118
assert.Equal(t, models.Finished, r.State.Status)
118119
})
119120
}
121+
122+
func TestSyncStatusNotReportedForLogicalMode(t *testing.T) {
123+
var r = Retrieval{
124+
State: State{
125+
Mode: models.Logical,
126+
},
127+
}
128+
status, err := r.ReportSyncStatus(context.TODO())
129+
assert.NoError(t, err)
130+
assert.NotNil(t, status)
131+
assert.Equal(t, models.SyncStatusNotAvailable, status.Status.Code)
132+
}

0 commit comments

Comments
 (0)