|
9 | 9 | from django.db.models import Q |
10 | 10 |
|
11 | 11 | import chdb |
| 12 | +import structlog |
12 | 13 |
|
13 | 14 | from posthog.schema import DatabaseSerializedFieldType, HogQLQueryModifiers |
14 | 15 |
|
@@ -184,30 +185,53 @@ def get_columns( |
184 | 185 | context=placeholder_context, |
185 | 186 | table_size_mib=self.size_in_s3_mib, |
186 | 187 | ) |
| 188 | + logger = structlog.get_logger(__name__) |
| 189 | + try: |
| 190 | + # chdb hangs in CI during tests |
| 191 | + if TEST: |
| 192 | + raise Exception() |
187 | 193 |
|
188 | | - tag_queries(team_id=self.team.pk, table_id=self.id, warehouse_query=True, name="describe_wh_table") |
| 194 | + quoted_placeholders = {k: f"'{v}'" for k, v in placeholder_context.values.items()} |
| 195 | + # chdb doesn't support parameterized queries |
| 196 | + chdb_query = f"DESCRIBE TABLE (SELECT * FROM {s3_table_func} LIMIT 1)" % quoted_placeholders |
189 | 197 |
|
190 | | - # The cluster is a little broken right now, and so this can intermittently fail. |
191 | | - # See https://posthog.slack.com/archives/C076R4753Q8/p1756901693184169 for context |
192 | | - attempts = 5 |
193 | | - result = None |
194 | | - for i in range(attempts): |
195 | | - try: |
196 | | - result = sync_execute( |
197 | | - f"""DESCRIBE TABLE {s3_table_func}""", |
198 | | - args=placeholder_context.values, |
199 | | - ) |
200 | | - break |
201 | | - except Exception as err: |
202 | | - if i >= attempts - 1: |
203 | | - capture_exception(err) |
204 | | - if safe_expose_ch_error: |
205 | | - self._safe_expose_ch_error(err) |
206 | | - else: |
207 | | - raise |
208 | | - |
209 | | - # Pause execution slightly to not overload clickhouse |
210 | | - time.sleep(2**i) |
| 198 | + # TODO: upgrade chdb once https://github.com/chdb-io/chdb/issues/342 is actually resolved |
| 199 | + # See https://github.com/chdb-io/chdb/pull/374 for the fix |
| 200 | + chdb_result = chdb.query(chdb_query, output_format="CSV") |
| 201 | + reader = csv.reader(StringIO(str(chdb_result))) |
| 202 | + result = [tuple(row) for row in reader] |
| 203 | + except Exception as chdb_error: |
| 204 | + if self._is_suppressed_chdb_error(chdb_error): |
| 205 | + logger.debug(chdb_error) |
| 206 | + else: |
| 207 | + capture_exception(chdb_error) |
| 208 | + |
| 209 | + tag_queries(team_id=self.team.pk, table_id=self.id, warehouse_query=True) |
| 210 | + |
| 211 | + # The cluster is a little broken right now, and so this can intermittently fail. |
| 212 | + # See https://posthog.slack.com/archives/C076R4753Q8/p1756901693184169 for context |
| 213 | + attempts = 5 |
| 214 | + for i in range(attempts): |
| 215 | + try: |
| 216 | + result = sync_execute( |
| 217 | + f"""DESCRIBE TABLE ( |
| 218 | + SELECT * |
| 219 | + FROM {s3_table_func} |
| 220 | + LIMIT 1 |
| 221 | + )""", |
| 222 | + args=placeholder_context.values, |
| 223 | + ) |
| 224 | + break |
| 225 | + except Exception as err: |
| 226 | + if i >= attempts - 1: |
| 227 | + capture_exception(err) |
| 228 | + if safe_expose_ch_error: |
| 229 | + self._safe_expose_ch_error(err) |
| 230 | + else: |
| 231 | + raise |
| 232 | + |
| 233 | + # Pause execution slightly to not overload clickhouse |
| 234 | + time.sleep(2**i) |
211 | 235 |
|
212 | 236 | if result is None or isinstance(result, int): |
213 | 237 | raise Exception("No columns types provided by clickhouse in get_columns") |
|
0 commit comments