77 "log/slog"
88 "os"
99 "strconv"
10- "sync"
1110 "sync/atomic"
1211 "time"
1312
@@ -704,7 +703,6 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
704703
705704 // run above command for each Postgres peer
706705 for _ , pgPeer := range pgPeers {
707- activity .RecordHeartbeat (ctx , pgPeer .Name )
708706 if err := ctx .Err (); err != nil {
709707 return err
710708 }
@@ -719,7 +717,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
719717 }
720718 defer pgConn .Close ()
721719 if cmdErr := pgConn .ExecuteCommand (ctx , walHeartbeatStatement ); cmdErr != nil {
722- logger .Warn (fmt . Sprintf ( "could not send wal heartbeat to peer %s: %v" , pgPeer .Name , cmdErr ))
720+ logger .Warn ("could not send wal heartbeat to peer" , slog . String ( "peer" , pgPeer .Name ), slog . Any ( "error" , cmdErr ))
723721 }
724722 logger .Info ("sent wal heartbeat" , slog .String ("peer" , pgPeer .Name ))
725723 }()
@@ -734,20 +732,64 @@ func (a *FlowableActivity) ScheduledTasks(ctx context.Context) error {
734732 defer shared .Interval (ctx , 20 * time .Second , func () {
735733 activity .RecordHeartbeat (ctx , "Running scheduled tasks" )
736734 })()
737- wrapWithLog := func (ctx context. Context , name string , fn func (context. Context ) error ) func () {
735+ wrapWithLog := func (name string , fn func () error ) func () {
738736 return func () {
739- logger := internal .LoggerFromCtx (ctx )
740737 now := time .Now ()
741738 logger .Info (name + " starting" )
742- if err := fn (ctx ); err != nil {
739+ if err := fn (); err != nil {
743740 logger .Error (name + " failed" , slog .Any ("error" , err ))
744741 }
745742 logger .Info (name + " completed" , slog .Duration ("duration" , time .Since (now )))
746743 }
747744 }
748- defer shared .Interval (ctx , 10 * time .Minute , wrapWithLog (ctx , "SendWALHeartbeat" , a .SendWALHeartbeat ))()
749- defer shared .Interval (ctx , 1 * time .Minute , wrapWithLog (ctx , "RecordMetrics" , a .RecordMetrics ))()
750- defer shared .Interval (ctx , 1 * time .Minute , wrapWithLog (ctx , "RecordSlotSizes" , a .RecordSlotSizes ))()
745+ var allFlows atomic.Pointer [[]flowInformation ]
746+ defer shared .Interval (ctx , 59 * time .Second , func () {
747+ rows , err := a .CatalogPool .Query (ctx , "SELECT DISTINCT ON (name) name, config_proto, workflow_id FROM flows WHERE query_string IS NULL" )
748+ if err != nil {
749+ logger .Error ("failed to query all flows" , slog .Any ("error" , err ))
750+ return
751+ }
752+
753+ infos , err := pgx .CollectRows (rows , func (row pgx.CollectableRow ) (flowInformation , error ) {
754+ var flowName string
755+ var configProto []byte
756+ var workflowID string
757+ if err := rows .Scan (& flowName , & configProto , & workflowID ); err != nil {
758+ return flowInformation {}, err
759+ }
760+
761+ var config protos.FlowConnectionConfigs
762+ if err := proto .Unmarshal (configProto , & config ); err != nil {
763+ return flowInformation {}, err
764+ }
765+
766+ return flowInformation {
767+ config : & config ,
768+ workflowID : workflowID ,
769+ }, nil
770+ })
771+ if err != nil {
772+ logger .Error ("failed to process result of all flows" , slog .Any ("error" , err ))
773+ }
774+ allFlows .Store (& infos )
775+ })
776+ defer shared .Interval (ctx , 10 * time .Minute , wrapWithLog ("SendWALHeartbeat" , func () error {
777+ return a .SendWALHeartbeat (ctx )
778+ }))()
779+ defer shared .Interval (ctx , 1 * time .Minute , wrapWithLog ("RecordMetrics" , func () error {
780+ infos := allFlows .Load ()
781+ if infos == nil {
782+ return nil
783+ }
784+ return a .RecordMetrics (ctx , * infos )
785+ }))()
786+ defer shared .Interval (ctx , 1 * time .Minute , wrapWithLog ("RecordSlotSizes" , func () error {
787+ infos := allFlows .Load ()
788+ if infos == nil {
789+ return nil
790+ }
791+ return a .RecordSlotSizes (ctx , * infos )
792+ }))()
751793 <- ctx .Done ()
752794 logger .Info ("Stopping scheduled tasks due to context done" , slog .Any ("error" , ctx .Err ()))
753795 return nil
@@ -760,15 +802,9 @@ type flowInformation struct {
760802 isActive bool
761803}
762804
763- func (a * FlowableActivity ) RecordMetrics (ctx context.Context ) error {
805+ func (a * FlowableActivity ) RecordMetrics (ctx context.Context , infos [] flowInformation ) error {
764806 logger := internal .LoggerFromCtx (ctx )
765807 logger .Info ("Started RecordMetrics" )
766- timeoutCtx , cancel := context .WithTimeout (ctx , 30 * time .Second )
767- defer cancel ()
768- infos , err := a .getAllFlows (timeoutCtx )
769- if err != nil {
770- return err
771- }
772808
773809 maintenanceEnabled , err := internal .PeerDBMaintenanceModeEnabled (ctx , nil )
774810 instanceStatus := otel_metrics .InstanceStatusReady
@@ -786,24 +822,25 @@ func (a *FlowableActivity) RecordMetrics(ctx context.Context) error {
786822 attribute .String (otel_metrics .DeploymentVersionKey , internal .PeerDBDeploymentVersion ()),
787823 )))
788824 logger .Info ("Emitting Instance and Flow Status" , slog .Int ("flows" , len (infos )))
789- activeFlows := make ([]* flowInformation , 0 , len (infos ))
825+ activeFlows := make ([]flowInformation , 0 , len (infos ))
790826 for _ , info := range infos {
791- func (ctx context.Context ) {
792- flowMetadata , err := a .GetFlowMetadata (ctx , & protos.FlowContextMetadataInput {
793- FlowName : info .config .FlowJobName ,
794- SourceName : info .config .SourceName ,
795- DestinationName : info .config .DestinationName ,
796- })
797- if err != nil {
798- logger .Error ("Failed to get flow metadata" , slog .Any ("error" , err ))
799- }
800- ctx = context .WithValue (ctx , internal .FlowMetadataKey , flowMetadata )
801- logger = internal .LoggerFromCtx (ctx )
802- status , sErr := internal .GetWorkflowStatus (ctx , a .CatalogPool , info .workflowID )
803- if sErr != nil {
804- logger .Error ("Failed to get workflow status" , slog .Any ("error" , sErr ), slog .String ("status" , status .String ()))
805- }
806- info .status = status
827+ flowMetadata , err := a .GetFlowMetadata (ctx , & protos.FlowContextMetadataInput {
828+ FlowName : info .config .FlowJobName ,
829+ SourceName : info .config .SourceName ,
830+ DestinationName : info .config .DestinationName ,
831+ })
832+ if err != nil {
833+ logger .Error ("Failed to get flow metadata" , slog .Any ("error" , err ))
834+ }
835+ ctx := context .WithValue (ctx , internal .FlowMetadataKey , flowMetadata )
836+ logger := internal .LoggerFromCtx (ctx )
837+ status , sErr := internal .GetWorkflowStatus (ctx , a .CatalogPool , info .workflowID )
838+ if sErr != nil {
839+ logger .Error ("Failed to get workflow status" , slog .Any ("error" , sErr ), slog .String ("status" , status .String ()))
840+ }
841+ info .status = status
842+ if info .status != protos .FlowStatus_STATUS_COMPLETED &&
843+ info .status != protos .FlowStatus_STATUS_TERMINATED {
807844 if _ , info .isActive = activeFlowStatuses [status ]; info .isActive {
808845 activeFlows = append (activeFlows , info )
809846 }
@@ -812,12 +849,7 @@ func (a *FlowableActivity) RecordMetrics(ctx context.Context) error {
812849 attribute .String (otel_metrics .FlowStatusKey , status .String ()),
813850 attribute .Bool (otel_metrics .IsFlowActiveKey , info .isActive ),
814851 )))
815-
816- if flowMetadata .Status == protos .FlowStatus_STATUS_COMPLETED ||
817- flowMetadata .Status == protos .FlowStatus_STATUS_TERMINATED {
818- return
819- }
820- }(ctx )
852+ }
821853 }
822854 logger .Info ("Finished emitting Instance and Flow Status" , slog .Int ("flows" , len (infos )))
823855 var totalCpuLimit float64
@@ -857,25 +889,22 @@ func (a *FlowableActivity) RecordMetrics(ctx context.Context) error {
857889 a .OtelManager .Metrics .ActiveFlowsGauge .Record (ctx , int64 (activeFlowCount ))
858890 if activeFlowCpuLimit > 0 || activeFlowMemoryLimit > 0 {
859891 for _ , info := range activeFlows {
860- func (ctx context.Context ) {
861- flowMetadata , err := a .GetFlowMetadata (ctx , & protos.FlowContextMetadataInput {
862- FlowName : info .config .FlowJobName ,
863- SourceName : info .config .SourceName ,
864- DestinationName : info .config .DestinationName ,
865- })
866- if err != nil {
867- logger .Error ("Failed to get flow metadata" , slog .Any ("error" , err ))
868- }
869- ctx = context .WithValue (ctx , internal .FlowMetadataKey , flowMetadata )
870- logger = internal .LoggerFromCtx (ctx )
892+ flowMetadata , err := a .GetFlowMetadata (ctx , & protos.FlowContextMetadataInput {
893+ FlowName : info .config .FlowJobName ,
894+ SourceName : info .config .SourceName ,
895+ DestinationName : info .config .DestinationName ,
896+ })
897+ if err != nil {
898+ logger .Error ("Failed to get flow metadata" , slog .Any ("error" , err ))
899+ }
900+ ctx := context .WithValue (ctx , internal .FlowMetadataKey , flowMetadata )
871901
872- if activeFlowMemoryLimit > 0 {
873- a .OtelManager .Metrics .MemoryLimitsPerActiveFlowGauge .Record (ctx , activeFlowMemoryLimit )
874- }
875- if activeFlowCpuLimit > 0 {
876- a .OtelManager .Metrics .CPULimitsPerActiveFlowGauge .Record (ctx , activeFlowCpuLimit )
877- }
878- }(ctx )
902+ if activeFlowMemoryLimit > 0 {
903+ a .OtelManager .Metrics .MemoryLimitsPerActiveFlowGauge .Record (ctx , activeFlowMemoryLimit )
904+ }
905+ if activeFlowCpuLimit > 0 {
906+ a .OtelManager .Metrics .CPULimitsPerActiveFlowGauge .Record (ctx , activeFlowCpuLimit )
907+ }
879908 }
880909 }
881910 }
@@ -884,7 +913,7 @@ func (a *FlowableActivity) RecordMetrics(ctx context.Context) error {
884913 return nil
885914}
886915
887- func (a * FlowableActivity ) RecordSlotSizes (ctx context.Context ) error {
916+ func (a * FlowableActivity ) RecordSlotSizes (ctx context.Context , infos [] flowInformation ) error {
888917 logger := internal .LoggerFromCtx (ctx )
889918 logger .Info ("Recording Slot Information" )
890919 slotMetricGauges := otel_metrics.SlotMetricGauges {}
@@ -894,63 +923,24 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
894923 slotMetricGauges .OpenConnectionsGauge = a .OtelManager .Metrics .OpenConnectionsGauge
895924 slotMetricGauges .OpenReplicationConnectionsGauge = a .OtelManager .Metrics .OpenReplicationConnectionsGauge
896925 slotMetricGauges .IntervalSinceLastNormalizeGauge = a .OtelManager .Metrics .IntervalSinceLastNormalizeGauge
897- infos , err := a .getAllFlows (ctx )
898- if err != nil {
899- return err
900- }
926+
901927 logger .Info ("Recording slot size and emitting log retention where applicable" , slog .Int ("flows" , len (infos )))
902- var wg sync.WaitGroup
903- maxParallel := 5
904- semaphore := make (chan struct {}, maxParallel )
905928 for _ , info := range infos {
906- wg .Add (1 )
907- go func (ctx context.Context , info * flowInformation ) {
908- defer wg .Done ()
909- semaphore <- struct {}{}
910- defer func () { <- semaphore }()
911-
912- timeoutCtx , cancel := context .WithTimeout (ctx , 30 * time .Second )
913- defer cancel ()
914- a .recordSlotInformation (timeoutCtx , info , slotMetricGauges )
915- a .emitLogRetentionHours (timeoutCtx , info , a .OtelManager .Metrics .LogRetentionGauge )
916- }(ctx , info )
917- }
918- logger .Info ("Waiting for Slot Information to be recorded" , slog .Int ("flows" , len (infos )))
919- wg .Wait ()
929+ if err := ctx .Err (); err != nil {
930+ return err
931+ }
932+ timeoutCtx , cancel := context .WithTimeout (ctx , 30 * time .Second )
933+ a .recordSlotInformation (timeoutCtx , info , slotMetricGauges )
934+ a .emitLogRetentionHours (timeoutCtx , info , a .OtelManager .Metrics .LogRetentionGauge )
935+ cancel ()
936+ }
920937 logger .Info ("Finished emitting Slot Information" , slog .Int ("flows" , len (infos )))
921938 return nil
922939}
923940
924- func (a * FlowableActivity ) getAllFlows (ctx context.Context ) ([]* flowInformation , error ) {
925- rows , err := a .CatalogPool .Query (ctx , "SELECT DISTINCT ON (name) name, config_proto, workflow_id FROM flows WHERE query_string IS NULL" )
926- if err != nil {
927- return nil , err
928- }
929-
930- infos , err := pgx .CollectRows (rows , func (row pgx.CollectableRow ) (* flowInformation , error ) {
931- var flowName string
932- var configProto []byte
933- var workflowID string
934- if err := rows .Scan (& flowName , & configProto , & workflowID ); err != nil {
935- return nil , err
936- }
937-
938- var config protos.FlowConnectionConfigs
939- if err := proto .Unmarshal (configProto , & config ); err != nil {
940- return nil , err
941- }
942-
943- return & flowInformation {
944- config : & config ,
945- workflowID : workflowID ,
946- }, nil
947- })
948- return infos , err
949- }
950-
951941func (a * FlowableActivity ) recordSlotInformation (
952942 ctx context.Context ,
953- info * flowInformation ,
943+ info flowInformation ,
954944 slotMetricGauges otel_metrics.SlotMetricGauges ,
955945) {
956946 logger := internal .LoggerFromCtx (ctx )
@@ -978,10 +968,6 @@ func (a *FlowableActivity) recordSlotInformation(
978968 }
979969 peerName := info .config .SourceName
980970
981- activity .RecordHeartbeat (ctx , fmt .Sprintf ("checking %s on %s" , slotName , peerName ))
982- if ctx .Err () != nil {
983- return
984- }
985971 if err := srcConn .HandleSlotInfo (ctx , a .Alerter , a .CatalogPool , & alerting.AlertKeys {
986972 FlowName : info .config .FlowJobName ,
987973 PeerName : peerName ,
@@ -993,7 +979,7 @@ func (a *FlowableActivity) recordSlotInformation(
993979
994980func (a * FlowableActivity ) emitLogRetentionHours (
995981 ctx context.Context ,
996- info * flowInformation ,
982+ info flowInformation ,
997983 logRetentionGauge metric.Float64Gauge ,
998984) {
999985 logger := internal .LoggerFromCtx (ctx )
@@ -1016,10 +1002,6 @@ func (a *FlowableActivity) emitLogRetentionHours(
10161002 defer connectors .CloseConnector (ctx , srcConn )
10171003
10181004 peerName := info .config .SourceName
1019- activity .RecordHeartbeat (ctx , "checking log retention on " + peerName )
1020- if ctx .Err () != nil {
1021- return
1022- }
10231005 logRetentionHours , err := srcConn .GetLogRetentionHours (ctx )
10241006 if err != nil {
10251007 logger .Error ("Failed to get log retention hours" , slog .Any ("error" , err ))
0 commit comments