Skip to content

Commit 7c20a8e

Browse files
author
longshan.lu
committed
feat(postgres): enhance Postgres transport with IpInet and pgvector support
- Updated imports to use the `IpInet` type from the `cidr` crate in Postgres modules. - Enhanced type conversion implementations for `IpInet`, `Vector`, `HalfVector`, `Bit`, and `SparseVector` in Postgres transport. - Updated `Cargo.lock` to include the `cidr` and `pgvector` dependencies. - Improved conversion mappings for Postgres transport to support new types.
1 parent c5cfe1a commit 7c20a8e

File tree

7 files changed

+194
-32
lines changed

7 files changed

+194
-32
lines changed

.devcontainer/Dockerfile

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
FROM mcr.microsoft.com/devcontainers/rust:1
2+
3+
# Install system dependencies
4+
RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \
5+
&& apt-get -y install --no-install-recommends \
6+
sqlite3 \
7+
pkg-config \
8+
libclang-dev \
9+
postgresql-client \
10+
default-mysql-client \
11+
&& apt-get clean -y \
12+
&& rm -rf /var/lib/apt/lists/*
13+
14+
# Install Python and Poetry
15+
ENV POETRY_HOME=/opt/poetry
16+
ENV POETRY_VERSION=2.1.3
17+
ENV PATH="/opt/poetry/bin:$PATH"
18+
RUN curl -sSL https://install.python-poetry.org | python3 - \
19+
&& poetry config virtualenvs.create false
20+
21+
# Install Rust components
22+
RUN rustup component add rustfmt clippy \
23+
&& cargo install cargo-watch \
24+
&& cargo install just
25+
26+
# Set environment variables
27+
ENV PATH="/home/vscode/.local/bin:${PATH}"
28+
ENV PYTHONPATH="/workspaces/connector-x:${PYTHONPATH}"

.devcontainer/devcontainer.json

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"name": "ConnectorX Development",
3+
"dockerComposeFile": [
4+
"docker-compose.yml"
5+
],
6+
"service": "connectorx",
7+
"workspaceFolder": "/workspaces/${localWorkspaceFolderBasename}",
8+
"customizations": {
9+
"vscode": {
10+
"extensions": [
11+
"rust-lang.rust-analyzer"
12+
],
13+
"settings": {
14+
"rust-analyzer.checkOnSave.command": "clippy"
15+
}
16+
}
17+
},
18+
"features": {
19+
"ghcr.io/devcontainers/features/rust:1": "latest"
20+
}
21+
}

.devcontainer/docker-compose.yml

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
services:
2+
connectorx:
3+
build:
4+
context: ..
5+
dockerfile: .devcontainer/Dockerfile
6+
volumes:
7+
- ..:/workspaces/connectorx:cached
8+
command: sleep infinity
9+
depends_on:
10+
- postgres
11+
- mysql
12+
networks:
13+
- connectorx-network
14+
15+
postgres:
16+
image: pgvector/pgvector:pg17
17+
environment:
18+
POSTGRES_USER: postgres
19+
POSTGRES_PASSWORD: postgres
20+
POSTGRES_DB: connectorx
21+
ports:
22+
- "5433:5432"
23+
volumes:
24+
- postgres_data:/var/lib/postgresql/data
25+
networks:
26+
- connectorx-network
27+
28+
mysql:
29+
image: ghcr.io/wangxiaoying/mysql:latest
30+
environment:
31+
MYSQL_DATABASE: mysql
32+
MYSQL_ROOT_PASSWORD: mysql
33+
LANG: C.UTF-8
34+
ports:
35+
- "3306:3306"
36+
volumes:
37+
- mysql_data:/var/lib/mysql
38+
networks:
39+
- connectorx-network
40+
41+
networks:
42+
connectorx-network:
43+
driver: bridge
44+
45+
volumes:
46+
postgres_data:
47+
mysql_data:
48+

connectorx-python/Cargo.lock

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

connectorx-python/src/pandas/transports/postgres.rs

Lines changed: 74 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::errors::ConnectorXPythonError;
22
use crate::pandas::{destination::PandasDestination, typesystem::PandasTypeSystem};
33
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
4+
use connectorx::sources::postgres::{Bit, HalfVector, IpInet, SparseVector, Vector};
45
use connectorx::{
56
impl_transport,
67
sources::postgres::{
@@ -28,35 +29,40 @@ macro_rules! impl_postgres_transport {
2829
systems = PostgresTypeSystem => PandasTypeSystem,
2930
route = PostgresSource<$proto, $tls> => PandasDestination<'tp>,
3031
mappings = {
31-
{ Float4[f32] => F64[f64] | conversion auto }
32-
{ Float8[f64] => F64[f64] | conversion auto }
33-
{ Numeric[Decimal] => F64[f64] | conversion option }
34-
{ Int2[i16] => I64[i64] | conversion auto }
35-
{ Int4[i32] => I64[i64] | conversion auto }
36-
{ Int8[i64] => I64[i64] | conversion auto }
37-
{ BoolArray[Vec<Option<bool>>] => BoolArray[Vec<bool>] | conversion option }
38-
{ Int2Array[Vec<Option<i16>>] => I64Array[Vec<i64>] | conversion option }
39-
{ Int4Array[Vec<Option<i32>>] => I64Array[Vec<i64>] | conversion option }
40-
{ Int8Array[Vec<Option<i64>>] => I64Array[Vec<i64>] | conversion option }
41-
{ Float4Array[Vec<Option<f32>>] => F64Array[Vec<f64>] | conversion option }
42-
{ Float8Array[Vec<Option<f64>>] => F64Array[Vec<f64>] | conversion option }
43-
{ NumericArray[Vec<Option<Decimal>>] => F64Array[Vec<f64>] | conversion option }
44-
{ Bool[bool] => Bool[bool] | conversion auto }
45-
{ Char[i8] => Char[char] | conversion option }
46-
{ Text[&'r str] => Str[&'r str] | conversion auto }
47-
{ BpChar[&'r str] => Str[&'r str] | conversion none }
48-
{ VarChar[&'r str] => Str[&'r str] | conversion none }
49-
{ Name[&'r str] => Str[&'r str] | conversion none }
50-
{ Timestamp[NaiveDateTime] => DateTime[DateTime<Utc>] | conversion option }
51-
{ TimestampTz[DateTime<Utc>] => DateTime[DateTime<Utc>] | conversion auto }
52-
{ Date[NaiveDate] => DateTime[DateTime<Utc>] | conversion option }
53-
{ UUID[Uuid] => String[String] | conversion option }
54-
{ JSON[Value] => String[String] | conversion option }
55-
{ JSONB[Value] => String[String] | conversion none }
56-
{ Time[NaiveTime] => String[String] | conversion option }
57-
{ ByteA[Vec<u8>] => Bytes[Vec<u8>] | conversion auto }
58-
{ Enum[&'r str] => Str[&'r str] | conversion none }
59-
{ HSTORE[HashMap<String, Option<String>>] => String[String] | conversion option }
32+
{ Float4[f32] => F64[f64] | conversion auto }
33+
{ Float8[f64] => F64[f64] | conversion auto }
34+
{ Numeric[Decimal] => F64[f64] | conversion option }
35+
{ Int2[i16] => I64[i64] | conversion auto }
36+
{ Int4[i32] => I64[i64] | conversion auto }
37+
{ Int8[i64] => I64[i64] | conversion auto }
38+
{ BoolArray[Vec<Option<bool>>] => BoolArray[Vec<bool>] | conversion option }
39+
{ Int2Array[Vec<Option<i16>>] => I64Array[Vec<i64>] | conversion option }
40+
{ Int4Array[Vec<Option<i32>>] => I64Array[Vec<i64>] | conversion option }
41+
{ Int8Array[Vec<Option<i64>>] => I64Array[Vec<i64>] | conversion option }
42+
{ Float4Array[Vec<Option<f32>>] => F64Array[Vec<f64>] | conversion option }
43+
{ Float8Array[Vec<Option<f64>>] => F64Array[Vec<f64>] | conversion option }
44+
{ NumericArray[Vec<Option<Decimal>>] => F64Array[Vec<f64>] | conversion option }
45+
{ Vector[Vector] => F64Array[Vec<f64>] | conversion option }
46+
{ HalfVec[HalfVector] => F64Array[Vec<f64>] | conversion option }
47+
{ Bit[Bit] => Bytes[Vec<u8>] | conversion option }
48+
{ SparseVec[SparseVector] => F64Array[Vec<f64>] | conversion option }
49+
{ Bool[bool] => Bool[bool] | conversion auto }
50+
{ Char[i8] => Char[char] | conversion option }
51+
{ Text[&'r str] => Str[&'r str] | conversion auto }
52+
{ BpChar[&'r str] => Str[&'r str] | conversion none }
53+
{ VarChar[&'r str] => Str[&'r str] | conversion none }
54+
{ Name[&'r str] => Str[&'r str] | conversion none }
55+
{ Timestamp[NaiveDateTime] => DateTime[DateTime<Utc>] | conversion option }
56+
{ TimestampTz[DateTime<Utc>] => DateTime[DateTime<Utc>] | conversion auto }
57+
{ Date[NaiveDate] => DateTime[DateTime<Utc>] | conversion option }
58+
{ UUID[Uuid] => String[String] | conversion option }
59+
{ JSON[Value] => String[String] | conversion option }
60+
{ JSONB[Value] => String[String] | conversion none }
61+
{ Inet[IpInet] => String[String] | conversion none }
62+
{ Time[NaiveTime] => String[String] | conversion option }
63+
{ ByteA[Vec<u8>] => Bytes[Vec<u8>] | conversion auto }
64+
{ Enum[&'r str] => Str[&'r str] | conversion none }
65+
{ HSTORE[HashMap<String, Option<String>>] => String[String] | conversion option }
6066
}
6167
);
6268
}
@@ -71,6 +77,45 @@ impl_postgres_transport!(CursorProtocol, MakeTlsConnector);
7177
impl_postgres_transport!(SimpleProtocol, NoTls);
7278
impl_postgres_transport!(SimpleProtocol, MakeTlsConnector);
7379

80+
81+
impl<'py, P, C> TypeConversion<Vector, Vec<f64>> for PostgresPandasTransport<'py, P, C> {
82+
fn convert(val: Vector) -> Vec<f64> {
83+
val.to_vec().into_iter().map(|v| v as f64).collect()
84+
}
85+
}
86+
87+
impl<'py, P, C> TypeConversion<HalfVector, Vec<f64>> for PostgresPandasTransport<'py, P, C> {
88+
fn convert(val: HalfVector) -> Vec<f64> {
89+
val.to_vec().into_iter().map(|v| v.to_f64()).collect()
90+
}
91+
}
92+
93+
impl<'py, P, C> TypeConversion<Bit, Vec<u8>> for PostgresPandasTransport<'py, P, C> {
94+
fn convert(val: Bit) -> Vec<u8> {
95+
val.as_bytes().to_vec()
96+
}
97+
}
98+
99+
impl<'py, P, C> TypeConversion<SparseVector, Vec<f64>> for PostgresPandasTransport<'py, P, C> {
100+
fn convert(val: SparseVector) -> Vec<f64> {
101+
val.to_vec().into_iter().map(|v| v as f64).collect()
102+
}
103+
}
104+
105+
impl<'py, P, C> TypeConversion<IpInet, String> for PostgresPandasTransport<'py, P, C> {
106+
fn convert(val: IpInet) -> String {
107+
val.to_string()
108+
}
109+
}
110+
111+
impl<'py, P, C> TypeConversion<Option<IpInet>, Option<String>>
112+
for PostgresPandasTransport<'py, P, C>
113+
{
114+
fn convert(val: Option<IpInet>) -> Option<String> {
115+
val.map(|val| val.to_string())
116+
}
117+
}
118+
74119
impl<'py, P, C> TypeConversion<HashMap<String, Option<String>>, String>
75120
for PostgresPandasTransport<'py, P, C>
76121
{

connectorx/src/sources/postgres/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ mod errors;
55
mod typesystem;
66

77
pub use self::errors::PostgresSourceError;
8-
use cidr_02::IpInet;
8+
pub use cidr_02::IpInet;
99
pub use connection::rewrite_tls_args;
10-
use pgvector::{Bit, HalfVector, SparseVector, Vector};
10+
pub use pgvector::{Bit, HalfVector, SparseVector, Vector};
1111
pub use typesystem::{PostgresTypePairs, PostgresTypeSystem};
1212

1313
use crate::constants::DB_BUFFER_SIZE;

connectorx/src/sources/postgres/typesystem.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
2-
use cidr_02::IpInet;
2+
use crate::sources::postgres::IpInet;
33
use postgres::types::Type;
44
use rust_decimal::Decimal;
55
use serde_json::Value;

0 commit comments

Comments
 (0)