Skip to content

Commit 19aa9e5

Browse files
committed
feat: parallel WAL upload (#45)
While parallel upload was documented as supported, it actually wasn't. Added some e2e tests for it although this kind of concurrency is tricky to validate. Signed-off-by: Szymon Soloch <[email protected]>
1 parent 36d0ba2 commit 19aa9e5

File tree

11 files changed

+597
-5
lines changed

11 files changed

+597
-5
lines changed

internal/cnpgi/common/wal.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,14 +136,41 @@ func (w WALServiceImplementation) Archive(
136136
if err != nil {
137137
return nil, err
138138
}
139-
walList := arch.GatherWALFilesToArchive(ctx, request.GetSourceFileName(), 1)
139+
140+
maxParallel := 1
141+
if archive.Spec.Configuration.Wal != nil && archive.Spec.Configuration.Wal.MaxParallel > 1 {
142+
maxParallel = archive.Spec.Configuration.Wal.MaxParallel
143+
}
144+
145+
walList := arch.GatherWALFilesToArchive(ctx, request.GetSourceFileName(), maxParallel)
146+
147+
// Log the batch of WAL files prepared for archiving
148+
contextLogger.Info("WAL archive batch prepared",
149+
"requestedWalFile", request.GetSourceFileName(),
150+
"maxParallel", maxParallel,
151+
"walFiles", walList)
152+
140153
result := arch.ArchiveList(ctx, walList, options)
141154
for _, archiverResult := range result {
142155
if archiverResult.Err != nil {
143156
return nil, archiverResult.Err
144157
}
145158
}
146159

160+
// Log the completion of the archive batch
161+
successfulArchives := 0
162+
for _, archiverResult := range result {
163+
if archiverResult.Err == nil {
164+
successfulArchives++
165+
}
166+
}
167+
168+
contextLogger.Info("WAL archive batch completed",
169+
"requestedWalFile", request.GetSourceFileName(),
170+
"maxParallel", maxParallel,
171+
"successfulArchives", successfulArchives,
172+
"failedArchives", len(walList)-successfulArchives)
173+
147174
return &wal.WALArchiveResult{}, nil
148175
}
149176

internal/pgbackrest/archiver/command_test.go

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package archiver
1919

2020
import (
2121
"os"
22+
"path/filepath"
2223
"strings"
2324

2425
pgbackrestApi "github.com/operasoftware/cnpg-plugin-pgbackrest/internal/pgbackrest/api"
@@ -75,3 +76,157 @@ var _ = Describe("pgbackrestWalArchiveOptions", func() {
7576
))
7677
})
7778
})
79+
80+
var _ = Describe("GatherWALFilesToArchive", func() {
81+
var tempPgData string
82+
var archiveStatusDir string
83+
var archiver *WALArchiver
84+
85+
BeforeEach(func(ctx SpecContext) {
86+
var err error
87+
88+
tempPgData, err = os.MkdirTemp("", "pgdata-test-*")
89+
Expect(err).ToNot(HaveOccurred())
90+
91+
// Archiver uses the env variable to determine directory root.
92+
os.Setenv("PGDATA", tempPgData)
93+
94+
archiveStatusDir = filepath.Join(tempPgData, "pg_wal", "archive_status")
95+
err = os.MkdirAll(archiveStatusDir, 0755)
96+
Expect(err).ToNot(HaveOccurred())
97+
98+
tempEmptyWalArchivePath := filepath.Join(tempPgData, "empty-wal-archive")
99+
_, err = os.Create(tempEmptyWalArchivePath)
100+
Expect(err).ToNot(HaveOccurred())
101+
102+
archiver, err = New(ctx, nil, filepath.Join(tempPgData, "spool"), tempPgData, tempEmptyWalArchivePath)
103+
Expect(err).ToNot(HaveOccurred())
104+
})
105+
106+
AfterEach(func() {
107+
// Clean up temp directory
108+
if tempPgData != "" {
109+
os.RemoveAll(tempPgData)
110+
}
111+
os.Unsetenv("PGDATA")
112+
})
113+
114+
// Helper function to create .ready files
115+
createReadyFile := func(walName string) {
116+
path := filepath.Join(archiveStatusDir, walName+".ready")
117+
err := os.WriteFile(path, []byte{}, 0644)
118+
Expect(err).ToNot(HaveOccurred())
119+
}
120+
121+
Context("when parallel=1", func() {
122+
It("should gather only the requested file", func(ctx SpecContext) {
123+
// Create .ready files for multiple WAL files
124+
createReadyFile("000000010000000000000001")
125+
createReadyFile("000000010000000000000002")
126+
createReadyFile("000000010000000000000003")
127+
128+
walList := archiver.GatherWALFilesToArchive(ctx, "pg_wal/000000010000000000000001", 1)
129+
130+
Expect(walList).To(HaveLen(1))
131+
Expect(walList[0]).To(Equal("pg_wal/000000010000000000000001"))
132+
})
133+
134+
It("should handle when no other .ready files exist", func(ctx SpecContext) {
135+
// Only create the requested file
136+
createReadyFile("000000010000000000000001")
137+
138+
walList := archiver.GatherWALFilesToArchive(ctx, "pg_wal/000000010000000000000001", 1)
139+
140+
Expect(walList).To(HaveLen(1))
141+
Expect(walList[0]).To(Equal("pg_wal/000000010000000000000001"))
142+
})
143+
})
144+
145+
Context("when parallel>1", func() {
146+
It("should gather multiple files when parallel=4", func(ctx SpecContext) {
147+
// Create .ready files for multiple WAL files
148+
createReadyFile("000000010000000000000001")
149+
createReadyFile("000000010000000000000002")
150+
createReadyFile("000000010000000000000003")
151+
createReadyFile("000000010000000000000004")
152+
153+
walList := archiver.GatherWALFilesToArchive(ctx, "pg_wal/000000010000000000000001", 4)
154+
155+
Expect(walList).To(HaveLen(4))
156+
Expect(walList[0]).To(Equal("pg_wal/000000010000000000000001"))
157+
})
158+
159+
It("should not exceed parallel limit even when more files are ready", func(ctx SpecContext) {
160+
// Create many .ready files
161+
for i := 1; i <= 10; i++ {
162+
createReadyFile("00000001000000000000000" + string(rune('0'+i)))
163+
}
164+
165+
walList := archiver.GatherWALFilesToArchive(ctx, "pg_wal/000000010000000000000001", 3)
166+
167+
Expect(walList).To(HaveLen(3))
168+
})
169+
170+
It("should handle when fewer files exist than parallel limit", func(ctx SpecContext) {
171+
// Create only 2 .ready files but request parallel=5
172+
createReadyFile("000000010000000000000001")
173+
createReadyFile("000000010000000000000002")
174+
175+
walList := archiver.GatherWALFilesToArchive(ctx, "pg_wal/000000010000000000000001", 5)
176+
177+
// Should only get the files that exist
178+
Expect(walList).To(HaveLen(2))
179+
Expect(walList[0]).To(Equal("pg_wal/000000010000000000000001"))
180+
})
181+
})
182+
183+
Context("edge cases", func() {
184+
It("should handle empty archive_status directory", func(ctx SpecContext) {
185+
// Don't create any .ready files
186+
187+
walList := archiver.GatherWALFilesToArchive(ctx, "pg_wal/000000010000000000000001", 3)
188+
189+
// Should still return the requested file
190+
Expect(walList).To(HaveLen(1))
191+
Expect(walList[0]).To(Equal("pg_wal/000000010000000000000001"))
192+
})
193+
194+
})
195+
196+
Context("other files in directory", func() {
197+
It("should ignore non-.ready files in archive_status", func(ctx SpecContext) {
198+
// Create .ready files
199+
createReadyFile("000000010000000000000001")
200+
createReadyFile("000000010000000000000002")
201+
202+
// Create .done files (should be ignored)
203+
donePath := filepath.Join(archiveStatusDir, "000000010000000000000003.done")
204+
err := os.WriteFile(donePath, []byte{}, 0644)
205+
Expect(err).ToNot(HaveOccurred())
206+
207+
// Create a random file (should be ignored)
208+
randomPath := filepath.Join(archiveStatusDir, "random.txt")
209+
err = os.WriteFile(randomPath, []byte{}, 0644)
210+
Expect(err).ToNot(HaveOccurred())
211+
212+
walList := archiver.GatherWALFilesToArchive(ctx, "pg_wal/000000010000000000000001", 5)
213+
214+
// Should only get .ready files, not .done or other files
215+
Expect(len(walList)).To(BeNumerically("<=", 2))
216+
for _, wal := range walList {
217+
Expect(wal).NotTo(ContainSubstring("000000010000000000000003"))
218+
Expect(wal).NotTo(ContainSubstring("random"))
219+
}
220+
})
221+
It("should handle timeline history files", func(ctx SpecContext) {
222+
// Create timeline history file
223+
createReadyFile("00000002.history")
224+
createReadyFile("000000010000000000000001")
225+
226+
walList := archiver.GatherWALFilesToArchive(ctx, "pg_wal/00000002.history", 2)
227+
228+
Expect(walList).To(HaveLen(2))
229+
Expect(walList[0]).To(Equal("pg_wal/00000002.history"))
230+
})
231+
})
232+
})

