Skip to content

Commit 7dbc8d1

Browse files
authored
fix: improve error handling for scheduled slot and log retention tasks (#3552)
1 parent 092819b commit 7dbc8d1

File tree

1 file changed

+22
-10
lines changed

1 file changed

+22
-10
lines changed

flow/activities/flowable.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,8 +1105,12 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
11051105
return err
11061106
}
11071107
timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
1108-
a.recordSlotInformation(timeoutCtx, info, slotMetricGauges)
1109-
a.emitLogRetentionHours(timeoutCtx, info, a.OtelManager.Metrics.LogRetentionGauge)
1108+
if err := a.recordSlotInformation(timeoutCtx, info, slotMetricGauges); err != nil {
1109+
logger.Error("Failed to record slot information", slog.Any("error", err))
1110+
}
1111+
if err := a.emitLogRetentionHours(timeoutCtx, info, a.OtelManager.Metrics.LogRetentionGauge); err != nil {
1112+
logger.Error("Failed to emit log retention hours", slog.Any("error", err))
1113+
}
11101114
cancel()
11111115
}
11121116
logger.Info("Finished emitting Slot Information", slog.Int("flows", len(infos)))
@@ -1117,7 +1121,7 @@ func (a *FlowableActivity) recordSlotInformation(
11171121
ctx context.Context,
11181122
info flowInformation,
11191123
slotMetricGauges otel_metrics.SlotMetricGauges,
1120-
) {
1124+
) error {
11211125
logger := internal.LoggerFromCtx(ctx)
11221126
flowMetadata, err := a.GetFlowMetadata(ctx, &protos.FlowContextMetadataInput{
11231127
FlowName: info.config.FlowJobName,
@@ -1126,15 +1130,20 @@ func (a *FlowableActivity) recordSlotInformation(
11261130
})
11271131
if err != nil {
11281132
logger.Error("Failed to get flow metadata", slog.Any("error", err))
1129-
return
1133+
return err
11301134
}
1135+
1136+
if flowMetadata.Source.Type != protos.DBType_POSTGRES {
1137+
return nil
1138+
}
1139+
11311140
ctx = context.WithValue(ctx, internal.FlowMetadataKey, flowMetadata)
1132-
srcConn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, nil, a.CatalogPool, info.config.SourceName)
1141+
srcConn, err := connectors.GetPostgresConnectorByName(ctx, nil, a.CatalogPool, info.config.SourceName)
11331142
if err != nil {
11341143
if !errors.Is(err, errors.ErrUnsupported) {
11351144
logger.Error("Failed to create connector to handle slot info", slog.Any("error", err))
11361145
}
1137-
return
1146+
return err
11381147
}
11391148
defer connectors.CloseConnector(ctx, srcConn)
11401149

@@ -1151,13 +1160,15 @@ func (a *FlowableActivity) recordSlotInformation(
11511160
}, slotMetricGauges); err != nil {
11521161
logger.Error("Failed to handle slot info", slog.Any("error", err))
11531162
}
1163+
1164+
return nil
11541165
}
11551166

11561167
func (a *FlowableActivity) emitLogRetentionHours(
11571168
ctx context.Context,
11581169
info flowInformation,
11591170
logRetentionGauge metric.Float64Gauge,
1160-
) {
1171+
) error {
11611172
logger := internal.LoggerFromCtx(ctx)
11621173
flowMetadata, err := a.GetFlowMetadata(ctx, &protos.FlowContextMetadataInput{
11631174
FlowName: info.config.FlowJobName,
@@ -1166,15 +1177,15 @@ func (a *FlowableActivity) emitLogRetentionHours(
11661177
})
11671178
if err != nil {
11681179
logger.Error("Failed to get flow metadata", slog.Any("error", err))
1169-
return
1180+
return err
11701181
}
11711182
ctx = context.WithValue(ctx, internal.FlowMetadataKey, flowMetadata)
11721183
srcConn, err := connectors.GetByNameAs[connectors.GetLogRetentionConnector](ctx, nil, a.CatalogPool, info.config.SourceName)
11731184
if err != nil {
11741185
if !errors.Is(err, errors.ErrUnsupported) {
11751186
logger.Error("Failed to create connector to emit log retention", slog.Any("error", err))
11761187
}
1177-
return
1188+
return err
11781189
}
11791190
defer connectors.CloseConnector(ctx, srcConn)
11801191

@@ -1187,11 +1198,12 @@ func (a *FlowableActivity) emitLogRetentionHours(
11871198
if logRetentionHours > 0 {
11881199
logRetentionGauge.Record(ctx, logRetentionHours)
11891200
logger.Info("Emitted log retention hours", slog.String("peerName", peerName), slog.Float64("logRetentionHours", logRetentionHours))
1190-
return
1201+
return nil
11911202
}
11921203

11931204
logger.Warn("Log retention hours is not set or is zero, skipping emission",
11941205
slog.String("peerName", peerName), slog.Float64("logRetentionHours", logRetentionHours))
1206+
return nil
11951207
}
11961208

11971209
var activeFlowStatuses = map[protos.FlowStatus]struct{}{

0 commit comments

Comments
 (0)