Skip to content

Conversation

@akshayutture-augment
Copy link

@akshayutture-augment akshayutture-augment commented Nov 14, 2025

Test 9


Replicated from ai-code-review-evaluation/sentry-cursor#9


Note

Introduces a queue-based threaded processing mode for uptime results that preserves per-group ordering and safely commits offsets, with full integration and tests.

  • Uptime Results Consumer:
    • Add new thread-queue-parallel mode to ResultsStrategyFactory and CLI uptime-options.
    • Integrate queue-based strategy creation (create_thread_queue_parallel_worker) and graceful shutdown.
  • Queue-based Processing (sentry/remote_subscriptions/consumers/queue_consumer.py):
    • Implement OffsetTracker for per-partition contiguous offset commits.
    • Implement FixedQueuePool with consistent group-to-queue mapping and single-thread workers to preserve order.
    • Implement SimpleQueueProcessingStrategy with background commit loop and metrics.
  • CLI Options (sentry/consumers/__init__.py):
    • Extend uptime_options to include thread-queue-parallel; clarify --max-workers help.
  • Tests:
    • Add unit tests for OffsetTracker, FixedQueuePool, and SimpleQueueProcessingStrategy.
    • Extend uptime consumer tests to cover ordering, concurrency, offset commit behavior, error handling, shutdown, and Kafka commit integration.

Written by Cursor Bugbot for commit 33f4fdd. Configure here.

One potential problem we have with batch processing is that any one slow
item will clog up the whole batch. This pr implements a queueing method
instead, where we keep N queues that each have their own workers.
There's still a chance of individual items backlogging a queue, but we
can try increased concurrency here to reduce the chances of that
happening

<!-- Describe your PR here. -->
@cursor
Copy link

cursor bot commented Nov 14, 2025

You have run out of free Bugbot PR reviews for this billing cycle. This will reset on December 14.

To receive reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.

@akshayutture-augment
Copy link
Author

cursor review

for partition, offset in committable.items():
self.queue_pool.offset_tracker.mark_committed(partition, offset)
except Exception:
logger.exception("Error in commit loop")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Shutdown: Uncommitted Offsets Lost

The _commit_loop calls shutdown_event.wait(1.0) at the start of each iteration. When shutdown_event is set during close(), the wait returns immediately, causing the loop condition check to exit without performing a final commit. This means any offsets that became committable between the last commit and shutdown will be lost, potentially causing message reprocessing on restart.

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants