Skip to content

Commit 86d58e2

Browse files
committed
revert all changes for files under ops
1 parent 914efeb commit 86d58e2

File tree

3 files changed

+53
-60
lines changed

3 files changed

+53
-60
lines changed

src/ops/sources/amazon_s3.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::fields_value;
2-
use async_stream::{stream, try_stream};
2+
use async_stream::try_stream;
33
use aws_config::BehaviorVersion;
44
use aws_sdk_s3::Client;
55
use globset::{Glob, GlobSet, GlobSetBuilder};

src/ops/sources/google_drive.rs

Lines changed: 48 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ impl Executor {
115115
file: File,
116116
new_folder_ids: &mut Vec<Arc<str>>,
117117
seen_ids: &mut HashSet<Arc<str>>,
118-
_options: &SourceExecutorListOptions,
119118
) -> Result<Option<PartialSourceRowMetadata>> {
120119
if file.trashed == Some(true) {
121120
return Ok(None);
@@ -306,7 +305,7 @@ impl SourceExecutor for Executor {
306305
.list_files(&folder_id, &fields, &mut next_page_token)
307306
.await?;
308307
for file in files {
309-
curr_rows.extend(self.visit_file(file, &mut new_folder_ids, &mut seen_ids, options)?);
308+
curr_rows.extend(self.visit_file(file, &mut new_folder_ids, &mut seen_ids)?);
310309
}
311310
if !curr_rows.is_empty() {
312311
yield curr_rows;
@@ -354,60 +353,54 @@ impl SourceExecutor for Executor {
354353
} else {
355354
None
356355
};
357-
358-
let value = if options.include_value {
359-
let type_n_body = if let Some(export_mime_type) = file
360-
.mime_type
361-
.as_ref()
362-
.and_then(|mime_type| EXPORT_MIME_TYPES.get(mime_type.as_str()))
363-
{
364-
let target_mime_type = if self.binary {
365-
export_mime_type.binary
366-
} else {
367-
export_mime_type.text
368-
};
369-
self.drive_hub
370-
.files()
371-
.export(file_id, target_mime_type)
372-
.add_scope(Scope::Readonly)
373-
.doit()
374-
.await
375-
.or_not_found()?
376-
.map(|content| (Some(target_mime_type.to_string()), content.into_body()))
356+
let type_n_body = if let Some(export_mime_type) = file
357+
.mime_type
358+
.as_ref()
359+
.and_then(|mime_type| EXPORT_MIME_TYPES.get(mime_type.as_str()))
360+
{
361+
let target_mime_type = if self.binary {
362+
export_mime_type.binary
377363
} else {
378-
self.drive_hub
379-
.files()
380-
.get(file_id)
381-
.add_scope(Scope::Readonly)
382-
.param("alt", "media")
383-
.doit()
384-
.await
385-
.or_not_found()?
386-
.map(|(resp, _)| (file.mime_type, resp.into_body()))
364+
export_mime_type.text
387365
};
388-
389-
match type_n_body {
390-
Some((mime_type, resp_body)) => {
391-
let content = resp_body.collect().await?;
392-
let content_bytes = content.to_bytes();
393-
394-
let fields = vec![
395-
file.name.unwrap_or_default().into(),
396-
mime_type.into(),
397-
if self.binary {
398-
content_bytes.to_vec().into()
399-
} else {
400-
String::from_utf8_lossy(&content_bytes).to_string().into()
401-
},
402-
];
403-
Some(SourceValue::Existence(FieldValues { fields }))
404-
}
405-
None => Some(SourceValue::NonExistence),
406-
}
366+
self.drive_hub
367+
.files()
368+
.export(file_id, target_mime_type)
369+
.add_scope(Scope::Readonly)
370+
.doit()
371+
.await
372+
.or_not_found()?
373+
.map(|content| (Some(target_mime_type.to_string()), content.into_body()))
407374
} else {
408-
None
375+
self.drive_hub
376+
.files()
377+
.get(file_id)
378+
.add_scope(Scope::Readonly)
379+
.param("alt", "media")
380+
.doit()
381+
.await
382+
.or_not_found()?
383+
.map(|(resp, _)| (file.mime_type, resp.into_body()))
409384
};
385+
let value = match type_n_body {
386+
Some((mime_type, resp_body)) => {
387+
let content = resp_body.collect().await?;
410388

389+
let fields = vec![
390+
file.name.unwrap_or_default().into(),
391+
mime_type.into(),
392+
if self.binary {
393+
content.to_bytes().to_vec().into()
394+
} else {
395+
String::from_utf8_lossy(&content.to_bytes())
396+
.to_string()
397+
.into()
398+
},
399+
];
400+
Some(SourceValue::Existence(FieldValues { fields }))
401+
}
402+
None => None,
403+
};
411404
Ok(PartialSourceRowData { value, ordinal })
412405
}
413406

@@ -449,6 +442,10 @@ impl SourceFactoryBase for Factory {
449442
) -> Result<EnrichedValueType> {
450443
let mut struct_schema = StructSchema::default();
451444
let mut schema_builder = StructSchemaBuilder::new(&mut struct_schema);
445+
schema_builder.add_field(FieldSchema::new(
446+
"file_id",
447+
make_output_type(BasicValueType::Str),
448+
));
452449
let filename_field = schema_builder.add_field(FieldSchema::new(
453450
"filename",
454451
make_output_type(BasicValueType::Str),

src/ops/sources/local_file.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use std::path::Path;
66
use std::{path::PathBuf, sync::Arc};
77

88
use crate::base::field_attrs;
9-
109
use crate::{fields_value, ops::sdk::*};
1110

1211
#[derive(Debug, Deserialize)]
@@ -69,7 +68,6 @@ impl SourceExecutor for Executor {
6968
} else {
7069
None
7170
};
72-
7371
if let Some(relative_path) = relative_path.to_str() {
7472
yield vec![PartialSourceRowMetadata {
7573
key: KeyValue::Str(relative_path.into()),
@@ -103,26 +101,24 @@ impl SourceExecutor for Executor {
103101
} else {
104102
None
105103
};
106-
107104
let value = if options.include_value {
108-
match std::fs::read(&path) {
105+
match std::fs::read(path) {
109106
Ok(content) => {
110-
let content_value = if self.binary {
107+
let content = if self.binary {
111108
fields_value!(content)
112109
} else {
113110
fields_value!(String::from_utf8_lossy(&content).to_string())
114111
};
115-
Some(SourceValue::Existence(content_value))
112+
Some(SourceValue::Existence(content))
116113
}
117114
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
118115
Some(SourceValue::NonExistence)
119116
}
120-
Err(e) => return Err(e.into()),
117+
Err(e) => Err(e)?,
121118
}
122119
} else {
123120
None
124121
};
125-
126122
Ok(PartialSourceRowData { value, ordinal })
127123
}
128124
}

0 commit comments

Comments
 (0)