Skip to content

Commit 23d7807

Browse files
committed
feat: parallel WAL upload (#45)
While parallel upload was documented as supported, it actually wasn't. Added some e2e tests for it although this kind of concurrency is tricky to validate. Signed-off-by: Szymon Soloch <[email protected]>
1 parent 36d0ba2 commit 23d7807

File tree

11 files changed

+603
-7
lines changed

11 files changed

+603
-7
lines changed

internal/cnpgi/common/wal.go

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,14 +136,41 @@ func (w WALServiceImplementation) Archive(
136136
if err != nil {
137137
return nil, err
138138
}
139-
walList := arch.GatherWALFilesToArchive(ctx, request.GetSourceFileName(), 1)
139+
140+
maxParallel := 1
141+
if archive.Spec.Configuration.Wal != nil && archive.Spec.Configuration.Wal.MaxParallel > 1 {
142+
maxParallel = archive.Spec.Configuration.Wal.MaxParallel
143+
}
144+
145+
walList := arch.GatherWALFilesToArchive(ctx, request.GetSourceFileName(), maxParallel)
146+
147+
// Log the batch of WAL files prepared for archiving
148+
contextLogger.Info("WAL archive batch prepared",
149+
"requestedWalFile", request.GetSourceFileName(),
150+
"maxParallel", maxParallel,
151+
"walFiles", walList)
152+
140153
result := arch.ArchiveList(ctx, walList, options)
154+
successfulArchives := 0
155+
var lastErr error
141156
for _, archiverResult := range result {
142-
if archiverResult.Err != nil {
143-
return nil, archiverResult.Err
157+
if archiverResult.Err == nil {
158+
successfulArchives++
159+
} else {
160+
lastErr = archiverResult.Err
144161
}
145162
}
146163

164+
contextLogger.Info("WAL archive batch completed",
165+
"requestedWalFile", request.GetSourceFileName(),
166+
"maxParallel", maxParallel,
167+
"successfulArchives", successfulArchives,
168+
"failedArchives", len(walList)-successfulArchives)
169+
170+
if lastErr != nil {
171+
return nil, lastErr
172+
}
173+
147174
return &wal.WALArchiveResult{}, nil
148175
}
149176

internal/pgbackrest/archiver/command_test.go

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package archiver
1919

2020
import (
2121
"os"
22+
"path/filepath"
2223
"strings"
2324

2425
pgbackrestApi "github.com/operasoftware/cnpg-plugin-pgbackrest/internal/pgbackrest/api"
@@ -75,3 +76,161 @@ var _ = Describe("pgbackrestWalArchiveOptions", func() {
7576
))
7677
})
7778
})
79+
80+
var _ = Describe("GatherWALFilesToArchive", func() {
81+
var tempPgData string
82+
var archiveStatusDir string
83+
var archiver *WALArchiver
84+
85+
BeforeEach(func(ctx SpecContext) {
86+
var err error
87+
88+
tempPgData, err = os.MkdirTemp("", "pgdata-test-*")
89+
Expect(err).ToNot(HaveOccurred())
90+
91+
// Archiver uses the env variable to determine directory root.
92+
os.Setenv("PGDATA", tempPgData)
93+
94+
archiveStatusDir = filepath.Join(tempPgData, "pg_wal", "archive_status")
95+
err = os.MkdirAll(archiveStatusDir, 0755)
96+
Expect(err).ToNot(HaveOccurred())
97+
98+
tempEmptyWalArchivePath := filepath.Join(tempPgData, "empty-wal-archive")
99+
_, err = os.Create(tempEmptyWalArchivePath)
100+
Expect(err).ToNot(HaveOccurred())
101+
102+
archiver, err = New(ctx, nil, filepath.Join(tempPgData, "spool"), tempPgData, tempEmptyWalArchivePath)
103+
Expect(err).ToNot(HaveOccurred())
104+
})
105+
106+
AfterEach(func() {
107+
// Clean up temp directory
108+
if tempPgData != "" {
109+
os.RemoveAll(tempPgData)
110+
}
111+
os.Unsetenv("PGDATA")
112+
})
113+
114+
// Helper function to create .ready files
115+
createReadyFile := func(walName string) {
116+
path := filepath.Join(archiveStatusDir, walName+".ready")
117+
err := os.WriteFile(path, []byte{}, 0644)
118+
Expect(err).ToNot(HaveOccurred())
119+
}
120+
121+
Context("when parallel=1", func() {
122+
It("should gather only the requested file", func(ctx SpecContext) {
123+
// Create .ready files for multiple WAL files
124+
createReadyFile("000000010000000000000001")
125+
createReadyFile("000000010000000000000002")
126+
createReadyFile("000000010000000000000003")
127+
128+
walList := archiver.GatherWALFilesToArchive(ctx, "pg_wal/000000010000000000000001", 1)
129+
130+
Expect(walList).To(ConsistOf("pg_wal/000000010000000000000001"))
131+
})
132+
133+
It("should handle when no other .ready files exist", func(ctx SpecContext) {
134+
// Only create the requested file
135+
createReadyFile("000000010000000000000001")
136+
137+
walList := archiver.GatherWALFilesToArchive(ctx, "pg_wal/000000010000000000000001", 1)
138+
139+
Expect(walList).To(ConsistOf("pg_wal/000000010000000000000001"))
140+
})
141+
})
142+
143+
Context("when parallel>1", func() {
144+
It("should gather multiple files when parallel=4", func(ctx SpecContext) {
145+
// Create .ready files for multiple WAL files
146+
createReadyFile("000000010000000000000001")
147+
createReadyFile("000000010000000000000002")
148+
createReadyFile("000000010000000000000003")
149+
createReadyFile("000000010000000000000004")
150+
151+
walList := archiver.GatherWALFilesToArchive(ctx, "pg_wal/000000010000000000000001", 4)
152+
153+
Expect(walList).To(ConsistOf(
154+
"pg_wal/000000010000000000000001",
155+
"pg_wal/000000010000000000000002",
156+
"pg_wal/000000010000000000000003",
157+
"pg_wal/000000010000000000000004",
158+
))
159+
})
160+
161+
It("should not exceed parallel limit even when more files are ready", func(ctx SpecContext) {
162+
// Create many .ready files
163+
for i := 1; i <= 10; i++ {
164+
createReadyFile("00000001000000000000000" + string(rune('0'+i)))
165+
}
166+
167+
walList := archiver.GatherWALFilesToArchive(ctx, "pg_wal/000000010000000000000001", 3)
168+
169+
Expect(walList).To(HaveLen(3))
170+
})
171+
172+
It("should handle when fewer files exist than parallel limit", func(ctx SpecContext) {
173+
// Create only 2 .ready files but request parallel=5
174+
createReadyFile("000000010000000000000001")
175+
createReadyFile("000000010000000000000002")
176+
177+
walList := archiver.GatherWALFilesToArchive(ctx, "pg_wal/000000010000000000000001", 5)
178+
179+
// Should only get the files that exist
180+
Expect(walList).To(ConsistOf(
181+
"pg_wal/000000010000000000000001",
182+
"pg_wal/000000010000000000000002",
183+
))
184+
})
185+
})
186+
187+
Context("edge cases", func() {
188+
It("should handle empty archive_status directory", func(ctx SpecContext) {
189+
// Don't create any .ready files
190+
191+
walList := archiver.GatherWALFilesToArchive(ctx, "pg_wal/000000010000000000000001", 3)
192+
193+
// Should still return the requested file
194+
Expect(walList).To(ConsistOf("pg_wal/000000010000000000000001"))
195+
})
196+
197+
})
198+
199+
Context("other files in directory", func() {
200+
It("should ignore non-.ready files in archive_status", func(ctx SpecContext) {
201+
// Create .ready files
202+
createReadyFile("000000010000000000000001")
203+
createReadyFile("000000010000000000000002")
204+
205+
// Create .done files (should be ignored)
206+
donePath := filepath.Join(archiveStatusDir, "000000010000000000000003.done")
207+
err := os.WriteFile(donePath, []byte{}, 0644)
208+
Expect(err).ToNot(HaveOccurred())
209+
210+
// Create a random file (should be ignored)
211+
randomPath := filepath.Join(archiveStatusDir, "random.txt")
212+
err = os.WriteFile(randomPath, []byte{}, 0644)
213+
Expect(err).ToNot(HaveOccurred())
214+
215+
walList := archiver.GatherWALFilesToArchive(ctx, "pg_wal/000000010000000000000001", 5)
216+
217+
// Should only get .ready files, not .done or other files
218+
Expect(walList).To(ConsistOf(
219+
"pg_wal/000000010000000000000001",
220+
"pg_wal/000000010000000000000002",
221+
))
222+
})
223+
It("should handle timeline history files", func(ctx SpecContext) {
224+
// Create timeline history file
225+
createReadyFile("00000002.history")
226+
createReadyFile("000000010000000000000001")
227+
228+
walList := archiver.GatherWALFilesToArchive(ctx, "pg_wal/00000002.history", 2)
229+
230+
Expect(walList).To(ConsistOf(
231+
"pg_wal/00000002.history",
232+
"pg_wal/000000010000000000000001",
233+
))
234+
})
235+
})
236+
})

