Skip to content

Conversation

@hussam789
Copy link

@hussam789 hussam789 commented Oct 30, 2025

User description

PR #6


PR Type

Enhancement


Description

  • Implement multiprocess span flusher with per-shard process management

  • Add configurable flusher_processes parameter to limit concurrent processes

  • Distribute shards across multiple processes for parallel processing

  • Add shard-level health monitoring and metrics tagging

  • Update tests to verify process limiting and shard distribution


Diagram Walkthrough

flowchart LR
  A["CLI Option: flusher_processes"] --> B["ProcessSpansStrategyFactory"]
  B --> C["SpanFlusher: Multiple Processes"]
  C --> D["Process-to-Shards Mapping"]
  D --> E["Parallel Shard Processing"]
  E --> F["Health Monitoring per Process"]
  F --> G["Metrics with Shard Tags"]
Loading

File Walkthrough

Relevant files
Configuration changes
__init__.py
Add flusher processes CLI option                                                 

src/sentry/consumers/init.py

  • Add --flusher-processes CLI option to span consumer configuration
  • Allow configuration of maximum number of flusher processes
  • Default value set to 1 process
+9/-1     
Enhancement
factory.py
Pass flusher processes config to SpanFlusher                         

src/sentry/spans/consumers/process/factory.py

  • Add flusher_processes parameter to factory initialization
  • Store flusher_processes configuration as instance variable
  • Pass max_processes parameter to SpanFlusher during creation
+3/-0     
flusher.py
Implement multiprocess flusher with shard distribution     

src/sentry/spans/consumers/process/flusher.py

  • Refactor from single process to multiprocess architecture with
    process-to-shards mapping
  • Create separate processes for each shard or share processes based on
    max_processes limit
  • Implement per-process health monitoring, backpressure tracking, and
    restart counters
  • Add shard tagging to metrics for better observability
  • Update process lifecycle management to handle multiple processes
  • Add ServiceMemory import for memory tracking across multiple buffers
+127/-47
Tests
test_consumer.py
Add multiprocess flusher tests and transaction handling   

tests/sentry/spans/consumers/process/test_consumer.py

  • Add time import for sleep operations in tests
  • Change @pytest.mark.django_db to use transaction=True for proper
    transaction handling
  • Add sleep delay after drift change to allow flusher threads to process
  • Add new test test_flusher_processes_limit to verify process limiting
    and shard distribution
+48/-1   
test_flusher.py
Update backpressure test for multiprocess                               

tests/sentry/spans/consumers/process/test_flusher.py

  • Update backpressure assertion to check across all process backpressure
    values
  • Change from single backpressure_since to dictionary-based
    process_backpressure_since
+1/-1     
Documentation
CLAUDE.md
Add union type checking best practices                                     

CLAUDE.md

  • Add documentation example showing correct use of isinstance() for type
    unions
  • Contrast with incorrect use of hasattr() for type checking
+11/-0   

Summary by CodeRabbit

Release Notes

  • New Features

    • Added --flusher-processes CLI option to configure the number of span processing workers (default: 1)
    • Upgraded span flushing with multi-process support, enabling parallel per-shard processing for improved performance
  • Bug Fixes

    • Enhanced process health monitoring with per-process restart capabilities for improved reliability
  • Documentation

    • Clarified type-checking best practices in guidance documentation

✏️ Tip: You can customize this high-level summary in your review settings.

@qodo-code-review
Copy link

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
Process restart robustness

Description: Process liveness and restart logic may terminate and immediately respawn processes
handling multiple shards without graceful drain or exponential backoff, risking message
duplication or processing gaps; add backoff/jitter and ensure safe shutdown of producers
before kill.
flusher.py [219-260]

Referred Code
max_unhealthy_seconds = options.get("spans.buffer.flusher.max-unhealthy-seconds")

for process_index, process in self.processes.items():
    if not process:
        continue

    shards = self.process_to_shards_map[process_index]

    cause = None
    if not process.is_alive():
        exitcode = getattr(process, "exitcode", "unknown")
        cause = f"no_process_{exitcode}"
    elif (
        int(time.time()) - self.process_healthy_since[process_index].value
        > max_unhealthy_seconds
    ):
        # Check if any shard handled by this process is unhealthy
        cause = "hang"

    if cause is None:
        continue  # healthy


 ... (clipped 21 lines)
Data loss on shutdown

