11use std:: sync:: Arc ;
22
3+ use futures:: future:: try_join;
34use google_drive3:: {
45 api:: Scope ,
56 yup_oauth2:: { read_service_account_key, ServiceAccountAuthenticator } ,
67 DriveHub ,
78} ;
9+ use http_body_util:: BodyExt ;
810use hyper_rustls:: HttpsConnector ;
911use hyper_util:: client:: legacy:: connect:: HttpConnector ;
12+ use log:: debug;
1013
1114use crate :: ops:: sdk:: * ;
1215
@@ -37,7 +40,7 @@ impl Executor {
3740 hyper_rustls:: HttpsConnectorBuilder :: new ( )
3841 . with_provider_and_native_roots ( rustls:: crypto:: ring:: default_provider ( ) ) ?
3942 . https_only ( )
40- . enable_http1 ( )
43+ . enable_http2 ( )
4144 . build ( ) ,
4245 ) ;
4346 let drive_hub = DriveHub :: new ( client, auth) ;
@@ -72,16 +75,17 @@ impl SourceExecutor for Executor {
7275 . drive_hub
7376 . files ( )
7477 . list ( )
75- . q ( & query )
76- . add_scope ( Scope :: Readonly ) ;
78+ . add_scope ( Scope :: Readonly )
79+ . q ( & query ) ;
7780 if let Some ( next_page_token) = & next_page_token {
7881 list_call = list_call. page_token ( next_page_token) ;
7982 }
80- let ( resp , files) = list_call. doit ( ) . await ?;
83+ let ( _ , files) = list_call. doit ( ) . await ?;
8184 if let Some ( files) = files. files {
8285 for file in files {
83- if let Some ( name) = file. name {
84- result. push ( KeyValue :: Str ( Arc :: from ( name) ) ) ;
86+ debug ! ( "file: {:?}" , file) ;
87+ if let Some ( id) = file. id {
88+ result. push ( KeyValue :: Str ( Arc :: from ( id) ) ) ;
8589 }
8690 }
8791 }
@@ -94,7 +98,45 @@ impl SourceExecutor for Executor {
9498 }
9599
96100 async fn get_value ( & self , key : & KeyValue ) -> Result < Option < FieldValues > > {
97- unimplemented ! ( )
101+ let file_id = key. str_value ( ) ?;
102+
103+ let filename = async {
104+ let ( _, file) = self
105+ . drive_hub
106+ . files ( )
107+ . get ( file_id)
108+ . add_scope ( Scope :: Readonly )
109+ . doit ( )
110+ . await ?;
111+ anyhow:: Ok ( file. name . unwrap_or_default ( ) )
112+ } ;
113+ let body = async {
114+ let ( resp, _) = self
115+ . drive_hub
116+ . files ( )
117+ . get ( file_id)
118+ . add_scope ( Scope :: Readonly )
119+ . param ( "alt" , "media" )
120+ . doit ( )
121+ . await ?;
122+ let content = resp. into_body ( ) . collect ( ) . await ?;
123+ anyhow:: Ok ( content)
124+ } ;
125+ let ( filename, content) = try_join ( filename, body) . await ?;
126+
127+ let mut fields = Vec :: with_capacity ( 2 ) ;
128+ fields. push ( filename. into ( ) ) ;
129+ if self . binary {
130+ fields. push ( content. to_bytes ( ) . to_vec ( ) . into ( ) ) ;
131+ } else {
132+ fields. push (
133+ String :: from_utf8_lossy ( & content. to_bytes ( ) )
134+ . to_string ( )
135+ . into ( ) ,
136+ ) ;
137+ }
138+
139+ Ok ( Some ( FieldValues { fields } ) )
98140 }
99141}
100142
@@ -116,6 +158,7 @@ impl SourceFactoryBase for Factory {
116158 Ok ( make_output_type ( CollectionSchema :: new (
117159 CollectionKind :: Table ,
118160 vec ! [
161+ FieldSchema :: new( "file_id" , make_output_type( BasicValueType :: Str ) ) ,
119162 FieldSchema :: new( "filename" , make_output_type( BasicValueType :: Str ) ) ,
120163 FieldSchema :: new(
121164 "content" ,
0 commit comments