Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 73 additions & 2 deletions gdrive/drive_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

Shared utilities for Google Drive operations including permission checking.
"""
import asyncio
import re
from typing import List, Dict, Any, Optional
from typing import List, Dict, Any, Optional, Tuple


def check_public_link_permission(permissions: List[Dict[str, Any]]) -> bool:
Expand Down Expand Up @@ -107,4 +108,74 @@ def build_drive_list_params(
elif corpora:
list_params["corpora"] = corpora

return list_params
return list_params


SHORTCUT_MIME_TYPE = "application/vnd.google-apps.shortcut"
FOLDER_MIME_TYPE = "application/vnd.google-apps.folder"
BASE_SHORTCUT_FIELDS = "id, mimeType, parents, shortcutDetails(targetId, targetMimeType)"


async def resolve_drive_item(
service,
file_id: str,
*,
extra_fields: Optional[str] = None,
max_depth: int = 5,
) -> Tuple[str, Dict[str, Any]]:
"""
Resolve a Drive shortcut so downstream callers operate on the real item.

Returns the resolved file ID and its metadata. Raises if shortcut targets loop
or exceed max_depth to avoid infinite recursion.
"""
current_id = file_id
depth = 0
fields = BASE_SHORTCUT_FIELDS
if extra_fields:
fields = f"{fields}, {extra_fields}"

while True:
metadata = await asyncio.to_thread(
service.files()
.get(fileId=current_id, fields=fields, supportsAllDrives=True)
.execute
)
mime_type = metadata.get("mimeType")
if mime_type != SHORTCUT_MIME_TYPE:
return current_id, metadata

shortcut_details = metadata.get("shortcutDetails") or {}
Copy link

Copilot AI Nov 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using or {} after .get() can mask None values in the dictionary. If shortcutDetails exists but is explicitly None, this will incorrectly return an empty dict. Use metadata.get('shortcutDetails', {}) instead to only provide a default when the key is missing.

Suggested change
shortcut_details = metadata.get("shortcutDetails") or {}
shortcut_details = metadata.get("shortcutDetails", {})

Copilot uses AI. Check for mistakes.
target_id = shortcut_details.get("targetId")
if not target_id:
raise Exception(f"Shortcut '{current_id}' is missing target details.")

depth += 1
if depth > max_depth:
raise Exception(
f"Shortcut resolution exceeded {max_depth} hops starting from '{file_id}'."
)
current_id = target_id


async def resolve_folder_id(
service,
folder_id: str,
*,
max_depth: int = 5,
) -> str:
"""
Resolve a folder ID that might be a shortcut and ensure the final target is a folder.
"""
resolved_id, metadata = await resolve_drive_item(
service,
folder_id,
extra_fields="mimeType",
max_depth=max_depth,
)
mime_type = metadata.get("mimeType")
if mime_type != FOLDER_MIME_TYPE:
raise Exception(
f"Resolved ID '{resolved_id}' (from '{folder_id}') is not a folder; mimeType={mime_type}."
)
Comment on lines +176 to +180
Copy link

Copilot AI Nov 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mimeType field is already included in BASE_SHORTCUT_FIELDS, so the extra_fields='mimeType' parameter on line 173 is redundant. Remove this parameter to avoid unnecessary duplication.

Copilot uses AI. Check for mistakes.
return resolved_id
71 changes: 51 additions & 20 deletions gdrive/drive_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
from core.utils import extract_office_xml_text, handle_http_errors
from core.server import server
from core.config import get_transport_mode
from gdrive.drive_helpers import DRIVE_QUERY_PATTERNS, build_drive_list_params
from gdrive.drive_helpers import (
DRIVE_QUERY_PATTERNS,
build_drive_list_params,
resolve_drive_item,
resolve_folder_id,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -119,11 +124,12 @@ async def get_drive_file_content(
"""
logger.info(f"[get_drive_file_content] Invoked. File ID: '{file_id}'")

