Skip to content

Commit 5e0a0d7

Browse files
authored
more logging, align connector close for source peer types (#3555)
1 parent 090fd0f commit 5e0a0d7

File tree

5 files changed

+33
-19
lines changed

5 files changed

+33
-19
lines changed

flow/connectors/mongo/mongo.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"crypto/tls"
66
"errors"
77
"fmt"
8+
"log/slog"
89
"net"
910
"sync/atomic"
1011
"time"
@@ -81,18 +82,22 @@ func NewMongoConnector(ctx context.Context, config *protos.MongoConfig) (*MongoC
8182
}
8283

8384
func (c *MongoConnector) Close() error {
85+
var errs []error
8486
if c != nil && c.client != nil {
8587
// Use a timeout to ensure the disconnect operation does not hang indefinitely
8688
timeout, cancel := context.WithTimeout(context.Background(), 30*time.Second)
8789
defer cancel()
88-
return c.client.Disconnect(timeout)
89-
}
90-
if c.ssh != nil && c.ssh.Client != nil {
91-
if err := c.ssh.Close(); err != nil {
92-
return fmt.Errorf("failed to close SSH tunnel: %w", err)
90+
if err := c.client.Disconnect(timeout); err != nil {
91+
c.logger.Error("failed to disconnect MongoDB client", slog.Any("error", err))
92+
errs = append(errs, fmt.Errorf("failed to disconnect MongoDB client: %w", err))
9393
}
9494
}
95-
return nil
95+
96+
if err := c.ssh.Close(); err != nil {
97+
c.logger.Error("[mongo] failed to close SSH tunnel", slog.Any("error", err))
98+
errs = append(errs, fmt.Errorf("[mongo] failed to close SSH tunnel: %w", err))
99+
}
100+
return errors.Join(errs...)
96101
}
97102

98103
func (c *MongoConnector) ConnectionActive(ctx context.Context) error {

flow/connectors/mysql/mysql.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,13 @@ func (c *MySqlConnector) Close() error {
131131
}
132132
if conn := c.conn.Swap(nil); conn != nil {
133133
if err := conn.Close(); err != nil {
134-
c.logger.Error("Failed to close MySQL connection", slog.Any("error", err))
135-
errs = append(errs, err)
134+
c.logger.Error("failed to close MySQL connection", slog.Any("error", err))
135+
errs = append(errs, fmt.Errorf("failed to close MySQL connection: %w", err))
136136
}
137137
}
138138
if err := c.ssh.Close(); err != nil {
139-
c.logger.Error("Failed to close SSH tunnel", slog.Any("error", err))
140-
errs = append(errs, err)
139+
c.logger.Error("[mysql] failed to close SSH tunnel", slog.Any("error", err))
140+
errs = append(errs, fmt.Errorf("[mysql] failed to close SSH tunnel: %w", err))
141141
}
142142
return errors.Join(errs...)
143143
}

flow/connectors/postgres/postgres.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -269,21 +269,30 @@ func (c *PostgresConnector) replicationOptions(publicationName string, pgVersion
269269

270270
// Close closes all connections.
271271
func (c *PostgresConnector) Close() error {
272-
var connerr, replerr error
272+
var errs []error
273273
if c != nil {
274274
timeout, cancel := context.WithTimeout(context.Background(), 30*time.Second)
275275
defer cancel()
276-
connerr = c.conn.Close(timeout)
276+
if err := c.conn.Close(timeout); err != nil {
277+
c.logger.Error("failed to close Postgres connection", slog.Any("error", err))
278+
errs = append(errs, fmt.Errorf("failed to close Postgres connection: %w", err))
279+
}
277280

278281
if c.replConn != nil {
279282
timeout, cancel := context.WithTimeout(context.Background(), 30*time.Second)
280283
defer cancel()
281-
replerr = c.replConn.Close(timeout)
284+
if err := c.replConn.Close(timeout); err != nil {
285+
c.logger.Error("failed to close Postgres replication connection", slog.Any("error", err))
286+
errs = append(errs, fmt.Errorf("failed to close Postgres replication connection: %w", err))
287+
}
282288
}
283289

284-
c.ssh.Close()
290+
if err := c.ssh.Close(); err != nil {
291+
c.logger.Error("[postgres] failed to close SSH tunnel", slog.Any("error", err))
292+
errs = append(errs, fmt.Errorf("[postgres] failed to close SSH tunnel: %w", err))
293+
}
285294
}
286-
return errors.Join(connerr, replerr)
295+
return errors.Join(errs...)
287296
}
288297

289298
func (c *PostgresConnector) Conn() *pgx.Conn {

flow/workflows/qrep_flow.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -617,8 +617,8 @@ func QRepFlowWorkflow(
617617
}
618618

619619
q.logger.Info("Continuing as new workflow",
620-
slog.Any("Last Partition", state.LastPartition),
621-
slog.Uint64("Number of Partitions Processed", state.NumPartitionsProcessed))
620+
slog.Any("lastPartition", state.LastPartition),
621+
slog.Uint64("numPartitionsProcessed", state.NumPartitionsProcessed))
622622

623623
if q.activeSignal == model.PauseSignal {
624624
updateStatus(ctx, q.logger, state, protos.FlowStatus_STATUS_PAUSED)

flow/workflows/xmin_flow.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,8 @@ func XminFlowWorkflow(
118118
}
119119

120120
logger.Info("Continuing as new workflow",
121-
slog.Any("Last Partition", state.LastPartition),
122-
slog.Uint64("Number of Partitions Processed", state.NumPartitionsProcessed))
121+
slog.Any("lastPartition", state.LastPartition),
122+
slog.Uint64("numPartitionsProcessed", state.NumPartitionsProcessed))
123123

124124
if q.activeSignal == model.PauseSignal {
125125
updateStatus(ctx, q.logger, state, protos.FlowStatus_STATUS_PAUSED)

0 commit comments

Comments
 (0)