Skip to content

Commit c78e54f

Browse files
committed
Escape unprintable ASCII characters in received SQL queries
1 parent 1b1f242 commit c78e54f

File tree

2 files changed

+41
-7
lines changed

2 files changed

+41
-7
lines changed

data_rentgen/consumer/extractors/generic/operation.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
# SPDX-License-Identifier: Apache-2.0
33
from __future__ import annotations
44

5+
import codecs
6+
import re
57
from abc import ABC, abstractmethod
68
from textwrap import dedent
79

@@ -17,6 +19,13 @@
1719
OpenLineageRunEventType,
1820
)
1921

22+
# https://www.ascii-code.com/, but left \n intact
23+
ASCII_UNPRINTABLE = re.compile(r"[\x00-\x09\x0b-\x1f\x7f]", re.UNICODE)
24+
25+
26+
def encode_char(char: re.Match[str]) -> str:
27+
return codecs.encode(char.group(0), "unicode-escape").decode("utf-8")
28+
2029

2130
class OperationExtractorMixin(ABC):
2231
@abstractmethod
@@ -74,5 +83,7 @@ def _enrich_operation_status(self, operation: OperationDTO, event: OpenLineageRu
7483
def _extract_sql_query(self, event: OpenLineageRunEvent) -> SQLQueryDTO | None:
7584
if event.job.facets.sql:
7685
query = dedent(event.job.facets.sql.query).strip()
86+
# https://stackoverflow.com/questions/56237415/removing-encoding-utf8-0x00-chars-from-pandas-dataframe-for-psycopg2-cursor
87+
query = ASCII_UNPRINTABLE.sub(encode_char, query)
7788
return SQLQueryDTO(query=query)
7889
return None

tests/test_consumer/test_extractors/test_extractors_operation_spark.py

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,34 @@ def test_extractors_extract_operation_spark_job_finished(
310310
)
311311

312312

313-
def test_extractors_extract_operation_spark_job_sql_query():
313+
@pytest.mark.parametrize(
314+
["sql_query", "expected_query"],
315+
[
316+
pytest.param(
317+
"select id, name\nfrom schema.table\nwhere id = 1 \t\n\r",
318+
"select id, name\nfrom schema.table\nwhere id = 1",
319+
id="spaces and newlines",
320+
),
321+
pytest.param(
322+
"""
323+
select id, name
324+
from schema.table
325+
where id = 1
326+
""",
327+
"select id, name\nfrom schema.table\nwhere id = 1",
328+
id="indentation",
329+
),
330+
pytest.param(
331+
"[\x00-\x1f\x20abc\x7e\x7f\x80\xff]",
332+
"[\\x00-\\x1f\x20abc\x7e\\x7f\x80\xff]",
333+
id="ascii encoding",
334+
),
335+
],
336+
)
337+
def test_extractors_extract_operation_spark_job_sql_query(
338+
sql_query: str,
339+
expected_query: str,
340+
):
314341
now = datetime(2024, 7, 5, 9, 6, 29, 462000, tzinfo=timezone.utc)
315342
run_id = UUID("01908224-8410-79a2-8de6-a769ad6944c9")
316343
operation_id = UUID("01908225-1fd7-746b-910c-70d24f2898b1")
@@ -328,11 +355,7 @@ def test_extractors_extract_operation_spark_job_sql_query():
328355
jobType="SQL_JOB",
329356
),
330357
sql=OpenLineageSqlJobFacet(
331-
query="""
332-
select id, name
333-
from schema.table
334-
where id = 1
335-
""",
358+
query=sql_query,
336359
),
337360
),
338361
),
@@ -369,7 +392,7 @@ def test_extractors_extract_operation_spark_job_sql_query():
369392
position=None,
370393
description=None,
371394
status=OperationStatusDTO.STARTED,
372-
sql_query=SQLQueryDTO(query="select id, name\nfrom schema.table\nwhere id = 1"),
395+
sql_query=SQLQueryDTO(query=expected_query),
373396
started_at=now,
374397
ended_at=None,
375398
)

0 commit comments

Comments
 (0)