file_metadata = await asyncio.to_thread(
service.files().get(
fileId=file_id, fields="id, name, mimeType, webViewLink", supportsAllDrives=True
).execute
resolved_file_id, file_metadata = await resolve_drive_item(
service,
file_id,
extra_fields="name, webViewLink",
)
file_id = resolved_file_id
mime_type = file_metadata.get("mimeType", "")
file_name = file_metadata.get("name", "Unknown File")
export_mime_type = {
Expand All @@ -133,9 +139,9 @@ async def get_drive_file_content(
}.get(mime_type)

request_obj = (
service.files().export_media(fileId=file_id, mimeType=export_mime_type, supportsAllDrives=True)
service.files().export_media(fileId=file_id, mimeType=export_mime_type)
if export_mime_type
else service.files().get_media(fileId=file_id, supportsAllDrives=True)
else service.files().get_media(fileId=file_id)
)
fh = io.BytesIO()
downloader = MediaIoBaseDownload(fh, request_obj)
Expand Down Expand Up @@ -214,7 +220,8 @@ async def list_drive_items(
"""
logger.info(f"[list_drive_items] Invoked. Email: '{user_google_email}', Folder ID: '{folder_id}'")

final_query = f"'{folder_id}' in parents and trashed=false"
resolved_folder_id = await resolve_folder_id(service, folder_id)
final_query = f"'{resolved_folder_id}' in parents and trashed=false"

list_params = build_drive_list_params(
query=final_query,
Expand Down Expand Up @@ -273,10 +280,11 @@ async def create_drive_file(
raise Exception("You must provide either 'content' or 'fileUrl'.")

file_data = None
resolved_folder_id = await resolve_folder_id(service, folder_id)

file_metadata = {
'name': file_name,
'parents': [folder_id],
'parents': [resolved_folder_id],
'mimeType': mime_type
}

Expand Down Expand Up @@ -449,6 +457,9 @@ async def get_drive_file_permissions(
str: Detailed file metadata including sharing status and URLs.
"""
logger.info(f"[get_drive_file_permissions] Checking file {file_id} for {user_google_email}")

resolved_file_id, _ = await resolve_drive_item(service, file_id)
file_id = resolved_file_id

Copy link

Copilot AI Nov 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trailing whitespace on line 463. Remove the extra spaces after the statement for code cleanliness.

Suggested change

Copilot uses AI. Check for mistakes.
try:
# Get comprehensive file metadata including permissions
Expand Down Expand Up @@ -589,6 +600,8 @@ async def check_drive_file_public_access(

# Check permissions for the first file
file_id = files[0]['id']
resolved_file_id, _ = await resolve_drive_item(service, file_id)
file_id = resolved_file_id

# Get detailed permissions
file_metadata = await asyncio.to_thread(
Expand Down Expand Up @@ -674,14 +687,16 @@ async def update_drive_file(
"""
logger.info(f"[update_drive_file] Updating file {file_id} for {user_google_email}")

# First, get current file info for reference
current_file = await asyncio.to_thread(
service.files().get(
fileId=file_id,
fields="id, name, description, mimeType, parents, starred, trashed, webViewLink",
supportsAllDrives=True
).execute
current_file_fields = (
"name, description, mimeType, parents, starred, trashed, webViewLink, "
"writersCanShare, copyRequiresWriterPermission, properties"
)
resolved_file_id, current_file = await resolve_drive_item(
service,
file_id,
extra_fields=current_file_fields,
)
file_id = resolved_file_id

# Build the update body with only specified fields
update_body = {}
Expand All @@ -702,17 +717,33 @@ async def update_drive_file(
if properties is not None:
update_body['properties'] = properties

async def _resolve_parent_arguments(parent_arg: Optional[str]) -> Optional[str]:
if not parent_arg:
return None
parent_ids = [part.strip() for part in parent_arg.split(",") if part.strip()]
if not parent_ids:
return None

resolved_ids = []
for parent in parent_ids:
resolved_parent = await resolve_folder_id(service, parent)
resolved_ids.append(resolved_parent)
return ",".join(resolved_ids)

resolved_add_parents = await _resolve_parent_arguments(add_parents)
resolved_remove_parents = await _resolve_parent_arguments(remove_parents)

# Build query parameters for parent changes
query_params = {
'fileId': file_id,
'supportsAllDrives': True,
'fields': 'id, name, description, mimeType, parents, starred, trashed, webViewLink, writersCanShare, copyRequiresWriterPermission, properties'
}

if add_parents:
query_params['addParents'] = add_parents
if remove_parents:
query_params['removeParents'] = remove_parents
if resolved_add_parents:
query_params['addParents'] = resolved_add_parents
if resolved_remove_parents:
query_params['removeParents'] = resolved_remove_parents

# Only include body if there are updates
if update_body:
Expand Down