|
1 | | -from typing import TYPE_CHECKING, TypeVar, Union |
| 1 | +from typing import TYPE_CHECKING, Deque, TypeVar, Union |
2 | 2 |
|
3 | 3 |
|
4 | 4 | # Re-exported for compat, since code out there in the wild might use this variable. |
@@ -81,18 +81,53 @@ def substituted_because_contains_sensitive_data(cls): |
81 | 81 | }, |
82 | 82 | ) |
83 | 83 |
|
| 84 | +T = TypeVar("T") |
| 85 | +Annotated = Union[AnnotatedValue, T] |
| 86 | +class AnnotatedDeque(AnnotatedValue): |
| 87 | + """ |
| 88 | + Meta information for a data field in the event payload. |
| 89 | + This is to tell Relay that we have tampered with the fields value. |
| 90 | + See: |
| 91 | + https://github.com/getsentry/relay/blob/be12cd49a0f06ea932ed9b9f93a655de5d6ad6d1/relay-general/src/types/meta.rs#L407-L423 |
| 92 | + """ |
| 93 | + |
| 94 | + __slots__ = ("value", "metadata") |
| 95 | + |
| 96 | + def __init__(self, value, metadata): |
| 97 | + # type: (Deque[Any], Dict[str, Any]) -> None |
| 98 | + self.value = value |
| 99 | + self.metadata = metadata |
| 100 | + |
| 101 | + def __eq__(self, other): |
| 102 | + # type: (Any) -> bool |
| 103 | + if not isinstance(other, AnnotatedValue): |
| 104 | + return False |
| 105 | + |
| 106 | + return self.value == other.value and self.metadata == other.metadata |
| 107 | + |
| 108 | + def append(self, other): |
| 109 | + # type: (Any) -> None |
| 110 | + self.value.append(other) |
| 111 | + |
| 112 | + def extend(self, other): |
| 113 | + # type: (Any) -> None |
| 114 | + self.value.extend(other) |
| 115 | + |
| 116 | + def popleft(self): |
| 117 | + self.value.popleft() |
| 118 | + |
| 119 | + def __len__(self): |
| 120 | + return len(self.value) |
| 121 | + |
84 | 122 | @classmethod |
85 | | - def truncated_breadcrumbs(cls, breadcrumbs, n_truncated): |
86 | | - # type: (list[Breadcrumb], int) -> AnnotatedValue |
87 | | - """Breadcrumbs were removed because the number of breadcrumbs exceeded their maximum limit.""" |
88 | | - return AnnotatedValue( |
89 | | - value=breadcrumbs, |
| 123 | + def truncated(cls, value, n_truncated): |
| 124 | + # type: (Deque[Any], int) -> AnnotatedValue |
| 125 | + """Data was removed because the number of elements exceeded the maximum limit.""" |
| 126 | + return AnnotatedDeque( |
| 127 | + value=value, |
90 | 128 | metadata={"len": [n_truncated]}, # Remark |
91 | 129 | ) |
92 | 130 |
|
93 | | -T = TypeVar("T") |
94 | | -Annotated = Union[AnnotatedValue, T] |
95 | | - |
96 | 131 | if TYPE_CHECKING: |
97 | 132 | from collections.abc import Container, MutableMapping, Sequence |
98 | 133 |
|
|
0 commit comments