Skip to content

Commit 3efa5a8

Browse files
committed
[PostgreSQL] Add serialization support for PostgreSQL special types
Backport of PR #3900 to 9.2 PostgreSQL connector uses asyncpg which returns special Python objects for certain PostgreSQL data types. These need to be serialized to strings before indexing: - Network types (INET, CIDR) -> ipaddress module objects - UUID type -> uuid.UUID objects - Geometric types (POINT, LINE, POLYGON, etc.) -> asyncpg.types objects - BitString type (BIT, VARBIT) -> asyncpg.types.BitString objects
1 parent 417adc0 commit 3efa5a8

File tree

3 files changed

+672
-1
lines changed

3 files changed

+672
-1
lines changed

connectors/sources/postgresql.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,29 @@
77

88
import ssl
99
from functools import cached_property, partial
10+
from ipaddress import (
11+
IPv4Address,
12+
IPv4Interface,
13+
IPv4Network,
14+
IPv6Address,
15+
IPv6Interface,
16+
IPv6Network,
17+
)
1018
from urllib.parse import quote
19+
from uuid import UUID
1120

1221
import fastjsonschema
1322
from asyncpg.exceptions._base import InternalClientError
23+
from asyncpg.types import (
24+
BitString,
25+
Box,
26+
Circle,
27+
Line,
28+
LineSegment,
29+
Path,
30+
Point,
31+
Polygon,
32+
)
1433
from fastjsonschema import JsonSchemaValueException
1534
from sqlalchemy import text
1635
from sqlalchemy.exc import ProgrammingError
@@ -510,6 +529,76 @@ async def ping(self):
510529
msg = f"Can't connect to Postgresql on {self.postgresql_client.host}."
511530
raise Exception(msg) from e
512531