Description: join terminates processes after timeout without ensuring in-flight producer futures are
drained, which could lead to dropped spans; ensure producer flush or coordinated shutdown
before terminate.
flusher.py [337-347]

Referred Code
for process_index, process in self.processes.items():
    if deadline is not None:
        remaining_time = deadline - time.time()
        if remaining_time <= 0:
            break

    while process.is_alive() and (deadline is None or deadline > time.time()):
        time.sleep(0.1)

    if isinstance(process, multiprocessing.Process):
        process.terminate()
Ticket Compliance
🎫 No ticket provided
  • Create ticket/issue
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
🟢
Generic: Meaningful Naming and Self-Documenting Code

Objective: Ensure all identifiers clearly express their purpose and intent, making code
self-documenting

Status: Passed

Generic: Secure Error Handling

Objective: To prevent the leakage of sensitive system information through error messages while
providing sufficient detail for internal debugging.

Status: Passed

Generic: Secure Logging Practices

Objective: To ensure logs are useful for debugging and auditing without exposing sensitive
information like PII, PHI, or cardholder data.

Status: Passed

Generic: Comprehensive Audit Trails

Objective: To create a detailed and reliable record of critical system actions for security analysis
and compliance.

Status:
Action logging: New process lifecycle actions (creation, restarts, unhealthy detection) are not explicitly
audit-logged with user/context, making it unclear if critical operational events are
captured for audit trails.

Referred Code
def _create_processes(self):
    # Create processes based on shard mapping
    for process_index, shards in self.process_to_shards_map.items():
        self._create_process_for_shards(process_index, shards)

