Skip to content

Commit 57aab1f

Browse files
[DOP-22126] Add incremental strategy to API (#202)
1 parent 9d1e591 commit 57aab1f

File tree

10 files changed

+125
-82
lines changed

10 files changed

+125
-82
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add `increment_by` field to `strategy_params`

syncmaster/schemas/v1/connection_types.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,21 @@
1515
FTPS_TYPE = Literal["ftps"]
1616
WEBDAV_TYPE = Literal["webdav"]
1717
SAMBA_TYPE = Literal["samba"]
18+
19+
CONNECTION_TYPES = [
20+
"oracle",
21+
"postgres",
22+
"clickhouse",
23+
"hive",
24+
"mssql",
25+
"mysql",
26+
"s3",
27+
"hdfs",
28+
"sftp",
29+
"ftp",
30+
"ftps",
31+
"webdav",
32+
"samba",
33+
]
34+
FILE_CONNECTION_TYPES = ["s3", "hdfs", "sftp", "ftp", "ftps", "webdav", "samba"]
35+
DB_CONNECTION_TYPES = ["oracle", "postgres", "clickhouse", "hive", "mssql", "mysql"]

syncmaster/schemas/v1/transfers/__init__.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44

55
from typing import Annotated
66

7-
from pydantic import BaseModel, Field, field_validator, model_validator
7+
from pydantic import BaseModel, Field, model_validator
88

9+
from syncmaster.schemas.v1.connection_types import FILE_CONNECTION_TYPES
910
from syncmaster.schemas.v1.connections.connection import ReadConnectionSchema
1011
from syncmaster.schemas.v1.page import PageSchema
1112
from syncmaster.schemas.v1.transfers.db import (
@@ -246,6 +247,19 @@ def validate_scheduling(cls, values):
246247
raise ValueError("If transfer must be scheduled than set schedule param")
247248
return values
248249

250+
@model_validator(mode="after")
251+
def validate_increment_by(cls, values):
252+
if not isinstance(values.strategy_params, IncrementalStrategy):
253+
return values
254+
255+
source_type = values.source_params.type
256+
increment_by = values.strategy_params.increment_by
257+
258+
if source_type in FILE_CONNECTION_TYPES and increment_by != "modified_since":
259+
raise ValueError("Field 'increment_by' must be equal to 'modified_since' for file source types")
260+
261+
return values
262+
249263

250264
class UpdateTransferSchema(BaseModel):
251265
source_connection_id: int | None = None

syncmaster/schemas/v1/transfers/strategy.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ class FullStrategy(BaseModel):
1313

1414
class IncrementalStrategy(BaseModel):
1515
type: INCREMENTAL_TYPE
16+
increment_by: str

syncmaster/server/api/v1/connections.py

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
33
from collections.abc import Sequence
4-
from typing import get_args
54

65
from fastapi import APIRouter, Depends, Query, status
76
from pydantic import TypeAdapter
@@ -17,16 +16,7 @@
1716
)
1817
from syncmaster.exceptions.credentials import AuthDataNotFoundError
1918
from syncmaster.exceptions.group import GroupNotFoundError
20-
from syncmaster.schemas.v1.connection_types import (
21-
CLICKHOUSE_TYPE,
22-
HDFS_TYPE,
23-
HIVE_TYPE,
24-
MSSQL_TYPE,
25-
MYSQL_TYPE,
26-
ORACLE_TYPE,
27-
POSTGRES_TYPE,
28-
S3_TYPE,
29-
)
19+
from syncmaster.schemas.v1.connection_types import CONNECTION_TYPES
3020
from syncmaster.schemas.v1.connections.connection import (
3121
ConnectionCopySchema,
3222
ConnectionPageSchema,
@@ -41,8 +31,6 @@
4131

4232
router = APIRouter(tags=["Connections"], responses=get_error_responses())
4333

44-
CONNECTION_TYPES = ORACLE_TYPE, POSTGRES_TYPE, CLICKHOUSE_TYPE, HIVE_TYPE, MSSQL_TYPE, MYSQL_TYPE, S3_TYPE, HDFS_TYPE
45-
4634

4735
@router.get("/connections")
4836
async def read_connections(
@@ -157,7 +145,7 @@ async def create_connection(
157145

158146
@router.get("/connections/known_types", dependencies=[Depends(get_user(is_active=True))])
159147
async def read_connection_types() -> list[str]:
160-
return [get_args(type_)[0] for type_ in CONNECTION_TYPES]
148+
return CONNECTION_TYPES
161149

162150

163151
@router.get("/connections/{connection_id}")

tests/test_unit/test_connections/test_read_connection_types.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
from typing import get_args
2-
31
import pytest
42
from httpx import AsyncClient
53

6-
from syncmaster.server.api.v1.connections import CONNECTION_TYPES
4+
from syncmaster.schemas.v1.connection_types import CONNECTION_TYPES
75
from tests.mocks import MockUser
86

97
pytestmark = [pytest.mark.asyncio, pytest.mark.server]
@@ -31,4 +29,4 @@ async def test_groupless_user_can_read_connection_types(client: AsyncClient, sim
3129
)
3230
# Assert
3331
assert result.status_code == 200
34-
assert set(result.json()) == {get_args(type)[0] for type in CONNECTION_TYPES}
32+
assert set(result.json()) == set(CONNECTION_TYPES)

tests/test_unit/test_transfers/test_create_transfer.py

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ async def test_developer_plus_can_create_transfer(
4848
"type": "csv",
4949
},
5050
},
51-
"strategy_params": {"type": "full"},
51+
"strategy_params": {"type": "incremental", "increment_by": "modified_since"},
5252
"transformations": [
5353
{
5454
"type": "dataframe_rows_filter",
@@ -325,6 +325,7 @@ async def test_superuser_can_create_transfer(
325325
}
326326

327327

328+
# TODO: refactor annotations & fixtures
328329
@pytest.mark.parametrize(
329330
argnames=["new_data", "error_json"],
330331
argvalues=(
@@ -394,19 +395,6 @@ async def test_superuser_can_create_transfer(
394395
"message": "Value error, If transfer must be scheduled than set schedule param",
395396
"code": "value_error",
396397
"context": {},
397-
"input": {
398-
"description": "",
399-
"group_id": 1,
400-
"is_scheduled": True,
401-
"name": "new test transfer",
402-
"queue_id": 1,
403-
"schedule": None,
404-
"source_connection_id": 1,
405-
"source_params": {"table_name": "source_table", "type": "postgres"},
406-
"strategy_params": {"type": "full"},
407-
"target_connection_id": 2,
408-
"target_params": {"table_name": "target_table", "type": "postgres"},
409-
},
410398
},
411399
],
412400
},
@@ -439,6 +427,37 @@ async def test_superuser_can_create_transfer(
439427
},
440428
},
441429
),
430+
(
431+
{
432+
"source_params": {
433+
"type": "ftp",
434+
"directory_path": "/source_path",
435+
"file_format": {
436+
"type": "csv",
437+
},
438+
},
439+
"strategy_params": {
440+
"type": "incremental",
441+
"increment_by": "unknown",
442+
},
443+
},
444+
{
445+
"error": {
446+
"code": "invalid_request",
447+
"message": "Invalid request",
448+
"details": [
449+
{
450+
"location": ["body"],
451+
"message": (
452+
"Value error, Field 'increment_by' must be equal to 'modified_since' for file source types"
453+
),
454+
"code": "value_error",
455+
"context": {},
456+
},
457+
],
458+
},
459+
},
460+
),
442461
(
443462
{
444463
"source_params": {
@@ -759,7 +778,9 @@ async def test_check_fields_validation_on_create_transfer(
759778
# Assert
760779
assert result.status_code == 422
761780

762-
if new_data == {"schedule": None}:
781+
if (new_data == {"schedule": None}) or (
782+
"strategy_params" in new_data and new_data["strategy_params"].get("increment_by") == "unknown"
783+
):
763784
error_json["error"]["details"][0]["input"] = transfer_data
764785

765786
assert result.json() == error_json

tests/test_unit/test_transfers/test_file_transfers/test_read_transfer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@
104104
"target_params": {
105105
"file_name_template": "{run_created_at}_{index}.{extension}",
106106
},
107+
"strategy_params": {
108+
"type": "incremental",
109+
"increment_by": "modified_since",
110+
},
107111
},
108112
],
109113
)

tests/test_unit/test_transfers/test_file_transfers/test_update_transfer.py

Lines changed: 34 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -145,54 +145,44 @@ async def test_developer_plus_can_update_s3_transfer(
145145
):
146146
# Arrange
147147
user = group_transfer.owner_group.get_member_of_role(role_developer_plus)
148-
transformations = [
149-
{
150-
"type": "dataframe_rows_filter",
151-
"filters": [
152-
{
153-
"type": "is_not_null",
154-
"field": "col2",
155-
},
156-
],
148+
updated_fields = {
149+
"source_params": {
150+
"type": "ftp",
151+
"directory_path": "/some/new/test/directory",
152+
"file_format": create_transfer_data["source_and_target_params"]["file_format"],
153+
"options": {"some": "option"},
157154
},
158-
]
155+
"target_params": {
156+
"type": "ftp",
157+
"directory_path": "/some/new/test/directory",
158+
"file_format": create_transfer_data["source_and_target_params"]["file_format"],
159+
"file_name_template": "{index}.{extension}",
160+
"options": {"some": "option"},
161+
},
162+
"strategy_params": {
163+
"type": "incremental",
164+
"increment_by": "modified_since",
165+
},
166+
"transformations": [
167+
{
168+
"type": "dataframe_rows_filter",
169+
"filters": [
170+
{
171+
"type": "is_not_null",
172+
"field": "col2",
173+
},
174+
],
175+
},
176+
],
177+
}
159178

160179
# Act
161180
result = await client.patch(
162181
f"v1/transfers/{group_transfer.id}",
163182
headers={"Authorization": f"Bearer {user.token}"},
164-
json={
165-
"source_params": {
166-
"type": "ftp",
167-
"directory_path": "/some/new/test/directory",
168-
"file_format": create_transfer_data["source_and_target_params"]["file_format"],
169-
"options": {"some": "option"},
170-
},
171-
"target_params": {
172-
"type": "ftp",
173-
"directory_path": "/some/new/test/directory",
174-
"file_format": create_transfer_data["source_and_target_params"]["file_format"],
175-
"file_name_template": "{index}.{extension}",
176-
"options": {"some": "option"},
177-
},
178-
"transformations": transformations,
179-
},
183+
json=updated_fields,
180184
)
181185

182-
# Pre-Assert
183-
source_params = group_transfer.source_params.copy()
184-
source_params.update(
185-
{
186-
"directory_path": "/some/new/test/directory",
187-
"file_format": create_transfer_data["source_and_target_params"]["file_format"],
188-
"options": {"some": "option"},
189-
},
190-
)
191-
target_params = {
192-
**source_params,
193-
"file_name_template": "{index}.{extension}",
194-
}
195-
196186
# Assert
197187
assert result.status_code == 200
198188
assert result.json() == {
@@ -204,9 +194,9 @@ async def test_developer_plus_can_update_s3_transfer(
204194
"is_scheduled": group_transfer.is_scheduled,
205195
"source_connection_id": group_transfer.source_connection_id,
206196
"target_connection_id": group_transfer.target_connection_id,
207-
"source_params": source_params,
208-
"target_params": target_params,
209-
"strategy_params": group_transfer.strategy_params,
210-
"transformations": transformations,
197+
"source_params": updated_fields["source_params"],
198+
"target_params": updated_fields["target_params"],
199+
"strategy_params": updated_fields["strategy_params"],
200+
"transformations": updated_fields["transformations"],
211201
"queue_id": group_transfer.transfer.queue_id,
212202
}

tests/test_unit/test_transfers/test_update_transfer.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,28 +14,36 @@ async def test_developer_plus_can_update_transfer(
1414
):
1515
# Arrange
1616
user = group_transfer.owner_group.get_member_of_role(role_developer_plus)
17+
updated_fields = {
18+
"name": "New transfer name",
19+
"is_scheduled": False,
20+
"strategy_params": {
21+
"type": "incremental",
22+
"increment_by": "updated_at",
23+
},
24+
}
1725

1826
# Act
1927
result = await client.patch(
2028
f"v1/transfers/{group_transfer.id}",
2129
headers={"Authorization": f"Bearer {user.token}"},
22-
json={"name": "New transfer name", "is_scheduled": False},
30+
json=updated_fields,
2331
)
2432

2533
# Assert
2634
assert result.status_code == 200
2735
assert result.json() == {
2836
"id": group_transfer.id,
2937
"group_id": group_transfer.group_id,
30-
"name": "New transfer name",
38+
"name": updated_fields["name"],
3139
"description": group_transfer.description,
3240
"schedule": group_transfer.schedule,
33-
"is_scheduled": False,
41+
"is_scheduled": updated_fields["is_scheduled"],
3442
"source_connection_id": group_transfer.source_connection_id,
3543
"target_connection_id": group_transfer.target_connection_id,
3644
"source_params": group_transfer.source_params,
3745
"target_params": group_transfer.target_params,
38-
"strategy_params": group_transfer.strategy_params,
46+
"strategy_params": updated_fields["strategy_params"],
3947
"transformations": group_transfer.transformations,
4048
"queue_id": group_transfer.transfer.queue_id,
4149
}

0 commit comments

Comments
 (0)