Skip to content

Commit cb86d67

Browse files
[9.2] Fix PostgreSQL IP address serialization error (#3900) (#3904)
Manually backporting the work from the following PR to 9.2: - Fix PostgreSQL IP address serialization error (#3900) I think auto backport should work for these versions and so I can avoid seeking reviews for the other branches. 😁 --------- Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
1 parent f528bbe commit cb86d67

File tree

4 files changed

+675
-29
lines changed

4 files changed

+675
-29
lines changed

NOTICE.txt

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1602,7 +1602,7 @@ Apache License
16021602

16031603

16041604
anyio
1605-
4.12.0
1605+
4.12.1
16061606
MIT
16071607
The MIT License (MIT)
16081608

@@ -2605,31 +2605,6 @@ Redistribution and use in source and binary forms, with or without modification,
26052605
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26062606

26072607

2608-
cachetools
2609-
6.2.4
2610-
MIT
2611-
The MIT License (MIT)
2612-
2613-
Copyright (c) 2014-2025 Thomas Kemmer
2614-
2615-
Permission is hereby granted, free of charge, to any person obtaining a copy of
2616-
this software and associated documentation files (the "Software"), to deal in
2617-
the Software without restriction, including without limitation the rights to
2618-
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
2619-
the Software, and to permit persons to whom the Software is furnished to do so,
2620-
subject to the following conditions:
2621-
2622-
The above copyright notice and this permission notice shall be included in all
2623-
copies or substantial portions of the Software.
2624-
2625-
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
2626-
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
2627-
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
2628-
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
2629-
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
2630-
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
2631-
2632-
26332608
certifi
26342609
2024.7.4
26352610
Mozilla Public License 2.0 (MPL 2.0)
@@ -4115,7 +4090,7 @@ Apache Software License
41154090

41164091

41174092
google-auth
4118-
2.45.0
4093+
2.47.0
41194094
Apache Software License
41204095
Apache License
41214096
Version 2.0, January 2004
@@ -8121,7 +8096,7 @@ made under the terms of *both* these licenses.
81218096

81228097

81238098
urllib3
8124-
2.6.2
8099+
2.6.3
81258100
MIT
81268101
MIT License
81278102

connectors/sources/postgresql.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,29 @@
77

88
import ssl
99
from functools import cached_property, partial
10+
from ipaddress import (
11+
IPv4Address,
12+
IPv4Interface,
13+
IPv4Network,
14+
IPv6Address,
15+
IPv6Interface,
16+
IPv6Network,
17+
)
1018
from urllib.parse import quote
19+
from uuid import UUID
1120

1221
import fastjsonschema
1322
from asyncpg.exceptions._base import InternalClientError
23+
from asyncpg.types import (
24+
BitString,
25+
Box,
26+
Circle,
27+
Line,
28+
LineSegment,
29+
Path,
30+
Point,
31+
Polygon,
32+
)
1433
from fastjsonschema import JsonSchemaValueException
1534
from sqlalchemy import text
1635
from sqlalchemy.exc import ProgrammingError
@@ -510,6 +529,76 @@ async def ping(self):
510529
msg = f"Can't connect to Postgresql on {self.postgresql_client.host}."
511530
raise Exception(msg) from e
512531

532+
def serialize(self, doc):
533+
"""Override base serialize to handle PostgreSQL-specific types.
534+
535+
PostgreSQL connector uses asyncpg which returns special Python objects for certain
536+
PostgreSQL data types that need to be serialized to strings:
537+
- Network types (INET, CIDR) -> ipaddress module objects
538+
- UUID type -> uuid.UUID objects
539+
- Geometric types (POINT, LINE, POLYGON, etc.) -> asyncpg.types objects
540+
- BitString type (BIT, VARBIT) -> asyncpg.types.BitString objects
541+
542+
Args:
543+
doc (Dict): Dictionary to be serialized
544+
545+
Returns:
546+
doc (Dict): Serialized version of dictionary
547+
"""
548+
549+
def _serialize(value):
550+
"""Serialize input value with respect to its datatype.
551+
552+
Args:
553+
value (Any): Value to be serialized
554+
555+
Returns:
556+
value (Any): Serialized version of input value.
557+
"""
558+
match value:
559+
case (
560+
IPv4Address()
561+
| IPv6Address()
562+
| IPv4Interface()
563+
| IPv6Interface()
564+
| IPv4Network()
565+
| IPv6Network()
566+
):
567+
return str(value)
568+
case UUID():
569+
return str(value)
570+
case Point():
571+
return f"({value.x}, {value.y})"
572+
case LineSegment():
573+
return (
574+
f"[({value.p1.x}, {value.p1.y}), ({value.p2.x}, {value.p2.y})]"
575+
)
576+
case Box():
577+
return f"[({value.high.x}, {value.high.y}), ({value.low.x}, {value.low.y})]"
578+
case Polygon():
579+
# Polygon inherits from Path, so check it first
580+
coords = [(p.x, p.y) for p in value.points]
581+
return str(coords)
582+
case Path():
583+
coords = [(p.x, p.y) for p in value.points]
584+
status = "closed" if value.is_closed else "open"
585+
return f"{status} {str(coords)}"
586+
case Line() | Circle():
587+
return str(value)
588+
case BitString():
589+
return value.as_string()
590+
case list() | tuple():
591+
return [_serialize(item) for item in value]
592+
case dict():
593+
return {k: _serialize(v) for k, v in value.items()}
594+
case _:
595+
return value
596+
597+
for key, value in doc.items():
598+
doc[key] = _serialize(value)
599+
600+
return super().serialize(doc)
601+
513602
def row2doc(self, row, doc_id, table, timestamp):
514603
row.update(
515604
{

tests/sources/fixtures/postgresql/fixture.py

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,16 @@
99
import random
1010

1111
import asyncpg
12+
from asyncpg.types import (
13+
BitString,
14+
Box,
15+
Circle,
16+
Line,
17+
LineSegment,
18+
Path,
19+
Point,
20+
Polygon,
21+
)
1222

1323
from tests.commons import WeightedFakeProvider
1424

@@ -36,9 +46,12 @@
3646

3747
event_loop = asyncio.get_event_loop()
3848

49+
# Number of test rows in special_types table for serialization testing
50+
SPECIAL_TYPES_TEST_ROWS = 3
51+
3952

4053
def get_num_docs():
41-
print(NUM_TABLES * (RECORD_COUNT - RECORDS_TO_DELETE))
54+
print(NUM_TABLES * (RECORD_COUNT - RECORDS_TO_DELETE) + SPECIAL_TYPES_TEST_ROWS)
4255

4356

4457
async def load():
@@ -81,6 +94,104 @@ async def inject_lines(table, connect, lines):
8194
inserted += batch_size
8295
print(f"Inserting batch #{batch} of {batch_size} documents.")
8396

97+
async def create_special_types_table():
98+
"""Create a table with PostgreSQL special types that require serialization."""
99+
connect = await asyncpg.connect(CONNECTION_STRING)
100+
101+
print("Creating special_types table for serialization testing...")
102+
create_table_sql = """
103+
CREATE TABLE IF NOT EXISTS special_types (
104+
id SERIAL PRIMARY KEY,
105+
ip_inet INET,
106+
ip_cidr CIDR,
107+
uuid_col UUID,
108+
point_col POINT,
109+
line_col LINE,
110+
lseg_col LSEG,
111+
box_col BOX,
112+
path_col PATH,
113+
polygon_col POLYGON,
114+
circle_col CIRCLE,
115+
bit_col BIT(8),
116+
varbit_col VARBIT(16),
117+
inet_array INET[],
118+
uuid_array UUID[]
119+
)
120+
"""
121+
await connect.execute(create_table_sql)
122+
123+
print("Inserting special type test data...")
124+
insert_sql = """
125+
INSERT INTO special_types (
126+
ip_inet, ip_cidr, uuid_col,
127+
point_col, line_col, lseg_col, box_col, path_col, polygon_col, circle_col,
128+
bit_col, varbit_col,
129+
inet_array, uuid_array
130+
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
131+
"""
132+
133+
test_rows = [
134+
(
135+
"192.168.1.1",
136+
"10.0.0.0/8",
137+
"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11",
138+
Point(1.5, 2.5),
139+
Line(1, -1, 0),
140+
LineSegment((0, 0), (1, 1)),
141+
Box((2, 2), (0, 0)),
142+
Path((0, 0), (1, 1), (2, 0)),
143+
Polygon((0, 0), (1, 0), (1, 1), (0, 1)),
144+
Circle((0, 0), 5),
145+
BitString("10101010"),
146+
BitString("1101"),
147+
["192.168.1.1", "10.0.0.1"],
148+
[
149+
"550e8400-e29b-41d4-a716-446655440000",
150+
"f47ac10b-58cc-4372-a567-0e02b2c3d479",
151+
],
152+
),
153+
(
154+
"2001:db8::1",
155+
"2001:db8::/32",
156+
"123e4567-e89b-12d3-a456-426614174000",
157+
Point(-3.14, 2.71),
158+
Line(2, 3, -6),
159+
LineSegment((-1, -1), (1, 1)),
160+
Box((10, 10), (-10, -10)),
161+
Path((0, 0), (3, 0), (3, 3), (0, 3), is_closed=True),
162+
Polygon((-1, -1), (1, -1), (1, 1), (-1, 1)),
163+
Circle((5, 5), 2.5),
164+
BitString("11110000"),
165+
BitString("101010101010"),
166+
["::1", "fe80::1"],
167+
["aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"],
168+
),
169+
(
170+
"10.30.0.9/24",
171+
"172.16.0.0/12",
172+
"00000000-0000-0000-0000-000000000000",
173+
Point(0, 0),
174+
Line(0, 1, 0),
175+
LineSegment((5, 5), (10, 10)),
176+
Box((100, 100), (50, 50)),
177+
Path((0, 0), (5, 0), (5, 5)),
178+
Polygon((0, 0), (4, 0), (4, 3), (0, 3)),
179+
Circle((0, 0), 10),
180+
BitString("00000000"),
181+
BitString("1111111111111111"),
182+
["192.0.2.1", "198.51.100.1", "203.0.113.1"],
183+
[
184+
"12345678-1234-5678-1234-567812345678",
185+
"87654321-4321-8765-4321-876543218765",
186+
],
187+
),
188+
]
189+
190+
await connect.executemany(insert_sql, test_rows)
191+
print(f"Inserted {len(test_rows)} rows with special types")
192+
193+
await connect.close()
194+
84195
async def load_rows():
85196
"""N tables of 10001 rows each. each row is ~ 1024*20 bytes"""
86197
connect = await asyncpg.connect(CONNECTION_STRING)
@@ -93,6 +204,7 @@ async def load_rows():
93204

94205
await create_readonly_user()
95206
await load_rows()
207+
await create_special_types_table()
96208

97209

98210
async def remove():

0 commit comments

Comments
 (0)