|
41 | 41 | ENABLE_TRACE_PAGINATION_DEFAULT, |
42 | 42 | ENDPOINT_GET_TRACE_PAGINATION_MAX_ITEMS, |
43 | 43 | ) |
| 44 | +from snuba.utils.metrics.util import with_span |
44 | 45 | from snuba.web.query import run_query |
45 | 46 | from snuba.web.rpc import RPCEndpoint |
46 | 47 | from snuba.web.rpc.common.common import ( |
@@ -164,6 +165,7 @@ def to_protobuf(self) -> PageToken: |
164 | 165 | return PageToken(filter_offset=filters) |
165 | 166 |
|
166 | 167 |
|
| 168 | +@with_span(op="function") |
167 | 169 | def _build_query( |
168 | 170 | request: GetTraceRequest, |
169 | 171 | item: GetTraceRequest.TraceItem, |
@@ -465,69 +467,84 @@ def _value_to_attribute(key: str, value: Any) -> tuple[AttributeKey, AttributeVa |
465 | 467 | ) |
466 | 468 |
|
467 | 469 |
|
| 470 | +@with_span(op="function") |
468 | 471 | def _process_results( |
469 | 472 | data: Iterable[Dict[str, Any]], |
470 | 473 | ) -> ProcessedResults: |
471 | 474 | """ |
472 | | - Used to process the results returned from clickhouse in a single pass. |
473 | | - If you have more processing to do on the results, you can do it here and |
474 | | - return the results as another entry in the ProcessedResults named tuple. |
| 475 | + Used to process the results returned from clickhouse in two passes. |
| 476 | + The first pass adds attributes to each row, and the second pass sorts |
| 477 | + the attributes and assembles the final items. |
475 | 478 | """ |
476 | | - items: list[GetTraceResponse.Item] = [] |
477 | 479 | last_seen_timestamp_precise = 0.0 |
478 | 480 | last_seen_id = "" |
479 | 481 |
|
480 | | - for row in data: |
481 | | - id = row.pop("id") |
482 | | - ts = row.pop("timestamp") |
483 | | - arrays = row.pop("attributes_array", "{}") or "{}" |
484 | | - # We want to merge these values after to overwrite potential floats |
485 | | - # with the same name. |
486 | | - booleans = row.pop("attributes_bool", {}) or {} |
487 | | - integers = row.pop("attributes_int", {}) or {} |
488 | | - last_seen_timestamp_precise = float(ts) |
489 | | - last_seen_id = id |
490 | | - |
491 | | - timestamp = Timestamp() |
492 | | - # truncate to microseconds since we store microsecond precision only |
493 | | - # then transform to nanoseconds |
494 | | - timestamp.FromNanoseconds(int(ts * 1e6) * 1000) |
495 | | - |
496 | | - attributes: dict[str, GetTraceResponse.Item.Attribute] = {} |
497 | | - |
498 | | - def add_attribute(key: str, value: Any) -> None: |
499 | | - attribute_key, attribute_value = _value_to_attribute(key, value) |
500 | | - attributes[key] = GetTraceResponse.Item.Attribute( |
501 | | - key=attribute_key, |
502 | | - value=attribute_value, |
503 | | - ) |
| 482 | + # First pass: parse rows and build attribute dicts |
| 483 | + parsed_rows: list[tuple[str, Timestamp, dict[str, GetTraceResponse.Item.Attribute]]] = [] |
| 484 | + |
| 485 | + with sentry_sdk.start_span(op="function", description="add_attributes"): |
| 486 | + for row in data: |
| 487 | + id = row.pop("id") |
| 488 | + ts = row.pop("timestamp") |
| 489 | + arrays = row.pop("attributes_array", "{}") or "{}" |
| 490 | + # We want to merge these values after to overwrite potential floats |
| 491 | + # with the same name. |
| 492 | + booleans = row.pop("attributes_bool", {}) or {} |
| 493 | + integers = row.pop("attributes_int", {}) or {} |
| 494 | + last_seen_timestamp_precise = float(ts) |
| 495 | + last_seen_id = id |
| 496 | + |
| 497 | + timestamp = Timestamp() |
| 498 | + # truncate to microseconds since we store microsecond precision only |
| 499 | + # then transform to nanoseconds |
| 500 | + timestamp.FromNanoseconds(int(ts * 1e6) * 1000) |
| 501 | + |
| 502 | + attributes: dict[str, GetTraceResponse.Item.Attribute] = {} |
| 503 | + |
| 504 | + def add_attribute(key: str, value: Any) -> None: |
| 505 | + attribute_key, attribute_value = _value_to_attribute(key, value) |
| 506 | + attributes[key] = GetTraceResponse.Item.Attribute( |
| 507 | + key=attribute_key, |
| 508 | + value=attribute_value, |
| 509 | + ) |
504 | 510 |
|
505 | | - for row_key, row_value in row.items(): |
506 | | - if isinstance(row_value, dict): |
507 | | - for column_key, column_value in row_value.items(): |
508 | | - add_attribute(column_key, column_value) |
509 | | - else: |
510 | | - add_attribute(row_key, row_value) |
| 511 | + for row_key, row_value in row.items(): |
| 512 | + if isinstance(row_value, dict): |
| 513 | + for column_key, column_value in row_value.items(): |
| 514 | + add_attribute(column_key, column_value) |
| 515 | + else: |
| 516 | + add_attribute(row_key, row_value) |
511 | 517 |
|
512 | | - attributes_array = process_arrays(arrays) |
513 | | - for array_key, array_value in attributes_array.items(): |
514 | | - add_attribute(array_key, array_value) |
| 518 | + attributes_array = process_arrays(arrays) |
| 519 | + for array_key, array_value in attributes_array.items(): |
| 520 | + add_attribute(array_key, array_value) |
515 | 521 |
|
516 | | - for bool_key, bool_value in booleans.items(): |
517 | | - add_attribute(bool_key, bool_value) |
| 522 | + for bool_key, bool_value in booleans.items(): |
| 523 | + add_attribute(bool_key, bool_value) |
518 | 524 |
|
519 | | - for int_key, int_value in integers.items(): |
520 | | - add_attribute(int_key, int_value) |
| 525 | + for int_key, int_value in integers.items(): |
| 526 | + add_attribute(int_key, int_value) |
521 | 527 |
|
522 | | - item = GetTraceResponse.Item( |
523 | | - id=id, |
524 | | - timestamp=timestamp, |
525 | | - attributes=sorted( |
526 | | - attributes.values(), |
527 | | - key=attrgetter("key.name"), |
528 | | - ), |
529 | | - ) |
530 | | - items.append(item) |
| 528 | + parsed_rows.append((id, timestamp, attributes)) |
| 529 | + |
| 530 | + # Second pass: sort attributes and assemble items |
| 531 | + items: list[GetTraceResponse.Item] = [] |
| 532 | + |
| 533 | + with sentry_sdk.start_span(op="function", description="sort_attributes"): |
| 534 | + for id, timestamp, attributes in parsed_rows: |
| 535 | + item = GetTraceResponse.Item( |
| 536 | + id=id, |
| 537 | + timestamp=timestamp, |
| 538 | + attributes=sorted( |
| 539 | + attributes.values(), |
| 540 | + key=attrgetter("key.name"), |
| 541 | + ), |
| 542 | + ) |
| 543 | + items.append(item) |
| 544 | + |
| 545 | + current_span = sentry_sdk.get_current_span() |
| 546 | + if current_span is not None: |
| 547 | + current_span.set_data("rows_processed", len(parsed_rows)) |
531 | 548 |
|
532 | 549 | return ProcessedResults( |
533 | 550 | items=items, |
@@ -612,25 +629,27 @@ def _execute(self, in_msg: GetTraceRequest) -> GetTraceResponse: |
612 | 629 | page_token = EndpointGetTracePageToken(i, last_seen_timestamp_precise, last_seen_id) |
613 | 630 | break |
614 | 631 |
|
615 | | - response_meta = extract_response_meta( |
616 | | - in_msg.meta.request_id, |
617 | | - in_msg.meta.debug, |
618 | | - query_results, |
619 | | - [self._timer] * len(query_results), |
620 | | - ) |
621 | | - if not enable_pagination: |
622 | | - serialized_page_token = None |
623 | | - elif page_token is None: |
624 | | - serialized_page_token = PageToken(end_pagination=True) |
625 | | - else: |
626 | | - serialized_page_token = page_token.to_protobuf() |
627 | | - return GetTraceResponse( |
628 | | - item_groups=item_groups, |
629 | | - meta=response_meta, |
630 | | - trace_id=in_msg.trace_id, |
631 | | - page_token=serialized_page_token, |
632 | | - ) |
| 632 | + with sentry_sdk.start_span(op="function", description="assemble_response"): |
| 633 | + response_meta = extract_response_meta( |
| 634 | + in_msg.meta.request_id, |
| 635 | + in_msg.meta.debug, |
| 636 | + query_results, |
| 637 | + [self._timer] * len(query_results), |
| 638 | + ) |
| 639 | + if not enable_pagination: |
| 640 | + serialized_page_token = None |
| 641 | + elif page_token is None: |
| 642 | + serialized_page_token = PageToken(end_pagination=True) |
| 643 | + else: |
| 644 | + serialized_page_token = page_token.to_protobuf() |
| 645 | + return GetTraceResponse( |
| 646 | + item_groups=item_groups, |
| 647 | + meta=response_meta, |
| 648 | + trace_id=in_msg.trace_id, |
| 649 | + page_token=serialized_page_token, |
| 650 | + ) |
633 | 651 |
|
| 652 | + @with_span(op="function") |
634 | 653 | def _query_item_group( |
635 | 654 | self, |
636 | 655 | in_msg: GetTraceRequest, |
|
0 commit comments