-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdrive_manager.py
More file actions
113 lines (95 loc) · 4.35 KB
/
drive_manager.py
File metadata and controls
113 lines (95 loc) · 4.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# drive_manager.py
import io
from googleapiclient.http import MediaIoBaseDownload
from drive_auth import DriveAuthenticator # <--- USAMOS TU CLASE CORREGIDA
class DriveManager:
def __init__(self):
self.auth = DriveAuthenticator()
self.service = None
def authenticate(self):
"""Usa DriveAuthenticator para obtener el servicio."""
# Esto reutiliza la lógica de credenciales incrustadas de drive_auth.py
self.service = self.auth.get_service()
return self.service is not None
def list_folders(self, parent_id='root'):
"""Devuelve carpetas."""
if not self.service:
self.authenticate()
# CASO ESPECIAL: Buscar los Ordenadores
if parent_id == 'computers':
# ... (Lógica igual, solo necesitamos el servicio) ...
query = "mimeType = 'application/vnd.google-apps.folder' and trashed = false"
fields = "nextPageToken, files(id, name, parents, ownedByMe)"
all_folders = []
page_token = None
while True:
try:
results = self.service.files().list(
q=query, pageSize=1000, pageToken=page_token, fields=fields
).execute()
all_folders.extend(results.get('files', []))
page_token = results.get('nextPageToken')
if not page_token: break
except Exception as e:
print(f"⚠️ Error buscando ordenadores: {e}")
break
computer_roots = []
for f in all_folders:
if not f.get('parents') and f.get('name') != 'Sin Nombre' and not f.get('name', '').startswith('.'):
if f.get('ownedByMe', False):
computer_roots.append(f)
computer_roots.sort(key=lambda x: x.get('name', '').lower())
return computer_roots
else:
# Búsqueda normal
query = f"'{parent_id}' in parents and mimeType = 'application/vnd.google-apps.folder' and trashed = false"
fields = "files(id, name)"
results = self.service.files().list(q=query, pageSize=1000, orderBy="name", fields=fields).execute()
all_folders = results.get('files', [])
return [f for f in all_folders if not f.get('name', '').startswith('.')]
def list_images_recursively(self, folder_id):
"""Generador recursivo de imágenes."""
if not self.service:
self.authenticate()
page_token = None
while True:
query = f"'{folder_id}' in parents and mimeType contains 'image/' and trashed = false"
try:
results = self.service.files().list(
q=query, pageSize=1000, pageToken=page_token,
fields="nextPageToken, files(id, name, mimeType, thumbnailLink, webContentLink, createdTime, parents)"
).execute()
except Exception:
break
files = results.get('files', [])
if files:
yield files
page_token = results.get('nextPageToken')
if not page_token: break
# Subcarpetas
page_token = None
while True:
query = f"'{folder_id}' in parents and mimeType = 'application/vnd.google-apps.folder' and trashed = false"
try:
results = self.service.files().list(
q=query, pageSize=1000, pageToken=page_token, fields="nextPageToken, files(id, name)"
).execute()
except Exception:
break
subfolders = results.get('files', [])
if subfolders:
import time
time.sleep(0.1) # Freno de emergencia
for sub in subfolders:
yield from self.list_images_recursively(sub['id'])
page_token = results.get('nextPageToken')
if not page_token: break
def download_file(self, file_id, local_path):
if not self.service:
self.authenticate()
request = self.service.files().get_media(fileId=file_id)
with io.FileIO(local_path, 'wb') as fh:
downloader = MediaIoBaseDownload(fh, request)
done = False
while done is False:
status, done = downloader.next_chunk()