Skip to content

Commit bd9be8d

Browse files
committed
drive: Enhance the Changes API with the Drive Activity API
Signed-off-by: Anagh Kumar Baranwal <6824881+darthShadow@users.noreply.github.com>
1 parent 301dc58 commit bd9be8d

File tree

5 files changed

+469
-127
lines changed

5 files changed

+469
-127
lines changed

backend/drive/drive.go

Lines changed: 45 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import (
4949
"golang.org/x/oauth2/google"
5050
drive_v2 "google.golang.org/api/drive/v2"
5151
drive "google.golang.org/api/drive/v3"
52+
"google.golang.org/api/driveactivity/v2"
5253
"google.golang.org/api/googleapi"
5354
"google.golang.org/api/option"
5455
)
@@ -194,6 +195,8 @@ func driveScopes(scopesString string) (scopes []string) {
194195
scope = strings.TrimSpace(scope)
195196
scopes = append(scopes, scopePrefix+scope)
196197
}
198+
// The minimal read-only scope for activities should be sufficient
199+
scopes = append(scopes, scopePrefix+"drive.activity.readonly")
197200
return scopes
198201
}
199202

@@ -822,21 +825,22 @@ type Options struct {
822825

823826
// Fs represents a remote drive server
824827
type Fs struct {
825-
name string // name of this remote
826-
root string // the path we are working on
827-
opt Options // parsed options
828-
ci *fs.ConfigInfo // global config
829-
features *fs.Features // optional features
830-
svc *drive.Service // the connection to the drive server
831-
v2Svc *drive_v2.Service // used to create download links for the v2 api
832-
client *http.Client // authorized client
833-
rootFolderID string // the id of the root folder
834-
dirCache *dircache.DirCache // Map of directory path to directory id
835-
lastQuery string // Last query string to check in unit tests
836-
pacer *fs.Pacer // To pace the API calls
837-
exportExtensions []string // preferred extensions to download docs
838-
importMimeTypes []string // MIME types to convert to docs
839-
isTeamDrive bool // true if this is a team drive
828+
name string // name of this remote
829+
root string // the path we are working on
830+
opt Options // parsed options
831+
ci *fs.ConfigInfo // global config
832+
features *fs.Features // optional features
833+
svc *drive.Service // the connection to the drive server
834+
v2Svc *drive_v2.Service // used to create download links for the v2 api
835+
activitySvc *driveactivity.Service // used for fetching the drive activities
836+
client *http.Client // authorized client
837+
rootFolderID string // the id of the root folder
838+
dirCache *dircache.DirCache // Map of directory path to directory id
839+
lastQuery string // Last query string to check in unit tests
840+
pacer *fs.Pacer // To pace the API calls
841+
exportExtensions []string // preferred extensions to download docs
842+
importMimeTypes []string // MIME types to convert to docs
843+
isTeamDrive bool // true if this is a team drive
840844
m configmap.Mapper
841845
grouping int32 // number of IDs to search at once in ListR - read with atomic
842846
listRmu *sync.Mutex // protects listRempties
@@ -919,8 +923,8 @@ func (f *Fs) shouldRetry(ctx context.Context, err error) (bool, error) {
919923
}
920924
switch gerr := err.(type) {
921925
case *googleapi.Error:
922-
if gerr.Code >= 500 && gerr.Code < 600 {
923-
// All 5xx errors should be retried
926+
if gerr.Code == 429 || (gerr.Code >= 500 && gerr.Code < 600) {
927+
// All 429 or 5xx errors should be retried
924928
return true, err
925929
}
926930
if len(gerr.Errors) > 0 {
@@ -1414,6 +1418,11 @@ func newFs(ctx context.Context, name, path string, m configmap.Mapper) (*Fs, err
14141418
return nil, fmt.Errorf("couldn't create Drive client: %w", err)
14151419
}
14161420

1421+
f.activitySvc, err = driveactivity.NewService(context.Background(), option.WithHTTPClient(f.client))
1422+
if err != nil {
1423+
return nil, fmt.Errorf("couldn't create Drive Activity client: %w", err)
1424+
}
1425+
14171426
if f.opt.V2DownloadMinSize >= 0 {
14181427
f.v2Svc, err = drive_v2.NewService(context.Background(), option.WithHTTPClient(f.client))
14191428
if err != nil {
@@ -3153,10 +3162,12 @@ func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryT
31533162
// get the StartPageToken early so all changes from now on get processed
31543163
startPageToken, err := f.changeNotifyStartPageToken(ctx)
31553164
if err != nil {
3156-
fs.Infof(f, "Failed to get StartPageToken: %s", err)
3165+
fs.Infof(f, "Failed to get changeNotify StartPageToken: %v", err)
31573166
}
3158-
var ticker *time.Ticker
3159-
var tickerC <-chan time.Time
3167+
var (
3168+
ticker *time.Ticker
3169+
tickerC <-chan time.Time
3170+
)
31603171
for {
31613172
select {
31623173
case pollInterval, ok := <-pollIntervalChan:
@@ -3178,123 +3189,21 @@ func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryT
31783189
if startPageToken == "" {
31793190
startPageToken, err = f.changeNotifyStartPageToken(ctx)
31803191
if err != nil {
3181-
fs.Infof(f, "Failed to get StartPageToken: %s", err)
3192+
fs.Infof(f, "Failed to get changeNotify StartPageToken: %v", err)
31823193
continue
31833194
}
31843195
}
31853196
fs.Debugf(f, "Checking for changes on remote")
31863197
startPageToken, err = f.changeNotifyRunner(ctx, notifyFunc, startPageToken)
31873198
if err != nil {
3188-
fs.Infof(f, "Change notify listener failure: %s", err)
3199+
fs.Errorf(f, "Change notify listener failure: %v", err)
31893200
}
3201+
// f.changeActivityRunner(ctx, fromTime, toTime, notifyFunc)
31903202
}
31913203
}
31923204
}()
31933205
}
31943206

3195-
func (f *Fs) changeNotifyStartPageToken(ctx context.Context) (pageToken string, err error) {
3196-
var startPageToken *drive.StartPageToken
3197-
err = f.pacer.Call(func() (bool, error) {
3198-
changes := f.svc.Changes.GetStartPageToken().SupportsAllDrives(true)
3199-
if f.isTeamDrive {
3200-
changes.DriveId(f.opt.TeamDriveID)
3201-
}
3202-
startPageToken, err = changes.Context(ctx).Do()
3203-
return f.shouldRetry(ctx, err)
3204-
})
3205-
if err != nil {
3206-
return
3207-
}
3208-
return startPageToken.StartPageToken, nil
3209-
}
3210-
3211-
func (f *Fs) changeNotifyRunner(ctx context.Context, notifyFunc func(string, fs.EntryType), startPageToken string) (newStartPageToken string, err error) {
3212-
pageToken := startPageToken
3213-
for {
3214-
var changeList *drive.ChangeList
3215-
3216-
err = f.pacer.Call(func() (bool, error) {
3217-
changesCall := f.svc.Changes.List(pageToken).
3218-
Fields("nextPageToken,newStartPageToken,changes(fileId,file(name,parents,mimeType))")
3219-
if f.opt.ListChunk > 0 {
3220-
changesCall.PageSize(f.opt.ListChunk)
3221-
}
3222-
changesCall.SupportsAllDrives(true)
3223-
changesCall.IncludeItemsFromAllDrives(true)
3224-
if f.isTeamDrive {
3225-
changesCall.DriveId(f.opt.TeamDriveID)
3226-
}
3227-
// If using appDataFolder then need to add Spaces
3228-
if f.rootFolderID == "appDataFolder" {
3229-
changesCall.Spaces("appDataFolder")
3230-
}
3231-
changesCall.RestrictToMyDrive(!f.opt.SharedWithMe)
3232-
changeList, err = changesCall.Context(ctx).Do()
3233-
return f.shouldRetry(ctx, err)
3234-
})
3235-
if err != nil {
3236-
return
3237-
}
3238-
3239-
type entryType struct {
3240-
path string
3241-
entryType fs.EntryType
3242-
}
3243-
var pathsToClear []entryType
3244-
for _, change := range changeList.Changes {
3245-
// find the previous path
3246-
if path, ok := f.dirCache.GetInv(change.FileId); ok {
3247-
if change.File != nil && change.File.MimeType != driveFolderType {
3248-
pathsToClear = append(pathsToClear, entryType{path: path, entryType: fs.EntryObject})
3249-
} else {
3250-
pathsToClear = append(pathsToClear, entryType{path: path, entryType: fs.EntryDirectory})
3251-
}
3252-
}
3253-
3254-
// find the new path
3255-
if change.File != nil {
3256-
change.File.Name = f.opt.Enc.ToStandardName(change.File.Name)
3257-
changeType := fs.EntryDirectory
3258-
if change.File.MimeType != driveFolderType {
3259-
changeType = fs.EntryObject
3260-
}
3261-
3262-
// translate the parent dir of this object
3263-
if len(change.File.Parents) > 0 {
3264-
for _, parent := range change.File.Parents {
3265-
if parentPath, ok := f.dirCache.GetInv(parent); ok {
3266-
// and append the drive file name to compute the full file name
3267-
newPath := path.Join(parentPath, change.File.Name)
3268-
// this will now clear the actual file too
3269-
pathsToClear = append(pathsToClear, entryType{path: newPath, entryType: changeType})
3270-
}
3271-
}
3272-
} else { // a true root object that is changed
3273-
pathsToClear = append(pathsToClear, entryType{path: change.File.Name, entryType: changeType})
3274-
}
3275-
}
3276-
}
3277-
3278-
visitedPaths := make(map[string]struct{})
3279-
for _, entry := range pathsToClear {
3280-
if _, ok := visitedPaths[entry.path]; ok {
3281-
continue
3282-
}
3283-
visitedPaths[entry.path] = struct{}{}
3284-
notifyFunc(entry.path, entry.entryType)
3285-
}
3286-
3287-
switch {
3288-
case changeList.NewStartPageToken != "":
3289-
return changeList.NewStartPageToken, nil
3290-
case changeList.NextPageToken != "":
3291-
pageToken = changeList.NextPageToken
3292-
default:
3293-
return
3294-
}
3295-
}
3296-
}
3297-
32983207
// DirCacheFlush resets the directory cache - used in testing as an
32993208
// optional interface
33003209
func (f *Fs) DirCacheFlush() {
@@ -3327,21 +3236,26 @@ func (f *Fs) changeServiceAccountFile(ctx context.Context, file string) (err err
33273236
if file == f.opt.ServiceAccountFile {
33283237
return nil
33293238
}
3239+
33303240
oldSvc := f.svc
3241+
oldActivitySvc := f.activitySvc
33313242
oldv2Svc := f.v2Svc
33323243
oldOAuthClient := f.client
33333244
oldFile := f.opt.ServiceAccountFile
33343245
oldCredentials := f.opt.ServiceAccountCredentials
3246+
33353247
defer func() {
3336-
// Undo all the changes instead of doing selective undo's
3248+
// Undo all the changes instead of doing selective undo
33373249
if err != nil {
33383250
f.svc = oldSvc
3251+
f.activitySvc = oldActivitySvc
33393252
f.v2Svc = oldv2Svc
33403253
f.client = oldOAuthClient
33413254
f.opt.ServiceAccountFile = oldFile
33423255
f.opt.ServiceAccountCredentials = oldCredentials
33433256
}
33443257
}()
3258+
33453259
f.opt.ServiceAccountFile = file
33463260
f.opt.ServiceAccountCredentials = ""
33473261
oAuthClient, err := createOAuthClient(ctx, &f.opt, f.name, f.m)
@@ -3353,12 +3267,17 @@ func (f *Fs) changeServiceAccountFile(ctx context.Context, file string) (err err
33533267
if err != nil {
33543268
return fmt.Errorf("couldn't create Drive client: %w", err)
33553269
}
3270+
f.activitySvc, err = driveactivity.NewService(context.Background(), option.WithHTTPClient(f.client))
3271+
if err != nil {
3272+
return fmt.Errorf("couldn't create Drive Activity client: %w", err)
3273+
}
33563274
if f.opt.V2DownloadMinSize >= 0 {
33573275
f.v2Svc, err = drive_v2.NewService(context.Background(), option.WithHTTPClient(f.client))
33583276
if err != nil {
33593277
return fmt.Errorf("couldn't create Drive v2 client: %w", err)
33603278
}
33613279
}
3280+
33623281
return nil
33633282
}
33643283

0 commit comments

Comments
 (0)