test/e2e/e2e_suite_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/operasoftware/cnpg-plugin-pgbackrest/test/e2e/internal/kustomize"
3535

3636
_ "github.com/operasoftware/cnpg-plugin-pgbackrest/test/e2e/internal/tests/backup"
37+
_ "github.com/operasoftware/cnpg-plugin-pgbackrest/test/e2e/internal/tests/parallelarchive"
3738
_ "github.com/operasoftware/cnpg-plugin-pgbackrest/test/e2e/internal/tests/replicacluster"
3839

3940
. "github.com/onsi/ginkgo/v2"

test/e2e/internal/logs/doc.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
Copyright 2025, Opera Norway AS
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package logs provides utilities for retrieving and parsing pod logs in e2e tests
18+
package logs

test/e2e/internal/logs/logs.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
Copyright 2025, Opera Norway AS
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package logs
18+
19+
import (
20+
"bufio"
21+
"context"
22+
"encoding/json"
23+
"fmt"
24+
"strings"
25+
26+
corev1 "k8s.io/api/core/v1"
27+
"k8s.io/client-go/kubernetes"
28+
)
29+
30+
// GetPodContainerLogs retrieves logs from a specific container in a pod and parses them as JSON.
31+
// All logs are expected to be valid structured JSON. Returns an error if any line is not valid JSON.
32+
func GetPodContainerLogs(
33+
ctx context.Context,
34+
clientSet *kubernetes.Clientset,
35+
namespace, podName, containerName string,
36+
sinceSeconds *int64,
37+
) ([]map[string]any, error) {
38+
podLogOpts := corev1.PodLogOptions{
39+
Container: containerName,
40+
}
41+
if sinceSeconds != nil {
42+
podLogOpts.SinceSeconds = sinceSeconds
43+
}
44+
45+
req := clientSet.CoreV1().Pods(namespace).GetLogs(podName, &podLogOpts)
46+
stream, err := req.Stream(ctx)
47+
if err != nil {
48+
return nil, fmt.Errorf("error opening log stream: %w", err)
49+
}
50+
defer stream.Close()
51+
52+
var logEntries []map[string]any
53+
scanner := bufio.NewScanner(stream)
54+
lineNum := 0
55+
for scanner.Scan() {
56+
lineNum++
57+
line := scanner.Text()
58+
59+
// Skip empty lines
60+
if strings.TrimSpace(line) == "" {
61+
continue
62+
}
63+
64+
// Check if line is JSON
65+
if !strings.HasPrefix(line, "{") {
66+
return nil, fmt.Errorf("non-JSON line at line %d: %s", lineNum, line)
67+
}
68+
69+
var logEntry map[string]any
70+
if err := json.Unmarshal([]byte(line), &logEntry); err != nil {
71+
return nil, fmt.Errorf("invalid JSON at line %d: %w", lineNum, err)
72+
}
73+
74+
logEntries = append(logEntries, logEntry)
75+
}
76+
77+
if err := scanner.Err(); err != nil {
78+
return nil, fmt.Errorf("error reading logs: %w", err)
79+
}
80+
81+
return logEntries, nil
82+
}
83+
84+
// FindArchiveBatches finds "WAL archive batch prepared" log entries and returns the parsed data.
85+
// Each returned map contains the structured log fields.
86+
func FindArchiveBatches(logEntries []map[string]any) []map[string]any {
87+
var batches []map[string]any
88+
89+
for _, logEntry := range logEntries {
90+
// Check if this is a "WAL archive batch prepared" message
91+
if msg, ok := logEntry["msg"].(string); ok && msg == "WAL archive batch prepared" {
92+
batches = append(batches, logEntry)
93+
}
94+
}
95+
96+
return batches
97+
}
98+
99+
// FindArchiveBatchCompletions finds "WAL archive batch completed" log entries and returns the parsed data.
100+
// Each returned map contains the structured log fields.
101+
func FindArchiveBatchCompletions(logEntries []map[string]any) []map[string]any {
102+
var batches []map[string]any
103+
104+
for _, logEntry := range logEntries {
105+
// Check if this is a "WAL archive batch completed" message
106+
if msg, ok := logEntry["msg"].(string); ok && msg == "WAL archive batch completed" {
107+
batches = append(batches, logEntry)
108+
}
109+
}
110+
111+
return batches
112+
}

