-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
113 lines (90 loc) · 3.51 KB
/
main.py
File metadata and controls
113 lines (90 loc) · 3.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import os
import json
import airbyte as ab
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from pydantic import BaseModel, Field
from datetime import datetime
import pandas as pd
# If modifying these scopes, delete the file token.json.
SCOPES = ["https://www.googleapis.com/auth/drive"]
def get_credentials():
"""Authenticate user and return credentials."""
creds = None
# Check if token.json exists and load credentials
if os.path.exists("token.json"):
creds = Credentials.from_authorized_user_file("token.json", SCOPES)
# If credentials are not valid, refresh or request new ones
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file("credentials.json", SCOPES)
creds = flow.run_local_server(port=0)
# Save new credentials
with open("token.json", "w") as token:
token.write(creds.to_json())
return creds
def load_credentials_from_token():
"""Extract client_id, client_secret, and refresh_token from token.json."""
if not os.path.exists("token.json"):
raise FileNotFoundError("token.json not found. Run authentication first.")
with open("token.json", "r") as token_file:
data = json.load(token_file)
return {
"auth_type": "Client",
"client_id": data.get("client_id"),
"client_secret": data.get("client_secret"),
"refresh_token": data.get("refresh_token"),
}
def sync_google_drive():
"""Use Airbyte to sync Google Drive folder."""
try:
credentials = load_credentials_from_token()
folder_url = "https://drive.google.com/drive/folders/10DrawuhFx85xmr8v8vVB6PPOTVWPu7nI" # Extracted from folder URL
source = ab.get_source(
"source-google-drive",
install_if_missing=True,
config={
"folder_url": folder_url,
"credentials": credentials,
"streams": [
{
"name": "Csv_data",
"format": {
"filetype": "csv"
},
"globs": ["**/*.csv"],
},
{
"name":"Unstructured_data",
"format":{
"filetype":"unstructured"
}
}
]
}
)
source.check()
source.select_streams("Unstructured_data") # Select all streams from the Google Drive source
read_result = source.read() # Read the data from the selected streams
# documents_list = []
# Convert the read data into document objects
for stream_name, cached_dataset in read_result.items():
print(f"Stream Name: {stream_name}")
print(cached_dataset)
df = cached_dataset.to_pandas()
print(df.to_string())
except HttpError as error:
print(f"An error occurred: {error}")
except FileNotFoundError as e:
print(e)
def main():
"""Authenticate and sync Google Drive files using Airbyte."""
get_credentials() # Ensure authentication
sync_google_drive() # Sync with Airbyte
if __name__ == "__main__":
main()