Skip to content

Commit f6e6c1b

Browse files
author
Ilyas Gasanov
committed
[DOP-21832] Move Connection type field to the root of the model
1 parent 5178354 commit f6e6c1b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+656
-674
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
- Moved the `type` field from within `connection_data` to the root level of the `Connection` model.
2+
- Decoupled `auth_data.type` from the connection type and linked it to the authentication type instead.

docs/design/entities.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,15 @@ Example:
5252
"group_id": "1",
5353
"name": "Beautiful name",
5454
"description": "What a great connection !",
55+
"type": "postgres",
5556
"connection_data": {
56-
"type": "postgres",
5757
"host": "127.0.0.1",
5858
"port": 5432,
5959
"database_name": "postgres",
6060
"additional_params": {},
6161
},
6262
"auth_data": {
63-
"type": "postgres",
63+
"type": "basic",
6464
"user": "user_name",
6565
"password": "password",
6666
}

syncmaster/backend/api/v1/connections.py

Lines changed: 48 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing import get_args
55

66
from fastapi import APIRouter, Depends, Query, status
7+
from pydantic import TypeAdapter
78

89
from syncmaster.backend.services import UnitOfWork, get_user
910
from syncmaster.db.models import Connection, Transfer, User
@@ -80,13 +81,16 @@ async def read_connections(
8081
if pagination.items:
8182
credentials = await unit_of_work.credentials.read_bulk([item.id for item in pagination.items])
8283
items = [
83-
ReadConnectionSchema(
84-
id=item.id,
85-
group_id=item.group_id,
86-
name=item.name,
87-
description=item.description,
88-
auth_data=credentials.get(item.id, None),
89-
data=item.data,
84+
TypeAdapter(ReadConnectionSchema).validate_python(
85+
{
86+
"id": item.id,
87+
"group_id": item.group_id,
88+
"name": item.name,
89+
"description": item.description,
90+
"type": item.type,
91+
"data": item.data,
92+
"auth_data": credentials.get(item.id, None),
93+
},
9094
)
9195
for item in pagination.items
9296
]
@@ -126,6 +130,7 @@ async def create_connection(
126130
async with unit_of_work:
127131
connection = await unit_of_work.connection.create(
128132
name=connection_data.name,
133+
type=connection_data.type,
129134
description=connection_data.description,
130135
group_id=connection_data.group_id,
131136
data=connection_data.data.dict(),
@@ -137,13 +142,16 @@ async def create_connection(
137142
)
138143

139144
credentials = await unit_of_work.credentials.read(connection.id)
140-
return ReadConnectionSchema(
141-
id=connection.id,
142-
group_id=connection.group_id,
143-
name=connection.name,
144-
description=connection.description,
145-
data=connection.data,
146-
auth_data=credentials,
145+
return TypeAdapter(ReadConnectionSchema).validate_python(
146+
{
147+
"id": connection.id,
148+
"group_id": connection.group_id,
149+
"name": connection.name,
150+
"description": connection.description,
151+
"type": connection.type,
152+
"data": connection.data,
153+
"auth_data": credentials,
154+
},
147155
)
148156

149157

@@ -172,13 +180,16 @@ async def read_connection(
172180
except AuthDataNotFoundError:
173181
credentials = None
174182

175-
return ReadConnectionSchema(
176-
id=connection.id,
177-
group_id=connection.group_id,
178-
name=connection.name,
179-
description=connection.description,
180-
data=connection.data,
181-
auth_data=credentials,
183+
return TypeAdapter(ReadConnectionSchema).validate_python(
184+
{
185+
"id": connection.id,
186+
"group_id": connection.group_id,
187+
"name": connection.name,
188+
"description": connection.description,
189+
"type": connection.type,
190+
"data": connection.data,
191+
"auth_data": credentials,
192+
},
182193
)
183194

184195

@@ -202,15 +213,15 @@ async def update_connection(
202213

203214
async with unit_of_work:
204215
data = changes.data.dict(exclude={"auth_data"}) if changes.data else {}
205-
if data.get("type", None) is not None:
206-
source_connection: Connection = await unit_of_work.connection.read_by_id(connection_id=connection_id)
207-
if data["type"] != source_connection.data["type"]:
208-
linked_transfers: Sequence[Transfer] = await unit_of_work.transfer.list_by_connection_id(connection_id)
209-
if linked_transfers:
210-
raise ConnectionTypeUpdateError
216+
source_connection: Connection = await unit_of_work.connection.read_by_id(connection_id=connection_id)
217+
if changes.type != source_connection.type:
218+
linked_transfers: Sequence[Transfer] = await unit_of_work.transfer.list_by_connection_id(connection_id)
219+
if linked_transfers:
220+
raise ConnectionTypeUpdateError
211221
connection = await unit_of_work.connection.update(
212222
connection_id=connection_id,
213223
name=changes.name,
224+
type=changes.type,
214225
description=changes.description,
215226
data=data,
216227
)
@@ -222,13 +233,16 @@ async def update_connection(
222233
)
223234

224235
credentials = await unit_of_work.credentials.read(connection_id)
225-
return ReadConnectionSchema(
226-
id=connection.id,
227-
group_id=connection.group_id,
228-
name=connection.name,
229-
description=connection.description,
230-
data=connection.data,
231-
auth_data=credentials,
236+
return TypeAdapter(ReadConnectionSchema).validate_python(
237+
{
238+
"id": connection.id,
239+
"group_id": connection.group_id,
240+
"name": connection.name,
241+
"description": connection.description,
242+
"type": connection.type,
243+
"data": connection.data,
244+
"auth_data": credentials,
245+
},
232246
)
233247

234248

syncmaster/backend/api/v1/runs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from syncmaster.exceptions.base import ActionNotAllowedError
2020
from syncmaster.exceptions.run import CannotConnectToTaskQueueError
2121
from syncmaster.exceptions.transfer import TransferNotFoundError
22-
from syncmaster.schemas.v1.connections.connection import ReadAuthDataSchema
22+
from syncmaster.schemas.v1.connections.connection_base import ReadAuthDataSchema
2323
from syncmaster.schemas.v1.transfers.run import (
2424
CreateRunSchema,
2525
ReadRunSchema,

syncmaster/backend/api/v1/transfers.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,16 +103,16 @@ async def create_transfer(
103103
):
104104
raise DifferentTransferAndConnectionsGroupsError
105105

106-
if target_connection.data["type"] != transfer_data.target_params.type:
106+
if target_connection.type != transfer_data.target_params.type:
107107
raise DifferentTypeConnectionsAndParamsError(
108-
connection_type=target_connection.data["type"],
108+
connection_type=target_connection.type,
109109
conn="target",
110110
params_type=transfer_data.target_params.type,
111111
)
112112

113-
if source_connection.data["type"] != transfer_data.source_params.type:
113+
if source_connection.type != transfer_data.source_params.type:
114114
raise DifferentTypeConnectionsAndParamsError(
115-
connection_type=source_connection.data["type"],
115+
connection_type=source_connection.type,
116116
conn="source",
117117
params_type=transfer_data.source_params.type,
118118
)
@@ -302,16 +302,16 @@ async def update_transfer(
302302
if queue.group_id != transfer.group_id:
303303
raise DifferentTransferAndQueueGroupError
304304

305-
if transfer_data.target_params and target_connection.data["type"] != transfer_data.target_params.type:
305+
if transfer_data.target_params and target_connection.type != transfer_data.target_params.type:
306306
raise DifferentTypeConnectionsAndParamsError(
307-
connection_type=target_connection.data["type"],
307+
connection_type=target_connection.type,
308308
conn="target",
309309
params_type=transfer_data.target_params.type,
310310
)
311311

312-
if transfer_data.source_params and source_connection.data["type"] != transfer_data.source_params.type:
312+
if transfer_data.source_params and source_connection.type != transfer_data.source_params.type:
313313
raise DifferentTypeConnectionsAndParamsError(
314-
connection_type=source_connection.data["type"],
314+
connection_type=source_connection.type,
315315
conn="source",
316316
params_type=transfer_data.source_params.type,
317317
)

syncmaster/db/migrations/versions/2023-11-23_0004_create_connection_table.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def upgrade():
3030
"connection",
3131
sa.Column("id", sa.BigInteger(), nullable=False),
3232
sa.Column("group_id", sa.BigInteger(), nullable=False),
33+
sa.Column("type", sa.String(length=128), nullable=False),
3334
sa.Column("name", sa.String(length=128), nullable=False),
3435
sa.Column("description", sa.String(length=512), nullable=False),
3536
sa.Column("data", sa.JSON(), nullable=False),

syncmaster/db/models/connection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from typing import Any
66

7-
from sqlalchemy import JSON, Computed, Index, UniqueConstraint
7+
from sqlalchemy import JSON, Computed, Index, String, UniqueConstraint
88
from sqlalchemy.dialects.postgresql import TSVECTOR
99
from sqlalchemy.orm import Mapped, declared_attr, mapped_column, relationship
1010

@@ -14,8 +14,8 @@
1414

1515

1616
class Connection(Base, ResourceMixin, DeletableMixin, TimestampMixin):
17+
type: Mapped[str] = mapped_column(String(128), nullable=False)
1718
data: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False, default={})
18-
1919
group: Mapped[Group] = relationship("Group")
2020

2121
search_vector: Mapped[str] = mapped_column(

syncmaster/db/repositories/connection.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ async def paginate(
4141
stmt = self._construct_vector_search(stmt, combined_query)
4242

4343
if connection_type is not None:
44-
stmt = stmt.where(Connection.data.op("->>")("type").in_(connection_type))
44+
stmt = stmt.where(Connection.type.in_(connection_type))
4545

4646
return await self._paginate_scalar_result(
4747
query=stmt.order_by(Connection.name),
@@ -63,6 +63,7 @@ async def read_by_id(
6363
async def create(
6464
self,
6565
group_id: int,
66+
type: str,
6667
name: str,
6768
description: str,
6869
data: dict[str, Any],
@@ -71,6 +72,7 @@ async def create(
7172
insert(Connection)
7273
.values(
7374
group_id=group_id,
75+
type=type,
7476
name=name,
7577
description=description,
7678
data=data,
@@ -89,6 +91,7 @@ async def update(
8991
self,
9092
connection_id: int,
9193
name: str | None,
94+
type: str | None,
9295
description: str | None,
9396
data: dict[str, Any],
9497
) -> Connection:
@@ -100,6 +103,7 @@ async def update(
100103
return await self._update(
101104
Connection.id == connection_id,
102105
Connection.is_deleted.is_(False),
106+
type=type or connection.type,
103107
name=name or connection.name,
104108
description=description or connection.description,
105109
data=data,

syncmaster/db/repositories/transfer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ async def paginate(
7070
Transfer.source_connection_id == SourceConnection.id,
7171
)
7272
stmt = stmt.where(
73-
SourceConnection.data.op("->>")("type").in_(source_connection_type),
73+
SourceConnection.type.in_(source_connection_type),
7474
)
7575

7676
if target_connection_type is not None:
@@ -79,7 +79,7 @@ async def paginate(
7979
Transfer.target_connection_id == TargetConnection.id,
8080
)
8181
stmt = stmt.where(
82-
TargetConnection.data.op("->>")("type").in_(target_connection_type),
82+
TargetConnection.type.in_(target_connection_type),
8383
)
8484

8585
return await self._paginate_scalar_result(

syncmaster/scheduler/transfer_job_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from syncmaster.scheduler.celery import app as celery
1515
from syncmaster.scheduler.settings import SchedulerAppSettings as Settings
1616
from syncmaster.scheduler.utils import get_async_session
17-
from syncmaster.schemas.v1.connections.connection import ReadAuthDataSchema
17+
from syncmaster.schemas.v1.connections.connection_base import ReadAuthDataSchema
1818

1919

2020
class TransferJobManager:

0 commit comments

Comments
 (0)