def _create_process_for_shards(self, process_index: int, shards: list[int]):
    # Optimistically reset healthy_since to avoid a race between the
    # starting process and the next flush cycle. Keep back pressure across
    # the restart, however.
    self.process_healthy_since[process_index].value = int(time.time())

    # Create a buffer for these specific shards
    shard_buffer = SpansBuffer(shards)

    make_process: Callable[..., multiprocessing.context.SpawnProcess | threading.Thread]
    if self.produce_to_pipe is None:
        target = run_with_initialized_sentry(
            SpanFlusher.main,
            # unpickling buffer will import sentry, so it needs to be
            # pickled separately. at the same time, pickling
            # synchronization primitives like multiprocessing.Value can


 ... (clipped 25 lines)
Generic: Robust Error Handling and Edge Case Management

Objective: Ensure comprehensive error handling that provides meaningful context and graceful
degradation

Status:
Process restart logic: When processes are unhealthy, the restart path lacks contextual error logging about shards
and exit codes beyond a metric, which may hinder debugging of edge cases in multiprocess
management.

Referred Code
def _ensure_processes_alive(self) -> None:
    max_unhealthy_seconds = options.get("spans.buffer.flusher.max-unhealthy-seconds")

    for process_index, process in self.processes.items():
        if not process:
            continue

        shards = self.process_to_shards_map[process_index]

        cause = None
        if not process.is_alive():
            exitcode = getattr(process, "exitcode", "unknown")
            cause = f"no_process_{exitcode}"
        elif (
            int(time.time()) - self.process_healthy_since[process_index].value
            > max_unhealthy_seconds
        ):
            # Check if any shard handled by this process is unhealthy
            cause = "hang"

        if cause is None:


 ... (clipped 22 lines)
Generic: Security-First Input Validation and Data Handling

Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent
vulnerabilities

Status:
Tag exposure risk: Adding shard identifiers to Sentry tags and metrics increases observability but may
require validation that shard IDs are non-sensitive and sanitized; no explicit validation
is shown in the new code.

Referred Code
shard_tag = ",".join(map(str, shards))
sentry_sdk.set_tag("sentry_spans_buffer_component", "flusher")
sentry_sdk.set_tag("sentry_spans_buffer_shards", shard_tag)

try:
    producer_futures = []

    if produce_to_pipe is not None:
        produce = produce_to_pipe
        producer = None
    else:
        cluster_name = get_topic_definition(Topic.BUFFERED_SEGMENTS)["cluster"]

        producer_config = get_kafka_producer_cluster_options(cluster_name)
        producer = KafkaProducer(build_kafka_configuration(default_config=producer_config))
        topic = ArroyoTopic(
            get_topic_definition(Topic.BUFFERED_SEGMENTS)["real_topic_name"]
        )

        def produce(payload: KafkaPayload) -> None:
            producer_futures.append(producer.produce(topic, payload))


 ... (clipped 35 lines)
Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

@qodo-code-review
Copy link

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix zombie processes on shutdown

Remove the break statement from the join method's shutdown loop to prevent
zombie processes. Ensure all flusher processes are properly terminated even if
the timeout is reached.

src/sentry/spans/consumers/process/flusher.py [336-347]

 # Wait for all processes to finish
 for process_index, process in self.processes.items():
     if deadline is not None:
         remaining_time = deadline - time.time()
-        if remaining_time <= 0:
-            break
+        if remaining_time > 0:
+            # The while loop condition is sufficient, but this is a small optimization
+            # to avoid the sleep if timeout is already passed.
+            while process.is_alive() and time.time() < deadline:
+                time.sleep(0.1)
+    else:
+        while process.is_alive():
+            time.sleep(0.1)
 
-    while process.is_alive() and (deadline is None or deadline > time.time()):
-        time.sleep(0.1)
-
-    if isinstance(process, multiprocessing.Process):
+    if process.is_alive() and isinstance(process, multiprocessing.Process):
         process.terminate()
  • Apply / Chat
Suggestion importance[1-10]: 9

__

Why: The suggestion correctly identifies a critical bug where a break statement would cause the shutdown loop to exit prematurely, leaving zombie processes and leading to resource leaks.

High
High-level
Refactor to simplify process management

Simplify the SpanFlusher by replacing the complex manual process management
logic with a standard library like multiprocessing.Pool. This would abstract
away process creation, health checks, and restarts, improving maintainability.

Examples:

src/sentry/spans/consumers/process/flusher.py [81-125]
    def _create_processes(self):
        # Create processes based on shard mapping
        for process_index, shards in self.process_to_shards_map.items():
            self._create_process_for_shards(process_index, shards)

    def _create_process_for_shards(self, process_index: int, shards: list[int]):
        # Optimistically reset healthy_since to avoid a race between the
        # starting process and the next flush cycle. Keep back pressure across
        # the restart, however.
        self.process_healthy_since[process_index].value = int(time.time())

 ... (clipped 35 lines)
src/sentry/spans/consumers/process/flusher.py [218-259]
    def _ensure_processes_alive(self) -> None:
        max_unhealthy_seconds = options.get("spans.buffer.flusher.max-unhealthy-seconds")

        for process_index, process in self.processes.items():
            if not process:
                continue

            shards = self.process_to_shards_map[process_index]

            cause = None

 ... (clipped 32 lines)

Solution Walkthrough:

Before:

class SpanFlusher:
    def __init__(self, ...):
        self.processes = {}
        self.process_healthy_since = {}
        self.process_backpressure_since = {}
        self.process_restarts = {}
        self._create_processes()

    def _create_process_for_shards(self, process_index, shards):
        # manual process creation
        process = self.mp_context.Process(...)
        process.start()
        self.processes[process_index] = process

    def _ensure_processes_alive(self):
        for process_index, process in self.processes.items():
            if not process.is_alive() or is_hanging(process):
                # manual restart logic
                process.kill()
                self._create_process_for_shards(...)

    def join(self, timeout):
        for process in self.processes.values():
            # manual join logic
            process.terminate()

After:

from multiprocessing.pool import Pool

def flusher_worker(shards, ...):
    # Worker logic for a set of shards
    ...

class SpanFlusher:
    def __init__(self, ...):
        self.pool = Pool(processes=self.num_processes)
        self._start_workers()

    def _start_workers(self):
        for process_index, shards in self.process_to_shards_map.items():
            self.pool.apply_async(flusher_worker, args=(shards, ...))
        # Health/backpressure monitoring might still be needed,
        # but worker lifecycle is managed by the pool.

    def _ensure_processes_alive(self):
        # This can be simplified or removed, as the pool handles
        # worker lifecycle and can restart them.
        # Custom hang detection might still be layered on top.
        pass

    def join(self, timeout):
        self.pool.close()
        self.pool.join()
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that the manual process management in SpanFlusher adds significant complexity, and proposing a process pool is a valid architectural alternative to improve maintainability.

Medium
General
Remove unused private helper method

Remove the unused private method _create_process_for_shard to eliminate dead
code and improve maintainability.

src/sentry/spans/consumers/process/flusher.py [127-132]

-def _create_process_for_shard(self, shard: int):
-    # Find which process this shard belongs to and restart that process
-    for process_index, shards in self.process_to_shards_map.items():
-        if shard in shards:
-            self._create_process_for_shards(process_index, shards)
-            break
 
+
  • Apply / Chat
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies that the _create_process_for_shard method is dead code and removing it improves code maintainability.

Low
Remove redundant list clearing operation

Remove the redundant producer_futures.clear() call from the main method's loop,
as the list is already re-initialized on each iteration.

src/sentry/spans/consumers/process/flusher.py [199-203]

 with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}):
     for future in producer_futures:
         future.result()
 
