Skip to content

Commit 21d77e9

Browse files
committed
Fix S3 and HDFSt tests
1 parent 02c7557 commit 21d77e9

File tree

4 files changed

+13
-13
lines changed

4 files changed

+13
-13
lines changed

syncmaster/worker/handlers/file/base.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,13 @@ class FileHandler(Handler):
5454
def _rename_files(self, tmp_path: str) -> None:
5555
files = self.file_connection.list_dir(tmp_path)
5656

57-
for index, file_name in enumerate(files):
57+
for index, old_path in enumerate(files):
5858
extension = self._get_file_extension()
59-
new_name = self._get_file_name(str(index), extension)
60-
old_path = os.path.join(tmp_path, file_name)
59+
new_name = self._get_file_name(index, extension)
6160
new_path = os.path.join(tmp_path, new_name)
6261
self.file_connection.rename_file(old_path, new_path)
6362

64-
def _get_file_name(self, index: str, extension: str) -> str:
63+
def _get_file_name(self, index: int, extension: str) -> str:
6564
return self.transfer_dto.file_name_template.format(
6665
index=index,
6766
extension=extension,

tests/test_integration/test_run_transfer/test_hdfs.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ async def test_run_transfer_postgres_to_hdfs_with_full_strategy(
248248

249249
await run_transfer_and_verify(client, group_owner, postgres_to_hdfs.id)
250250

251-
files = [os.fspath(file) for file in hdfs_file_connection.list_dir(target_path) if file.is_file()]
251+
files = [file for file in hdfs_file_connection.list_dir(target_path) if file.is_file()]
252252
verify_file_name_template(files, expected_extension)
253253

254254
spark.catalog.clearCache()
@@ -304,7 +304,7 @@ async def test_run_transfer_postgres_to_hdfs_with_incremental_strategy(
304304
fill_with_data(first_transfer_df)
305305
await run_transfer_and_verify(client, group_owner, postgres_to_hdfs.id)
306306

307-
files = [os.fspath(file) for file in hdfs_file_connection.list_dir(target_path) if file.is_file()]
307+
files = [file for file in hdfs_file_connection.list_dir(target_path) if file.is_file()]
308308
verify_file_name_template(files, expected_extension)
309309

310310
spark.catalog.clearCache()
@@ -323,7 +323,7 @@ async def test_run_transfer_postgres_to_hdfs_with_incremental_strategy(
323323
fill_with_data(second_transfer_df)
324324
await run_transfer_and_verify(client, group_owner, postgres_to_hdfs.id)
325325

326-
files = [os.fspath(file) for file in hdfs_file_connection.list_dir(target_path) if file.is_file()]
326+
files = [file for file in hdfs_file_connection.list_dir(target_path) if file.is_file()]
327327
verify_file_name_template(files, expected_extension)
328328

329329
spark.catalog.clearCache()

tests/test_integration/test_run_transfer/test_s3.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ async def test_run_transfer_postgres_to_s3_with_full_strategy(
283283

284284
await run_transfer_and_verify(client, group_owner, postgres_to_s3.id, target_auth="s3")
285285

286-
files = [os.fspath(file) for file in s3_file_connection.list_dir(target_path)]
286+
files = [file for file in s3_file_connection.list_dir(target_path)]
287287
verify_file_name_template(files, expected_extension)
288288

289289
reader = FileDFReader(
@@ -339,7 +339,7 @@ async def test_run_transfer_postgres_to_s3_with_incremental_strategy(
339339
fill_with_data(first_transfer_df)
340340
await run_transfer_and_verify(client, group_owner, postgres_to_s3.id, target_auth="s3")
341341

342-
files = [os.fspath(file) for file in s3_file_connection.list_dir(target_path)]
342+
files = [file for file in s3_file_connection.list_dir(target_path)]
343343
verify_file_name_template(files, expected_extension)
344344

345345
reader = FileDFReader(
@@ -357,7 +357,7 @@ async def test_run_transfer_postgres_to_s3_with_incremental_strategy(
357357
fill_with_data(second_transfer_df)
358358
await run_transfer_and_verify(client, group_owner, postgres_to_s3.id, target_auth="s3")
359359

360-
files = [os.fspath(file) for file in s3_file_connection.list_dir(target_path)]
360+
files = [file for file in s3_file_connection.list_dir(target_path)]
361361
verify_file_name_template(files, expected_extension)
362362

363363
df_with_increment = reader.run()

tests/utils.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from httpx import AsyncClient
1313
from onetl.connection import FileConnection
1414
from onetl.file import FileDownloader, FileUploader
15+
from onetl.impl import RemoteFile
1516
from pyspark.sql import DataFrame
1617
from pyspark.sql.functions import (
1718
col,
@@ -240,9 +241,9 @@ def add_increment_to_files_and_upload(file_connection: FileConnection, remote_pa
240241
uploader.run()
241242

242243

243-
def verify_file_name_template(files: list, expected_extension: str) -> None:
244-
for file_name in files:
245-
run_created_at, index_and_extension = file_name.split("-")
244+
def verify_file_name_template(files: list[RemoteFile], expected_extension: str) -> None:
245+
for file in files:
246+
run_created_at, index_and_extension = file.name.split("-")
246247
assert len(run_created_at.split("_")) == 6, f"Got wrong {run_created_at=}"
247248
assert index_and_extension.split(".", 1)[1] == expected_extension
248249

0 commit comments

Comments
 (0)