Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 29 additions & 18 deletions docs/modules/components/pages/processors/google_drive_download.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ google_drive_download:
credentials_json: "" # No default (optional)
file_id: "" # No default (required)
mime_type: "" # No default (required)
shared_drives: false
```

--
Expand All @@ -59,6 +60,7 @@ google_drive_download:
application/vnd.google-apps.presentation: application/pdf
application/vnd.google-apps.script: application/vnd.google-apps.script+json
application/vnd.google-apps.spreadsheet: text/csv
shared_drives: false
```

--
Expand All @@ -79,6 +81,29 @@ Otherwise if using a service account, you can create a JSON key for the service
In order for a service account to access files in Google Drive either files need to be explicitly shared with the service account email, otherwise https://support.google.com/a/answer/162106[^domain wide delegation] can be used to share all files within a Google Workspace.


== Examples

[tabs]
======
Download files from Google Drive::
+
--

This examples downloads all the files from Google Drive

```yaml
pipeline:
processors:
- google_drive_search:
query: "name = 'Test Doc'"
- google_drive_download:
file_id: "${!this.id}"
mime_type: "${!this.mimeType}"
```

--
======

== Fields

=== `credentials_json`
Expand Down Expand Up @@ -137,27 +162,13 @@ export_mime_types:
application/vnd.google-apps.spreadsheet: application/vnd.openxmlformats-officedocument.spreadsheetml.sheet
```

== Examples
=== `shared_drives`

[tabs]
======
Download files from Google Drive::
+
--
Whether or not to include shared drives.

This examples downloads all the files from Google Drive

```yaml
pipeline:
processors:
- google_drive_search:
query: "name = 'Test Doc'"
- google_drive_download:
file_id: "${!this.id}"
mime_type: "${!this.mimeType}"
```
*Type*: `bool`

--
======
*Default*: `false`


10 changes: 10 additions & 0 deletions docs/modules/components/pages/processors/google_drive_search.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ google_drive_search:
- labelInfo
include_label_ids: ""
max_results: 64
shared_drives: false
```

This processor searches for files in Google Drive using the provided query.
Expand Down Expand Up @@ -141,4 +142,13 @@ The maximum number of results to return.

*Default*: `64`

=== `shared_drives`

Whether or not to include shared drives in the result.


*Type*: `bool`

*Default*: `false`


41 changes: 28 additions & 13 deletions internal/impl/google/drive_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ import (
)

const (
driveDownloadFieldFileID = "file_id"
driveDownloadFieldMimeType = "mime_type"
driveDownloadFieldExportMimeTypes = "export_mime_types"
driveDownloadFieldFileID = "file_id"
driveDownloadFieldMimeType = "mime_type"
driveDownloadFieldExportMimeTypes = "export_mime_types"
driveDownloadFieldSupportSharedDrives = "shared_drives"
)

