@@ -65,126 +65,49 @@ func NewWorker(
6565 return w , nil
6666}
6767
68- func (w * Worker ) Run (ctx context.Context , idleTimeout time.Duration ) error {
69- w .logger .Info ("Starting Run loop" ,
70- zap .String ("nats_consumer" , envs .NatsConsumer ),
71- zap .String ("stream" , envs .StreamName ),
72- zap .String ("topic" , envs .TopicName ),
73- zap .Duration ("idleTimeout" , idleTimeout ))
74-
75- idleTimer := time .NewTimer (idleTimeout )
76-
77- if ! idleTimer .Stop () {
78- select {
79- case <- idleTimer .C :
80- default :
68+ func (w * Worker ) Run (ctx context.Context ) error {
69+ w .logger .Info ("starting to consume" , zap .String ("url" , envs .NatsURL ), zap .String ("consumer" , envs .NatsConsumer ),
70+ zap .String ("stream" , envs .StreamName ), zap .String ("topic" , envs .TopicName ))
71+
72+ consumeCtx , err := w .jq .ConsumeWithConfig (ctx , envs .NatsConsumer , envs .StreamName , []string {envs .TopicName }, jetstream.ConsumerConfig {
73+ Replicas : 1 ,
74+ AckPolicy : jetstream .AckExplicitPolicy ,
75+ DeliverPolicy : jetstream .DeliverAllPolicy ,
76+ MaxAckPending : - 1 ,
77+ AckWait : time .Minute * 30 ,
78+ InactiveThreshold : time .Hour ,
79+ }, []jetstream.PullConsumeOpt {
80+ jetstream .PullMaxMessages (1 ),
81+ }, func (msg jetstream.Msg ) {
82+ w .logger .Info ("received a new job" )
83+
84+ err := w .ProcessMessage (ctx , msg )
85+ if err != nil {
86+ // Log error from ProcessMessage itself (e.g., initial setup failure)
87+ // Note: Errors during task.RunTask are handled within ProcessMessage's defer
88+ w .logger .Error ("failed during message processing setup" , zap .Error (err ))
8189 }
82- }
83- idleTimer .Reset (idleTimeout )
84-
85- var consumer jetstream.ConsumeContext
86-
87- consumerCtx , cancelConsumer := context .WithCancel (ctx )
88- defer cancelConsumer ()
89-
90- consumerErrChan := make (chan error , 1 )
91- activityChan := make (chan struct {}, 1 )
92-
93- go func () {
94- var consErr error
95- consumer , consErr = w .jq .ConsumeWithConfig (
96- consumerCtx ,
97- envs .NatsConsumer ,
98- envs .StreamName ,
99- []string {envs .TopicName },
100- jetstream.ConsumerConfig {
101- Replicas : 1 ,
102- AckPolicy : jetstream .AckExplicitPolicy ,
103- DeliverPolicy : jetstream .DeliverAllPolicy ,
104- MaxAckPending : - 1 ,
105- AckWait : time .Minute * 30 ,
106- InactiveThreshold : time .Hour ,
107- },
108- []jetstream.PullConsumeOpt {
109- jetstream .PullMaxMessages (1 ), // Process one message at a time
110- },
111- func (msg jetstream.Msg ) {
112- w .logger .Info ("Received a new job" )
113- err := w .ProcessMessage (consumerCtx , msg )
114- if err != nil {
115- w .logger .Error ("Failed during message processing" , zap .Error (err ))
116- }
117-
118- if ackErr := msg .Ack (); ackErr != nil {
119- w .logger .Error ("Failed to send the ack message" , zap .Error (ackErr ))
120-
121- } else {
122- w .logger .Info ("Message Ack'd successfully." )
123- }
124-
125- w .logger .Info ("Processing a job completed, signaling activity." )
126-
127- select {
128- case activityChan <- struct {}{}:
129- default :
130- w .logger .Warn ("Activity channel buffer full?" )
131- }
132- })
13390
134- consumerErrChan <- consErr
135-
136- <- consumerCtx .Done ()
137- w .logger .Info ("Consumer goroutine exiting." )
138-
139- }()
140-
141- setupErr := <- consumerErrChan
142- if setupErr != nil {
143- w .logger .Error ("Failed to start NATS consumer" , zap .Error (setupErr ))
144- if ! idleTimer .Stop () {
145- select {
146- case <- idleTimer .C :
147- default :
148- }
91+ // Ack is always sent by Run after ProcessMessage finishes or fails setup
92+ if ackErr := msg .Ack (); ackErr != nil {
93+ w .logger .Error ("failed to send the ack message" , zap .Error (ackErr ))
14994 }
150- return setupErr
151- }
15295
153- w .logger .Info ("NATS consumer started successfully. Waiting for messages or idle timeout..." )
96+ w .logger .Info ("processing a job completed" )
97+ })
98+ if err != nil {
99+ w .logger .Error ("failed to start consuming messages" , zap .Error (err ))
100+ return err
101+ }
154102
155- for {
156- select {
157- case <- ctx .Done ():
158- w .logger .Info ("Worker Run context cancelled. Initiating shutdown." )
159- cancelConsumer ()
160- if consumer != nil {
161- w .logger .Info ("Draining consumer due to context cancellation..." )
162- consumer .Drain ()
163- w .logger .Info ("Consumer drained." )
164- }
165- return ctx .Err ()
103+ w .logger .Info ("consuming messages..." )
166104
167- case <- activityChan :
168- w .logger .Debug ("Activity detected, resetting idle timer." )
169- if ! idleTimer .Stop () {
170- select {
171- case <- idleTimer .C :
172- default :
173- }
174- }
175- idleTimer .Reset (idleTimeout )
105+ <- ctx .Done ()
106+ w .logger .Info ("Main context cancelled, draining consumer..." )
107+ consumeCtx .Drain ()
108+ w .logger .Info ("Consumer stopped." )
176109
177- case <- idleTimer .C :
178- w .logger .Info ("Idle timeout reached. Initiating shutdown." )
179- cancelConsumer ()
180- if consumer != nil {
181- w .logger .Info ("Draining consumer due to idle timeout..." )
182- consumer .Drain ()
183- w .logger .Info ("Consumer drained." )
184- }
185- return nil
186- }
187- }
110+ return nil
188111}
189112
190113func (w * Worker ) ProcessMessage (ctx context.Context , msg jetstream.Msg ) (err error ) {
0 commit comments