Skip to content

Commit f8be9f6

Browse files
authored
feat(ingest/s3): type aware directory sorting (#8089)
1 parent 1185c68 commit f8be9f6

File tree

2 files changed

+81
-2
lines changed

2 files changed

+81
-2
lines changed

metadata-ingestion/src/datahub/ingestion/source/s3/source.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import dataclasses
2+
import functools
23
import logging
34
import os
45
import pathlib
@@ -182,6 +183,29 @@ def get_column_type(
182183
# )
183184

184185

186+
def partitioned_folder_comparator(folder1: str, folder2: str) -> int:
187+
# Try to convert to number and compare if the folder name is a number
188+
try:
189+
# Stripping = from the folder names as it most probably partition name part like year=2021
190+
if "=" in folder1 and "=" in folder2:
191+
if folder1.split("=", 1)[0] == folder2.split("=", 1)[0]:
192+
folder1 = folder1.split("=", 1)[1]
193+
folder2 = folder2.split("=", 1)[1]
194+
195+
num_folder1 = int(folder1)
196+
num_folder2 = int(folder2)
197+
if num_folder1 == num_folder2:
198+
return 0
199+
else:
200+
return 1 if num_folder1 > num_folder2 else -1
201+
except Exception:
202+
# If folder name is not a number then do string comparison
203+
if folder1 == folder2:
204+
return 0
205+
else:
206+
return 1 if folder1 > folder2 else -1
207+
208+
185209
@dataclasses.dataclass
186210
class TableData:
187211
display_name: str
@@ -700,7 +724,12 @@ def get_dir_to_process(self, bucket_name: str, folder: str) -> str:
700724
)
701725
iterator = peekable(iterator)
702726
if iterator:
703-
sorted_dirs = sorted(iterator, reverse=True)
727+
sorted_dirs = sorted(
728+
iterator,
729+
key=functools.cmp_to_key(partitioned_folder_comparator),
730+
reverse=True,
731+
)
732+
704733
return self.get_dir_to_process(
705734
bucket_name=bucket_name, folder=sorted_dirs[0] + "/"
706735
)
@@ -786,7 +815,8 @@ def local_browser(self, path_spec: PathSpec) -> Iterable[Tuple[str, datetime, in
786815
else:
787816
logger.debug(f"Scanning files under local folder: {prefix}")
788817
for root, dirs, files in os.walk(prefix):
789-
dirs.sort()
818+
dirs.sort(key=functools.cmp_to_key(partitioned_folder_comparator))
819+
790820
for file in sorted(files):
791821
full_path = os.path.join(root, file)
792822
yield full_path, datetime.utcfromtimestamp(
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
from datahub.ingestion.source.s3.source import partitioned_folder_comparator
2+
3+
4+
def test_partition_comparator_numeric_folder_name():
5+
folder1 = "3"
6+
folder2 = "12"
7+
assert partitioned_folder_comparator(folder1, folder2) == -1
8+
9+
10+
def test_partition_comparator_numeric_folder_name2():
11+
folder1 = "12"
12+
folder2 = "3"
13+
assert partitioned_folder_comparator(folder1, folder2) == 1
14+
15+
16+
def test_partition_comparator_string_folder():
17+
folder1 = "myfolderB"
18+
folder2 = "myFolderA"
19+
assert partitioned_folder_comparator(folder1, folder2) == 1
20+
21+
22+
def test_partition_comparator_string_same_folder():
23+
folder1 = "myFolderA"
24+
folder2 = "myFolderA"
25+
assert partitioned_folder_comparator(folder1, folder2) == 0
26+
27+
28+
def test_partition_comparator_with_numeric_partition():
29+
folder1 = "year=3"
30+
folder2 = "year=12"
31+
assert partitioned_folder_comparator(folder1, folder2) == -1
32+
33+
34+
def test_partition_comparator_with_padded_numeric_partition():
35+
folder1 = "year=03"
36+
folder2 = "year=12"
37+
assert partitioned_folder_comparator(folder1, folder2) == -1
38+
39+
40+
def test_partition_comparator_with_equal_sign_in_name():
41+
folder1 = "month=12"
42+
folder2 = "year=0"
43+
assert partitioned_folder_comparator(folder1, folder2) == -1
44+
45+
46+
def test_partition_comparator_with_string_partition():
47+
folder1 = "year=year2020"
48+
folder2 = "year=year2021"
49+
assert partitioned_folder_comparator(folder1, folder2) == -1

0 commit comments

Comments
 (0)