feat(spans): add option to rollout process_segment task#119003
feat(spans): add option to rollout process_segment task#119003lvthanh03 wants to merge 2 commits into
Conversation
| flushed_segment.project_id, | ||
| kafka_payload, | ||
| len(message["spans"]), | ||
| ) |
There was a problem hiding this comment.
Redis cleared before task enqueue
Medium Severity
When the rollout sends a segment via process_segment_task.apply_async, that call is not tied to the existing producer_futures / wait_produce path. The flusher still calls done_flush_segments for the whole batch right after, so Redis cleanup can run before task delivery is confirmed, unlike the Kafka produce path that waits on futures first.
Reviewed by Cursor Bugbot for commit 4d899e7. Configure here.
There was a problem hiding this comment.
this is legit. i wonder if we should change taskbroker-client so that apply_async returns a future, and we can track it
There was a problem hiding this comment.
yeah this is legit. Taskbroker client has wait_for_delivery, I think this would be a good use case?
There was a problem hiding this comment.
seems like with wait_for_delivery the taskbroker client catches delivery failure and only logs it, so we still move on to done_flush_segment even if delivery fails and not crash the spans buffer. This matches the current behaviour, the difference is that we also track invalid outcomes if the current produce fails.
We should find a way for apply_async to return the producer future, then flusher appends that future to the current producer_futures if we want to track negative outcomes, but with outcomes being tracked in EAP instead, I doubt this is a concern?
There was a problem hiding this comment.
i think we should return the future. with wait_for_delivery the problem is that we have many futures we want to await on in parallel, not block the flusher per-produce.
There was a problem hiding this comment.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 6579d20. Configure here.
| flushed_segment.project_id, | ||
| kafka_payload, | ||
| len(message["spans"]), | ||
| ) |
There was a problem hiding this comment.
Task enqueue errors crash flusher
High Severity
When the rollout path enqueues via process_segment_task.apply_async, delivery or broker errors propagate uncaught out of the flush iteration and terminate the flusher process. Kafka handoffs catch produce failures in wait_produce, record outcomes, and still run cleanup, so the two paths behave inconsistently and a mixed batch can leave in-flight Kafka produces unawaited.
Reviewed by Cursor Bugbot for commit 6579d20. Configure here.


Refs STREAM-1308
Adds a rollout path for the spans buffer flusher to enqueue flushed segments directly to the
process_segmenttask.When
spans.buffer.process-segments-task-rollout-rateis enabled, segment bytes are sent to the new task instead of being produced to thebuffered-segmentstopic.