Skip to content

Commit 88b0359

Browse files
author
Ilyas Gasanov
committed
[DOP-22139] Reset HWM when changing strategy
1 parent 83b9588 commit 88b0359

File tree

11 files changed

+208
-73
lines changed

11 files changed

+208
-73
lines changed

.env.docker

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ SYNCMASTER__AUTH__ACCESS_TOKEN__SECRET_KEY=generate_another_random_string
3939
SYNCMASTER__SCHEDULER__TRANSFER_FETCHING_TIMEOUT_SECONDS=200
4040

4141
# HWM Store
42+
SYNCMASTER__HWM_STORE__ENABLED=true
4243
SYNCMASTER__HWM_STORE__TYPE=horizon
4344
SYNCMASTER__HWM_STORE__URL=http://horizon:8000
4445
SYNCMASTER__HWM_STORE__NAMESPACE=syncmaster_namespace

.env.local

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ export SYNCMASTER__AUTH__ACCESS_TOKEN__SECRET_KEY=generate_another_random_string
3939
export SYNCMASTER__SCHEDULER__TRANSFER_FETCHING_TIMEOUT_SECONDS=200
4040

4141
# HWM Store
42+
export SYNCMASTER__HWM_STORE__ENABLED=true
4243
export SYNCMASTER__HWM_STORE__TYPE=horizon
4344
export SYNCMASTER__HWM_STORE__URL=http://localhost:8020
4445
export SYNCMASTER__HWM_STORE__NAMESPACE=syncmaster_namespace
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Reset HWM when changing strategy from `incremental` to `full`

poetry.lock

Lines changed: 53 additions & 53 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ pyjwt = { version = "^2.10.1", optional = true }
6060
jinja2 = { version = "^3.1.4", optional = true }
6161
python-multipart = { version = ">=0.0.9,<0.0.21", optional = true }
6262
celery = { version = "^5.4.0", optional = true }
63-
onetl = { version = "^0.13.1", extras = ["all"], optional = true }
63+
onetl = { version = "^0.13.3", extras = ["all"], optional = true }
6464
pyyaml = { version = "*", optional = true }
6565
# due to not supporting MacOS 14.x https://www.psycopg.org/psycopg3/docs/news.html#psycopg-3-1-20
6666
psycopg = { version = ">=3.1.0,<3.2.6", extras = ["binary"], optional = true }
@@ -72,7 +72,7 @@ apscheduler = { version = "^3.10.4", optional = true }
7272
starlette-exporter = {version = "^0.23.0", optional = true}
7373
itsdangerous = {version = "*", optional = true}
7474
python-keycloak = {version = ">=4.7,<6.0", optional = true}
75-
horizon-hwm-store = {version = ">=1.1.1", optional = true }
75+
horizon-hwm-store = {version = ">=1.1.2", optional = true }
7676

7777
[tool.poetry.extras]
7878
server = [

syncmaster/worker/controller.py

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,9 @@ def perform_transfer(self) -> None:
198198
if self.source_handler.transfer_dto.strategy.type == "incremental":
199199
return self._perform_incremental_transfer()
200200

201+
if self.source_handler.transfer_dto.strategy.type == "full" and self.settings.hwm_store.enabled:
202+
self._reset_transfer_hwm()
203+
201204
df = self.source_handler.read()
202205
self.target_handler.write(df)
203206
finally:
@@ -243,21 +246,38 @@ def _perform_incremental_transfer(self) -> None:
243246
).force_create_namespace() as hwm_store:
244247

245248
with IncrementalStrategy():
246-
if self.source_handler.connection_dto.type in FILE_CONNECTION_TYPES:
247-
hwm_name_suffix = self.source_handler.transfer_dto.directory_path
248-
else:
249-
hwm_name_suffix = self.source_handler.transfer_dto.table_name
250-
hwm_name = "_".join(
251-
[
252-
str(self.source_handler.transfer_dto.id),
253-
self.source_handler.connection_dto.type,
254-
hwm_name_suffix,
255-
],
256-
)
249+
hwm_name = self._get_transfer_hwm_name()
257250
hwm = hwm_store.get_hwm(hwm_name)
258251

259252
self.source_handler.hwm = hwm
260253
self.target_handler.hwm = hwm
261254

262255
df = self.source_handler.read()
263256
self.target_handler.write(df)
257+
258+
def _get_transfer_hwm_name(self) -> str:
259+
if self.source_handler.connection_dto.type in FILE_CONNECTION_TYPES:
260+
hwm_name_suffix = self.source_handler.transfer_dto.directory_path
261+
else:
262+
hwm_name_suffix = self.source_handler.transfer_dto.table_name
263+
hwm_name = "_".join(
264+
[
265+
str(self.source_handler.transfer_dto.id),
266+
self.source_handler.connection_dto.type,
267+
hwm_name_suffix,
268+
],
269+
)
270+
return hwm_name
271+
272+
def _reset_transfer_hwm(self) -> None:
273+
with HorizonHWMStore(
274+
api_url=self.settings.hwm_store.url,
275+
auth=LoginPassword(login=self.settings.hwm_store.user, password=self.settings.hwm_store.password),
276+
namespace=self.settings.hwm_store.namespace,
277+
).force_create_namespace() as hwm_store:
278+
279+
hwm_name = self._get_transfer_hwm_name()
280+
hwm = hwm_store.get_hwm(hwm_name)
281+
if hwm:
282+
hwm.reset()
283+
hwm_store.set_hwm(hwm)

