Skip to content

Commit c9b70a5

Browse files
author
Ilyas Gasanov
committed
[DOP-22139] Reset HWM when changing strategy
1 parent 83b9588 commit c9b70a5

File tree

11 files changed

+244
-82
lines changed

11 files changed

+244
-82
lines changed

.env.docker

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ SYNCMASTER__AUTH__ACCESS_TOKEN__SECRET_KEY=generate_another_random_string
3939
SYNCMASTER__SCHEDULER__TRANSFER_FETCHING_TIMEOUT_SECONDS=200
4040

4141
# HWM Store
42+
SYNCMASTER__HWM_STORE__ENABLED=true
4243
SYNCMASTER__HWM_STORE__TYPE=horizon
4344
SYNCMASTER__HWM_STORE__URL=http://horizon:8000
4445
SYNCMASTER__HWM_STORE__NAMESPACE=syncmaster_namespace

.env.local

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ export SYNCMASTER__AUTH__ACCESS_TOKEN__SECRET_KEY=generate_another_random_string
3939
export SYNCMASTER__SCHEDULER__TRANSFER_FETCHING_TIMEOUT_SECONDS=200
4040

4141
# HWM Store
42+
export SYNCMASTER__HWM_STORE__ENABLED=true
4243
export SYNCMASTER__HWM_STORE__TYPE=horizon
4344
export SYNCMASTER__HWM_STORE__URL=http://localhost:8020
4445
export SYNCMASTER__HWM_STORE__NAMESPACE=syncmaster_namespace
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Reset HWM when changing strategy from `incremental` to `full`

poetry.lock

Lines changed: 53 additions & 53 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ pyjwt = { version = "^2.10.1", optional = true }
6060
jinja2 = { version = "^3.1.4", optional = true }
6161
python-multipart = { version = ">=0.0.9,<0.0.21", optional = true }
6262
celery = { version = "^5.4.0", optional = true }
63-
onetl = { version = "^0.13.1", extras = ["all"], optional = true }
63+
onetl = { version = "^0.13.3", extras = ["all"], optional = true }
6464
pyyaml = { version = "*", optional = true }
6565
# due to not supporting MacOS 14.x https://www.psycopg.org/psycopg3/docs/news.html#psycopg-3-1-20
6666
psycopg = { version = ">=3.1.0,<3.2.6", extras = ["binary"], optional = true }
@@ -72,7 +72,7 @@ apscheduler = { version = "^3.10.4", optional = true }
7272
starlette-exporter = {version = "^0.23.0", optional = true}
7373
itsdangerous = {version = "*", optional = true}
7474
python-keycloak = {version = ">=4.7,<6.0", optional = true}
75-
horizon-hwm-store = {version = ">=1.1.1", optional = true }
75+
horizon-hwm-store = {version = ">=1.1.2", optional = true }
7676

