Skip to content

Commit 61d2fcb

Browse files
github-actions[bot]Apmatselasticmachine
authored
[9.3] Fix PostgreSQL IP address serialization error (#3900) (#3903)
Backports the following commits to 9.3: - Fix PostgreSQL IP address serialization error (#3900) --------- Co-authored-by: Apostolos Matsagkas <Apmats@users.noreply.github.com> Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
1 parent a2287af commit 61d2fcb

File tree

5 files changed

+676
-4
lines changed

5 files changed

+676
-4
lines changed

app/connectors_service/NOTICE.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2615,7 +2615,7 @@ SOFTWARE.
26152615

26162616

26172617
build
2618-
1.3.0
2618+
1.4.0
26192619
UNKNOWN
26202620
Copyright © 2019 Filipe Laíns <filipe.lains@gmail.com>
26212621

@@ -8230,7 +8230,7 @@ made under the terms of *both* these licenses.
82308230

82318231

82328232
urllib3
8233-
2.6.2
8233+
2.6.3
82348234
UNKNOWN
82358235
MIT License
82368236

app/connectors_service/connectors/sources/postgresql/datasource.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,27 @@
33
# or more contributor license agreements. Licensed under the Elastic License 2.0;
44
# you may not use this file except in compliance with the Elastic License 2.0.
55
#
6+
from ipaddress import (
7+
IPv4Address,
8+
IPv4Interface,
9+
IPv4Network,
10+
IPv6Address,
11+
IPv6Interface,
12+
IPv6Network,
13+
)
14+
from uuid import UUID
15+
616
from asyncpg.exceptions._base import InternalClientError
17+
from asyncpg.types import (
18+
BitString,
19+
Box,
20+
Circle,
21+
Line,
22+
LineSegment,
23+
Path,
24+
Point,
25+
Polygon,
26+
)
727
from connectors_sdk.source import BaseDataSource
828
from connectors_sdk.utils import iso_utc
929
from sqlalchemy.exc import ProgrammingError
@@ -141,6 +161,76 @@ async def ping(self):
141161
msg = f"Can't connect to Postgresql on {self.postgresql_client.host}."
142162
raise Exception(msg) from e
143163

164+
def serialize(self, doc):
165+
"""Override base serialize to handle PostgreSQL-specific types.
166+
167+
PostgreSQL connector uses asyncpg which returns special Python objects for certain
168+
PostgreSQL data types that need to be serialized to strings:
169+
- Network types (INET, CIDR) -> ipaddress module objects
170+
- UUID type -> uuid.UUID objects
171+
- Geometric types (POINT, LINE, POLYGON, etc.) -> asyncpg.types objects
172+
- BitString type (BIT, VARBIT) -> asyncpg.types.BitString objects
173+
174+
Args:
175+
doc (Dict): Dictionary to be serialized
176+
177+
Returns:
178+
doc (Dict): Serialized version of dictionary
179+
"""
180+
181+
def _serialize(value):
182+
"""Serialize input value with respect to its datatype.
183+
184+
Args:
185+
value (Any): Value to be serialized
186+
187+
Returns:
188+
value (Any): Serialized version of input value.
189+
"""
190+
match value:
191+
case (
192+
IPv4Address()
193+
| IPv6Address()
194+
| IPv4Interface()
195+
| IPv6Interface()
196+
| IPv4Network()
197+
| IPv6Network()
198+
):
199+
return str(value)
200+
case UUID():
201+
return str(value)
202+
case Point():
203+
return f"({value.x}, {value.y})"
204+
case LineSegment():
205+
return (
206+
f"[({value.p1.x}, {value.p1.y}), ({value.p2.x}, {value.p2.y})]"
207+
)
208+
case Box():
209+
return f"[({value.high.x}, {value.high.y}), ({value.low.x}, {value.low.y})]"
210+
case Polygon():
211+
# Polygon inherits from Path, so check it first
212+
coords = [(p.x, p.y) for p in value.points]
213+
return str(coords)
214+
case Path():
215+
coords = [(p.x, p.y) for p in value.points]
216+
status = "closed" if value.is_closed else "open"
217+
return f"{status} {str(coords)}"
218+
case Line() | Circle():
219+
return str(value)
220+
case BitString():
221+
return value.as_string()
222+
case list() | tuple():
223+
return [_serialize(item) for item in value]
224+
case dict():
225+
return {k: _serialize(v) for k, v in value.items()}
226+
case _:
227+
return value
228+
229+
for key, value in doc.items():
230+
doc[key] = _serialize(value)
231+
232+
return super().serialize(doc)
233+
144234
def row2doc(self, row, doc_id, table, timestamp):
145235
row.update(
146236
{

app/connectors_service/tests/sources/fixtures/postgresql/fixture.py

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,16 @@
99
import random
1010

1111
import asyncpg
12+
from asyncpg.types import (
13+
BitString,
14+
Box,
15+
Circle,
16+
Line,
17+
LineSegment,
18+
Path,
19+
Point,
20+
Polygon,
21+
)
1222

1323
from tests.commons import WeightedFakeProvider
1424

@@ -36,9 +46,12 @@
3646

3747
event_loop = asyncio.get_event_loop()
3848

49+
# Number of test rows in special_types table for serialization testing
50+
SPECIAL_TYPES_TEST_ROWS = 3
51+
3952

4053
def get_num_docs():
41-
print(NUM_TABLES * (RECORD_COUNT - RECORDS_TO_DELETE))
54+
print(NUM_TABLES * (RECORD_COUNT - RECORDS_TO_DELETE) + SPECIAL_TYPES_TEST_ROWS)
4255

4356

4457
async def load():
@@ -81,6 +94,104 @@ async def inject_lines(table, connect, lines):
8194
inserted += batch_size
8295
print(f"Inserting batch #{batch} of {batch_size} documents.")
8396

97+
async def create_special_types_table():
98+
"""Create a table with PostgreSQL special types that require serialization."""
99+
connect = await asyncpg.connect(CONNECTION_STRING)
100+
101+
print("Creating special_types table for serialization testing...")
102+
create_table_sql = """
103+
CREATE TABLE IF NOT EXISTS special_types (
104+
id SERIAL PRIMARY KEY,
105+
ip_inet INET,
106+
ip_cidr CIDR,
107+
uuid_col UUID,
108+
point_col POINT,
109+
line_col LINE,
110+
lseg_col LSEG,
111+
box_col BOX,
112+
path_col PATH,
113+
polygon_col POLYGON,
114+
circle_col CIRCLE,
115+
bit_col BIT(8),
116+
varbit_col VARBIT(16),
117+
inet_array INET[],
118+
uuid_array UUID[]
119+
)
120+
"""
121+
await connect.execute(create_table_sql)
122+
123+
print("Inserting special type test data...")
124+
insert_sql = """
125+
INSERT INTO special_types (
126+
ip_inet, ip_cidr, uuid_col,
127+
point_col, line_col, lseg_col, box_col, path_col, polygon_col, circle_col,
128+
bit_col, varbit_col,
129+
inet_array, uuid_array
130+
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
131+
"""
132+
133+
test_rows = [
134+
(
135+
"192.168.1.1",
136+
"10.0.0.0/8",
137+
"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11",
138+
Point(1.5, 2.5),
139+
Line(1, -1, 0),
140+
LineSegment((0, 0), (1, 1)),
141+
Box((2, 2), (0, 0)),
142+
Path((0, 0), (1, 1), (2, 0)),
143+
Polygon((0, 0), (1, 0), (1, 1), (0, 1)),
144+
Circle((0, 0), 5),
145+
BitString("10101010"),
146+
BitString("1101"),
147+
["192.168.1.1", "10.0.0.1"],
148+
[
149+
"550e8400-e29b-41d4-a716-446655440000",
150+
"f47ac10b-58cc-4372-a567-0e02b2c3d479",
151+
],
152+
),
153+
(
154+
"2001:db8::1",
155+
"2001:db8::/32",
156+
"123e4567-e89b-12d3-a456-426614174000",
157+
Point(-3.14, 2.71),
158+
Line(2, 3, -6),
159+
LineSegment((-1, -1), (1, 1)),
160+
Box((10, 10), (-10, -10)),
161+
Path((0, 0), (3, 0), (3, 3), (0, 3), is_closed=True),
162+
Polygon((-1, -1), (1, -1), (1, 1), (-1, 1)),
163+
Circle((5, 5), 2.5),
164+
BitString("11110000"),
165+
BitString("101010101010"),
166+
["::1", "fe80::1"],
167+
["aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"],
168+
),
169+
(
170+
"10.30.0.9/24",
171+
"172.16.0.0/12",
172+
"00000000-0000-0000-0000-000000000000",
173+
Point(0, 0),
174+
Line(0, 1, 0),
175+
LineSegment((5, 5), (10, 10)),
176+
Box((100, 100), (50, 50)),
177+
Path((0, 0), (5, 0), (5, 5)),
178+
Polygon((0, 0), (4, 0), (4, 3), (0, 3)),
179+
Circle((0, 0), 10),
180+
BitString("00000000"),
181+
BitString("1111111111111111"),
182+
["192.0.2.1", "198.51.100.1", "203.0.113.1"],
183+
[
184+
"12345678-1234-5678-1234-567812345678",
185+
"87654321-4321-8765-4321-876543218765",
186+
],
187+
),
188+
]
189+
190+
await connect.executemany(insert_sql, test_rows)
191+
print(f"Inserted {len(test_rows)} rows with special types")
192+
193+
await connect.close()
194+
84195
async def load_rows():
85196
"""N tables of 10001 rows each. each row is ~ 1024*20 bytes"""
86197
connect = await asyncpg.connect(CONNECTION_STRING)
@@ -93,6 +204,7 @@ async def load_rows():
93204

94205
await create_readonly_user()
95206
await load_rows()
207+
await create_special_types_table()
96208

97209

98210
async def remove():

0 commit comments

Comments
 (0)