-producer_futures.clear()
-
  • Apply / Chat
Suggestion importance[1-10]: 4

__

Why: The suggestion correctly points out that producer_futures.clear() is redundant since the list is re-initialized in each loop iteration, offering a minor code simplification.

Low
  • More

@bar-qodo
Copy link

@coderabbitai review

@coderabbitai
Copy link

coderabbitai bot commented Nov 23, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@coderabbitai
Copy link

coderabbitai bot commented Nov 23, 2025

Walkthrough

The changes implement multi-process support for Sentry's span flusher consumer. A new --flusher-processes CLI option configures the number of parallel flusher processes. ProcessSpansStrategyFactory accepts and passes this parameter to SpanFlusher, which has been substantially refactored to support multiple processes with per-shard work distribution, independent buffers, and individual health and backpressure tracking per process.

Changes

Cohort / File(s) Summary
Configuration and CLI
src/sentry/consumers/__init__.py
Adds --flusher-processes CLI option (default 1) to the "process-spans" consumer's clickOptions, enabling configuration of the number of span flusher processes.
Factory and Initialization
src/sentry/spans/consumers/process/factory.py
Adds flusher_processes: int | None = None parameter to ProcessSpansStrategyFactory.__init__, stores it as instance attribute, and passes it as max_processes when constructing SpanFlusher.
Flusher Core Logic
src/sentry/spans/consumers/process/flusher.py
Refactors SpanFlusher from single-process to multi-process architecture: adds max_processes parameter to __init__, replaces single backpressure_since with per-process process_backpressure_since mapping, introduces per-process buffers and shard-to-process mappings, adds _create_processes and _create_process_for_shards methods, updates main signature to accept shards: list[int] with per-shard metrics tagging, adds per-process health checks via _ensure_processes_alive, and extends termination/join logic for all processes.
Test Updates
tests/sentry/spans/consumers/process/test_consumer.py
Adds time module import, changes test_basic to use @pytest.mark.django_db(transaction=True), adds synchronization after clock drift, and introduces new test_flusher_processes_limit test verifying process creation and shard distribution with flusher_processes=2.
Test Backpressure Logic
tests/sentry/spans/consumers/process/test_flusher.py
Updates backpressure detection from flusher.backpressure_since.value to any(x.value for x in flusher.process_backpressure_since.values()) to check across all processes.
Documentation
CLAUDE.md
Adds guidance blocks illustrating incorrect use of hasattr for unions and correct use of isinstance.

Sequence Diagram

sequenceDiagram
    participant CLI as CLI Config
    participant Factory as ProcessSpansStrategyFactory
    participant Flusher as SpanFlusher
    participant PM as Process Manager
    participant P1 as Process 1<br/>(Shards 0,2)
    participant P2 as Process 2<br/>(Shards 1,3)
    participant Kafka as Kafka

    CLI->>Factory: flusher_processes=2
    Factory->>Flusher: max_processes=2
    Flusher->>PM: _create_processes(shards=[0,1,2,3])
    PM->>P1: main(shards=[0,2])
    PM->>P2: main(shards=[1,3])
    
    rect rgb(200, 220, 255)
        note over P1,P2: Parallel Processing
        P1->>P1: buffer spans for shards 0,2<br/>health & backpressure tracking
        P2->>P2: buffer spans for shards 1,3<br/>health & backpressure tracking
    end
    
    P1->>Kafka: flush segments (shards 0,2)
    P2->>Kafka: flush segments (shards 1,3)
    
    rect rgb(200, 255, 200)
        note over Flusher: Health Checks
        Flusher->>P1: _ensure_processes_alive
        Flusher->>P2: _ensure_processes_alive
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

  • src/sentry/spans/consumers/process/flusher.py: Primary concern—substantial refactoring from single-process to multi-process architecture with complex per-process state management, health tracking, and backpressure logic across multiple processes. Requires careful review of process lifecycle, shard distribution, and concurrency semantics.
  • Multi-process state management: New per-process attributes (process_backpressure_since, process_healthy_since, process_restarts, process_buffers, shards_to_process) and their interaction across _create_processes, _ensure_processes_alive, and flushing paths.
  • Process lifecycle changes: Updates to terminate, join, and process creation flow warrant attention to ensure proper cleanup and restart semantics.
  • Test coverage for new multiprocessing behavior: New test test_flusher_processes_limit validates shard distribution and process count, but review should verify correctness of shard-to-process mapping logic.