test/e2e/e2e_suite_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/operasoftware/cnpg-plugin-pgbackrest/test/e2e/internal/kustomize"
3535

3636
_ "github.com/operasoftware/cnpg-plugin-pgbackrest/test/e2e/internal/tests/backup"
37+
_ "github.com/operasoftware/cnpg-plugin-pgbackrest/test/e2e/internal/tests/parallelarchive"
3738
_ "github.com/operasoftware/cnpg-plugin-pgbackrest/test/e2e/internal/tests/replicacluster"
3839

3940
. "github.com/onsi/ginkgo/v2"

test/e2e/internal/logs/doc.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
Copyright 2025, Opera Norway AS
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package logs provides utilities for retrieving and parsing pod logs in e2e tests
18+
package logs

test/e2e/internal/logs/logs.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
Copyright 2025, Opera Norway AS
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package logs
18+
19+
import (
20+
"bufio"
21+
"context"
22+
"encoding/json"
23+
"fmt"
24+
"strings"
25+
26+
corev1 "k8s.io/api/core/v1"
27+
"k8s.io/client-go/kubernetes"
28+
)
29+
30+
// GetPodContainerLogs retrieves logs from a specific container in a pod and parses them as JSON.
31+
// All logs are expected to be valid structured JSON. Returns an error if any line is not valid JSON.
32+
func GetPodContainerLogs(
33+
ctx context.Context,
34+
clientSet *kubernetes.Clientset,
35+
namespace, podName, containerName string,
36+
sinceSeconds *int64,
37+
) ([]map[string]any, error) {
38+
podLogOpts := corev1.PodLogOptions{
39+
Container: containerName,
40+
}
41+
if sinceSeconds != nil {
42+
podLogOpts.SinceSeconds = sinceSeconds
43+
}
44+
45+
req := clientSet.CoreV1().Pods(namespace).GetLogs(podName, &podLogOpts)
46+
stream, err := req.Stream(ctx)
47+
if err != nil {
48+
return nil, fmt.Errorf("error opening log stream: %w", err)
49+
}
50+
defer stream.Close()
51+
52+
var logEntries []map[string]any
53+
scanner := bufio.NewScanner(stream)
54+
lineNum := 0
55+
for scanner.Scan() {
56+
lineNum++
57+
line := scanner.Text()
58+
59+
// Skip empty lines
60+
if strings.TrimSpace(line) == "" {
61+
continue
62+
}
63+
64+
// Check if line is JSON
65+
if !strings.HasPrefix(line, "{") {
66+
return nil, fmt.Errorf("non-JSON line at line %d: %s", lineNum, line)
67+
}
68+
69+
var logEntry map[string]any
70+
if err := json.Unmarshal([]byte(line), &logEntry); err != nil {
71+
return nil, fmt.Errorf("invalid JSON at line %d: %w", lineNum, err)
72+
}
73+
74+
logEntries = append(logEntries, logEntry)
75+
}
76+
77+
if err := scanner.Err(); err != nil {
78+
return nil, fmt.Errorf("error reading logs: %w", err)
79+
}
80+
81+
return logEntries, nil
82+
}
83+
84+
// FindArchiveBatches finds "WAL archive batch prepared" log entries and returns the parsed data.
85+
// Each returned map contains the structured log fields.
86+
func FindArchiveBatches(logEntries []map[string]any) []map[string]any {
87+
var batches []map[string]any
88+
89+
for _, logEntry := range logEntries {
90+
// Check if this is a "WAL archive batch prepared" message
91+
if msg, ok := logEntry["msg"].(string); ok && msg == "WAL archive batch prepared" {
92+
batches = append(batches, logEntry)
93+
}
94+
}
95+
96+
return batches
97+
}
98+
99+
// FindArchiveBatchCompletions finds "WAL archive batch completed" log entries and returns the parsed data.
100+
// Each returned map contains the structured log fields.
101+
func FindArchiveBatchCompletions(logEntries []map[string]any) []map[string]any {
102+
var batches []map[string]any
103+
104+
for _, logEntry := range logEntries {
105+
// Check if this is a "WAL archive batch completed" message
106+
if msg, ok := logEntry["msg"].(string); ok && msg == "WAL archive batch completed" {
107+
batches = append(batches, logEntry)
108+
}
109+
}
110+
111+
return batches
112+
}

