|
11 | 11 | # without introducing a hard dependency on `typing_extensions` |
12 | 12 | # from: https://stackoverflow.com/a/71944042/300572 |
13 | 13 | if TYPE_CHECKING: |
14 | | - from typing import ParamSpec, Callable |
| 14 | + from collections.abc import Iterator |
| 15 | + from typing import Any, ParamSpec, Callable |
15 | 16 | else: |
16 | 17 | # Fake ParamSpec |
17 | 18 | class ParamSpec: |
@@ -49,9 +50,7 @@ def setup_once() -> None: |
49 | 50 | ) |
50 | 51 |
|
51 | 52 | # If the query contains parameters then the send_data function is used to send those parameters to clickhouse |
52 | | - clickhouse_driver.client.Client.send_data = _wrap_send_data( |
53 | | - clickhouse_driver.client.Client.send_data |
54 | | - ) |
| 53 | + _wrap_send_data() |
55 | 54 |
|
56 | 55 | # Every query ends either with the Client's `receive_end_of_query` (no result expected) |
57 | 56 | # or its `receive_result` (result expected) |
@@ -128,23 +127,44 @@ def _inner_end(*args: P.args, **kwargs: P.kwargs) -> T: |
128 | 127 | return _inner_end |
129 | 128 |
|
130 | 129 |
|
131 | | -def _wrap_send_data(f: Callable[P, T]) -> Callable[P, T]: |
132 | | - def _inner_send_data(*args: P.args, **kwargs: P.kwargs) -> T: |
133 | | - instance = args[0] # type: clickhouse_driver.client.Client |
134 | | - data = args[2] |
135 | | - span = getattr(instance.connection, "_sentry_span", None) |
| 130 | +def _wrap_send_data() -> None: |
| 131 | + original_send_data = clickhouse_driver.client.Client.send_data |
| 132 | + |
| 133 | + def _inner_send_data( # type: ignore[no-untyped-def] # clickhouse-driver does not type send_data |
| 134 | + self, sample_block, data, types_check=False, columnar=False, *args, **kwargs |
| 135 | + ): |
| 136 | + span = getattr(self.connection, "_sentry_span", None) |
136 | 137 |
|
137 | 138 | if span is not None: |
138 | | - _set_db_data(span, instance.connection) |
| 139 | + _set_db_data(span, self.connection) |
139 | 140 |
|
140 | 141 | if should_send_default_pii(): |
141 | 142 | db_params = span._data.get("db.params", []) |
142 | | - db_params.extend(data) |
| 143 | + |
| 144 | + if isinstance(data, (list, tuple)): |
| 145 | + db_params.extend(data) |
| 146 | + |
| 147 | + else: # data is a generic iterator |
| 148 | + orig_data = data |
| 149 | + |
| 150 | + # Wrap the generator to add items to db.params as they are yielded. |
| 151 | + # This allows us to send the params to Sentry without needing to allocate |
| 152 | + # memory for the entire generator at once. |
| 153 | + def wrapped_generator() -> "Iterator[Any]": |
| 154 | + for item in orig_data: |
| 155 | + db_params.append(item) |
| 156 | + yield item |
| 157 | + |
| 158 | + # Replace the original iterator with the wrapped one. |
| 159 | + data = wrapped_generator() |
| 160 | + |
143 | 161 | span.set_data("db.params", db_params) |
144 | 162 |
|
145 | | - return f(*args, **kwargs) |
| 163 | + return original_send_data( |
| 164 | + self, sample_block, data, types_check, columnar, *args, **kwargs |
| 165 | + ) |
146 | 166 |
|
147 | | - return _inner_send_data |
| 167 | + clickhouse_driver.client.Client.send_data = _inner_send_data |
148 | 168 |
|
149 | 169 |
|
150 | 170 | def _set_db_data( |
|
0 commit comments