Skip to content

Commit 49b602c

Browse files
author
Ilyas Gasanov
committed
[DOP-22132] Implement increment for transfers with DB source
1 parent d0e3f27 commit 49b602c

File tree

21 files changed

+1698
-956
lines changed

21 files changed

+1698
-956
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Implement increment for transfers with database sources

syncmaster/worker/controller.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
)
4343
from syncmaster.dto.transfers_strategy import Strategy
4444
from syncmaster.exceptions.connection import ConnectionTypeNotRecognizedError
45+
from syncmaster.schemas.v1.connection_types import FILE_CONNECTION_TYPES
4546
from syncmaster.worker.handlers.base import Handler
4647
from syncmaster.worker.handlers.db.clickhouse import ClickhouseHandler
4748
from syncmaster.worker.handlers.db.hive import HiveHandler
@@ -242,11 +243,15 @@ def _perform_incremental_transfer(self) -> None:
242243
).force_create_namespace() as hwm_store:
243244

244245
with IncrementalStrategy():
246+
if self.source_handler.connection_dto.type in FILE_CONNECTION_TYPES:
247+
hwm_name_suffix = self.source_handler.transfer_dto.directory_path
248+
else:
249+
hwm_name_suffix = self.source_handler.transfer_dto.table_name
245250
hwm_name = "_".join(
246251
[
247252
str(self.source_handler.transfer_dto.id),
248253
self.source_handler.connection_dto.type,
249-
self.source_handler.transfer_dto.directory_path,
254+
hwm_name_suffix,
250255
],
251256
)
252257
hwm = hwm_store.get_hwm(hwm_name)

syncmaster/worker/handlers/db/base.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,21 @@ class DBHandler(Handler):
3636
}
3737

3838
def read(self) -> DataFrame:
39+
reader_params = {}
40+
if self.transfer_dto.strategy.type == "incremental":
41+
self.transfer_dto.strategy.increment_by = self._quote_field(self.transfer_dto.strategy.increment_by)
42+
hwm_name = f"{self.transfer_dto.id}_{self.connection_dto.type}_{self.transfer_dto.table_name}"
43+
reader_params["hwm"] = DBReader.AutoDetectHWM(
44+
name=hwm_name,
45+
expression=self.transfer_dto.strategy.increment_by,
46+
)
47+
3948
reader = DBReader(
4049
connection=self.connection,
4150
table=self.transfer_dto.table_name,
4251
where=self._get_rows_filter_expression(),
4352
columns=self._get_columns_filter_expressions(),
53+
**reader_params,
4454
)
4555
return reader.run()
4656

tests/test_integration/test_run_transfer/connection_fixtures/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@
123123
full_strategy,
124124
incremental_strategy_by_file_modified_since,
125125
incremental_strategy_by_file_name,
126+
incremental_strategy_by_number_column,
126127
)
127128
from tests.test_integration.test_run_transfer.connection_fixtures.webdav_fixtures import (
128129
prepare_webdav,

tests/test_integration/test_run_transfer/connection_fixtures/filters_fixtures.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,13 @@ def expected_dataframe_rows_filter():
5050

5151
@pytest.fixture
5252
def dataframe_columns_filter_transformations(source_type: str):
53-
string_types_per_source_type = {
54-
"postgres": "VARCHAR(10)",
55-
"oracle": "VARCHAR2(10)",
56-
"clickhouse": "VARCHAR(10)",
53+
string_type_per_source_type = {
54+
"postgres": "VARCHAR(30)",
55+
"oracle": "VARCHAR2(30)",
56+
"clickhouse": "VARCHAR(30)",
5757
"mysql": "CHAR",
58-
"mssql": "VARCHAR(10)",
59-
"hive": "VARCHAR(10)",
58+
"mssql": "VARCHAR(30)",
59+
"hive": "VARCHAR(30)",
6060
"s3": "STRING",
6161
"hdfs": "STRING",
6262
}
@@ -84,7 +84,7 @@ def dataframe_columns_filter_transformations(source_type: str):
8484
{
8585
"type": "cast",
8686
"field": "NUMBER",
87-
"as_type": string_types_per_source_type[source_type],
87+
"as_type": string_type_per_source_type[source_type],
8888
},
8989
{
9090
"type": "include",

tests/test_integration/test_run_transfer/connection_fixtures/s3_fixtures.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def s3_file_connection(s3_server):
8282
return s3_connection
8383

8484

85-
@pytest.fixture(scope="session")
85+
@pytest.fixture
8686
def s3_file_connection_with_path(request, s3_file_connection):
8787
connection = s3_file_connection
8888
source = PurePosixPath("/data")
@@ -99,7 +99,7 @@ def finalizer():
9999
return connection, source
100100

101101

102-
@pytest.fixture(scope="session")
102+
@pytest.fixture
103103
def s3_file_df_connection_with_path(s3_file_connection_with_path, s3_file_df_connection):
104104
_, root = s3_file_connection_with_path
105105
return s3_file_df_connection, root
@@ -123,7 +123,7 @@ def s3_file_df_connection(s3_file_connection, spark, s3_server):
123123
)
124124

125125

126-
@pytest.fixture(scope="session")
126+
@pytest.fixture
127127
def prepare_s3(
128128
resource_path: PosixPath,
129129
s3_file_connection: S3,

tests/test_integration/test_run_transfer/connection_fixtures/strategy_fixtures.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,11 @@ def incremental_strategy_by_file_name():
2222
"type": "incremental",
2323
"increment_by": "file_name",
2424
}
25+
26+
27+
@pytest.fixture
28+
def incremental_strategy_by_number_column():
29+
return {
30+
"type": "incremental",
31+
"increment_by": "NUMBER",
32+
}

0 commit comments

Comments
 (0)