@@ -36,87 +36,35 @@ func parsePRNumberFromURL(url string) (int, error) {
3636 return num , nil
3737}
3838
39- // handleSprinklerEvent processes a single event from sprinkler.
40- func (c * Coordinator ) handleSprinklerEvent (ctx context.Context , event client.Event , organization string ) {
41- // Deduplicate events using delivery_id if available, otherwise fall back to timestamp + URL + type
42- // delivery_id is unique per GitHub webhook and is the same across all instances receiving the event
43- var eventKey string
39+ // eventKey generates a unique key for event deduplication.
40+ // Uses delivery_id if available (GitHub's unique webhook ID),
41+ // otherwise falls back to timestamp + URL + type.
42+ func eventKey (event client.Event ) string {
4443 if event .Raw != nil {
45- if deliveryID , ok := event .Raw ["delivery_id" ].(string ); ok && deliveryID != "" {
46- eventKey = deliveryID
44+ if id , ok := event .Raw ["delivery_id" ].(string ); ok && id != "" {
45+ return id
4746 }
4847 }
49- if eventKey == "" {
50- // Fallback to timestamp-based key if delivery_id not available
51- eventKey = fmt .Sprintf ("%s:%s:%s" , event .Timestamp .Format (time .RFC3339Nano ), event .URL , event .Type )
52- }
53-
54- // Check persistent state first (survives restarts)
55- if c .stateStore .WasProcessed (eventKey ) {
56- slog .Info ("skipping duplicate event (persistent check)" ,
57- "organization" , organization ,
58- "type" , event .Type ,
59- "url" , event .URL ,
60- "timestamp" , event .Timestamp ,
61- "event_key" , eventKey )
62- return
63- }
64-
65- // Check if this event is currently being processed (prevents concurrent duplicates)
66- // This is critical when sprinkler delivers the same event twice in quick succession
67- c .processingEventMu .Lock ()
68- if c .processingEvents [eventKey ] {
69- c .processingEventMu .Unlock ()
70- slog .Info ("skipping duplicate event (currently processing)" ,
71- "organization" , organization ,
72- "type" , event .Type ,
73- "url" , event .URL ,
74- "timestamp" , event .Timestamp ,
75- "event_key" , eventKey )
76- return
77- }
78- // Mark as currently processing
79- c .processingEvents [eventKey ] = true
80- c .processingEventMu .Unlock ()
81-
82- // Ensure we clean up the processing flag when done
83- defer func () {
84- c .processingEventMu .Lock ()
85- delete (c .processingEvents , eventKey )
86- c .processingEventMu .Unlock ()
87- }()
48+ return fmt .Sprintf ("%s:%s:%s" , event .Timestamp .Format (time .RFC3339Nano ), event .URL , event .Type )
49+ }
8850
89- // Also check in-memory for fast deduplication during normal operation
90- c .processedEventMu .Lock ()
91- if processedTime , exists := c .processedEvents [eventKey ]; exists {
92- c .processedEventMu .Unlock ()
93- slog .Info ("skipping duplicate event (memory check)" ,
51+ // handleSprinklerEvent processes a single event from sprinkler.
52+ func (c * Coordinator ) handleSprinklerEvent (ctx context.Context , event client.Event , organization string ) {
53+ // Generate event key using delivery_id if available, otherwise timestamp + URL + type.
54+ // delivery_id is unique per GitHub webhook and is the same across all instances.
55+ eventKey := eventKey (event )
56+
57+ // Try to claim this event atomically using persistent store (Datastore transaction).
58+ // This is the single source of truth for cross-instance deduplication.
59+ if err := c .stateStore .MarkProcessed (eventKey , 24 * time .Hour ); err != nil {
60+ slog .Info ("skipping duplicate event" ,
9461 "organization" , organization ,
9562 "type" , event .Type ,
9663 "url" , event .URL ,
97- "timestamp" , event .Timestamp ,
98- "first_processed" , processedTime ,
99- "event_key" , eventKey )
64+ "event_key" , eventKey ,
65+ "reason" , "already_processed" )
10066 return
10167 }
102- c .processedEvents [eventKey ] = time .Now ()
103-
104- // Cleanup old in-memory events (older than 1 hour - persistent store handles long-term)
105- cutoff := time .Now ().Add (- 1 * time .Hour )
106- cleanedCount := 0
107- for key , processedTime := range c .processedEvents {
108- if processedTime .Before (cutoff ) {
109- delete (c .processedEvents , key )
110- cleanedCount ++
111- }
112- }
113- if cleanedCount > 0 {
114- slog .Debug ("cleaned up old in-memory processed events" ,
115- "organization" , organization ,
116- "removed_count" , cleanedCount ,
117- "remaining_count" , len (c .processedEvents ))
118- }
119- c .processedEventMu .Unlock ()
12068
12169 slog .Info ("accepted event for async processing" ,
12270 "organization" , organization ,
@@ -204,18 +152,17 @@ func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Eve
204152 "type" , event .Type ,
205153 "url" , event .URL ,
206154 "repo" , repo )
207- // Don't mark as processed if processing failed - allow retry
155+ // Event already marked as processed before goroutine started.
156+ // Failed processing won't be retried automatically.
157+ // This is intentional - we don't want infinite retries of broken events.
208158 return
209159 }
210160
211- // Mark event as processed in persistent state (survives restarts)
212- if err := c .stateStore .MarkProcessed (eventKey , 24 * time .Hour ); err != nil {
213- slog .Warn ("failed to mark event as processed" ,
214- "organization" , organization ,
215- "event_key" , eventKey ,
216- "error" , err )
217- // Continue anyway - in-memory dedup will prevent immediate duplicates
218- }
161+ slog .Info ("successfully processed sprinkler event" ,
162+ "organization" , organization ,
163+ "type" , event .Type ,
164+ "url" , event .URL ,
165+ "event_key" , eventKey )
219166 }() // Close the goroutine
220167}
221168
@@ -289,7 +236,10 @@ func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error {
289236 "organization" , organization )
290237 },
291238 OnEvent : func (event client.Event ) {
292- // Use background context for event processing to avoid losing events during shutdown.
239+ // SECURITY NOTE: Use detached context for event processing to prevent webhook
240+ // events from being lost during shutdown. Event processing has internal timeouts
241+ // (30s for turnclient, semaphore limits) to prevent resource exhaustion.
242+ // This ensures all GitHub events are processed reliably while maintaining security.
293243 // Note: No panic recovery - we want panics to propagate and restart the service.
294244 eventCtx := context .WithoutCancel (ctx )
295245 c .handleSprinklerEvent (eventCtx , event , organization )
0 commit comments