@@ -65,49 +65,126 @@ func NewWorker(
6565 return w , nil
6666}
6767
68- func (w * Worker ) Run (ctx context.Context ) error {
69- w .logger .Info ("starting to consume" , zap .String ("url" , envs .NatsURL ), zap .String ("consumer" , envs .NatsConsumer ),
70- zap .String ("stream" , envs .StreamName ), zap .String ("topic" , envs .TopicName ))
71-
72- consumeCtx , err := w .jq .ConsumeWithConfig (ctx , envs .NatsConsumer , envs .StreamName , []string {envs .TopicName }, jetstream.ConsumerConfig {
73- Replicas : 1 ,
74- AckPolicy : jetstream .AckExplicitPolicy ,
75- DeliverPolicy : jetstream .DeliverAllPolicy ,
76- MaxAckPending : - 1 ,
77- AckWait : time .Minute * 30 ,
78- InactiveThreshold : time .Hour ,
79- }, []jetstream.PullConsumeOpt {
80- jetstream .PullMaxMessages (1 ),
81- }, func (msg jetstream.Msg ) {
82- w .logger .Info ("received a new job" )
83-
84- err := w .ProcessMessage (ctx , msg )
85- if err != nil {
86- // Log error from ProcessMessage itself (e.g., initial setup failure)
87- // Note: Errors during task.RunTask are handled within ProcessMessage's defer
88- w .logger .Error ("failed during message processing setup" , zap .Error (err ))
89- }
68+ func (w * Worker ) Run (ctx context.Context , idleTimeout time.Duration ) error {
69+ w .logger .Info ("Starting Run loop" ,
70+ zap .String ("nats_consumer" , envs .NatsConsumer ),
71+ zap .String ("stream" , envs .StreamName ),
72+ zap .String ("topic" , envs .TopicName ),
73+ zap .Duration ("idleTimeout" , idleTimeout ))
74+
75+ idleTimer := time .NewTimer (idleTimeout )
9076
91- // Ack is always sent by Run after ProcessMessage finishes or fails setup
92- if ackErr := msg .Ack (); ackErr != nil {
93- w .logger .Error ("failed to send the ack message" , zap .Error (ackErr ))
77+ if ! idleTimer .Stop () {
78+ select {
79+ case <- idleTimer .C :
80+ default :
9481 }
82+ }
83+ idleTimer .Reset (idleTimeout )
9584
96- w .logger .Info ("processing a job completed" )
97- })
98- if err != nil {
99- w .logger .Error ("failed to start consuming messages" , zap .Error (err ))
100- return err
85+ var consumer jetstream.ConsumeContext
86+
87+ consumerCtx , cancelConsumer := context .WithCancel (ctx )
88+ defer cancelConsumer ()
89+
90+ consumerErrChan := make (chan error , 1 )
91+ activityChan := make (chan struct {}, 1 )
92+
93+ go func () {
94+ var consErr error
95+ consumer , consErr = w .jq .ConsumeWithConfig (
96+ consumerCtx ,
97+ envs .NatsConsumer ,
98+ envs .StreamName ,
99+ []string {envs .TopicName },
100+ jetstream.ConsumerConfig {
101+ Replicas : 1 ,
102+ AckPolicy : jetstream .AckExplicitPolicy ,
103+ DeliverPolicy : jetstream .DeliverAllPolicy ,
104+ MaxAckPending : - 1 ,
105+ AckWait : time .Minute * 30 ,
106+ InactiveThreshold : time .Hour ,
107+ },
108+ []jetstream.PullConsumeOpt {
109+ jetstream .PullMaxMessages (1 ), // Process one message at a time
110+ },
111+ func (msg jetstream.Msg ) {
112+ w .logger .Info ("Received a new job" )
113+ err := w .ProcessMessage (consumerCtx , msg )
114+ if err != nil {
115+ w .logger .Error ("Failed during message processing" , zap .Error (err ))
116+ }
117+
118+ if ackErr := msg .Ack (); ackErr != nil {
119+ w .logger .Error ("Failed to send the ack message" , zap .Error (ackErr ))
120+
121+ } else {
122+ w .logger .Info ("Message Ack'd successfully." )
123+ }
124+
125+ w .logger .Info ("Processing a job completed, signaling activity." )
126+
127+ select {
128+ case activityChan <- struct {}{}:
129+ default :
130+ w .logger .Warn ("Activity channel buffer full?" )
131+ }
132+ })
133+
134+ consumerErrChan <- consErr
135+
136+ <- consumerCtx .Done ()
137+ w .logger .Info ("Consumer goroutine exiting." )
138+
139+ }()
140+
141+ setupErr := <- consumerErrChan
142+ if setupErr != nil {
143+ w .logger .Error ("Failed to start NATS consumer" , zap .Error (setupErr ))
144+ if ! idleTimer .Stop () {
145+ select {
146+ case <- idleTimer .C :
147+ default :
148+ }
149+ }
150+ return setupErr
101151 }
102152
103- w .logger .Info ("consuming messages..." )
153+ w .logger .Info ("NATS consumer started successfully. Waiting for messages or idle timeout..." )
154+
155+ for {
156+ select {
157+ case <- ctx .Done ():
158+ w .logger .Info ("Worker Run context cancelled. Initiating shutdown." )
159+ cancelConsumer ()
160+ if consumer != nil {
161+ w .logger .Info ("Draining consumer due to context cancellation..." )
162+ consumer .Drain ()
163+ w .logger .Info ("Consumer drained." )
164+ }
165+ return ctx .Err ()
104166
105- <- ctx .Done ()
106- w .logger .Info ("Main context cancelled, draining consumer..." )
107- consumeCtx .Drain ()
108- w .logger .Info ("Consumer stopped." )
167+ case <- activityChan :
168+ w .logger .Debug ("Activity detected, resetting idle timer." )
169+ if ! idleTimer .Stop () {
170+ select {
171+ case <- idleTimer .C :
172+ default :
173+ }
174+ }
175+ idleTimer .Reset (idleTimeout )
109176
110- return nil
177+ case <- idleTimer .C :
178+ w .logger .Info ("Idle timeout reached. Initiating shutdown." )
179+ cancelConsumer ()
180+ if consumer != nil {
181+ w .logger .Info ("Draining consumer due to idle timeout..." )
182+ consumer .Drain ()
183+ w .logger .Info ("Consumer drained." )
184+ }
185+ return nil
186+ }
187+ }
111188}
112189
113190func (w * Worker ) ProcessMessage (ctx context.Context , msg jetstream.Msg ) (err error ) {
0 commit comments