Skip to content

Commit 35926fe

Browse files
Apmatselasticmachine
authored andcommitted
Fix PostgreSQL IP address serialization error (#3900)
## Closes #3879 Problem: The PostgreSQL connector fails with serialization error because asyncpg returns PostgreSQL's INET and CIDR columns as Python ipaddress module objects (IPv4Address, IPv6Address, IPv4Network, etc.) rather than strings. Our serialization layer doesn't handle these types, causing the connector to crash when attempting to convert documents to JSON. Solution: The implementation converts these objects to strings before calling the parent serializer, which handles standard types. Recursive serialization handles nested structures (lists/dicts containing IP addresses). Followed the same pattern from the MongoDB connector. Only added plain tests for these changes, not ftest coverage. You, reviewer!, give me an opinion on this! <!--Provide a general description of the code changes in your pull request. If the change relates to a specific issue, include the link at the top. If this is an ad-hoc/trivial change and does not have a corresponding issue, please describe your changes in enough details, so that reviewers and other team members can understand the reasoning behind the pull request.--> ## Checklists <!--You can remove unrelated items from checklists below and/or add new items that may help during the review.--> #### Pre-Review Checklist - [x] this PR does NOT contain credentials of any kind, such as API keys or username/passwords (double check `config.yml.example`) - [x] this PR has a meaningful title - [x] this PR links to all relevant github issues that it fixes or partially addresses - [ ] if there is no GH issue, please create it. Each PR should have a link to an issue - [x] this PR has a thorough description - [x] Covered the changes with automated tests - [x] Tested the changes locally - [x] Added a label for each target release version (example: `v7.13.2`, `v7.14.0`, `v8.0.0`) - [x] For bugfixes: backport safely to all minor branches still receiving patch releases - [x] Considered corresponding documentation changes - [x] Contributed any configuration settings changes to the configuration reference - [x] if you added or changed Rich Configurable Fields for a Native Connector, you made a corresponding PR in [Kibana](https://github.com/elastic/kibana/blob/main/packages/kbn-search-connectors/types/native_connectors.ts) #### Changes Requiring Extra Attention <!--Please call out any changes that require special attention from the reviewers and/or increase the risk to availability or security of the system after deployment. Remove the ones that don't apply.--> - [ ] Security-related changes (encryption, TLS, SSRF, etc) - [ ] New external service dependencies added. ## Related Pull Requests <!--List any relevant PRs here or remove the section if this is a standalone PR. * https://github.com/elastic/.../pull/123--> ## Release Note <!--If you think this enhancement/fix should be included in the release notes, please write a concise user-facing description of the change here. You should also label the PR with `release_note` so the release notes author(s) can easily look it up.--> --------- Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
1 parent a2287af commit 35926fe

File tree

3 files changed

+673
-1
lines changed

3 files changed

+673
-1
lines changed

app/connectors_service/connectors/sources/postgresql/datasource.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,27 @@
33
# or more contributor license agreements. Licensed under the Elastic License 2.0;
44
# you may not use this file except in compliance with the Elastic License 2.0.
55
#
6+
from ipaddress import (
7+
IPv4Address,
8+
IPv4Interface,
9+
IPv4Network,
10+
IPv6Address,
11+
IPv6Interface,
12+
IPv6Network,
13+
)
14+
from uuid import UUID
15+
616
from asyncpg.exceptions._base import InternalClientError
17+
from asyncpg.types import (
18+
BitString,
19+
Box,
20+
Circle,
21+
Line,
22+
LineSegment,
23+
Path,
24+
Point,
25+
Polygon,
26+
)
727
from connectors_sdk.source import BaseDataSource
828
from connectors_sdk.utils import iso_utc
929
from sqlalchemy.exc import ProgrammingError
@@ -141,6 +161,76 @@ async def ping(self):
141161
msg = f"Can't connect to Postgresql on {self.postgresql_client.host}."
142162
raise Exception(msg) from e
143163

164+
def serialize(self, doc):
165+
"""Override base serialize to handle PostgreSQL-specific types.
166+
167+
PostgreSQL connector uses asyncpg which returns special Python objects for certain
168+
PostgreSQL data types that need to be serialized to strings:
169+
- Network types (INET, CIDR) -> ipaddress module objects
170+
- UUID type -> uuid.UUID objects
171+
- Geometric types (POINT, LINE, POLYGON, etc.) -> asyncpg.types objects
172+
- BitString type (BIT, VARBIT) -> asyncpg.types.BitString objects
173+
174+
Args:
175+
doc (Dict): Dictionary to be serialized
176+
177+
Returns:
178+
doc (Dict): Serialized version of dictionary
179+
"""
180+
181+
def _serialize(value):
182+
"""Serialize input value with respect to its datatype.
183+
184+
Args:
185+
value (Any): Value to be serialized
186+
187+
Returns:
188+
value (Any): Serialized version of input value.
189+
"""
190+
match value:
191+
case (
192+
IPv4Address()
193+
| IPv6Address()
194+
| IPv4Interface()
195+
| IPv6Interface()
196+
| IPv4Network()
197+
| IPv6Network()
198+
):
199+
return str(value)
200+
case UUID():
201+
return str(value)
202+
case Point():
203+
return f"({value.x}, {value.y})"
204+
case LineSegment():
205+
return (
206+
f"[({value.p1.x}, {value.p1.y}), ({value.p2.x}, {value.p2.y})]"
207+
)
208+
case Box():
209+
return f"[({value.high.x}, {value.high.y}), ({value.low.x}, {value.low.y})]"
210+
case Polygon():
211+
# Polygon inherits from Path, so check it first
212+
coords = [(p.x, p.y) for p in value.points]
213+
return str(coords)
214+
case Path():
215+
coords = [(p.x, p.y) for p in value.points]
216+
status = "closed" if value.is_closed else "open"
217+
return f"{status} {str(coords)}"
218+
case Line() | Circle():
219+
return str(value)
220+
case BitString():
221+
return value.as_string()
222+
case list() | tuple():
223+
return [_serialize(item) for item in value]
224+
case dict():
225+
return {k: _serialize(v) for k, v in value.items()}
226+
case _:
227+
return value
228+
229+
for key, value in doc.items():
230+
doc[key] = _serialize(value)
231+
232+
return super().serialize(doc)
233+
144234
def row2doc(self, row, doc_id, table, timestamp):
145235
row.update(
146236
{

app/connectors_service/tests/sources/fixtures/postgresql/fixture.py

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,16 @@
99
import random
1010

1111
import asyncpg
12+
from asyncpg.types import (
13+
BitString,
14+
Box,
15+
Circle,
16+
Line,
17+
LineSegment,
18+
Path,
19+
Point,
20+
Polygon,
21+
)
1222

1323
from tests.commons import WeightedFakeProvider
1424

@@ -36,9 +46,12 @@
3646

3747
event_loop = asyncio.get_event_loop()
3848

49+
# Number of test rows in special_types table for serialization testing
50+
SPECIAL_TYPES_TEST_ROWS = 3
51+
3952

4053
def get_num_docs():
41-
print(NUM_TABLES * (RECORD_COUNT - RECORDS_TO_DELETE))
54+
print(NUM_TABLES * (RECORD_COUNT - RECORDS_TO_DELETE) + SPECIAL_TYPES_TEST_ROWS)
4255

4356

4457
async def load():
@@ -81,6 +94,104 @@ async def inject_lines(table, connect, lines):
8194
inserted += batch_size
8295
print(f"Inserting batch #{batch} of {batch_size} documents.")
8396

97+
async def create_special_types_table():
98+
"""Create a table with PostgreSQL special types that require serialization."""
99+
connect = await asyncpg.connect(CONNECTION_STRING)
100+
101+
print("Creating special_types table for serialization testing...")
102+
create_table_sql = """
103+
CREATE TABLE IF NOT EXISTS special_types (
104+
id SERIAL PRIMARY KEY,
105+
ip_inet INET,
106+
ip_cidr CIDR,
107+
uuid_col UUID,
108+
point_col POINT,
109+
line_col LINE,
110+
lseg_col LSEG,
111+
box_col BOX,
112+
path_col PATH,
113+
polygon_col POLYGON,
114+
circle_col CIRCLE,
115+
bit_col BIT(8),
116+
varbit_col VARBIT(16),
117+
inet_array INET[],
118+
uuid_array UUID[]
119+
)
120+
"""
121+
await connect.execute(create_table_sql)
122+
123+
print("Inserting special type test data...")
124+
insert_sql = """
125+
INSERT INTO special_types (
126+
ip_inet, ip_cidr, uuid_col,
127+
point_col, line_col, lseg_col, box_col, path_col, polygon_col, circle_col,
128+
bit_col, varbit_col,
129+
inet_array, uuid_array
130+
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
131+
"""
132+
133+
test_rows = [
134+
(
135+
"192.168.1.1",
136+
"10.0.0.0/8",
137+
"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11",
138+
Point(1.5, 2.5),
139+
Line(1, -1, 0),
140+
LineSegment((0, 0), (1, 1)),
141+
Box((2, 2), (0, 0)),
142+
Path((0, 0), (1, 1), (2, 0)),
143+
Polygon((0, 0), (1, 0), (1, 1), (0, 1)),
144+
Circle((0, 0), 5),
145+
BitString("10101010"),
146+
BitString("1101"),
147+
["192.168.1.1", "10.0.0.1"],
148+
[
149+
"550e8400-e29b-41d4-a716-446655440000",
150+
"f47ac10b-58cc-4372-a567-0e02b2c3d479",
151+
],
152+
),
153+
(
154+
"2001:db8::1",
155+
"2001:db8::/32",
156+
"123e4567-e89b-12d3-a456-426614174000",
157+
Point(-3.14, 2.71),
158+
Line(2, 3, -6),
159+
LineSegment((-1, -1), (1, 1)),
160+
Box((10, 10), (-10, -10)),
161+
Path((0, 0), (3, 0), (3, 3), (0, 3), is_closed=True),
162+
Polygon((-1, -1), (1, -1), (1, 1), (-1, 1)),
163+
Circle((5, 5), 2.5),
164+
BitString("11110000"),
165+
BitString("101010101010"),
166+
["::1", "fe80::1"],
167+
["aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"],
168+
),
169+
(
170+
"10.30.0.9/24",
171+
"172.16.0.0/12",
172+
"00000000-0000-0000-0000-000000000000",
173+
Point(0, 0),
174+
Line(0, 1, 0),
175+
LineSegment((5, 5), (10, 10)),
176+
Box((100, 100), (50, 50)),
177+
Path((0, 0), (5, 0), (5, 5)),
178+
Polygon((0, 0), (4, 0), (4, 3), (0, 3)),
179+
Circle((0, 0), 10),
180+
BitString("00000000"),
181+
BitString("1111111111111111"),
182+
["192.0.2.1", "198.51.100.1", "203.0.113.1"],
183+
[
184+
"12345678-1234-5678-1234-567812345678",
185+
"87654321-4321-8765-4321-876543218765",
186+
],
187+
),
188+
]
189+
190+
await connect.executemany(insert_sql, test_rows)
191+
print(f"Inserted {len(test_rows)} rows with special types")
192+
193+
await connect.close()
194+
84195
async def load_rows():
85196
"""N tables of 10001 rows each. each row is ~ 1024*20 bytes"""
86197
connect = await asyncpg.connect(CONNECTION_STRING)
@@ -93,6 +204,7 @@ async def load_rows():
93204

94205
await create_readonly_user()
95206
await load_rows()
207+
await create_special_types_table()
96208

97209

98210
async def remove():

0 commit comments

Comments
 (0)