Skip to content

Commit afc4cac

Browse files
authored
Source API simplification: get_value() no longer returns ordinal (#232)
1 parent 58ad711 commit afc4cac

File tree

5 files changed

+63
-100
lines changed

5 files changed

+63
-100
lines changed

src/execution/row_indexer.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -463,14 +463,10 @@ pub async fn evaluate_source_entry_with_memory(
463463
None
464464
};
465465
let memory = EvaluationMemory::new(chrono::Utc::now(), stored_info, options);
466-
let source_data = match source_op.executor.get_value(key).await? {
466+
let source_value = match source_op.executor.get_value(key).await? {
467467
Some(d) => d,
468468
None => return Ok(None),
469469
};
470-
let source_value = match source_data.value.await? {
471-
Some(value) => value,
472-
None => return Ok(None),
473-
};
474470
let output = evaluate_source_entry(plan, source_op, schema, key, source_value, &memory).await?;
475471
Ok(Some(output))
476472
}

src/execution/source_indexer.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,15 +94,13 @@ impl SourceContext {
9494
let source_op = &plan.source_ops[self.source_idx];
9595
let source_value = if source_version.kind == row_indexer::SourceVersionKind::Deleted {
9696
None
97-
} else if let Some(d) = source_op.executor.get_value(&key).await? {
98-
d.value.await?
9997
} else {
10098
// Even if the source version kind is not Deleted, the source value might be gone one polling.
10199
// In this case, we still use the current source version even if it's already stale - actually this version skew
102100
// also happens for update cases and there's no way to keep them always in sync for many sources.
103101
//
104102
// We only need source version <= actual version for value.
105-
None
103+
source_op.executor.get_value(&key).await?
106104
};
107105
let schema = &self.flow.data_schema;
108106
let result = row_indexer::update_source_row(

src/ops/interface.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,6 @@ impl<TZ: TimeZone> TryFrom<chrono::DateTime<TZ>> for Ordinal {
4040
}
4141
}
4242

43-
pub struct SourceData<'a> {
44-
/// Value that must increase monotonically after change. E.g. can be from the update time.
45-
pub ordinal: Option<Ordinal>,
46-
/// None means the item is gone when polling.
47-
pub value: BoxFuture<'a, Result<Option<FieldValues>>>,
48-
}
49-
5043
pub struct SourceRowMetadata {
5144
pub key: KeyValue,
5245
/// None means the ordinal is unavailable.
@@ -75,7 +68,7 @@ pub trait SourceExecutor: Send + Sync {
7568
) -> BoxStream<'a, Result<Vec<SourceRowMetadata>>>;
7669

7770
// Get the value for the given key.
78-
async fn get_value(&self, key: &KeyValue) -> Result<Option<SourceData<'async_trait>>>;
71+
async fn get_value(&self, key: &KeyValue) -> Result<Option<FieldValues>>;
7972

8073
fn change_stream<'a>(&'a self) -> Option<BoxStream<'a, SourceChange<'a>>> {
8174
None

src/ops/sources/google_drive.rs

Lines changed: 48 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -246,80 +246,70 @@ impl SourceExecutor for Executor {
246246
.boxed()
247247
}
248248

249-
async fn get_value(&self, key: &KeyValue) -> Result<Option<SourceData<'async_trait>>> {
249+
async fn get_value(&self, key: &KeyValue) -> Result<Option<FieldValues>> {
250250
let file_id = key.str_value()?;
251-
252251
let resp = self
253252
.drive_hub
254253
.files()
255254
.get(file_id)
256255
.add_scope(Scope::Readonly)
257-
.param("fields", "id,name,mimeType,trashed,modifiedTime")
256+
.param("fields", "id,name,mimeType,trashed")
258257
.doit()
259258
.await
260259
.or_not_found()?;
261260
let file = match resp {
262261
Some((_, file)) if file.trashed != Some(true) => file,
263262
_ => return Ok(None),
264263
};
265-
266-
let modified_time = file.modified_time;
267-
let value = async move {
268-
let type_n_body = if let Some(export_mime_type) = file
269-
.mime_type
270-
.as_ref()
271-
.and_then(|mime_type| EXPORT_MIME_TYPES.get(mime_type.as_str()))
272-
{
273-
let target_mime_type = if self.binary {
274-
export_mime_type.binary
275-
} else {
276-
export_mime_type.text
277-
};
278-
self.drive_hub
279-
.files()
280-
.export(file_id, target_mime_type)
281-
.add_scope(Scope::Readonly)
282-
.doit()
283-
.await
284-
.or_not_found()?
285-
.map(|content| (Some(target_mime_type.to_string()), content.into_body()))
264+
let type_n_body = if let Some(export_mime_type) = file
265+
.mime_type
266+
.as_ref()
267+
.and_then(|mime_type| EXPORT_MIME_TYPES.get(mime_type.as_str()))
268+
{
269+
let target_mime_type = if self.binary {
270+
export_mime_type.binary
286271
} else {
287-
self.drive_hub
288-
.files()
289-
.get(file_id)
290-
.add_scope(Scope::Readonly)
291-
.param("alt", "media")
292-
.doit()
293-
.await
294-
.or_not_found()?
295-
.map(|(resp, _)| (file.mime_type, resp.into_body()))
272+
export_mime_type.text
296273
};
297-
let value = match type_n_body {
298-
Some((mime_type, resp_body)) => {
299-
let content = resp_body.collect().await?;
274+
self.drive_hub
275+
.files()
276+
.export(file_id, target_mime_type)
277+
.add_scope(Scope::Readonly)
278+
.doit()
279+
.await
280+
.or_not_found()?
281+
.map(|content| (Some(target_mime_type.to_string()), content.into_body()))
282+
} else {
283+
self.drive_hub
284+
.files()
285+
.get(file_id)
286+
.add_scope(Scope::Readonly)
287+
.param("alt", "media")
288+
.doit()
289+
.await
290+
.or_not_found()?
291+
.map(|(resp, _)| (file.mime_type, resp.into_body()))
292+
};
293+
let value = match type_n_body {
294+
Some((mime_type, resp_body)) => {
295+
let content = resp_body.collect().await?;
300296

301-
let fields = vec![
302-
file.name.unwrap_or_default().into(),
303-
mime_type.into(),
304-
if self.binary {
305-
content.to_bytes().to_vec().into()
306-
} else {
307-
String::from_utf8_lossy(&content.to_bytes())
308-
.to_string()
309-
.into()
310-
},
311-
];
312-
Some(FieldValues { fields })
313-
}
314-
None => None,
315-
};
316-
Ok(value)
317-
}
318-
.boxed();
319-
Ok(Some(SourceData {
320-
ordinal: modified_time.map(|t| t.try_into()).transpose()?,
321-
value,
322-
}))
297+
let fields = vec![
298+
file.name.unwrap_or_default().into(),
299+
mime_type.into(),
300+
if self.binary {
301+
content.to_bytes().to_vec().into()
302+
} else {
303+
String::from_utf8_lossy(&content.to_bytes())
304+
.to_string()
305+
.into()
306+
},
307+
];
308+
Some(FieldValues { fields })
309+
}
310+
None => None,
311+
};
312+
Ok(value)
323313
}
324314
}
325315

src/ops/sources/local_file.rs

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -84,38 +84,24 @@ impl SourceExecutor for Executor {
8484
.boxed()
8585
}
8686

87-
async fn get_value(&self, key: &KeyValue) -> Result<Option<SourceData<'async_trait>>> {
87+
async fn get_value(&self, key: &KeyValue) -> Result<Option<FieldValues>> {
8888
if !self.is_file_included(key.str_value()?.as_ref()) {
8989
return Ok(None);
9090
}
9191
let path = self.root_path.join(key.str_value()?.as_ref());
92-
let modified_time = match std::fs::metadata(&path) {
93-
Ok(metadata) => metadata.modified()?,
94-
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
95-
return Ok(None);
92+
let value = match std::fs::read(path) {
93+
Ok(content) => {
94+
let content = if self.binary {
95+
fields_value!(content)
96+
} else {
97+
fields_value!(String::from_utf8_lossy(&content).to_string())
98+
};
99+
Some(content)
96100
}
97-
Err(e) => return Err(e.into()),
101+
Err(e) if e.kind() == std::io::ErrorKind::NotFound => None,
102+
Err(e) => Err(e)?,
98103
};
99-
let value = async move {
100-
let value = match std::fs::read(path) {
101-
Ok(content) => {
102-
let content = if self.binary {
103-
fields_value!(content)
104-
} else {
105-
fields_value!(String::from_utf8_lossy(&content).to_string())
106-
};
107-
Some(content)
108-
}
109-
Err(e) if e.kind() == std::io::ErrorKind::NotFound => None,
110-
Err(e) => Err(e)?,
111-
};
112-
Ok(value)
113-
}
114-
.boxed();
115-
Ok(Some(SourceData {
116-
ordinal: Some(modified_time.try_into()?),
117-
value,
118-
}))
104+
Ok(value)
119105
}
120106
}
121107

0 commit comments

Comments
 (0)