@@ -7,9 +7,13 @@ abstract class QueueConsumer extends Consumer
77 protected $ type = "QueueConsumer " ;
88
99 protected $ queue ;
10+ protected $ failed_queue = array ();
1011 protected $ max_queue_size = 1000 ;
1112 protected $ batch_size = 100 ;
1213 protected $ maximum_backoff_duration = 10000 ; // Set maximum waiting limit to 10s
14+ protected $ max_retry_attempts = 3 ;
15+ protected $ max_failed_queue_size = 1000 ;
16+ protected $ initial_retry_delay = 60 ; // Initial retry delay in seconds
1317 protected $ host = "app.posthog.com " ;
1418 protected $ compress_request = false ;
1519
@@ -34,6 +38,18 @@ public function __construct($apiKey, $options = array())
3438 $ this ->maximum_backoff_duration = (int ) $ options ["maximum_backoff_duration " ];
3539 }
3640
41+ if (isset ($ options ["max_retry_attempts " ])) {
42+ $ this ->max_retry_attempts = (int ) $ options ["max_retry_attempts " ];
43+ }
44+
45+ if (isset ($ options ["max_failed_queue_size " ])) {
46+ $ this ->max_failed_queue_size = (int ) $ options ["max_failed_queue_size " ];
47+ }
48+
49+ if (isset ($ options ["initial_retry_delay " ])) {
50+ $ this ->initial_retry_delay = (int ) $ options ["initial_retry_delay " ];
51+ }
52+
3753 if (isset ($ options ["host " ])) {
3854 $ this ->host = $ options ["host " ];
3955
@@ -48,6 +64,7 @@ public function __construct($apiKey, $options = array())
4864 }
4965
5066 $ this ->queue = array ();
67+ $ this ->failed_queue = array ();
5168 }
5269
5370 public function __destruct ()
@@ -92,27 +109,168 @@ public function alias(array $message)
92109 /**
93110 * Flushes our queue of messages by batching them to the server
94111 */
95- public function flush ()
112+ public function flush (): bool
96113 {
97- $ count = count ($ this ->queue );
98- $ overallSuccess = true ;
114+ // First, try to retry any failed batches
115+ $ this ->retryFailedBatches ();
116+
117+ // If no new messages, we're done
118+ if (empty ($ this ->queue )) {
119+ return true ;
120+ }
99121
100- while ( $ count > 0 ) {
101- $ batch = array_splice ( $ this -> queue , 0 , min ( $ this -> batch_size , $ count )) ;
102- $ batchSuccess = $ this ->flushBatch ( $ batch );
122+ // Process messages batch by batch, maintaining transactional behavior
123+ $ overallSuccess = true ;
124+ $ initialQueueSize = count ( $ this ->queue );
103125
104- // Track overall success but continue processing remaining batches
105- // This ensures we attempt to send all queued events even if some batches fail
106- if (!$ batchSuccess ) {
126+ while (!empty ($ this ->queue )) {
127+ $ queueSizeBefore = count ($ this ->queue );
128+ $ batchSize = min ($ this ->batch_size , $ queueSizeBefore );
129+ $ batch = array_slice ($ this ->queue , 0 , $ batchSize );
130+
131+ if ($ this ->flushBatchWithRetry ($ batch )) {
132+ // Success: remove these messages from queue
133+ $ this ->queue = array_slice ($ this ->queue , $ batchSize );
134+ } else {
135+ // Failed: move to failed queue and remove from main queue
136+ $ this ->addToFailedQueue ($ batch );
137+ $ this ->queue = array_slice ($ this ->queue , $ batchSize );
107138 $ overallSuccess = false ;
108139 }
109140
110- $ count = count ($ this ->queue );
141+ // Safety check: ensure queue size is actually decreasing
142+ $ queueSizeAfter = count ($ this ->queue );
143+ if ($ queueSizeAfter >= $ queueSizeBefore ) {
144+ // This should never happen, but prevents infinite loops
145+ $ this ->handleError ('flush_safety_break ' ,
146+ sprintf ('Queue size not decreasing: before=%d, after=%d. Breaking to prevent infinite loop. ' ,
147+ $ queueSizeBefore , $ queueSizeAfter ));
148+ break ;
149+ }
111150 }
112151
113152 return $ overallSuccess ;
114153 }
115154
155+ /**
156+ * Flush a batch with immediate retry logic
157+ */
158+ protected function flushBatchWithRetry (array $ batch ): bool
159+ {
160+ $ backoff = 100 ; // Start with 100ms
161+
162+ for ($ attempt = 0 ; $ attempt < $ this ->max_retry_attempts ; $ attempt ++) {
163+ if ($ attempt > 0 ) {
164+ usleep ($ backoff * 1000 ); // Wait with exponential backoff
165+ $ backoff = min ($ backoff * 2 , $ this ->maximum_backoff_duration );
166+ }
167+
168+ if ($ this ->flushBatch ($ batch )) {
169+ return true ;
170+ }
171+ }
172+
173+ return false ;
174+ }
175+
176+ /**
177+ * Add batch to failed queue for later retry
178+ */
179+ protected function addToFailedQueue (array $ batch ): void
180+ {
181+ // Prevent memory issues by limiting failed queue size
182+ if (count ($ this ->failed_queue ) >= $ this ->max_failed_queue_size ) {
183+ array_shift ($ this ->failed_queue ); // Remove oldest
184+ $ this ->handleError ('failed_queue_overflow ' ,
185+ 'Failed queue size limit reached. Dropping oldest failed batch. ' );
186+ }
187+
188+ $ this ->failed_queue [] = [
189+ 'messages ' => $ batch ,
190+ 'attempts ' => 0 ,
191+ 'next_retry ' => time () + $ this ->initial_retry_delay ,
192+ 'created_at ' => time ()
193+ ];
194+ }
195+
196+ /**
197+ * Retry failed batches that are ready for retry
198+ */
199+ protected function retryFailedBatches (): void
200+ {
201+ if (empty ($ this ->failed_queue )) {
202+ return ;
203+ }
204+
205+ $ currentTime = time ();
206+ $ remainingFailed = [];
207+
208+ foreach ($ this ->failed_queue as $ failedBatch ) {
209+ if (!$ this ->isReadyForRetry ($ failedBatch , $ currentTime )) {
210+ $ remainingFailed [] = $ failedBatch ;
211+ continue ;
212+ }
213+
214+ if ($ this ->retryFailedBatch ($ failedBatch )) {
215+ // Success - don't add back to queue
216+ continue ;
217+ }
218+
219+ // Still failed - update for next retry or mark as permanent failure
220+ $ updatedBatch = $ this ->updateFailedBatch ($ failedBatch , $ currentTime );
221+ if ($ updatedBatch !== null ) {
222+ $ remainingFailed [] = $ updatedBatch ;
223+ }
224+ }
225+
226+ $ this ->failed_queue = $ remainingFailed ;
227+ }
228+
229+ /**
230+ * Check if a failed batch is ready for retry
231+ */
232+ private function isReadyForRetry (array $ failedBatch , int $ currentTime ): bool
233+ {
234+ return $ failedBatch ['next_retry ' ] <= $ currentTime &&
235+ $ failedBatch ['attempts ' ] < $ this ->max_retry_attempts ;
236+ }
237+
238+ /**
239+ * Attempt to retry a single failed batch
240+ */
241+ private function retryFailedBatch (array $ failedBatch ): bool
242+ {
243+ if ($ this ->flushBatch ($ failedBatch ['messages ' ])) {
244+ $ this ->handleError ('batch_retry_success ' ,
245+ sprintf ('Successfully retried batch after %d failed attempts ' , $ failedBatch ['attempts ' ]));
246+ return true ;
247+ }
248+ return false ;
249+ }
250+
251+ /**
252+ * Update failed batch for next retry or mark as permanently failed
253+ * @return array|null Updated batch or null if permanently failed
254+ */
255+ private function updateFailedBatch (array $ failedBatch , int $ currentTime ): ?array
256+ {
257+ $ failedBatch ['attempts ' ]++;
258+
259+ if ($ failedBatch ['attempts ' ] >= $ this ->max_retry_attempts ) {
260+ // Permanently failed
261+ $ this ->handleError ('batch_permanently_failed ' ,
262+ sprintf ('Batch permanently failed after %d attempts, %d messages lost ' ,
263+ $ this ->max_retry_attempts , count ($ failedBatch ['messages ' ])));
264+ return null ;
265+ }
266+
267+ // Calculate next retry time with exponential backoff (capped at 1 hour)
268+ $ backoffMinutes = min (pow (2 , $ failedBatch ['attempts ' ]), 60 );
269+ $ failedBatch ['next_retry ' ] = $ currentTime + ($ backoffMinutes * 60 );
270+
271+ return $ failedBatch ;
272+ }
273+
116274 /**
117275 * Adds an item to our queue.
118276 * @param mixed $item
@@ -149,4 +307,44 @@ protected function payload($batch)
149307 "api_key " => $ this ->apiKey ,
150308 );
151309 }
310+
311+ /**
312+ * Get statistics about failed queue for observability
313+ */
314+ public function getFailedQueueStats (): array
315+ {
316+ $ totalMessages = 0 ;
317+ $ oldestRetry = null ;
318+ $ attemptCounts = [];
319+
320+ foreach ($ this ->failed_queue as $ failedBatch ) {
321+ $ totalMessages += count ($ failedBatch ['messages ' ]);
322+
323+ if ($ oldestRetry === null || $ failedBatch ['next_retry ' ] < $ oldestRetry ) {
324+ $ oldestRetry = $ failedBatch ['next_retry ' ];
325+ }
326+
327+ $ attempts = $ failedBatch ['attempts ' ];
328+ $ attemptCounts [$ attempts ] = ($ attemptCounts [$ attempts ] ?? 0 ) + 1 ;
329+ }
330+
331+ return [
332+ 'failed_batches ' => count ($ this ->failed_queue ),
333+ 'total_failed_messages ' => $ totalMessages ,
334+ 'oldest_retry_time ' => $ oldestRetry ,
335+ 'attempt_distribution ' => $ attemptCounts ,
336+ 'current_queue_size ' => count ($ this ->queue ),
337+ 'max_failed_queue_size ' => $ this ->max_failed_queue_size ,
338+ ];
339+ }
340+
341+ /**
342+ * Clear all failed queues (useful for testing or manual recovery)
343+ */
344+ public function clearFailedQueue (): int
345+ {
346+ $ clearedCount = count ($ this ->failed_queue );
347+ $ this ->failed_queue = [];
348+ return $ clearedCount ;
349+ }
152350}
0 commit comments