99 "strings"
1010 "time"
1111
12+ "github.com/codeGROOVE-dev/slacker/internal/state"
1213 "github.com/codeGROOVE-dev/sprinkler/pkg/client"
1314)
1415
@@ -57,12 +58,23 @@ func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Eve
5758 // Try to claim this event atomically using persistent store (Datastore transaction).
5859 // This is the single source of truth for cross-instance deduplication.
5960 if err := c .stateStore .MarkProcessed (eventKey , 24 * time .Hour ); err != nil {
60- slog .Info ("skipping duplicate event" ,
61- "organization" , organization ,
62- "type" , event .Type ,
63- "url" , event .URL ,
64- "event_key" , eventKey ,
65- "reason" , "already_processed" )
61+ // Check if this is a race condition vs a database error
62+ if errors .Is (err , state .ErrAlreadyProcessed ) {
63+ slog .Info ("skipping duplicate event - claimed by this or another instance" ,
64+ "organization" , organization ,
65+ "type" , event .Type ,
66+ "url" , event .URL ,
67+ "event_key" , eventKey ,
68+ "reason" , "deduplication_race" )
69+ } else {
70+ slog .Warn ("failed to mark event as processed - database error" ,
71+ "organization" , organization ,
72+ "type" , event .Type ,
73+ "url" , event .URL ,
74+ "event_key" , eventKey ,
75+ "error" , err ,
76+ "impact" , "will_skip_event" )
77+ }
6678 return
6779 }
6880
@@ -76,7 +88,11 @@ func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Eve
7688 // Process event asynchronously after deduplication checks pass
7789 // This allows the event handler to return immediately and accept the next event
7890 // Semaphore limits concurrent processing to prevent overwhelming APIs
91+ // WaitGroup tracks in-flight events for graceful shutdown
92+ c .processingEvents .Add (1 )
7993 go func () {
94+ defer c .processingEvents .Done ()
95+
8096 // Acquire semaphore slot (blocks if 10 events already processing)
8197 c .eventSemaphore <- struct {}{}
8298 defer func () { <- c .eventSemaphore }() // Release slot when done
@@ -166,6 +182,47 @@ func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Eve
166182 }() // Close the goroutine
167183}
168184
185+ // waitForEventProcessing waits for all in-flight events to complete during shutdown.
186+ // Returns immediately if no events are being processed.
187+ func (c * Coordinator ) waitForEventProcessing (organization string , maxWait time.Duration ) {
188+ // Check if any events are being processed
189+ queueLen := len (c .eventSemaphore )
190+ if queueLen == 0 {
191+ slog .Info ("no events in processing queue, shutdown can proceed immediately" ,
192+ "organization" , organization )
193+ return
194+ }
195+
196+ slog .Warn ("waiting for in-flight events to complete before shutdown" ,
197+ "organization" , organization ,
198+ "events_in_queue" , queueLen ,
199+ "max_wait_seconds" , maxWait .Seconds ())
200+
201+ // Create a channel to signal when all events are done
202+ done := make (chan struct {})
203+ go func () {
204+ c .processingEvents .Wait ()
205+ close (done )
206+ }()
207+
208+ // Wait for events to complete or timeout
209+ select {
210+ case <- done :
211+ slog .Info ("all in-flight events completed successfully" ,
212+ "organization" , organization ,
213+ "graceful_shutdown" , true )
214+ case <- time .After (maxWait ):
215+ remaining := len (c .eventSemaphore )
216+ slog .Warn ("shutdown timeout reached, proceeding with remaining events in queue" ,
217+ "organization" , organization ,
218+ "events_still_processing" , remaining ,
219+ "waited_seconds" , maxWait .Seconds (),
220+ "impact" , "these events may be incomplete" ,
221+ "recovery" , "polling will catch them in next 5min cycle" ,
222+ "graceful_shutdown" , false )
223+ }
224+ }
225+
169226// handleAuthError handles authentication errors by refreshing the token and recreating the client.
170227func (c * Coordinator ) handleAuthError (
171228 ctx context.Context ,
@@ -302,6 +359,8 @@ func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error {
302359 slog .Info ("sprinkler client context cancelled, stopping gracefully" ,
303360 "organization" , organization ,
304361 "total_attempts" , attempts )
362+ // Wait for in-flight events (up to 8 seconds, leaving 2s for HTTP shutdown)
363+ c .waitForEventProcessing (organization , 8 * time .Second )
305364 return nil
306365 }
307366
@@ -311,6 +370,8 @@ func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error {
311370 slog .Info ("context cancelled, stopping sprinkler client" ,
312371 "organization" , organization ,
313372 "context_error" , ctxErr )
373+ // Wait for in-flight events (up to 8 seconds)
374+ c .waitForEventProcessing (organization , 8 * time .Second )
314375 return ctxErr
315376 }
316377
@@ -347,6 +408,7 @@ func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error {
347408 "will_retry" , true )
348409 select {
349410 case <- ctx .Done ():
411+ c .waitForEventProcessing (organization , 8 * time .Second )
350412 return ctx .Err ()
351413 case <- time .After (retryDelay ):
352414 continue
@@ -371,6 +433,7 @@ func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error {
371433 select {
372434 case <- ctx .Done ():
373435 slog .Info ("context cancelled during retry wait" , "organization" , organization )
436+ c .waitForEventProcessing (organization , 8 * time .Second )
374437 return ctx .Err ()
375438 case <- time .After (retryDelay ):
376439 // Exponential backoff capped at maxRetryDelay
@@ -391,6 +454,7 @@ func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error {
391454 if ctxErr := ctx .Err (); ctxErr != nil {
392455 slog .Info ("sprinkler client stopped cleanly due to context cancellation" ,
393456 "organization" , organization )
457+ c .waitForEventProcessing (organization , 8 * time .Second )
394458 return ctxErr
395459 }
396460
@@ -407,6 +471,7 @@ func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error {
407471 // This might be network hiccup or server restart
408472 select {
409473 case <- ctx .Done ():
474+ c .waitForEventProcessing (organization , 8 * time .Second )
410475 return ctx .Err ()
411476 case <- time .After (5 * time .Second ):
412477 slog .Info ("restarting after unexpected clean disconnect" ,
0 commit comments