-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauth.py
More file actions
127 lines (109 loc) · 5.01 KB
/
auth.py
File metadata and controls
127 lines (109 loc) · 5.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
"""
Authentication module for Google Drive API.
Supports both service account and OAuth client authentication.
"""
import os
import json
from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
import logging
# Set up logging
logger = logging.getLogger(__name__)
# Define the scopes required for Google Drive API
SCOPES = ['https://www.googleapis.com/auth/drive']
class DriveAuthenticator:
"""
Class to handle authentication with Google Drive API.
Supports both service account and OAuth client authentication.
"""
def __init__(self, credentials_type='service_account',
service_account_file='service_account.json',
client_secrets_file='google_desktop_client1.json',
token_file='token.json'):
"""
Initialize the authenticator with the specified credentials type.
Args:
credentials_type (str): Type of credentials to use ('service_account' or 'desktop_client')
service_account_file (str): Path to the service account JSON file
client_secrets_file (str): Path to the client secrets JSON file
token_file (str): Path to the token file for OAuth client
"""
self.credentials_type = credentials_type
self.service_account_file = service_account_file
self.client_secrets_file = client_secrets_file
self.token_file = token_file
self.credentials = None
self.service = None
def authenticate(self):
"""
Authenticate with Google Drive API using the specified credentials type.
Returns:
googleapiclient.discovery.Resource: Google Drive API service
"""
if self.credentials_type == 'service_account':
return self._authenticate_service_account()
elif self.credentials_type == 'desktop_client':
return self._authenticate_desktop_client()
else:
raise ValueError(f"Unsupported credentials type: {self.credentials_type}")
def _authenticate_service_account(self):
"""
Authenticate with Google Drive API using a service account.
Returns:
googleapiclient.discovery.Resource: Google Drive API service
"""
try:
logger.info(f"Authenticating with service account: {self.service_account_file}")
self.credentials = service_account.Credentials.from_service_account_file(
self.service_account_file, scopes=SCOPES)
self.service = build('drive', 'v3', credentials=self.credentials)
logger.info("Service account authentication successful")
return self.service
except Exception as e:
logger.error(f"Service account authentication failed: {str(e)}")
raise
def _authenticate_desktop_client(self):
"""
Authenticate with Google Drive API using an OAuth client.
Returns:
googleapiclient.discovery.Resource: Google Drive API service
"""
try:
logger.info(f"Authenticating with desktop client: {self.client_secrets_file}")
self.credentials = None
# Check if token file exists and is valid
if os.path.exists(self.token_file):
with open(self.token_file, 'r') as token:
self.credentials = Credentials.from_authorized_user_info(
json.load(token), SCOPES)
# If credentials don't exist or are invalid, get new ones
if not self.credentials or not self.credentials.valid:
if self.credentials and self.credentials.expired and self.credentials.refresh_token:
logger.info("Refreshing expired token")
self.credentials.refresh(Request())
else:
logger.info("Getting new token")
flow = InstalledAppFlow.from_client_secrets_file(
self.client_secrets_file, SCOPES)
self.credentials = flow.run_local_server(port=0)
# Save the credentials for the next run
with open(self.token_file, 'w') as token:
token.write(self.credentials.to_json())
self.service = build('drive', 'v3', credentials=self.credentials)
logger.info("Desktop client authentication successful")
return self.service
except Exception as e:
logger.error(f"Desktop client authentication failed: {str(e)}")
raise
def get_service(self):
"""
Get the Google Drive API service.
Returns:
googleapiclient.discovery.Resource: Google Drive API service
"""
if self.service is None:
self.authenticate()
return self.service