532+
def serialize(self, doc):
533+
"""Override base serialize to handle PostgreSQL-specific types.
534+
535+
PostgreSQL connector uses asyncpg which returns special Python objects for certain
536+
PostgreSQL data types that need to be serialized to strings:
537+
- Network types (INET, CIDR) -> ipaddress module objects
538+
- UUID type -> uuid.UUID objects
539+
- Geometric types (POINT, LINE, POLYGON, etc.) -> asyncpg.types objects
540+
- BitString type (BIT, VARBIT) -> asyncpg.types.BitString objects
541+
542+
Args:
543+
doc (Dict): Dictionary to be serialized
544+
545+
Returns:
546+
doc (Dict): Serialized version of dictionary
547+
"""
548+
549+
def _serialize(value):
550+
"""Serialize input value with respect to its datatype.
551+
552+
Args:
553+
value (Any): Value to be serialized
554+
555+
Returns:
556+
value (Any): Serialized version of input value.
557+
"""
558+
match value:
559+
case (
560+
IPv4Address()
561+
| IPv6Address()
562+
| IPv4Interface()
563+
| IPv6Interface()
564+
| IPv4Network()
565+
| IPv6Network()
566+
):
567+
return str(value)
568+
case UUID():
569+
return str(value)
570+
case Point():
571+
return f"({value.x}, {value.y})"
572+
case LineSegment():
573+
return (
574+
f"[({value.p1.x}, {value.p1.y}), ({value.p2.x}, {value.p2.y})]"
575+
)
576+
case Box():
577+
return f"[({value.high.x}, {value.high.y}), ({value.low.x}, {value.low.y})]"
578+
case Polygon():
579+
# Polygon inherits from Path, so check it first
580+
coords = [(p.x, p.y) for p in value.points]
581+
return str(coords)
582+
case Path():
583+
coords = [(p.x, p.y) for p in value.points]
584+
status = "closed" if value.is_closed else "open"
585+
return f"{status} {str(coords)}"
586+
case Line() | Circle():
587+
return str(value)
588+
case BitString():
589+
return value.as_string()
590+
case list() | tuple():
591+
return [_serialize(item) for item in value]
592+
case dict():
593+
return {k: _serialize(v) for k, v in value.items()}
594+
case _:
595+
return value
596+
597+
for key, value in doc.items():
598+
doc[key] = _serialize(value)
599+
600+
return super().serialize(doc)
601+
513602
def row2doc(self, row, doc_id, table, timestamp):
514603
row.update(
515604
{

tests/sources/fixtures/postgresql/fixture.py

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,16 @@
99
import random
1010

1111
import asyncpg
12+
from asyncpg.types import (
13+
BitString,
14+
Box,
15+
Circle,
16+
Line,
17+
LineSegment,
18+
Path,
19+
Point,
20+
Polygon,
21+
)
1222

1323
from tests.commons import WeightedFakeProvider
1424

@@ -36,9 +46,12 @@
3646

3747
event_loop = asyncio.get_event_loop()
3848

49+
# Number of test rows in special_types table for serialization testing
50+
SPECIAL_TYPES_TEST_ROWS = 3
51+
3952

4053
def get_num_docs():
41-
print(NUM_TABLES * (RECORD_COUNT - RECORDS_TO_DELETE))
54+
print(NUM_TABLES * (RECORD_COUNT - RECORDS_TO_DELETE) + SPECIAL_TYPES_TEST_ROWS)
4255

4356

4457
async def load():
@@ -81,6 +94,104 @@ async def inject_lines(table, connect, lines):
8194
inserted += batch_size
8295
print(f"Inserting batch #{batch} of {batch_size} documents.")
8396

97+
async def create_special_types_table():
98+
"""Create a table with PostgreSQL special types that require serialization."""
99+
connect = await asyncpg.connect(CONNECTION_STRING)
100+
101+
print("Creating special_types table for serialization testing...")
102+
create_table_sql = """
103+
CREATE TABLE IF NOT EXISTS special_types (
104+
id SERIAL PRIMARY KEY,
105+
ip_inet INET,
106+
ip_cidr CIDR,
107+
uuid_col UUID,
108+
point_col POINT,
109+
line_col LINE,
110+
lseg_col LSEG,
111+
box_col BOX,
112+
path_col PATH,
113+
polygon_col POLYGON,
114+
circle_col CIRCLE,
115+
bit_col BIT(8),
116+
varbit_col VARBIT(16),
117+
inet_array INET[],
118+
uuid_array UUID[]
119+
)
120+
"""
121+
await connect.execute(create_table_sql)
122+
123+
print("Inserting special type test data...")
124+
insert_sql = """
125+
INSERT INTO special_types (
126+
ip_inet, ip_cidr, uuid_col,
127+
point_col, line_col, lseg_col, box_col, path_col, polygon_col, circle_col,
128+
bit_col, varbit_col,
129+
inet_array, uuid_array
130+
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
131+
"""
132+
133+
test_rows = [
134+
(
135+
"192.168.1.1",
136+
"10.0.0.0/8",
137+
"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11",
138+
Point(1.5, 2.5),
139+
Line(1, -1, 0),
140+
LineSegment((0, 0), (1, 1)),
141+
Box((2, 2), (0, 0)),
142+
Path((0, 0), (1, 1), (2, 0)),
143+
Polygon((0, 0), (1, 0), (1, 1), (0, 1)),
144+
Circle((0, 0), 5),
145+
BitString("10101010"),
146+
BitString("1101"),
147+
["192.168.1.1", "10.0.0.1"],
148+
[
149+
"550e8400-e29b-41d4-a716-446655440000",
150+
"f47ac10b-58cc-4372-a567-0e02b2c3d479",
151+
],
152+
),
153+
(
154+
"2001:db8::1",
155+
"2001:db8::/32",
156+
"123e4567-e89b-12d3-a456-426614174000",
157+
Point(-3.14, 2.71),
158+
Line(2, 3, -6),
159+
LineSegment((-1, -1), (1, 1)),
160+
Box((10, 10), (-10, -10)),
161+
Path((0, 0), (3, 0), (3, 3), (0, 3), is_closed=True),
162+
Polygon((-1, -1), (1, -1), (1, 1), (-1, 1)),
163+
Circle((5, 5), 2.5),
164+
BitString("11110000"),
165+
BitString("101010101010"),
166+
["::1", "fe80::1"],
167+
["aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"],
168+
),
169+
(
170+
"10.30.0.9/24",
171+
"172.16.0.0/12",
172+
"00000000-0000-0000-0000-000000000000",
173+
Point(0, 0),
174+
Line(0, 1, 0),
175+
LineSegment((5, 5), (10, 10)),
176+
Box((100, 100), (50, 50)),
177+
Path((0, 0), (5, 0), (5, 5)),
178+
Polygon((0, 0), (4, 0), (4, 3), (0, 3)),
179+
Circle((0, 0), 10),
180+
BitString("00000000"),
181+
BitString("1111111111111111"),
182+
["192.0.2.1", "198.51.100.1", "203.0.113.1"],
183+
[
184+
"12345678-1234-5678-1234-567812345678",
185+
"87654321-4321-8765-4321-876543218765",
186+
],
187+
),
188+
]
189+
190+
await connect.executemany(insert_sql, test_rows)
191+
print(f"Inserted {len(test_rows)} rows with special types")
192+
193+
await connect.close()
194+
84195
async def load_rows():
85196
"""N tables of 10001 rows each. each row is ~ 1024*20 bytes"""
86197
connect = await asyncpg.connect(CONNECTION_STRING)
@@ -93,6 +204,7 @@ async def load_rows():
93204

94205
await create_readonly_user()
95206
await load_rows()
207+
await create_special_types_table()
96208

97209

98210
async def remove():

0 commit comments

Comments
 (0)