Skip to content
This repository was archived by the owner on Mar 31, 2025. It is now read-only.

Commit e814579

Browse files
leonardocearmru
andauthored
feat: initial implementation of the WAL service (#5)
Signed-off-by: Leonardo Cecchi <[email protected]> Signed-off-by: Armando Ruocco <[email protected]> Co-authored-by: Armando Ruocco <[email protected]>
1 parent 3f942ff commit e814579

File tree

12 files changed

+411
-2
lines changed

12 files changed

+411
-2
lines changed

.dockerignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
cloudnative-pg
2+
bin
3+
.github
4+
.git

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ RUN go mod download
88

99
# Compile the application
1010
COPY . /app
11-
RUN ./scripts/build.sh
11+
RUN --mount=type=cache,target=/root/.cache/go-build ./scripts/build.sh
1212

1313
# Step 2: build the image to be actually run
1414
FROM alpine:3.18.4

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ toolchain go1.21.6
66

77
require (
88
github.com/cloudnative-pg/cloudnative-pg v1.22.1-0.20240123130737-a22a155b9eb8
9-
github.com/cloudnative-pg/cnpg-i v0.0.0-20240122164555-5215ff219c8f
9+
github.com/cloudnative-pg/cnpg-i v0.0.0-20240124144003-4c0a1ac46426
1010
github.com/evanphx/json-patch/v5 v5.8.1
1111
github.com/go-logr/logr v1.3.0
1212
github.com/go-logr/zapr v1.2.4

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ github.com/cloudnative-pg/cloudnative-pg v1.22.1-0.20240123130737-a22a155b9eb8 h
5454
github.com/cloudnative-pg/cloudnative-pg v1.22.1-0.20240123130737-a22a155b9eb8/go.mod h1:r6blheO2ihiuqKbk6rqPN5//PPJnYtKCGT2OxpXtk2o=
5555
github.com/cloudnative-pg/cnpg-i v0.0.0-20240122164555-5215ff219c8f h1:ypwPq45y8ezzwxUTHL0VkzkT2+pcHnE4yRoeGTP8fp8=
5656
github.com/cloudnative-pg/cnpg-i v0.0.0-20240122164555-5215ff219c8f/go.mod h1:0G5GXQVj09KvONIcYURyroL74zOFGjv4eI5OXz7/G/0=
57+
github.com/cloudnative-pg/cnpg-i v0.0.0-20240124144003-4c0a1ac46426 h1:eW94u+AQoFR+KDyIenekcHWCE6Kc48mo8CgGB+VOzKU=
58+
github.com/cloudnative-pg/cnpg-i v0.0.0-20240124144003-4c0a1ac46426/go.mod h1:0G5GXQVj09KvONIcYURyroL74zOFGjv4eI5OXz7/G/0=
5759
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
5860
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
5961
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=

internal/fileutils/cp.go

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package fileutils
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"io"
7+
"os"
8+
"os/user"
9+
"path/filepath"
10+
"strings"
11+
)
12+
13+
// This implementation is based on https://github.com/nmrshll/go-cp/blob/master/cp.go
14+
15+
func replaceHomeFolder(path string) (string, error) {
16+
if !strings.HasPrefix(path, "~") {
17+
return path, nil
18+
}
19+
var buffer bytes.Buffer
20+
usr, err := user.Current()
21+
if err != nil {
22+
return "", err
23+
}
24+
_, err = buffer.WriteString(usr.HomeDir)
25+
if err != nil {
26+
return "", err
27+
}
28+
_, err = buffer.WriteString(strings.TrimPrefix(path, "~"))
29+
if err != nil {
30+
return "", err
31+
}
32+
33+
return buffer.String(), nil
34+
}
35+
36+
// AbsolutePath converts a path (relative or absolute) into an absolute one.
37+
// Supports '~' notation for $HOME directory of the current user.
38+
func AbsolutePath(path string) (string, error) {
39+
homeReplaced, err := replaceHomeFolder(path)
40+
if err != nil {
41+
return "", err
42+
}
43+
return filepath.Abs(homeReplaced)
44+
}
45+
46+
// CopyFile copies a file from src to dst. If src and dst files exist, and are
47+
// the same, then return success. Otherwise, attempt to create a hard link
48+
// between the two files. If that fails, copy the file contents from src to dst.
49+
// Creates any missing directories. Supports '~' notation for $HOME directory of the current user.
50+
func CopyFile(src, dst string) error {
51+
srcAbs, err := AbsolutePath(src)
52+
if err != nil {
53+
return err
54+
}
55+
dstAbs, err := AbsolutePath(dst)
56+
if err != nil {
57+
return err
58+
}
59+
60+
// open source file
61+
sfi, err := os.Stat(srcAbs)
62+
if err != nil {
63+
return err
64+
}
65+
if !sfi.Mode().IsRegular() {
66+
// cannot copy non-regular files (e.g., directories,
67+
// symlinks, devices, etc.)
68+
return fmt.Errorf("CopyFile: non-regular source file %s (%q)", sfi.Name(), sfi.Mode().String())
69+
}
70+
71+
// open dest file
72+
dfi, err := os.Stat(dstAbs)
73+
if err != nil && !os.IsNotExist(err) {
74+
return err
75+
}
76+
77+
if err != nil {
78+
// file doesn't exist
79+
err := os.MkdirAll(filepath.Dir(dst), 0o750)
80+
if err != nil {
81+
return err
82+
}
83+
} else {
84+
if !(dfi.Mode().IsRegular()) {
85+
return fmt.Errorf("CopyFile: non-regular destination file %s (%q)", dfi.Name(), dfi.Mode().String())
86+
}
87+
if os.SameFile(sfi, dfi) {
88+
return err
89+
}
90+
}
91+
if err = os.Link(src, dst); err == nil {
92+
return err
93+
}
94+
return copyFileContents(src, dst)
95+
}
96+
97+
// copyFileContents copies the contents of the file named src to the file named
98+
// by dst. The file will be created if it does not already exist. If the
99+
// destination file exists, all it's contents will be replaced by the contents
100+
// of the source file.
101+
func copyFileContents(src, dst string) error {
102+
// Open the source file for reading
103+
srcFile, err := os.Open(src) // nolint:gosec
104+
if err != nil {
105+
return err
106+
}
107+
defer func() {
108+
_ = srcFile.Close()
109+
}()
110+
111+
// Open the destination file for writing
112+
dstFile, err := os.Create(dst) // nolint:gosec
113+
if err != nil {
114+
return err
115+
}
116+
// Return any errors that result from closing the destination file
117+
// Will return nil if no errors occurred
118+
defer func() {
119+
cerr := dstFile.Close()
120+
if err == nil {
121+
err = cerr
122+
}
123+
}()
124+
125+
// Copy the contents of the source file into the destination files
126+
const size = 1024 * 1024
127+
buf := make([]byte, size)
128+
if _, err = io.CopyBuffer(dstFile, srcFile, buf); err != nil {
129+
return err
130+
}
131+
err = dstFile.Sync()
132+
return err
133+
}

internal/fileutils/doc.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
// Package fileutils contains a set of useful functions to manage files
2+
package fileutils

internal/wal/doc.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
// Package wal contains the implementation of the
2+
// WAL Manager server
3+
package wal

internal/wal/impl.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package wal
2+
3+
import (
4+
"context"
5+
6+
"github.com/cloudnative-pg/cnpg-i/pkg/wal"
7+
)
8+
9+
// Implementation is the implementation of the identity service
10+
type Implementation struct {
11+
wal.WALServer
12+
}
13+
14+
// GetCapabilities gets the capabilities of the WAL service
15+
func (Implementation) GetCapabilities(
16+
context.Context,
17+
*wal.WALCapabilitiesRequest,
18+
) (*wal.WALCapabilitiesResult, error) {
19+
return &wal.WALCapabilitiesResult{
20+
Capabilities: []*wal.WALCapability{
21+
{
22+
Type: &wal.WALCapability_Rpc{
23+
Rpc: &wal.WALCapability_RPC{
24+
Type: wal.WALCapability_RPC_TYPE_ARCHIVE_WAL,
25+
},
26+
},
27+
},
28+
{
29+
Type: &wal.WALCapability_Rpc{
30+
Rpc: &wal.WALCapability_RPC{
31+
Type: wal.WALCapability_RPC_TYPE_RESTORE_WAL,
32+
},
33+
},
34+
},
35+
{
36+
Type: &wal.WALCapability_Rpc{
37+
Rpc: &wal.WALCapability_RPC{
38+
Type: wal.WALCapability_RPC_TYPE_STATUS,
39+
},
40+
},
41+
},
42+
{
43+
Type: &wal.WALCapability_Rpc{
44+
Rpc: &wal.WALCapability_RPC{
45+
Type: wal.WALCapability_RPC_TYPE_SET_FIRST_REQUIRED,
46+
},
47+
},
48+
},
49+
},
50+
}, nil
51+
}

internal/wal/status.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package wal
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io/fs"
7+
"os"
8+
"path"
9+
10+
"github.com/cloudnative-pg/cnpg-i/pkg/wal"
11+
12+
"github.com/cloudnative-pg/plugin-pvc-backup/pkg/logging"
13+
"github.com/cloudnative-pg/plugin-pvc-backup/pkg/metadata"
14+
"github.com/cloudnative-pg/plugin-pvc-backup/pkg/pluginhelper"
15+
)
16+
17+
type walStatMode string
18+
19+
const (
20+
walStatModeFirst = "first"
21+
walStatModeLast = "last"
22+
)
23+
24+
// Status gets the statistics of the WAL file archive
25+
func (Implementation) Status(
26+
ctx context.Context,
27+
request *wal.WALStatusRequest,
28+
) (*wal.WALStatusResult, error) {
29+
logging := logging.FromContext(ctx)
30+
31+
helper, err := pluginhelper.NewFromCluster(metadata.Data.Name, request.ClusterDefinition)
32+
if err != nil {
33+
logging.Error(err, "Error while decoding cluster definition from CNPG")
34+
return nil, err
35+
}
36+
37+
walPath := getWALPath(helper.GetCluster().Name)
38+
logging = logging.WithValues(
39+
"walPath", walPath,
40+
"clusterName", helper.GetCluster().Name,
41+
)
42+
43+
walDirEntries, err := os.ReadDir(walPath)
44+
if err != nil {
45+
logging.Error(err, "Error while reading WALs directory")
46+
return nil, err
47+
}
48+
49+
firstWal, err := getWALStat(helper.GetCluster().Name, walDirEntries, walStatModeFirst)
50+
if err != nil {
51+
logging.Error(err, "Error while reading WALs directory (getting first WAL)")
52+
return nil, err
53+
}
54+
55+
lastWal, err := getWALStat(helper.GetCluster().Name, walDirEntries, walStatModeLast)
56+
if err != nil {
57+
logging.Error(err, "Error while reading WALs directory (getting first WAL)")
58+
return nil, err
59+
}
60+
61+
return &wal.WALStatusResult{
62+
FirstWal: firstWal,
63+
LastWal: lastWal,
64+
}, nil
65+
}
66+
67+
func getWALStat(clusterName string, entries []fs.DirEntry, mode walStatMode) (string, error) {
68+
entry, ok := getEntry(entries, mode)
69+
if !ok {
70+
return "", nil
71+
}
72+
73+
if !entry.IsDir() {
74+
return "", fmt.Errorf("%s is not a directory", entry)
75+
}
76+
77+
entryAbsolutePath := path.Join(getWALPath(clusterName), entry.Name())
78+
subFolderEntries, err := os.ReadDir(entryAbsolutePath)
79+
if err != nil {
80+
return "", fmt.Errorf("while reading %s entries: %w", entry, err)
81+
}
82+
83+
selectSubFolderEntry, ok := getEntry(subFolderEntries, mode)
84+
if !ok {
85+
return "", nil
86+
}
87+
88+
return selectSubFolderEntry.Name(), nil
89+
}
90+
91+
func getEntry(entries []fs.DirEntry, mode walStatMode) (fs.DirEntry, bool) {
92+
if len(entries) == 0 {
93+
return nil, false
94+
}
95+
96+
switch mode {
97+
case walStatModeFirst:
98+
return entries[0], true
99+
100+
case walStatModeLast:
101+
return entries[len(entries)-1], true
102+
103+
default:
104+
return nil, false
105+
}
106+
}

internal/wal/utils.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package wal
2+
3+
import "path"
4+
5+
func getWalPrefix(walName string) string {
6+
return walName[0:16]
7+
}
8+
9+
func getClusterPath(clusterName string) string {
10+
return path.Join(basePath, clusterName)
11+
}
12+
13+
func getWALPath(clusterName string) string {
14+
return path.Join(
15+
getClusterPath(clusterName),
16+
walsDirectory,
17+
)
18+
}
19+
20+
func getWALFilePath(clusterName string, walName string) string {
21+
return path.Join(
22+
getWALPath(clusterName),
23+
getWalPrefix(walName),
24+
walName,
25+
)
26+
}

0 commit comments

Comments
 (0)