Skip to content

Commit a906e71

Browse files
Copilotkaranh37
andcommitted
Add foldersOnly support to GCS connector for folder-only ingestion
Co-authored-by: karanh37 <33024356+karanh37@users.noreply.github.com>
1 parent dda4112 commit a906e71

File tree

4 files changed

+387
-23
lines changed

4 files changed

+387
-23
lines changed

ingestion/src/metadata/ingestion/source/storage/gcs/metadata.py

Lines changed: 138 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
# See the License for the specific language governing permissions and
1010
# limitations under the License.
1111
"""GCS object store extraction metadata"""
12+
1213
import json
1314
import secrets
1415
import traceback
@@ -261,9 +262,11 @@ def _generate_container_details(
261262
return GCSContainerDetails(
262263
name=metadata_entry.dataPath.strip(KEY_SEPARATOR),
263264
prefix=prefix,
264-
creation_date=bucket_response.creation_date.isoformat()
265-
if bucket_response.creation_date
266-
else None,
265+
creation_date=(
266+
bucket_response.creation_date.isoformat()
267+
if bucket_response.creation_date
268+
else None
269+
),
267270
number_of_objects=self._fetch_metric(
268271
bucket=bucket_response, metric=GCSMetric.NUMBER_OF_OBJECTS
269272
),
@@ -284,6 +287,53 @@ def _generate_container_details(
284287
)
285288
return None
286289

290+
def _generate_structured_containers_by_depth_folders_only(
291+
self,
292+
bucket_response: GCSBucketResponse,
293+
metadata_entry: MetadataEntry,
294+
parent: Optional[EntityReference] = None,
295+
) -> Iterable[GCSContainerDetails]:
296+
"""
297+
Generate folder containers at specified depth without including files.
298+
"""
299+
try:
300+
prefix = self._get_sample_file_prefix(metadata_entry=metadata_entry)
301+
if prefix:
302+
client = self.gcs_clients.storage_client.clients[
303+
bucket_response.project_id
304+
]
305+
response = client.list_blobs(
306+
bucket_response.name,
307+
prefix=prefix,
308+
max_results=1000,
309+
)
310+
total_depth = metadata_entry.depth + len(prefix[:-1].split("/"))
311+
candidate_keys = {
312+
"/".join(entry.name.split("/")[:total_depth]) + "/"
313+
for entry in response
314+
if entry and entry.name and len(entry.name.split("/")) > total_depth
315+
}
316+
for key in candidate_keys:
317+
prefix_path = key.strip(KEY_SEPARATOR)
318+
yield GCSContainerDetails(
319+
name=prefix_path.split(KEY_SEPARATOR)[-1],
320+
prefix=KEY_SEPARATOR + prefix_path,
321+
file_formats=[],
322+
data_model=None,
323+
parent=parent,
324+
fullPath=self._get_full_path(bucket_response.name, prefix_path),
325+
sourceUrl=self._get_object_source_url(
326+
bucket=bucket_response,
327+
prefix=prefix_path,
328+
is_file=False,
329+
),
330+
)
331+
except Exception as err:
332+
logger.warning(
333+
f"Error while generating folder-only containers by depth for {metadata_entry.dataPath} - {err}"
334+
)
335+
logger.debug(traceback.format_exc())
336+
287337
def _generate_structured_containers_by_depth(
288338
self,
289339
bucket_response: GCSBucketResponse,
@@ -311,12 +361,12 @@ def _generate_structured_containers_by_depth(
311361
for key in candidate_keys:
312362
metadata_entry_copy = deepcopy(metadata_entry)
313363
metadata_entry_copy.dataPath = key.strip(KEY_SEPARATOR)
314-
structured_container: Optional[
315-
GCSContainerDetails
316-
] = self._generate_container_details(
317-
bucket_response=bucket_response,
318-
metadata_entry=metadata_entry_copy,
319-
parent=parent,
364+
structured_container: Optional[GCSContainerDetails] = (
365+
self._generate_container_details(
366+
bucket_response=bucket_response,
367+
metadata_entry=metadata_entry_copy,
368+
parent=parent,
369+
)
320370
)
321371
if structured_container:
322372
yield structured_container
@@ -337,13 +387,33 @@ def _generate_structured_containers(
337387
f"Extracting metadata from path {metadata_entry.dataPath.strip(KEY_SEPARATOR)} "
338388
f"and generating structured container"
339389
)
340-
if metadata_entry.depth == 0:
341-
structured_container: Optional[
342-
GCSContainerDetails
343-
] = self._generate_container_details(
344-
bucket_response=bucket_response,
345-
metadata_entry=metadata_entry,
346-
parent=parent,
390+
if metadata_entry.foldersOnly:
391+
if metadata_entry.depth > 0:
392+
logger.info(
393+
f"Extracting folder hierarchy only from path {metadata_entry.dataPath.strip(KEY_SEPARATOR)} "
394+
f"with depth {metadata_entry.depth}"
395+
)
396+
yield from self._generate_structured_containers_by_depth_folders_only(
397+
bucket_response=bucket_response,
398+
metadata_entry=metadata_entry,
399+
parent=parent,
400+
)
401+
else:
402+
logger.info(
403+
f"Extracting folder hierarchy only from path {metadata_entry.dataPath.strip(KEY_SEPARATOR)}"
404+
)
405+
yield from self._yield_folder_hierarchy(
406+
bucket_response=bucket_response,
407+
metadata_entry=metadata_entry,
408+
parent=parent,
409+
)
410+
elif metadata_entry.depth == 0:
411+
structured_container: Optional[GCSContainerDetails] = (
412+
self._generate_container_details(
413+
bucket_response=bucket_response,
414+
metadata_entry=metadata_entry,
415+
parent=parent,
416+
)
347417
)
348418
if structured_container:
349419
yield structured_container
@@ -433,9 +503,11 @@ def _generate_unstructured_container(
433503
return GCSContainerDetails(
434504
name=bucket_response.name,
435505
prefix=KEY_SEPARATOR,
436-
creation_date=bucket_response.creation_date.isoformat()
437-
if bucket_response.creation_date
438-
else None,
506+
creation_date=(
507+
bucket_response.creation_date.isoformat()
508+
if bucket_response.creation_date
509+
else None
510+
),
439511
number_of_objects=self._fetch_metric(
440512
bucket=bucket_response, metric=GCSMetric.NUMBER_OF_OBJECTS
441513
),
@@ -586,6 +658,38 @@ def _yield_parents_of_unstructured_container(
586658
)
587659
sub_parent = EntityReference(id=container_entity.id.root, type="container")
588660

661+
def _yield_folder_hierarchy(
662+
self,
663+
bucket_response: GCSBucketResponse,
664+
metadata_entry: MetadataEntry,
665+
parent: Optional[EntityReference] = None,
666+
):
667+
"""
668+
Yield only the folder hierarchy without listing individual files.
669+
This is used when foldersOnly=True to visualize storage structure.
670+
"""
671+
bucket_name = bucket_response.name
672+
client = self.gcs_clients.storage_client.clients[bucket_response.project_id]
673+
response = client.list_blobs(
674+
bucket_name,
675+
prefix=metadata_entry.dataPath,
676+
max_results=1000,
677+
)
678+
679+
folder_paths = set()
680+
for entry in response:
681+
if entry and entry.name:
682+
path_parts = entry.name.strip(KEY_SEPARATOR).split(KEY_SEPARATOR)
683+
for i in range(1, len(path_parts)):
684+
folder_path = KEY_SEPARATOR.join(path_parts[:i])
685+
folder_paths.add(folder_path)
686+
687+
for folder_path in sorted(folder_paths):
688+
list_of_parent = folder_path.split(KEY_SEPARATOR)
689+
yield from self._yield_parents_of_unstructured_container(
690+
bucket_name, bucket_response.project_id, list_of_parent, parent
691+
)
692+
589693
def _yield_nested_unstructured_containers(
590694
self,
591695
bucket_response: GCSBucketResponse,
@@ -634,9 +738,11 @@ def _yield_nested_unstructured_containers(
634738
size = self.get_size(bucket_name, bucket_response.project_id, key)
635739
yield GCSContainerDetails(
636740
name=list_of_parent[-1],
637-
prefix=KEY_SEPARATOR + parent_relative_path
638-
if parent_relative_path
639-
else KEY_SEPARATOR,
741+
prefix=(
742+
KEY_SEPARATOR + parent_relative_path
743+
if parent_relative_path
744+
else KEY_SEPARATOR
745+
),
640746
file_formats=[],
641747
size=size,
642748
container_fqn=container_fqn,
@@ -660,7 +766,16 @@ def _generate_unstructured_containers(
660766
for metadata_entry in entries:
661767
if metadata_entry.structureFormat:
662768
continue
663-
if metadata_entry.unstructuredFormats:
769+
if metadata_entry.foldersOnly:
770+
logger.info(
771+
f"Extracting folder hierarchy only from path {metadata_entry.dataPath.strip(KEY_SEPARATOR)}"
772+
)
773+
yield from self._yield_folder_hierarchy(
774+
bucket_response=bucket_response,
775+
metadata_entry=metadata_entry,
776+
parent=parent,
777+
)
778+
elif metadata_entry.unstructuredFormats:
664779
yield from self._yield_nested_unstructured_containers(
665780
bucket_response=bucket_response,
666781
metadata_entry=metadata_entry,

0 commit comments

Comments
 (0)