Skip to content

Commit cd53bb6

Browse files
authored
drop flow: ignore dns host lookup not found (#3528)
this tends to happen when destination database no longer exists
1 parent ee3493b commit cd53bb6

File tree

2 files changed

+29
-14
lines changed

2 files changed

+29
-14
lines changed

flow/activities/flowable.go

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"log/slog"
8+
"net"
89
"os"
910
"strconv"
1011
"sync/atomic"
@@ -657,12 +658,18 @@ func (a *FlowableActivity) DropFlowSource(ctx context.Context, req *protos.DropF
657658
defer connectors.CloseConnector(ctx, srcConn)
658659

659660
if err := srcConn.PullFlowCleanup(ctx, req.FlowJobName); err != nil {
660-
pullCleanupErr := exceptions.NewDropFlowError(fmt.Errorf("[DropFlowSource] failed to clean up source: %w", err))
661-
if !shared.IsSQLStateError(err, pgerrcode.ObjectInUse) {
662-
// don't alert when PID active
663-
_ = a.Alerter.LogFlowError(ctx, req.FlowJobName, pullCleanupErr)
661+
var dnsErr *net.DNSError
662+
if errors.As(err, &dnsErr) && dnsErr.IsNotFound {
663+
a.Alerter.LogFlowWarning(ctx, req.FlowJobName, fmt.Errorf("[DropFlowSource] hostname not found, skipping: %w", err))
664+
return nil
665+
} else {
666+
pullCleanupErr := exceptions.NewDropFlowError(fmt.Errorf("[DropFlowSource] failed to clean up source: %w", err))
667+
if !shared.IsSQLStateError(err, pgerrcode.ObjectInUse) {
668+
// don't alert when PID active
669+
_ = a.Alerter.LogFlowError(ctx, req.FlowJobName, pullCleanupErr)
670+
}
671+
return pullCleanupErr
664672
}
665-
return pullCleanupErr
666673
}
667674

668675
a.Alerter.LogFlowInfo(ctx, req.FlowJobName, "Cleaned up source peer replication objects.")
@@ -674,15 +681,21 @@ func (a *FlowableActivity) DropFlowDestination(ctx context.Context, req *protos.
674681
ctx = context.WithValue(ctx, shared.FlowNameKey, req.FlowJobName)
675682
dstConn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, nil, a.CatalogPool, req.PeerName)
676683
if err != nil {
677-
var notFound *exceptions.NotFoundError
678-
if errors.As(err, &notFound) {
679-
logger := internal.LoggerFromCtx(ctx)
680-
logger.Warn("peer missing, skipping", slog.String("peer", req.PeerName))
684+
var dnsErr *net.DNSError
685+
if errors.As(err, &dnsErr) && dnsErr.IsNotFound {
686+
a.Alerter.LogFlowWarning(ctx, req.FlowJobName, fmt.Errorf("[DropFlowDestination] hostname not found, skipping: %w", err))
681687
return nil
688+
} else {
689+
var notFound *exceptions.NotFoundError
690+
if errors.As(err, &notFound) {
691+
logger := internal.LoggerFromCtx(ctx)
692+
logger.Warn("peer missing, skipping", slog.String("peer", req.PeerName))
693+
return nil
694+
}
695+
return a.Alerter.LogFlowError(ctx, req.FlowJobName,
696+
exceptions.NewDropFlowError(fmt.Errorf("[DropFlowDestination] failed to get destination connector: %w", err)),
697+
)
682698
}
683-
return a.Alerter.LogFlowError(ctx, req.FlowJobName,
684-
exceptions.NewDropFlowError(fmt.Errorf("[DropFlowDestination] failed to get destination connector: %w", err)),
685-
)
686699
}
687700
defer connectors.CloseConnector(ctx, dstConn)
688701

flow/workflows/drop_flow.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ func executeCDCDropActivities(ctx workflow.Context, input *protos.DropFlowInput)
4747
if !sourceOk {
4848
sourceTries += 1
4949
var dropSourceFuture workflow.Future
50-
if sourceTries < 50 {
50+
var applicationError *temporal.ApplicationError
51+
if sourceTries < 50 && (!errors.As(sourceError, &applicationError) || !applicationError.NonRetryable()) {
5152
sleep := model.SleepFuture(ctx, time.Duration(sourceTries*sourceTries)*time.Second)
5253
selector.AddFuture(sleep, sleepSource)
5354
} else {
@@ -83,7 +84,8 @@ func executeCDCDropActivities(ctx workflow.Context, input *protos.DropFlowInput)
8384
if !destinationOk {
8485
destinationTries += 1
8586
var dropDestinationFuture workflow.Future
86-
if destinationTries < 50 {
87+
var applicationError *temporal.ApplicationError
88+
if destinationTries < 50 && (!errors.As(destinationError, &applicationError) || !applicationError.NonRetryable()) {
8789
sleep := model.SleepFuture(ctx, time.Duration(destinationTries*destinationTries)*time.Second)
8890
selector.AddFuture(sleep, sleepDestination)
8991
} else {

0 commit comments

Comments
 (0)