syncmaster/worker/settings/hwm_store.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,21 @@ class HWMStoreSettings(BaseModel):
1919
SYNCMASTER__HWM_STORE__URL=http://horizon:8000
2020
"""
2121

22+
enabled: bool = Field(
23+
description="Enable or disable HWM Store",
24+
)
2225
type: Literal["horizon"] = Field(
23-
description=("HWM Store type"),
26+
description="HWM Store type",
2427
)
2528
url: str = Field(
26-
description=("HWM Store URL"),
29+
description="HWM Store URL",
2730
)
2831
user: str = Field(
29-
description=("HWM Store user"),
32+
description="HWM Store user",
3033
)
3134
password: str = Field(
32-
description=("HWM Store password"),
35+
description="HWM Store password",
3336
)
3437
namespace: str = Field(
35-
description=("HWM Store namespace"),
38+
description="HWM Store namespace",
3639
)

tests/test_integration/test_run_transfer/connection_fixtures/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@
124124
incremental_strategy_by_file_modified_since,
125125
incremental_strategy_by_file_name,
126126
incremental_strategy_by_number_column,
127+
update_transfer_strategy,
127128
)
128129
from tests.test_integration.test_run_transfer.connection_fixtures.webdav_fixtures import (
129130
prepare_webdav,

tests/test_integration/test_run_transfer/connection_fixtures/strategy_fixtures.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
import pytest
2+
import pytest_asyncio
3+
from sqlalchemy.ext.asyncio import AsyncSession
4+
5+
from syncmaster.db.models import Transfer
26

37

48
@pytest.fixture
@@ -30,3 +34,14 @@ def incremental_strategy_by_number_column():
3034
"type": "incremental",
3135
"increment_by": "NUMBER",
3236
}
37+
38+
39+
@pytest_asyncio.fixture
40+
async def update_transfer_strategy(session: AsyncSession, request: pytest.FixtureRequest):
41+
async def _update_transfer_strategy(transfer: Transfer, strategy_fixture_name: str) -> None:
42+
strategy = request.getfixturevalue(strategy_fixture_name)
43+
transfer.strategy_params = strategy
44+
await session.commit()
45+
return transfer
46+
47+
return _update_transfer_strategy

tests/test_integration/test_run_transfer/test_clickhouse.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,49 @@ async def test_run_transfer_postgres_to_clickhouse_with_full_strategy(
135135
assert df.sort("ID").collect() == init_df.sort("ID").collect()
136136

137137

138+
@pytest.mark.parametrize(
139+
"strategy, transformations",
140+
[
141+
(
142+
lf("incremental_strategy_by_number_column"),
143+
[],
144+
),
145+
],
146+
)
147+
async def test_run_transfer_postgres_to_clickhouse_with_different_strategies(
148+
client: AsyncClient,
149+
group_owner: MockUser,
150+
prepare_postgres,
151+
prepare_clickhouse,
152+
init_df: DataFrame,
153+
postgres_to_clickhouse: Transfer,
154+
update_transfer_strategy,
155+
strategy,
156+
transformations,
157+
):
158+
_, fill_with_data = prepare_postgres
159+
clickhouse, _ = prepare_clickhouse
160+
161+
first_transfer_df, second_transfer_df = split_df(df=init_df, ratio=0.6, keep_sorted_by="number")
162+
fill_with_data(first_transfer_df)
163+
await run_transfer_and_verify(client, group_owner, postgres_to_clickhouse.id)
164+
165+
fill_with_data(second_transfer_df)
166+
await update_transfer_strategy(postgres_to_clickhouse, "full_strategy")
167+
await run_transfer_and_verify(client, group_owner, postgres_to_clickhouse.id)
168+
169+
await update_transfer_strategy(postgres_to_clickhouse, "incremental_strategy_by_number_column")
170+
await run_transfer_and_verify(client, group_owner, postgres_to_clickhouse.id)
171+
172+
reader = DBReader(
173+
connection=clickhouse,
174+
table=f"{clickhouse.user}.target_table",
175+
)
176+
df = reader.run()
177+
df, init_df = cast_dataframe_types(df, init_df)
178+
assert df.sort("ID").collect() == init_df.sort("ID").collect()
179+
180+
138181
@pytest.mark.parametrize(
139182
"strategy, transformations",
140183
[
@@ -239,11 +282,11 @@ async def test_run_transfer_clickhouse_to_postgres_with_full_strategy(
239282
prepare_clickhouse,
240283
prepare_postgres,
241284
init_df: DataFrame,
285+
clickhouse_to_postgres: Transfer,
242286
source_type,
243287
strategy,
244288
transformations,
245289
expected_filter,
246-
clickhouse_to_postgres: Transfer,
247290
):
248291
_, fill_with_data = prepare_clickhouse
249292
fill_with_data(init_df)
@@ -315,9 +358,9 @@ async def test_run_transfer_clickhouse_to_postgres_with_incremental_strategy(
315358
prepare_clickhouse,
316359
prepare_postgres,
317360
init_df: DataFrame,
361+
clickhouse_to_postgres: Transfer,
318362
strategy,
319363
transformations,
320-
clickhouse_to_postgres: Transfer,
321364
):
322365
_, fill_with_data = prepare_clickhouse
323366
postgres, _ = prepare_postgres

0 commit comments

Comments
 (0)