-
Notifications
You must be signed in to change notification settings - Fork 264
Issue #269 - supporting shortcuts #272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
55dfba3
cbeabb3
d470fbd
7936097
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,8 +3,9 @@ | |
|
|
||
| Shared utilities for Google Drive operations including permission checking. | ||
| """ | ||
| import asyncio | ||
| import re | ||
| from typing import List, Dict, Any, Optional | ||
| from typing import List, Dict, Any, Optional, Tuple | ||
|
|
||
|
|
||
| def check_public_link_permission(permissions: List[Dict[str, Any]]) -> bool: | ||
|
|
@@ -107,4 +108,74 @@ def build_drive_list_params( | |
| elif corpora: | ||
| list_params["corpora"] = corpora | ||
|
|
||
| return list_params | ||
| return list_params | ||
|
|
||
|
|
||
| SHORTCUT_MIME_TYPE = "application/vnd.google-apps.shortcut" | ||
| FOLDER_MIME_TYPE = "application/vnd.google-apps.folder" | ||
| BASE_SHORTCUT_FIELDS = "id, mimeType, parents, shortcutDetails(targetId, targetMimeType)" | ||
|
|
||
|
|
||
| async def resolve_drive_item( | ||
| service, | ||
| file_id: str, | ||
| *, | ||
| extra_fields: Optional[str] = None, | ||
| max_depth: int = 5, | ||
| ) -> Tuple[str, Dict[str, Any]]: | ||
| """ | ||
| Resolve a Drive shortcut so downstream callers operate on the real item. | ||
|
|
||
| Returns the resolved file ID and its metadata. Raises if shortcut targets loop | ||
| or exceed max_depth to avoid infinite recursion. | ||
| """ | ||
| current_id = file_id | ||
| depth = 0 | ||
| fields = BASE_SHORTCUT_FIELDS | ||
| if extra_fields: | ||
| fields = f"{fields}, {extra_fields}" | ||
|
|
||
| while True: | ||
| metadata = await asyncio.to_thread( | ||
| service.files() | ||
| .get(fileId=current_id, fields=fields, supportsAllDrives=True) | ||
| .execute | ||
| ) | ||
| mime_type = metadata.get("mimeType") | ||
| if mime_type != SHORTCUT_MIME_TYPE: | ||
| return current_id, metadata | ||
|
|
||
| shortcut_details = metadata.get("shortcutDetails") or {} | ||
| target_id = shortcut_details.get("targetId") | ||
| if not target_id: | ||
| raise Exception(f"Shortcut '{current_id}' is missing target details.") | ||
|
|
||
| depth += 1 | ||
| if depth > max_depth: | ||
| raise Exception( | ||
| f"Shortcut resolution exceeded {max_depth} hops starting from '{file_id}'." | ||
| ) | ||
| current_id = target_id | ||
|
|
||
|
|
||
| async def resolve_folder_id( | ||
| service, | ||
| folder_id: str, | ||
| *, | ||
| max_depth: int = 5, | ||
| ) -> str: | ||
| """ | ||
| Resolve a folder ID that might be a shortcut and ensure the final target is a folder. | ||
| """ | ||
| resolved_id, metadata = await resolve_drive_item( | ||
| service, | ||
| folder_id, | ||
| extra_fields="mimeType", | ||
| max_depth=max_depth, | ||
| ) | ||
| mime_type = metadata.get("mimeType") | ||
| if mime_type != FOLDER_MIME_TYPE: | ||
| raise Exception( | ||
| f"Resolved ID '{resolved_id}' (from '{folder_id}') is not a folder; mimeType={mime_type}." | ||
| ) | ||
|
Comment on lines
+176
to
+180
|
||
| return resolved_id | ||
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -20,7 +20,12 @@ | |||
| from core.utils import extract_office_xml_text, handle_http_errors | ||||
| from core.server import server | ||||
| from core.config import get_transport_mode | ||||
| from gdrive.drive_helpers import DRIVE_QUERY_PATTERNS, build_drive_list_params | ||||
| from gdrive.drive_helpers import ( | ||||
| DRIVE_QUERY_PATTERNS, | ||||
| build_drive_list_params, | ||||
| resolve_drive_item, | ||||
| resolve_folder_id, | ||||
| ) | ||||
|
|
||||
| logger = logging.getLogger(__name__) | ||||
|
|
||||
|
|
@@ -119,11 +124,12 @@ async def get_drive_file_content( | |||
| """ | ||||
| logger.info(f"[get_drive_file_content] Invoked. File ID: '{file_id}'") | ||||
|
|
||||
| file_metadata = await asyncio.to_thread( | ||||
| service.files().get( | ||||
| fileId=file_id, fields="id, name, mimeType, webViewLink", supportsAllDrives=True | ||||
| ).execute | ||||
| resolved_file_id, file_metadata = await resolve_drive_item( | ||||
| service, | ||||
| file_id, | ||||
| extra_fields="name, webViewLink", | ||||
| ) | ||||
| file_id = resolved_file_id | ||||
| mime_type = file_metadata.get("mimeType", "") | ||||
| file_name = file_metadata.get("name", "Unknown File") | ||||
| export_mime_type = { | ||||
|
|
@@ -214,7 +220,8 @@ async def list_drive_items( | |||
| """ | ||||
| logger.info(f"[list_drive_items] Invoked. Email: '{user_google_email}', Folder ID: '{folder_id}'") | ||||
|
|
||||
| final_query = f"'{folder_id}' in parents and trashed=false" | ||||
| resolved_folder_id = await resolve_folder_id(service, folder_id) | ||||
| final_query = f"'{resolved_folder_id}' in parents and trashed=false" | ||||
|
|
||||
| list_params = build_drive_list_params( | ||||
| query=final_query, | ||||
|
|
@@ -273,10 +280,11 @@ async def create_drive_file( | |||
| raise Exception("You must provide either 'content' or 'fileUrl'.") | ||||
|
|
||||
| file_data = None | ||||
| resolved_folder_id = await resolve_folder_id(service, folder_id) | ||||
|
|
||||
| file_metadata = { | ||||
| 'name': file_name, | ||||
| 'parents': [folder_id], | ||||
| 'parents': [resolved_folder_id], | ||||
| 'mimeType': mime_type | ||||
| } | ||||
|
|
||||
|
|
@@ -449,6 +457,9 @@ async def get_drive_file_permissions( | |||
| str: Detailed file metadata including sharing status and URLs. | ||||
| """ | ||||
| logger.info(f"[get_drive_file_permissions] Checking file {file_id} for {user_google_email}") | ||||
|
|
||||
| resolved_file_id, _ = await resolve_drive_item(service, file_id) | ||||
| file_id = resolved_file_id | ||||
|
|
||||
|
||||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using
or {}after.get()can maskNonevalues in the dictionary. IfshortcutDetailsexists but is explicitlyNone, this will incorrectly return an empty dict. Usemetadata.get('shortcutDetails', {})instead to only provide a default when the key is missing.