Skip to content

Commit b19eb4f

Browse files
author
Ilyas Gasanov
committed
[DOP-22132] Implement increment for transfers with DB source
1 parent d0e3f27 commit b19eb4f

File tree

20 files changed

+1775
-957
lines changed

20 files changed

+1775
-957
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Implement increment for transfers with database sources

syncmaster/worker/controller.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
)
4343
from syncmaster.dto.transfers_strategy import Strategy
4444
from syncmaster.exceptions.connection import ConnectionTypeNotRecognizedError
45+
from syncmaster.schemas.v1.connection_types import FILE_CONNECTION_TYPES
4546
from syncmaster.worker.handlers.base import Handler
4647
from syncmaster.worker.handlers.db.clickhouse import ClickhouseHandler
4748
from syncmaster.worker.handlers.db.hive import HiveHandler
@@ -242,11 +243,15 @@ def _perform_incremental_transfer(self) -> None:
242243
).force_create_namespace() as hwm_store:
243244

244245
with IncrementalStrategy():
246+
if self.source_handler.connection_dto.type in FILE_CONNECTION_TYPES:
247+
hwm_name_suffix = self.source_handler.transfer_dto.directory_path
248+
else:
249+
hwm_name_suffix = self.source_handler.transfer_dto.table_name
245250
hwm_name = "_".join(
246251
[
247252
str(self.source_handler.transfer_dto.id),
248253
self.source_handler.connection_dto.type,
249-
self.source_handler.transfer_dto.directory_path,
254+
hwm_name_suffix,
250255
],
251256
)
252257
hwm = hwm_store.get_hwm(hwm_name)

syncmaster/worker/handlers/db/base.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,21 @@ class DBHandler(Handler):
3636
}
3737

3838
def read(self) -> DataFrame:
39+
reader_params = {}
40+
if self.transfer_dto.strategy.type == "incremental":
41+
self.transfer_dto.strategy.increment_by = self._quote_field(self.transfer_dto.strategy.increment_by)
42+
hwm_name = f"{self.transfer_dto.id}_{self.connection_dto.type}_{self.transfer_dto.table_name}"
43+
reader_params["hwm"] = DBReader.AutoDetectHWM(
44+
name=hwm_name,
45+
expression=self.transfer_dto.strategy.increment_by,
46+
)
47+
3948
reader = DBReader(
4049
connection=self.connection,
4150
table=self.transfer_dto.table_name,
4251
where=self._get_rows_filter_expression(),
4352
columns=self._get_columns_filter_expressions(),
53+
**reader_params,
4454
)
4555
return reader.run()
4656

tests/test_integration/test_run_transfer/connection_fixtures/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@
123123
full_strategy,
124124
incremental_strategy_by_file_modified_since,
125125
incremental_strategy_by_file_name,
126+
incremental_strategy_by_number_column,
126127
)
127128
from tests.test_integration.test_run_transfer.connection_fixtures.webdav_fixtures import (
128129
prepare_webdav,

tests/test_integration/test_run_transfer/connection_fixtures/filters_fixtures.py

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,26 @@ def expected_dataframe_rows_filter():
5050

5151
@pytest.fixture
5252
def dataframe_columns_filter_transformations(source_type: str):
53-
string_types_per_source_type = {
54-
"postgres": "VARCHAR(10)",
55-
"oracle": "VARCHAR2(10)",
56-
"clickhouse": "VARCHAR(10)",
53+
string_type_per_source_type = {
54+
"postgres": "VARCHAR(30)",
55+
"oracle": "VARCHAR2(30)",
56+
"clickhouse": "VARCHAR(30)",
5757
"mysql": "CHAR",
58-
"mssql": "VARCHAR(10)",
59-
"hive": "VARCHAR(10)",
58+
"mssql": "VARCHAR(30)",
59+
"hive": "VARCHAR(30)",
6060
"s3": "STRING",
6161
"hdfs": "STRING",
6262
}
63+
int_type_per_source_type = {
64+
"postgres": "INTEGER",
65+
"oracle": "NUMBER",
66+
"clickhouse": "Int32",
67+
"mysql": "INT",
68+
"mssql": "INT",
69+
"hive": "INT",
70+
"s3": "INT",
71+
"hdfs": "INT",
72+
}
6373
return [
6474
{
6575
"type": "dataframe_columns_filter",
@@ -74,7 +84,7 @@ def dataframe_columns_filter_transformations(source_type: str):
7484
},
7585
{
7686
"type": "include",
77-
"field": "PHONE_NUMBER",
87+
"field": "NUMBER",
7888
},
7989
{
8090
"type": "rename",
@@ -83,8 +93,8 @@ def dataframe_columns_filter_transformations(source_type: str):
8393
},
8494
{
8595
"type": "cast",
86-
"field": "NUMBER",
87-
"as_type": string_types_per_source_type[source_type],
96+
"field": "ACCOUNT_BALANCE",
97+
"as_type": int_type_per_source_type[source_type],
8898
},
8999
{
90100
"type": "include",
@@ -100,9 +110,9 @@ def expected_dataframe_columns_filter():
100110
return lambda df, source_type: df.selectExpr(
101111
"ID",
102112
"REGION",
103-
"PHONE_NUMBER",
113+
"NUMBER",
104114
"REGION AS REGION2",
105-
"CAST(NUMBER AS STRING) AS NUMBER",
115+
"CAST(ACCOUNT_BALANCE AS INT) AS ACCOUNT_BALANCE",
106116
"REGISTERED_AT",
107117
)
108118

tests/test_integration/test_run_transfer/connection_fixtures/s3_fixtures.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def s3_file_connection(s3_server):
8282
return s3_connection
8383

8484

85-
@pytest.fixture(scope="session")
85+
@pytest.fixture
8686
def s3_file_connection_with_path(request, s3_file_connection):
8787
connection = s3_file_connection
8888
source = PurePosixPath("/data")
@@ -99,7 +99,7 @@ def finalizer():
9999
return connection, source
100100

101101

102-
@pytest.fixture(scope="session")
102+
@pytest.fixture
103103
def s3_file_df_connection_with_path(s3_file_connection_with_path, s3_file_df_connection):
104104
_, root = s3_file_connection_with_path
105105
return s3_file_df_connection, root
@@ -123,7 +123,7 @@ def s3_file_df_connection(s3_file_connection, spark, s3_server):
123123
)
124124

125125

126-
@pytest.fixture(scope="session")
126+
@pytest.fixture
127127
def prepare_s3(
128128
resource_path: PosixPath,
129129
s3_file_connection: S3,

tests/test_integration/test_run_transfer/connection_fixtures/strategy_fixtures.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,11 @@ def incremental_strategy_by_file_name():
2222
"type": "incremental",
2323
"increment_by": "file_name",
2424
}
25+
26+
27+
@pytest.fixture
28+
def incremental_strategy_by_number_column():
29+
return {
30+
"type": "incremental",
31+
"increment_by": "NUMBER",
32+
}

0 commit comments

Comments
 (0)