Poem

🐰 Parallel processes now hop with grace,
Each shard finds its own dedicated space,
From one flusher to many, we leap and bound,
Health checks and buffers all around,
Kafka flows faster than ever before! 🚀

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 6.25% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main objective: implementing multiprocess enhancement with health monitoring for the span buffer/flusher component.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch span-flusher-multiprocess

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
src/sentry/spans/consumers/process/flusher.py (2)

127-132: Verify usage of _create_process_for_shard method.

This method is defined but not called anywhere in the provided code. The health check logic (line 259) uses _create_process_for_shards instead. Consider removing this method if it's unused, or document its intended future use.

#!/bin/bash
# Search for any calls to _create_process_for_shard
rg -nP '\b_create_process_for_shard\s*\(' --type=py

247-251: Consider extracting long exception message.

The exception message is quite long. Per static analysis (TRY003), consider defining the message format in the exception class or as a constant for maintainability.

This is a minor style issue and can be deferred.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c2c41b0 and 9bf83b0.

📒 Files selected for processing (6)
  • CLAUDE.md (1 hunks)
  • src/sentry/consumers/__init__.py (1 hunks)
  • src/sentry/spans/consumers/process/factory.py (3 hunks)
  • src/sentry/spans/consumers/process/flusher.py (8 hunks)
  • tests/sentry/spans/consumers/process/test_consumer.py (4 hunks)
  • tests/sentry/spans/consumers/process/test_flusher.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
tests/sentry/spans/consumers/process/test_consumer.py (3)
src/sentry/spans/consumers/process/flusher.py (2)
  • poll (215-216)
  • join (328-347)
src/sentry/spans/consumers/process/factory.py (2)
  • ProcessSpansStrategyFactory (24-120)
  • create_with_partitions (58-116)
tests/sentry/spans/consumers/process/test_flusher.py (1)
  • append (24-26)
src/sentry/spans/consumers/process/flusher.py (1)
src/sentry/processing/backpressure/memory.py (1)
  • ServiceMemory (12-24)
🪛 Ruff (0.14.5)
tests/sentry/spans/consumers/process/test_consumer.py

106-106: Unused function argument: force

(ARG001)

src/sentry/spans/consumers/process/flusher.py

248-250: Avoid specifying long messages outside the exception class

(TRY003)


337-337: Loop control variable process_index not used within loop body

Rename unused process_index to _process_index

(B007)

🔇 Additional comments (8)
src/sentry/spans/consumers/process/flusher.py (2)

43-79: LGTM! Clean per-process state initialization.

The round-robin shard distribution and per-process state tracking are well-structured. Defaulting max_processes to the shard count ensures backward compatibility.


261-316: LGTM! Correct multi-process coordination.

The backpressure and memory checks properly aggregate across all processes. The logic correctly rejects messages if any process is under backpressure or if aggregate memory exceeds limits.

CLAUDE.md (1)

448-458: LGTM! Helpful type-checking guidance.

The added example clearly demonstrates the correct use of isinstance() over hasattr() for union type guards. This is valuable documentation for maintaining type safety.

tests/sentry/spans/consumers/process/test_flusher.py (1)

83-83: LGTM! Correct assertion for multiprocess backpressure.

The updated assertion properly checks if any process has entered backpressure state by iterating over the per-process dictionary values.

src/sentry/consumers/__init__.py (1)

430-438: LGTM! Clear CLI option with sensible default.

The --flusher-processes option is well-documented and defaults to 1 for backward compatibility. The help text clearly explains its purpose.

tests/sentry/spans/consumers/process/test_consumer.py (2)

1-1: LGTM! Necessary test adjustments for multiprocessing.

The addition of time.sleep(0.1) after advancing drift and transaction=True on the decorator ensure proper synchronization and database isolation for the multiprocess flusher behavior.

