Skip to content

Commit 9f3eddf

Browse files
armruleonardoceNiccoloFei
authored
fix(backup): clean unused backup connections (cloudnative-pg#6882)
An error while taking a snapshot backup could leave an unused connection open until the next backup. This patch ensures that backup connections are explicitly closed when no longer needed, preventing potential connection leaks. Partially closes cloudnative-pg#6761 Signed-off-by: Armando Ruocco <armando.ruocco@enterprisedb.com> Signed-off-by: Leonardo Cecchi <leonardo.cecchi@enterprisedb.com> Signed-off-by: Niccolò Fei <niccolo.fei@enterprisedb.com> Co-authored-by: Leonardo Cecchi <leonardo.cecchi@enterprisedb.com> Co-authored-by: Niccolò Fei <niccolo.fei@enterprisedb.com>
1 parent 50f0980 commit 9f3eddf

File tree

3 files changed

+165
-2
lines changed

3 files changed

+165
-2
lines changed

pkg/management/postgres/webserver/remote.go

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@ import (
2727
"os"
2828
"os/exec"
2929
"path"
30+
"time"
3031

3132
"github.com/cloudnative-pg/machinery/pkg/execlog"
3233
"github.com/cloudnative-pg/machinery/pkg/fileutils"
3334
"github.com/cloudnative-pg/machinery/pkg/log"
35+
apierrs "k8s.io/apimachinery/pkg/api/errors"
3436
"sigs.k8s.io/controller-runtime/pkg/client"
3537

3638
apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
@@ -111,7 +113,84 @@ func NewRemoteWebServer(
111113
}
112114
}
113115

114-
return NewWebServer(server), nil
116+
srv := NewWebServer(server)
117+
118+
srv.routines = append(srv.routines, endpoints.cleanupStaleCollections)
119+
120+
return srv, nil
121+
}
122+
123+
func (ws *remoteWebserverEndpoints) cleanupStaleCollections(ctx context.Context) {
124+
closeBackupConnection := func(bc *backupConnection) {
125+
log := log.WithValues(
126+
"backupName", bc.data.BackupName,
127+
"phase", bc.data.Phase,
128+
)
129+
log.Warning("Closing stale PostgreSQL backup connection")
130+
131+
if err := bc.conn.Close(); err != nil {
132+
log.Error(err, "Error while closing stale PostgreSQL backup connection")
133+
}
134+
bc.data.Phase = Completed
135+
}
136+
137+
innerRoutine := func() {
138+
if ws == nil {
139+
return
140+
}
141+
bc := ws.currentBackup
142+
if bc == nil || bc.conn == nil {
143+
return
144+
}
145+
146+
if bc.data.Phase == Completed || bc.data.BackupName == "" {
147+
return
148+
}
149+
150+
bc.sync.Lock()
151+
defer bc.sync.Unlock()
152+
153+
if bc.err != nil {
154+
closeBackupConnection(bc)
155+
return
156+
}
157+
158+
if err := bc.conn.PingContext(ctx); err != nil {
159+
bc.err = fmt.Errorf("error while pinging: %w", err)
160+
closeBackupConnection(bc)
161+
return
162+
}
163+
164+
var backup apiv1.Backup
165+
166+
err := ws.typedClient.Get(ctx, client.ObjectKey{
167+
Namespace: ws.instance.GetNamespaceName(),
168+
Name: bc.data.BackupName,
169+
}, &backup)
170+
if apierrs.IsNotFound(err) {
171+
bc.err = fmt.Errorf("backup %s not found", bc.data.BackupName)
172+
closeBackupConnection(bc)
173+
return
174+
}
175+
if err != nil {
176+
return
177+
}
178+
179+
if backup.Status.IsDone() {
180+
bc.err = fmt.Errorf("backup %s is done", bc.data.BackupName)
181+
closeBackupConnection(bc)
182+
return
183+
}
184+
}
185+
186+
for {
187+
select {
188+
case <-ctx.Done():
189+
return
190+
case <-time.After(1 * time.Minute):
191+
innerRoutine()
192+
}
193+
}
115194
}
116195