7777
[tool.poetry.extras]
7878
server = [

syncmaster/worker/controller.py

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,9 @@ def perform_transfer(self) -> None:
198198
if self.source_handler.transfer_dto.strategy.type == "incremental":
199199
return self._perform_incremental_transfer()
200200

201+
if self.source_handler.transfer_dto.strategy.type == "full" and self.settings.hwm_store.enabled:
202+
self._reset_transfer_hwm()
203+
201204
df = self.source_handler.read()
202205
self.target_handler.write(df)
203206
finally:
@@ -235,29 +238,43 @@ def get_handler(
235238
temp_dir=temp_dir,
236239
)
237240

238-
def _perform_incremental_transfer(self) -> None:
239-
with HorizonHWMStore(
241+
def _get_hwm_store(self) -> HorizonHWMStore:
242+
return HorizonHWMStore(
240243
api_url=self.settings.hwm_store.url,
241244
auth=LoginPassword(login=self.settings.hwm_store.user, password=self.settings.hwm_store.password),
242245
namespace=self.settings.hwm_store.namespace,
243-
).force_create_namespace() as hwm_store:
246+
)
244247

248+
def _perform_incremental_transfer(self) -> None:
249+
with self._get_hwm_store().force_create_namespace() as hwm_store:
245250
with IncrementalStrategy():
246-
if self.source_handler.connection_dto.type in FILE_CONNECTION_TYPES:
247-
hwm_name_suffix = self.source_handler.transfer_dto.directory_path
248-
else:
249-
hwm_name_suffix = self.source_handler.transfer_dto.table_name
250-
hwm_name = "_".join(
251-
[
252-
str(self.source_handler.transfer_dto.id),
253-
self.source_handler.connection_dto.type,
254-
hwm_name_suffix,
255-
],
256-
)
251+
hwm_name = self._get_transfer_hwm_name()
257252
hwm = hwm_store.get_hwm(hwm_name)
258253

259254
self.source_handler.hwm = hwm
260255
self.target_handler.hwm = hwm
261256

262257
df = self.source_handler.read()
263258
self.target_handler.write(df)
259+
260+
def _get_transfer_hwm_name(self) -> str:
261+
if self.source_handler.connection_dto.type in FILE_CONNECTION_TYPES:
262+
hwm_name_suffix = self.source_handler.transfer_dto.directory_path
263+
else:
264+
hwm_name_suffix = self.source_handler.transfer_dto.table_name
265+
hwm_name = "_".join(
266+
[
267+
str(self.source_handler.transfer_dto.id),
268+
self.source_handler.connection_dto.type,
269+
hwm_name_suffix,
270+
],
271+
)
272+
return hwm_name
273+
274+
def _reset_transfer_hwm(self) -> None:
275+
with self._get_hwm_store().force_create_namespace() as hwm_store:
276+
hwm_name = self._get_transfer_hwm_name()
277+
hwm = hwm_store.get_hwm(hwm_name)
278+
if hwm and hwm.value:
279+
hwm.reset()
280+
hwm_store.set_hwm(hwm)

syncmaster/worker/settings/hwm_store.py

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# SPDX-License-Identifier: Apache-2.0
33
from typing import Literal
44

5-
from pydantic import BaseModel, Field
5+
from pydantic import BaseModel, Field, model_validator
66

77

88
class HWMStoreSettings(BaseModel):
@@ -19,18 +19,37 @@ class HWMStoreSettings(BaseModel):
1919
SYNCMASTER__HWM_STORE__URL=http://horizon:8000
2020
"""
2121

22-
type: Literal["horizon"] = Field(
23-
description=("HWM Store type"),
22+
enabled: bool = Field(
23+
default=False,
24+
description="Enable or disable HWM Store",
2425
)
25-
url: str = Field(
26-
description=("HWM Store URL"),
26+
type: Literal["horizon"] | None = Field(
27+
default=None,
28+
description="HWM Store type",
2729
)
28-
user: str = Field(
29-
description=("HWM Store user"),
30+
url: str | None = Field(
31+
default=None,
32+
description="HWM Store URL",
3033
)
31-
password: str = Field(
32-
description=("HWM Store password"),
34+
user: str | None = Field(
35+
default=None,
36+
description="HWM Store user",
3337
)
34-
namespace: str = Field(
35-
description=("HWM Store namespace"),
38+
password: str | None = Field(
39+
default=None,
40+
description="HWM Store password",
3641
)
42+
namespace: str | None = Field(
43+
default=None,
44+
description="HWM Store namespace",
45+
)
46+
47+
@model_validator(mode="after")
48+
def check_required_fields_if_enabled(self):
49+
if self.enabled:
50+
missing_fields = [
51+
field for field in ["type", "url", "user", "password", "namespace"] if getattr(self, field) is None
52+
]
53+
if missing_fields:
54+
raise ValueError(f"All fields must be set with enabled HWMStore. Missing {', '.join(missing_fields)}")
55+
return self

tests/test_integration/test_run_transfer/connection_fixtures/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@
124124
incremental_strategy_by_file_modified_since,
125125
incremental_strategy_by_file_name,
126126
incremental_strategy_by_number_column,
127+
update_transfer_strategy,
127128
)
128129
from tests.test_integration.test_run_transfer.connection_fixtures.webdav_fixtures import (
129130
prepare_webdav,

tests/test_integration/test_run_transfer/connection_fixtures/strategy_fixtures.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
import pytest
2+
import pytest_asyncio
3+
from sqlalchemy.ext.asyncio import AsyncSession
4+
5+
from syncmaster.db.models import Transfer
26

37

48
@pytest.fixture
@@ -30,3 +34,14 @@ def incremental_strategy_by_number_column():
3034
"type": "incremental",
3135
"increment_by": "NUMBER",
3236
}
37+
38+
39+
@pytest_asyncio.fixture
40+
async def update_transfer_strategy(session: AsyncSession, request: pytest.FixtureRequest):
41+
async def _update_transfer_strategy(transfer: Transfer, strategy_fixture_name: str) -> None:
42+
strategy = request.getfixturevalue(strategy_fixture_name)
43+
transfer.strategy_params = strategy
44+
await session.commit()
45+
return transfer
46+
47+
return _update_transfer_strategy

tests/test_integration/test_run_transfer/test_clickhouse.py

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,56 @@ async def test_run_transfer_postgres_to_clickhouse_with_full_strategy(
135135
assert df.sort("ID").collect() == init_df.sort("ID").collect()
136136

137137

138+
@pytest.mark.parametrize(
139+
"strategy, transformations",
140+
[
141+
(
142+
lf("incremental_strategy_by_number_column"),
143+
[],
144+
),
145+
],
146+
)
147+
async def test_run_transfer_postgres_to_clickhouse_with_different_strategies(
148+
client: AsyncClient,
149+
group_owner: MockUser,
150+
prepare_postgres,
151+
prepare_clickhouse,
152+
init_df: DataFrame,
153+
postgres_to_clickhouse: Transfer,
154+
update_transfer_strategy,
155+
strategy,
156+
transformations,
157+
):
158+
"""
159+
Run three transfers:
160+
161+
1. Incremental transfer
162+
2. Full transfer with HWM reset
163+
3. Incremental transfer to verify that results match the snapshot.
164+
"""
165+
_, fill_with_data = prepare_postgres
166+
clickhouse, _ = prepare_clickhouse
167+
168+
first_transfer_df, second_transfer_df = split_df(df=init_df, ratio=0.6, keep_sorted_by="number")
169+
fill_with_data(first_transfer_df)
170+
await run_transfer_and_verify(client, group_owner, postgres_to_clickhouse.id)
171+
172+
fill_with_data(second_transfer_df)
173+
await update_transfer_strategy(postgres_to_clickhouse, "full_strategy")
174+
await run_transfer_and_verify(client, group_owner, postgres_to_clickhouse.id)
175+
176+
await update_transfer_strategy(postgres_to_clickhouse, "incremental_strategy_by_number_column")
177+
await run_transfer_and_verify(client, group_owner, postgres_to_clickhouse.id)
178+
179+
reader = DBReader(
180+
connection=clickhouse,
181+
table=f"{clickhouse.user}.target_table",
182+
)
183+
df = reader.run()
184+
df, init_df = cast_dataframe_types(df, init_df)
185+
assert df.sort("ID").collect() == init_df.sort("ID").collect()
186+
187+
138188
@pytest.mark.parametrize(
139189
"strategy, transformations",
140190
[
@@ -239,11 +289,11 @@ async def test_run_transfer_clickhouse_to_postgres_with_full_strategy(
239289
prepare_clickhouse,
240290
prepare_postgres,
241291
init_df: DataFrame,
292+
clickhouse_to_postgres: Transfer,
242293
source_type,
243294
strategy,
244295
transformations,
245296
expected_filter,
246-
clickhouse_to_postgres: Transfer,
247297
):
248298
_, fill_with_data = prepare_clickhouse
249299
fill_with_data(init_df)
@@ -315,9 +365,9 @@ async def test_run_transfer_clickhouse_to_postgres_with_incremental_strategy(
315365
prepare_clickhouse,
316366
prepare_postgres,
317367
init_df: DataFrame,
368+
clickhouse_to_postgres: Transfer,
318369
strategy,
319370
transformations,
320-
clickhouse_to_postgres: Transfer,
321371
):
322372
_, fill_with_data = prepare_clickhouse
323373
postgres, _ = prepare_postgres

0 commit comments

Comments
 (0)