Also applies to: 12-12, 60-62


84-123: LGTM! Comprehensive test for process limiting.

The test correctly verifies that:

  1. Only 2 processes are created despite 4 shards
  2. All 4 shards are distributed across the 2 processes
  3. The flusher respects the max_processes limit

Note: The unused force parameter on line 106 is part of the commit callback signature and cannot be removed (false positive from static analysis).

src/sentry/spans/consumers/process/factory.py (1)

41-41: LGTM! Clean parameter threading.

The flusher_processes parameter is cleanly passed through from the CLI to SpanFlusher, enabling configurable process limits for the span flusher.

Also applies to: 52-52, 74-74

Comment on lines +185 to +199
with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}):
for flushed_segment in flushed_segments.values():
if not flushed_segment.spans:
continue

spans = [span.payload for span in flushed_segment.spans]
kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), [])
metrics.timing("spans.buffer.segment_size_bytes", len(kafka_payload.value))
metrics.timing(
"spans.buffer.segment_size_bytes",
len(kafka_payload.value),
tags={"shard": shard_tag},
)
produce(kafka_payload)

with metrics.timer("spans.buffer.flusher.wait_produce"):
with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix inconsistent metric tag names.

Line 185 uses tags={"shard": shard_tag} while line 199 uses tags={"shards": shard_tag}. The inconsistency (singular vs plural) makes querying metrics more difficult. Use the same tag name for both.

Apply this diff to standardize on "shard":

-                with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}):
+                with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}):
for flushed_segment in flushed_segments.values():
if not flushed_segment.spans:
continue
spans = [span.payload for span in flushed_segment.spans]
kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), [])
metrics.timing("spans.buffer.segment_size_bytes", len(kafka_payload.value))
metrics.timing(
"spans.buffer.segment_size_bytes",
len(kafka_payload.value),
tags={"shard": shard_tag},
)
produce(kafka_payload)
with metrics.timer("spans.buffer.flusher.wait_produce"):
with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}):
with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}):
for flushed_segment in flushed_segments.values():
if not flushed_segment.spans:
continue
spans = [span.payload for span in flushed_segment.spans]
kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), [])
metrics.timing(
"spans.buffer.segment_size_bytes",
len(kafka_payload.value),
tags={"shard": shard_tag},
)
produce(kafka_payload)
with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}):
🤖 Prompt for AI Agents
In src/sentry/spans/consumers/process/flusher.py around lines 185 to 199, the
metric tag names are inconsistent: one timer uses tags={"shard": shard_tag}
while the later timer uses tags={"shards": shard_tag}; change the later timer to
use tags={"shard": shard_tag} so both timers use the same "shard" tag name, and
run tests or a quick local check to ensure metrics emit with the unified tag.

Comment on lines +337 to +344
for process_index, process in self.processes.items():
if deadline is not None:
remaining_time = deadline - time.time()
if remaining_time <= 0:
break

while process.is_alive() and (deadline is None or deadline > time.time()):
time.sleep(0.1)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Remove unused loop variable.

The process_index variable is not used within the loop body. Either rename it to _process_index to indicate it's intentionally unused, or iterate over self.processes.values() directly.

Apply this diff to iterate over values only:

         # Wait for all processes to finish
-        for process_index, process in self.processes.items():
+        for process in self.processes.values():
             if deadline is not None:
                 remaining_time = deadline - time.time()
                 if remaining_time <= 0:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for process_index, process in self.processes.items():
if deadline is not None:
remaining_time = deadline - time.time()
if remaining_time <= 0:
break
while process.is_alive() and (deadline is None or deadline > time.time()):
time.sleep(0.1)
for process in self.processes.values():
if deadline is not None:
remaining_time = deadline - time.time()
if remaining_time <= 0:
break
while process.is_alive() and (deadline is None or deadline > time.time()):
time.sleep(0.1)
🧰 Tools
🪛 Ruff (0.14.5)

337-337: Loop control variable process_index not used within loop body

Rename unused process_index to _process_index

(B007)

🤖 Prompt for AI Agents
In src/sentry/spans/consumers/process/flusher.py around lines 337 to 344, the
for-loop currently unpacks an unused process_index variable; change the
iteration to loop over self.processes.values() (or rename process_index to
_process_index) so the unused variable is removed, leaving the loop body to
operate on process only and keeping semantics unchanged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants