Skip to content

Commit 2f49b03

Browse files
author
Ilyas Gasanov
committed
[DOP-22139] Reset HWM when changing strategy
1 parent 83b9588 commit 2f49b03

File tree

11 files changed

+245
-82
lines changed

11 files changed

+245
-82
lines changed

.env.docker

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ SYNCMASTER__AUTH__ACCESS_TOKEN__SECRET_KEY=generate_another_random_string
3939
SYNCMASTER__SCHEDULER__TRANSFER_FETCHING_TIMEOUT_SECONDS=200
4040

4141
# HWM Store
42+
SYNCMASTER__HWM_STORE__ENABLED=true
4243
SYNCMASTER__HWM_STORE__TYPE=horizon
4344
SYNCMASTER__HWM_STORE__URL=http://horizon:8000
4445
SYNCMASTER__HWM_STORE__NAMESPACE=syncmaster_namespace

.env.local

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ export SYNCMASTER__AUTH__ACCESS_TOKEN__SECRET_KEY=generate_another_random_string
3939
export SYNCMASTER__SCHEDULER__TRANSFER_FETCHING_TIMEOUT_SECONDS=200
4040

4141
# HWM Store
42+
export SYNCMASTER__HWM_STORE__ENABLED=true
4243
export SYNCMASTER__HWM_STORE__TYPE=horizon
4344
export SYNCMASTER__HWM_STORE__URL=http://localhost:8020
4445
export SYNCMASTER__HWM_STORE__NAMESPACE=syncmaster_namespace
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Reset HWM when changing strategy from `incremental` to `full`

poetry.lock

Lines changed: 53 additions & 53 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ pyjwt = { version = "^2.10.1", optional = true }
6060
jinja2 = { version = "^3.1.4", optional = true }
6161
python-multipart = { version = ">=0.0.9,<0.0.21", optional = true }
6262
celery = { version = "^5.4.0", optional = true }
63-
onetl = { version = "^0.13.1", extras = ["all"], optional = true }
63+
onetl = { version = "^0.13.3", extras = ["all"], optional = true }
6464
pyyaml = { version = "*", optional = true }
6565
# due to not supporting MacOS 14.x https://www.psycopg.org/psycopg3/docs/news.html#psycopg-3-1-20
6666
psycopg = { version = ">=3.1.0,<3.2.6", extras = ["binary"], optional = true }
@@ -72,7 +72,7 @@ apscheduler = { version = "^3.10.4", optional = true }
7272
starlette-exporter = {version = "^0.23.0", optional = true}
7373
itsdangerous = {version = "*", optional = true}
7474
python-keycloak = {version = ">=4.7,<6.0", optional = true}
75-
horizon-hwm-store = {version = ">=1.1.1", optional = true }
75+
horizon-hwm-store = {version = ">=1.1.2", optional = true }
7676

7777
[tool.poetry.extras]
7878
server = [

syncmaster/worker/controller.py

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from tempfile import TemporaryDirectory
55
from typing import Any
66

7+
from etl_entities.hwm_store import BaseHWMStore
78
from horizon.client.auth import LoginPassword
89
from horizon_hwm_store import HorizonHWMStore
910
from onetl.strategy import IncrementalStrategy
@@ -198,6 +199,9 @@ def perform_transfer(self) -> None:
198199
if self.source_handler.transfer_dto.strategy.type == "incremental":
199200
return self._perform_incremental_transfer()
200201

202+
if self.source_handler.transfer_dto.strategy.type == "full" and self.settings.hwm_store.enabled:
203+
self._reset_transfer_hwm()
204+
201205
df = self.source_handler.read()
202206
self.target_handler.write(df)
203207
finally:
@@ -235,29 +239,43 @@ def get_handler(
235239
temp_dir=temp_dir,
236240
)
237241

238-
def _perform_incremental_transfer(self) -> None:
239-
with HorizonHWMStore(
242+
def _get_hwm_store(self) -> BaseHWMStore:
243+
return HorizonHWMStore(
240244
api_url=self.settings.hwm_store.url,
241245
auth=LoginPassword(login=self.settings.hwm_store.user, password=self.settings.hwm_store.password),
242246
namespace=self.settings.hwm_store.namespace,
243-
).force_create_namespace() as hwm_store:
247+
).force_create_namespace()
244248

249+
def _perform_incremental_transfer(self) -> None:
250+
with self._get_hwm_store() as hwm_store:
245251
with IncrementalStrategy():
246-
if self.source_handler.connection_dto.type in FILE_CONNECTION_TYPES:
247-
hwm_name_suffix = self.source_handler.transfer_dto.directory_path
248-
else:
249-
hwm_name_suffix = self.source_handler.transfer_dto.table_name
250-
hwm_name = "_".join(
251-
[
252-
str(self.source_handler.transfer_dto.id),
253-
self.source_handler.connection_dto.type,
254-
hwm_name_suffix,
255-
],
256-
)
252+
hwm_name = self._get_transfer_hwm_name()
257253
hwm = hwm_store.get_hwm(hwm_name)
258254

259255
self.source_handler.hwm = hwm
260256
self.target_handler.hwm = hwm
261257

262258
df = self.source_handler.read()
263259
self.target_handler.write(df)
260+
261+
def _get_transfer_hwm_name(self) -> str:
262+
if self.source_handler.connection_dto.type in FILE_CONNECTION_TYPES:
263+
hwm_name_suffix = self.source_handler.transfer_dto.directory_path
264+
else:
265+
hwm_name_suffix = self.source_handler.transfer_dto.table_name
266+
hwm_name = "_".join(
267+
[
268+
str(self.source_handler.transfer_dto.id),
269+
self.source_handler.connection_dto.type,
270+
hwm_name_suffix,
271+
],
272+
)
273+
return hwm_name
274+
275+
def _reset_transfer_hwm(self) -> None:
276+
with self._get_hwm_store() as hwm_store:
277+
hwm_name = self._get_transfer_hwm_name()
278+
hwm = hwm_store.get_hwm(hwm_name)
279+
if hwm and hwm.value:
280+
hwm.reset()
281+
hwm_store.set_hwm(hwm)

syncmaster/worker/settings/hwm_store.py

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# SPDX-License-Identifier: Apache-2.0
33
from typing import Literal
44

5-
from pydantic import BaseModel, Field
5+
from pydantic import BaseModel, Field, model_validator
66

77

88
class HWMStoreSettings(BaseModel):
@@ -19,18 +19,37 @@ class HWMStoreSettings(BaseModel):
1919
SYNCMASTER__HWM_STORE__URL=http://horizon:8000
2020
"""
2121

22-
type: Literal["horizon"] = Field(
23-
description=("HWM Store type"),
22+
enabled: bool = Field(
23+
default=False,
24+
description="Enable or disable HWM Store",
2425
)
25-
url: str = Field(
26-
description=("HWM Store URL"),
26+
type: Literal["horizon"] | None = Field(
27+
default=None,
28+
description="HWM Store type",
2729
)
28-
user: str = Field(
29-
description=("HWM Store user"),
30+
url: str | None = Field(
31+
default=None,
32+
description="HWM Store URL",
3033
)
31-
password: str = Field(
32-
description=("HWM Store password"),
34+
user: str | None = Field(
35+
default=None,
36+
description="HWM Store user",
3337
)
34-
namespace: str = Field(
35-
description=("HWM Store namespace"),
38+
password: str | None = Field(
39+
default=None,
40+
description="HWM Store password",
3641
)
42+
namespace: str | None = Field(
43+
default=None,
44+
description="HWM Store namespace",
45+
)
46+
47+
@model_validator(mode="after")
48+
def check_required_fields_if_enabled(self):
49+
if self.enabled:
50+
missing_fields = [
51+
field for field in ["type", "url", "user", "password", "namespace"] if getattr(self, field) is None
52+
]
53+
if missing_fields:
54+
raise ValueError(f"All fields must be set with enabled HWMStore. Missing {', '.join(missing_fields)}")
55+
return self

tests/test_integration/test_run_transfer/connection_fixtures/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@
124124
incremental_strategy_by_file_modified_since,
125125
incremental_strategy_by_file_name,
126126
incremental_strategy_by_number_column,
127+
update_transfer_strategy,
127128
)
128129
from tests.test_integration.test_run_transfer.connection_fixtures.webdav_fixtures import (
129130
prepare_webdav,

tests/test_integration/test_run_transfer/connection_fixtures/strategy_fixtures.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
import pytest
2+
import pytest_asyncio
3+
from sqlalchemy.ext.asyncio import AsyncSession
4+
5+
from syncmaster.db.models import Transfer
26

37

48
@pytest.fixture
@@ -30,3 +34,14 @@ def incremental_strategy_by_number_column():
3034
"type": "incremental",
3135
"increment_by": "NUMBER",
3236
}
37+
38+
39+
@pytest_asyncio.fixture
40+
async def update_transfer_strategy(session: AsyncSession, request: pytest.FixtureRequest):
41+
async def _update_transfer_strategy(transfer: Transfer, strategy_fixture_name: str) -> None:
42+
strategy = request.getfixturevalue(strategy_fixture_name)
43+
transfer.strategy_params = strategy
44+
await session.commit()
45+
return transfer
46+
47+
return _update_transfer_strategy

tests/test_integration/test_run_transfer/test_clickhouse.py

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,56 @@ async def test_run_transfer_postgres_to_clickhouse_with_full_strategy(
135135
assert df.sort("ID").collect() == init_df.sort("ID").collect()
136136

137137

138+
@pytest.mark.parametrize(
139+
"strategy, transformations",
140+
[
141+
(
142+
lf("incremental_strategy_by_number_column"),
143+
[],
144+
),
145+
],
146+
)
147+
async def test_run_transfer_postgres_to_clickhouse_with_different_strategies(
148+
client: AsyncClient,
149+
group_owner: MockUser,
150+
prepare_postgres,
151+
prepare_clickhouse,
152+
init_df: DataFrame,
153+
postgres_to_clickhouse: Transfer,
154+
update_transfer_strategy,
155+
strategy,
156+
transformations,
157+
):
158+
"""
159+
Run three transfers:
160+
161+
1. Incremental transfer
162+
2. Full transfer with HWM reset
163+
3. Incremental transfer to verify that results match the snapshot.
164+
"""
165+
_, fill_with_data = prepare_postgres
166+
clickhouse, _ = prepare_clickhouse
167+
168+
first_transfer_df, second_transfer_df = split_df(df=init_df, ratio=0.6, keep_sorted_by="number")
169+
fill_with_data(first_transfer_df)
170+
await run_transfer_and_verify(client, group_owner, postgres_to_clickhouse.id)
171+
172+
fill_with_data(second_transfer_df)
173+
await update_transfer_strategy(postgres_to_clickhouse, "full_strategy")
174+
await run_transfer_and_verify(client, group_owner, postgres_to_clickhouse.id)
175+
176+
await update_transfer_strategy(postgres_to_clickhouse, "incremental_strategy_by_number_column")
177+
await run_transfer_and_verify(client, group_owner, postgres_to_clickhouse.id)
178+
179+
reader = DBReader(
180+
connection=clickhouse,
181+
table=f"{clickhouse.user}.target_table",
182+
)
183+
df = reader.run()
184+
df, init_df = cast_dataframe_types(df, init_df)
185+
assert df.sort("ID").collect() == init_df.sort("ID").collect()
186+
187+
138188
@pytest.mark.parametrize(
139189
"strategy, transformations",
140190
[
@@ -239,11 +289,11 @@ async def test_run_transfer_clickhouse_to_postgres_with_full_strategy(
239289
prepare_clickhouse,
240290
prepare_postgres,
241291
init_df: DataFrame,
292+
clickhouse_to_postgres: Transfer,
242293
source_type,
243294
strategy,
244295
transformations,
245296
expected_filter,
246-
clickhouse_to_postgres: Transfer,
247297
):
248298
_, fill_with_data = prepare_clickhouse
249299
fill_with_data(init_df)
@@ -315,9 +365,9 @@ async def test_run_transfer_clickhouse_to_postgres_with_incremental_strategy(
315365
prepare_clickhouse,
316366
prepare_postgres,
317367
init_df: DataFrame,
368+
clickhouse_to_postgres: Transfer,
318369
strategy,
319370
transformations,
320-
clickhouse_to_postgres: Transfer,
321371
):
322372
_, fill_with_data = prepare_clickhouse
323373
postgres, _ = prepare_postgres

0 commit comments

Comments
 (0)