11package pkg
22
33import (
4- "context"
5- "encoding/json"
6- "errors"
7- "github.com/opengovern/og-describer-kubernetes/discovery/pkg/orchestrator"
8- "github.com/opengovern/og-describer-kubernetes/global"
9- "os"
10- "runtime"
11- "time"
12-
13- "github.com/nats-io/nats.go/jetstream"
14-
15- "github.com/opengovern/og-util/pkg/describe"
164 esSinkClient "github.com/opengovern/og-util/pkg/es/ingest/client"
175 "github.com/opengovern/og-util/pkg/jq"
186 "github.com/opengovern/og-util/pkg/opengovernance-es-sdk"
197 "go.uber.org/zap"
8+ "os"
209)
2110
2211type Worker struct {
@@ -30,103 +19,3 @@ type Worker struct {
3019var (
3120 ManualTriggers = os .Getenv ("MANUAL_TRIGGERS" )
3221)
33-
34- func NewWorker (
35-
36- logger * zap.Logger ,
37- ctx context.Context ,
38- ) (* Worker , error ) {
39- url := os .Getenv ("NATS_URL" )
40- jq , err := jq .New (url , logger )
41- if err != nil {
42- logger .Error ("failed to create job queue" , zap .Error (err ), zap .String ("url" , url ))
43- return nil , err
44- }
45-
46- topic := global .JobQueueTopic
47- if ManualTriggers == "true" {
48- topic = global .JobQueueTopicManuals
49- }
50- if err := jq .Stream (ctx , global .StreamName , " describe job runner queue" , []string {topic }, 5000 ); err != nil {
51- logger .Error ("failed to create stream" , zap .Error (err ))
52- return nil , err
53- }
54-
55- w := & Worker {
56- logger : logger ,
57- jq : jq ,
58- }
59-
60- return w , nil
61- }
62-
63- func (w * Worker ) Run (ctx context.Context ) error {
64- w .logger .Info ("starting to consume" )
65- topic := global .JobQueueTopic
66- consumer := global .ConsumerGroup
67- if ManualTriggers == "true" {
68- topic = global .JobQueueTopicManuals
69- consumer = global .ConsumerGroupManuals
70- }
71- consumeCtx , err := w .jq .ConsumeWithConfig (ctx , consumer , global .StreamName , []string {topic }, jetstream.ConsumerConfig {
72- Replicas : 1 ,
73- AckPolicy : jetstream .AckExplicitPolicy ,
74- DeliverPolicy : jetstream .DeliverAllPolicy ,
75- MaxAckPending : - 1 ,
76- AckWait : time .Minute * 30 ,
77- InactiveThreshold : time .Hour ,
78- }, []jetstream.PullConsumeOpt {
79- jetstream .PullMaxMessages (1 ),
80- }, func (msg jetstream.Msg ) {
81- w .logger .Info ("received a new job" )
82-
83- defer msg .Ack ()
84-
85- ctx , cancel := context .WithTimeoutCause (ctx , time .Minute * 25 , errors .New ("describe worker timed out" ))
86- defer cancel ()
87-
88- if err := w .ProcessMessage (ctx , msg ); err != nil {
89- w .logger .Error ("failed to process message" , zap .Error (err ))
90- }
91- err := msg .Ack ()
92- if err != nil {
93- w .logger .Error ("failed to ack message" , zap .Error (err ))
94- }
95-
96- w .logger .Info ("processing a job completed" )
97- })
98- if err != nil {
99- return err
100- }
101-
102- w .logger .Info ("consuming" )
103-
104- <- ctx .Done ()
105- consumeCtx .Drain ()
106- consumeCtx .Stop ()
107-
108- return nil
109- }
110-
111- func (w * Worker ) ProcessMessage (ctx context.Context , msg jetstream.Msg ) error {
112- startTime := time .Now ()
113- var input describe.DescribeWorkerInput
114- err := json .Unmarshal (msg .Data (), & input )
115- if err != nil {
116- return err
117- }
118- runtime .GC ()
119-
120- w .logger .Info ("running job" , zap .Uint ("id" , input .DescribeJob .JobID ), zap .String ("type" , input .DescribeJob .ResourceType ), zap .String ("providerID" , input .DescribeJob .ProviderID ))
121-
122- err = orchestrator .DescribeHandler (ctx , w .logger , orchestrator .TriggeredByLocal , input )
123- endTime := time .Now ()
124-
125- w .logger .Info ("job completed" , zap .Uint ("id" , input .DescribeJob .JobID ), zap .String ("type" , input .DescribeJob .ResourceType ), zap .String ("providerID" , input .DescribeJob .ProviderID ), zap .Duration ("duration" , endTime .Sub (startTime )))
126- if err != nil {
127- w .logger .Error ("failure while running job" , zap .Error (err ))
128- return err
129- }
130-
131- return nil
132- }
0 commit comments