117196
func (ws *remoteWebserverEndpoints) isServerHealthy(w http.ResponseWriter, _ *http.Request) {

pkg/management/postgres/webserver/webserver.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ func (body Response[T]) EnsureDataIsPresent() error {
6767

6868
// Webserver wraps a webserver to make it a kubernetes Runnable
6969
type Webserver struct {
70-
server *http.Server
70+
server *http.Server
71+
routines []func(ctx context.Context)
7172
}
7273

7374
// NewWebServer creates a Webserver as a Kubernetes Runnable, given a http.Server
@@ -96,6 +97,13 @@ func (ws *Webserver) Start(ctx context.Context) error {
9697
}
9798
}()
9899

100+
subCtx, cancel := context.WithCancel(ctx)
101+
defer cancel()
102+
103+
for _, routine := range ws.routines {
104+
routine(subCtx)
105+
}
106+
99107
select {
100108
// we exit with error code, potentially we could do a retry logic, but rarely a webserver that doesn't start will run
101109
// on subsequent tries

tests/e2e/volume_snapshot_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,22 @@ import (
2020
"encoding/json"
2121
"fmt"
2222
"os"
23+
"strconv"
2324
"strings"
2425
"time"
2526

2627
volumesnapshot "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumesnapshot/v1"
2728
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2829
"k8s.io/apimachinery/pkg/types"
30+
"k8s.io/client-go/util/retry"
2931
k8client "sigs.k8s.io/controller-runtime/pkg/client"
3032

3133
apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
3234
"github.com/cloudnative-pg/cloudnative-pg/pkg/utils"
3335
"github.com/cloudnative-pg/cloudnative-pg/tests"
3436
"github.com/cloudnative-pg/cloudnative-pg/tests/utils/backups"
3537
"github.com/cloudnative-pg/cloudnative-pg/tests/utils/clusterutils"
38+
"github.com/cloudnative-pg/cloudnative-pg/tests/utils/exec"
3639
"github.com/cloudnative-pg/cloudnative-pg/tests/utils/minio"
3740
"github.com/cloudnative-pg/cloudnative-pg/tests/utils/postgres"
3841
"github.com/cloudnative-pg/cloudnative-pg/tests/utils/secrets"
@@ -66,6 +69,18 @@ var _ = Describe("Verify Volume Snapshot",
6669
return snapshotList, nil
6770
}
6871

72+
updateClusterSnapshotClass := func(namespace, clusterName, className string) {
73+
cluster := &apiv1.Cluster{}
74+
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
75+
var err error
76+
cluster, err = clusterutils.Get(env.Ctx, env.Client, namespace, clusterName)
77+
Expect(err).ToNot(HaveOccurred())
78+
cluster.Spec.Backup.VolumeSnapshot.ClassName = className
79+
return env.Client.Update(env.Ctx, cluster)
80+
})
81+
Expect(err).ToNot(HaveOccurred())
82+
}
83+
6984
var namespace string
7085

7186
Context("using the kubectl cnpg plugin", Ordered, func() {
@@ -842,5 +857,66 @@ var _ = Describe("Verify Volume Snapshot",
842857
AssertDataExpectedCount(env, tableLocator, 6)
843858
})
844859
})
860+
861+
It("should clean up unused backup connections", func() {
862+
By("setting a non-existing snapshotClass", func() {
863+
updateClusterSnapshotClass(namespace, clusterToSnapshotName, "wrongSnapshotClass")
864+
})
865+
866+
By("starting a new backup that will fail", func() {
867+
backupName := fmt.Sprintf("%s-failed", clusterToSnapshotName)
868+
failedBackup, err := backups.Create(
869+
env.Ctx, env.Client,
870+
apiv1.Backup{
871+
ObjectMeta: metav1.ObjectMeta{
872+
Namespace: namespace,
873+
Name: backupName,
874+
},
875+
Spec: apiv1.BackupSpec{
876+
Target: apiv1.BackupTargetPrimary,
877+
Method: apiv1.BackupMethodVolumeSnapshot,
878+
Cluster: apiv1.LocalObjectReference{Name: clusterToSnapshotName},
879+
},
880+
},
881+
)
882+
Expect(err).ToNot(HaveOccurred())
883+
884+
Eventually(func(g Gomega) {
885+
err = env.Client.Get(env.Ctx, types.NamespacedName{
886+
Namespace: namespace,
887+
Name: backupName,
888+
}, failedBackup)
889+
g.Expect(err).ToNot(HaveOccurred())
890+
g.Expect(failedBackup.Status.Phase).To(BeEquivalentTo(apiv1.BackupPhaseFailed))
891+
g.Expect(failedBackup.Status.Error).To(ContainSubstring("Failed to get snapshot class"))
892+
}, RetryTimeout).Should(Succeed())
893+
})
894+
895+
By("verifying that the backup connection is cleaned up", func() {
896+
primaryPod, err := clusterutils.GetPrimary(env.Ctx, env.Client, namespace,
897+
clusterToSnapshotName)
898+
Expect(err).ToNot(HaveOccurred())
899+
query := "SELECT count(*) FROM pg_stat_activity WHERE query ILIKE '%pg_backup_start%' " +
900+
"AND application_name = 'cnpg-instance-manager'"
901+
902+
Eventually(func() (int, error, error) {
903+
stdout, _, err := exec.QueryInInstancePod(
904+
env.Ctx, env.Client, env.Interface, env.RestClientConfig,
905+
exec.PodLocator{
906+
Namespace: primaryPod.Namespace,
907+
PodName: primaryPod.Name,
908+
},
909+
postgres.PostgresDBName,
910+
query)
911+
value, atoiErr := strconv.Atoi(strings.TrimSpace(stdout))
912+
return value, err, atoiErr
913+
}, RetryTimeout).Should(BeEquivalentTo(0),
914+
"Stale backup connection should have been dropped")
915+
})
916+
917+
By("resetting the snapshotClass value", func() {
918+
updateClusterSnapshotClass(namespace, clusterToSnapshotName, os.Getenv("E2E_CSI_STORAGE_CLASS"))
919+
})
920+
})
845921
})
846922
})

0 commit comments

Comments
 (0)