func init() {
Expand Down Expand Up @@ -73,6 +74,9 @@ Can download a file from Google Drive based on a file ID.
"application/vnd.google-apps.drawing": "image/svg+xml",
}).
Advanced(),
service.NewBoolField(driveDownloadFieldSupportSharedDrives).
Description("Whether or not to include shared drives.").
Default(false),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@birdayz suggested adding it.
We are preparing pipelines for CKO ADP Workshop and using our RP shared docs would be very helpful for that. Hence to achieve it we need to turn on that function to search/download files including SharedDrives

).
Example("Download files from Google Drive", "This examples downloads all the files from Google Drive", `
pipeline:
Expand All @@ -90,6 +94,7 @@ type googleDriveDownloadProcessor struct {
fileID *service.InterpolatedString
mimeType *service.InterpolatedString
exportMimeTypes map[string]string
sharedDrives bool
}

func newGoogleDriveDownloadProcessor(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) {
Expand All @@ -114,6 +119,11 @@ func newGoogleDriveDownloadProcessor(conf *service.ParsedConfig, mgr *service.Re
return nil, err
}

sharedDrives, err := conf.FieldBool(driveDownloadFieldSupportSharedDrives)
if err != nil {
return nil, err
}

for mimeType, exportMimeType := range mimeTypes {
formats, ok := googleMimeToFormat[mimeType]
if !ok {
Expand All @@ -132,14 +142,11 @@ func newGoogleDriveDownloadProcessor(conf *service.ParsedConfig, mgr *service.Re
fileID: fileID,
mimeType: mimeType,
exportMimeTypes: mimeTypes,
sharedDrives: sharedDrives,
}, nil
}

func (g *googleDriveDownloadProcessor) Process(ctx context.Context, msg *service.Message) (service.MessageBatch, error) {
client, err := g.getDriveService(ctx)
if err != nil {
return nil, err
}
id, err := g.fileID.TryString(msg)
if err != nil {
return nil, fmt.Errorf("failed to interpolate file_id: %v", err)
Expand All @@ -151,9 +158,9 @@ func (g *googleDriveDownloadProcessor) Process(ctx context.Context, msg *service
exportMimeType, ok := g.exportMimeTypes[mimeType]
var b []byte
if ok {
b, err = exportFile(ctx, client, id, exportMimeType)
b, err = g.exportFile(ctx, id, exportMimeType)
} else {
b, err = downloadFile(ctx, client, id)
b, err = g.downloadFile(ctx, id)
}
if err != nil {
return nil, fmt.Errorf("failed to download file %v: %v", id, err)
Expand All @@ -163,17 +170,25 @@ func (g *googleDriveDownloadProcessor) Process(ctx context.Context, msg *service
return service.MessageBatch{msg}, nil
}

func downloadFile(ctx context.Context, srv *drive.Service, fileID string) ([]byte, error) {
resp, err := srv.Files.Get(fileID).Context(ctx).Download()
func (g *googleDriveDownloadProcessor) downloadFile(ctx context.Context, fileID string) ([]byte, error) {
client, err := g.getDriveService(ctx)
if err != nil {
return nil, err
}
resp, err := client.Files.Get(fileID).SupportsAllDrives(g.sharedDrives).Context(ctx).Download()
if err != nil {
return nil, fmt.Errorf("unable to download file: %v", err)
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
}

func exportFile(ctx context.Context, srv *drive.Service, fileID, mimeType string) ([]byte, error) {
resp, err := srv.Files.Export(fileID, mimeType).Context(ctx).Download()
func (g *googleDriveDownloadProcessor) exportFile(ctx context.Context, fileID, mimeType string) ([]byte, error) {
client, err := g.getDriveService(ctx)
if err != nil {
return nil, err
}
resp, err := client.Files.Export(fileID, mimeType).Context(ctx).Download()
if err != nil {
return nil, fmt.Errorf("unable to download file: %v", err)
}
Expand Down
34 changes: 26 additions & 8 deletions internal/impl/google/drive_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import (
)

const (
driveSearchFieldQuery = "query"
driveSearchFieldProjection = "projection"
driveSearchFieldLabels = "include_label_ids"
driveSearchFieldMaxResults = "max_results"
driveSearchFieldQuery = "query"
driveSearchFieldProjection = "projection"
driveSearchFieldLabels = "include_label_ids"
driveSearchFieldMaxResults = "max_results"
driveSearchFieldSupportSharedDrives = "shared_drives"
)

func init() {
Expand Down Expand Up @@ -62,6 +63,9 @@ Search results are emitted as message batch, where each message is a https://dev
service.NewIntField(driveSearchFieldMaxResults).
Description("The maximum number of results to return.").
Default(64),
service.NewBoolField(driveSearchFieldSupportSharedDrives).
Description("Whether or not to include shared drives in the result.").
Default(false),
).
Example("Search & download files from Google Drive", "This examples downloads all the files from Google Drive that are returned in the query", `
input:
Expand All @@ -83,10 +87,11 @@ output:

type googleDriveSearchProcessor struct {
*baseProcessor[drive.Service]
query *service.InterpolatedString
labels *service.InterpolatedString
fields []string
maxResults int
query *service.InterpolatedString
labels *service.InterpolatedString
fields []string
maxResults int
sharedDrives bool
}

// newGoogleDriveSearchProcessor creates a new instance of googleDriveSearchProcessor.
Expand Down Expand Up @@ -117,12 +122,18 @@ func newGoogleDriveSearchProcessor(conf *service.ParsedConfig, mgr *service.Reso
return nil, err
}

sharedDrives, err := conf.FieldBool(driveSearchFieldSupportSharedDrives)
if err != nil {
return nil, err
}

return &googleDriveSearchProcessor{
baseProcessor: base,
query: query,
labels: labels,
fields: fields,
maxResults: maxResults,
sharedDrives: sharedDrives,
}, nil
}

Expand All @@ -149,6 +160,13 @@ func (g *googleDriveSearchProcessor) Process(ctx context.Context, msg *service.M
if l != "" {
call = call.IncludeLabels(l)
}
if g.sharedDrives {
// all of those flags are needed to look into shared drives
call.
SupportsAllDrives(g.sharedDrives). // Flag 1: Tells API you know about Shared Drives
IncludeItemsFromAllDrives(g.sharedDrives). // Flag 2: Tells API to actually look in them
Corpora("allDrives") // Flag 3: Look everywhere the SA has access
}
var files []*drive.File
err = call.Pages(ctx, func(page *drive.FileList) error {
files = append(files, page.Files...)
Expand Down