Skip to content

Commit 7e4d1bc

Browse files
[DOP-21832] Move Connection type field to the root of the model (#169)
1 parent 3a325f9 commit 7e4d1bc

File tree

54 files changed

+741
-742
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+741
-742
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
- Moved the `type` field from within `connection_data` to the root level of the `Connection` model.
2+
- Decoupled `auth_data.type` from the connection type and linked it to the authentication type instead.

docs/design/entities.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,15 @@ Example:
5252
"group_id": "1",
5353
"name": "Beautiful name",
5454
"description": "What a great connection !",
55+
"type": "postgres",
5556
"connection_data": {
56-
"type": "postgres",
5757
"host": "127.0.0.1",
5858
"port": 5432,
5959
"database_name": "postgres",
6060
"additional_params": {},
6161
},
6262
"auth_data": {
63-
"type": "postgres",
63+
"type": "basic",
6464
"user": "user_name",
6565
"password": "password",
6666
}

syncmaster/backend/api/v1/connections.py

Lines changed: 49 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44
from typing import get_args
55

66
from fastapi import APIRouter, Depends, Query, status
7+
from pydantic import TypeAdapter
78

89
from syncmaster.backend.services import UnitOfWork, get_user
9-
from syncmaster.db.models import Connection, Transfer, User
10+
from syncmaster.db.models import Connection, ConnectionType, Transfer, User
1011
from syncmaster.db.utils import Permission
1112
from syncmaster.errors.registration import get_error_responses
1213
from syncmaster.exceptions import ActionNotAllowedError
@@ -26,7 +27,6 @@
2627
ORACLE_TYPE,
2728
POSTGRES_TYPE,
2829
S3_TYPE,
29-
ConnectionType,
3030
)
3131
from syncmaster.schemas.v1.connections.connection import (
3232
ConnectionCopySchema,
@@ -80,13 +80,16 @@ async def read_connections(
8080
if pagination.items:
8181
credentials = await unit_of_work.credentials.read_bulk([item.id for item in pagination.items])
8282
items = [
83-
ReadConnectionSchema(
84-
id=item.id,
85-
group_id=item.group_id,
86-
name=item.name,
87-
description=item.description,
88-
auth_data=credentials.get(item.id, None),
89-
data=item.data,
83+
TypeAdapter(ReadConnectionSchema).validate_python(
84+
{
85+
"id": item.id,
86+
"group_id": item.group_id,
87+
"name": item.name,
88+
"description": item.description,
89+
"type": item.type,
90+
"data": item.data,
91+
"auth_data": credentials.get(item.id, None),
92+
},
9093
)
9194
for item in pagination.items
9295
]
@@ -126,6 +129,7 @@ async def create_connection(
126129
async with unit_of_work:
127130
connection = await unit_of_work.connection.create(
128131
name=connection_data.name,
132+
type=connection_data.type,
129133
description=connection_data.description,
130134
group_id=connection_data.group_id,
131135
data=connection_data.data.dict(),
@@ -137,13 +141,16 @@ async def create_connection(
137141
)
138142

139143
credentials = await unit_of_work.credentials.read(connection.id)
140-
return ReadConnectionSchema(
141-
id=connection.id,
142-
group_id=connection.group_id,
143-
name=connection.name,
144-
description=connection.description,
145-
data=connection.data,
146-
auth_data=credentials,
144+
return TypeAdapter(ReadConnectionSchema).validate_python(
145+
{
146+
"id": connection.id,
147+
"group_id": connection.group_id,
148+
"name": connection.name,
149+
"description": connection.description,
150+
"type": connection.type,
151+
"data": connection.data,
152+
"auth_data": credentials,
153+
},
147154
)
148155

149156

@@ -172,13 +179,16 @@ async def read_connection(
172179
except AuthDataNotFoundError:
173180
credentials = None
174181

175-
return ReadConnectionSchema(
176-
id=connection.id,
177-
group_id=connection.group_id,
178-
name=connection.name,
179-
description=connection.description,
180-
data=connection.data,
181-
auth_data=credentials,
182+
return TypeAdapter(ReadConnectionSchema).validate_python(
183+
{
184+
"id": connection.id,
185+
"group_id": connection.group_id,
186+
"name": connection.name,
187+
"description": connection.description,
188+
"type": connection.type,
189+
"data": connection.data,
190+
"auth_data": credentials,
191+
},
182192
)
183193

184194

@@ -202,15 +212,15 @@ async def update_connection(
202212

203213
async with unit_of_work:
204214
data = changes.data.dict(exclude={"auth_data"}) if changes.data else {}
205-
if data.get("type", None) is not None:
206-
source_connection: Connection = await unit_of_work.connection.read_by_id(connection_id=connection_id)
207-
if data["type"] != source_connection.data["type"]:
208-
linked_transfers: Sequence[Transfer] = await unit_of_work.transfer.list_by_connection_id(connection_id)
209-
if linked_transfers:
210-
raise ConnectionTypeUpdateError
215+
source_connection: Connection = await unit_of_work.connection.read_by_id(connection_id=connection_id)
216+
if changes.type != source_connection.type:
217+
linked_transfers: Sequence[Transfer] = await unit_of_work.transfer.list_by_connection_id(connection_id)
218+
if linked_transfers:
219+
raise ConnectionTypeUpdateError
211220
connection = await unit_of_work.connection.update(
212221
connection_id=connection_id,
213222
name=changes.name,
223+
type=changes.type,
214224
description=changes.description,
215225
data=data,
216226
)
@@ -222,13 +232,16 @@ async def update_connection(
222232
)
223233

224234
credentials = await unit_of_work.credentials.read(connection_id)
225-
return ReadConnectionSchema(
226-
id=connection.id,
227-
group_id=connection.group_id,
228-
name=connection.name,
229-
description=connection.description,
230-
data=connection.data,
231-
auth_data=credentials,
235+
return TypeAdapter(ReadConnectionSchema).validate_python(
236+
{
237+
"id": connection.id,
238+
"group_id": connection.group_id,
239+
"name": connection.name,
240+
"description": connection.description,
241+
"type": connection.type,
242+
"data": connection.data,
243+
"auth_data": credentials,
244+
},
232245
)
233246

234247

syncmaster/backend/api/v1/runs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from syncmaster.exceptions.base import ActionNotAllowedError
2020
from syncmaster.exceptions.run import CannotConnectToTaskQueueError
2121
from syncmaster.exceptions.transfer import TransferNotFoundError
22-
from syncmaster.schemas.v1.connections.connection import ReadAuthDataSchema
22+
from syncmaster.schemas.v1.connections.connection_base import ReadAuthDataSchema
2323
from syncmaster.schemas.v1.transfers.run import (
2424
CreateRunSchema,
2525
ReadRunSchema,

syncmaster/backend/api/v1/transfers.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from fastapi import APIRouter, Depends, Query, status
55

66
from syncmaster.backend.services import UnitOfWork, get_user
7-
from syncmaster.db.models import User
7+
from syncmaster.db.models import ConnectionType, User
88
from syncmaster.db.utils import Permission
99
from syncmaster.errors.registration import get_error_responses
1010
from syncmaster.exceptions.base import ActionNotAllowedError
@@ -16,7 +16,6 @@
1616
DifferentTypeConnectionsAndParamsError,
1717
TransferNotFoundError,
1818
)
19-
from syncmaster.schemas.v1.connection_types import ConnectionType
2019
from syncmaster.schemas.v1.status import (
2120
StatusCopyTransferResponseSchema,
2221
StatusResponseSchema,
@@ -103,16 +102,16 @@ async def create_transfer(
103102
):
104103
raise DifferentTransferAndConnectionsGroupsError
105104

106-
if target_connection.data["type"] != transfer_data.target_params.type:
105+
if target_connection.type != transfer_data.target_params.type:
107106
raise DifferentTypeConnectionsAndParamsError(
108-
connection_type=target_connection.data["type"],
107+
connection_type=target_connection.type,
109108
conn="target",
110109
params_type=transfer_data.target_params.type,
111110
)
112111

113-
if source_connection.data["type"] != transfer_data.source_params.type:
112+
if source_connection.type != transfer_data.source_params.type:
114113
raise DifferentTypeConnectionsAndParamsError(
115-
connection_type=source_connection.data["type"],
114+
connection_type=source_connection.type,
116115
conn="source",
117116
params_type=transfer_data.source_params.type,
118117
)
@@ -302,16 +301,16 @@ async def update_transfer(
302301
if queue.group_id != transfer.group_id:
303302
raise DifferentTransferAndQueueGroupError
304303

305-
if transfer_data.target_params and target_connection.data["type"] != transfer_data.target_params.type:
304+
if transfer_data.target_params and target_connection.type != transfer_data.target_params.type:
306305
raise DifferentTypeConnectionsAndParamsError(
307-
connection_type=target_connection.data["type"],
306+
connection_type=target_connection.type,
308307
conn="target",
309308
params_type=transfer_data.target_params.type,
310309
)
311310

312-
if transfer_data.source_params and source_connection.data["type"] != transfer_data.source_params.type:
311+
if transfer_data.source_params and source_connection.type != transfer_data.source_params.type:
313312
raise DifferentTypeConnectionsAndParamsError(
314-
connection_type=source_connection.data["type"],
313+
connection_type=source_connection.type,
315314
conn="source",
316315
params_type=transfer_data.source_params.type,
317316
)

syncmaster/db/migrations/versions/2023-11-23_0004_create_connection_table.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def upgrade():
3030
"connection",
3131
sa.Column("id", sa.BigInteger(), nullable=False),
3232
sa.Column("group_id", sa.BigInteger(), nullable=False),
33+
sa.Column("type", sa.String(length=23), nullable=False),
3334
sa.Column("name", sa.String(length=128), nullable=False),
3435
sa.Column("description", sa.String(length=512), nullable=False),
3536
sa.Column("data", sa.JSON(), nullable=False),

syncmaster/db/models/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from syncmaster.db.models.apscheduler_job import APSchedulerJob
44
from syncmaster.db.models.auth_data import AuthData
55
from syncmaster.db.models.base import Base
6-
from syncmaster.db.models.connection import Connection
6+
from syncmaster.db.models.connection import Connection, ConnectionType
77
from syncmaster.db.models.group import Group, GroupMemberRole, UserGroup
88
from syncmaster.db.models.queue import Queue
99
from syncmaster.db.models.run import Run, RunType, Status

syncmaster/db/models/connection.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,32 @@
22
# SPDX-License-Identifier: Apache-2.0
33
from __future__ import annotations
44

5+
from enum import StrEnum
56
from typing import Any
67

7-
from sqlalchemy import JSON, Computed, Index, UniqueConstraint
8+
from sqlalchemy import JSON, Computed, Index, String, UniqueConstraint
89
from sqlalchemy.dialects.postgresql import TSVECTOR
910
from sqlalchemy.orm import Mapped, declared_attr, mapped_column, relationship
11+
from sqlalchemy_utils import ChoiceType
1012

1113
from syncmaster.db.mixins import ResourceMixin, TimestampMixin
1214
from syncmaster.db.models.base import Base
1315
from syncmaster.db.models.group import Group
1416

1517

18+
class ConnectionType(StrEnum):
19+
POSTGRES = "postgres"
20+
HIVE = "hive"
21+
ORACLE = "oracle"
22+
CLICKHOUSE = "clickhouse"
23+
MSSQL = "mssql"
24+
MYSQL = "mysql"
25+
S3 = "s3"
26+
HDFS = "hdfs"
27+
28+
1629
class Connection(Base, ResourceMixin, TimestampMixin):
30+
type: Mapped[ConnectionType] = mapped_column(ChoiceType(ConnectionType, impl=String(23)), nullable=False)
1731
data: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False, default={})
1832

1933
group: Mapped[Group] = relationship("Group")

syncmaster/db/repositories/connection.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ async def paginate(
4040
stmt = self._construct_vector_search(stmt, combined_query)
4141

4242
if connection_type is not None:
43-
stmt = stmt.where(Connection.data.op("->>")("type").in_(connection_type))
43+
stmt = stmt.where(Connection.type.in_(connection_type))
4444

4545
return await self._paginate_scalar_result(
4646
query=stmt.order_by(Connection.name),
@@ -62,6 +62,7 @@ async def read_by_id(
6262
async def create(
6363
self,
6464
group_id: int,
65+
type: str,
6566
name: str,
6667
description: str,
6768
data: dict[str, Any],
@@ -70,6 +71,7 @@ async def create(
7071
insert(Connection)
7172
.values(
7273
group_id=group_id,
74+
type=type,
7375
name=name,
7476
description=description,
7577
data=data,
@@ -88,6 +90,7 @@ async def update(
8890
self,
8991
connection_id: int,
9092
name: str | None,
93+
type: str | None,
9194
description: str | None,
9295
data: dict[str, Any],
9396
) -> Connection:
@@ -98,6 +101,7 @@ async def update(
98101

99102
return await self._update(
100103
Connection.id == connection_id,
104+
type=type or connection.type,
101105
name=name or connection.name,
102106
description=description or connection.description,
103107
data=data,

syncmaster/db/repositories/transfer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ async def paginate(
7070
Transfer.source_connection_id == SourceConnection.id,
7171
)
7272
stmt = stmt.where(
73-
SourceConnection.data.op("->>")("type").in_(source_connection_type),
73+
SourceConnection.type.in_(source_connection_type),
7474
)
7575

7676
if target_connection_type is not None:
@@ -79,7 +79,7 @@ async def paginate(
7979
Transfer.target_connection_id == TargetConnection.id,
8080
)
8181
stmt = stmt.where(
82-
TargetConnection.data.op("->>")("type").in_(target_connection_type),
82+
TargetConnection.type.in_(target_connection_type),
8383
)
8484

8585
return await self._paginate_scalar_result(

0 commit comments

Comments
 (0)