test/e2e/internal/objectstore/minio.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ func newMinioPVC(namespace, name string) *corev1.PersistentVolumeClaim {
300300
}
301301

302302
// NewMinioArchive creates a new Archive configured to use the Minio object store.
303-
func NewMinioArchive(namespace, name, minioOSName string) *pluginPgbackrestV1.Archive {
303+
func NewMinioArchive(namespace, name, minioOSName string, maxParallel int) *pluginPgbackrestV1.Archive {
304304
return &pluginPgbackrestV1.Archive{
305305
TypeMeta: metav1.TypeMeta{
306306
Kind: "Archive",
@@ -312,6 +312,9 @@ func NewMinioArchive(namespace, name, minioOSName string) *pluginPgbackrestV1.Ar
312312
},
313313
Spec: pluginPgbackrestV1.ArchiveSpec{
314314
Configuration: pgbackrestApi.PgbackrestConfiguration{
315+
Wal: &pgbackrestApi.WalBackupConfiguration{
316+
MaxParallel: maxParallel,
317+
},
315318
Repositories: []pgbackrestApi.PgbackrestRepository{
316319
{
317320
PgbackrestCredentials: pgbackrestApi.PgbackrestCredentials{

test/e2e/internal/tests/backup/fixtures.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (s s3BackupPluginBackupPluginRestore) createBackupRestoreTestResources(
6868
result := backupRestoreTestResources{}
6969

7070
result.ObjectStoreResources = objectstore.NewMinioObjectStoreResources(namespace, minio)
71-
result.Archive = objectstore.NewMinioArchive(namespace, archiveName, minio)
71+
result.Archive = objectstore.NewMinioArchive(namespace, archiveName, minio, 1)
7272
result.SrcCluster = newSrcClusterWithPlugin(namespace)
7373
result.SrcBackup = newSrcPluginBackup(namespace)
7474
result.DstCluster = newDstClusterWithPlugin(namespace)

0 commit comments

Comments
 (0)