test/e2e/internal/objectstore/minio.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ func newMinioPVC(namespace, name string) *corev1.PersistentVolumeClaim {
300300
}
301301

302302
// NewMinioArchive creates a new Archive configured to use the Minio object store.
303-
func NewMinioArchive(namespace, name, minioOSName string) *pluginPgbackrestV1.Archive {
303+
func NewMinioArchive(namespace, name, minioOSName string, maxParallel int) *pluginPgbackrestV1.Archive {
304304
return &pluginPgbackrestV1.Archive{
305305
TypeMeta: metav1.TypeMeta{
306306
Kind: "Archive",
@@ -312,6 +312,9 @@ func NewMinioArchive(namespace, name, minioOSName string) *pluginPgbackrestV1.Ar
312312
},
313313
Spec: pluginPgbackrestV1.ArchiveSpec{
314314
Configuration: pgbackrestApi.PgbackrestConfiguration{
315+
Wal: &pgbackrestApi.WalBackupConfiguration{
316+
MaxParallel: maxParallel,
317+
},
315318
Repositories: []pgbackrestApi.PgbackrestRepository{
316319
{
317320
PgbackrestCredentials: pgbackrestApi.PgbackrestCredentials{

test/e2e/internal/tests/backup/fixtures.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (s s3BackupPluginBackupPluginRestore) createBackupRestoreTestResources(
6868
result := backupRestoreTestResources{}
6969

7070
result.ObjectStoreResources = objectstore.NewMinioObjectStoreResources(namespace, minio)
71-
result.Archive = objectstore.NewMinioArchive(namespace, archiveName, minio)
71+
result.Archive = objectstore.NewMinioArchive(namespace, archiveName, minio, 1)
7272
result.SrcCluster = newSrcClusterWithPlugin(namespace)
7373
result.SrcBackup = newSrcPluginBackup(namespace)
7474
result.DstCluster = newDstClusterWithPlugin(namespace)
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
Copyright 2025, Opera Norway AS
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package parallelarchive contains e2e tests for parallel WAL archiving functionality
18+
package parallelarchive

0 commit comments

Comments
 (0)