Skip to content

Commit 383bd9f

Browse files
authored
Generating change streams based on last modified files. (#250)
1 parent 5671d74 commit 383bd9f

File tree

6 files changed

+163
-38
lines changed

6 files changed

+163
-38
lines changed

examples/gdrive_text_embedding/main.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@ def gdrive_text_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope:
1111
"""
1212
credential_path = os.environ["GOOGLE_SERVICE_ACCOUNT_CREDENTIAL"]
1313
root_folder_ids = os.environ["GOOGLE_DRIVE_ROOT_FOLDER_IDS"].split(",")
14-
14+
1515
data_scope["documents"] = flow_builder.add_source(
1616
cocoindex.sources.GoogleDrive(
1717
service_account_credential_path=credential_path,
18-
root_folder_ids=root_folder_ids),
18+
root_folder_ids=root_folder_ids,
19+
recent_changes_poll_interval=datetime.timedelta(seconds=10)),
1920
refresh_options=cocoindex.SourceRefreshOptions(
2021
refresh_interval=datetime.timedelta(minutes=1)))
2122

python/cocoindex/sources.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""All builtin sources."""
22
from . import op
3+
import datetime
34

45
class LocalFile(op.SourceSpec):
56
"""Import data from local file system."""
@@ -26,3 +27,4 @@ class GoogleDrive(op.SourceSpec):
2627
service_account_credential_path: str
2728
root_folder_ids: list[str]
2829
binary: bool = False
30+
recent_changes_poll_interval: datetime.timedelta | None = None

src/execution/live_updater.rs

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -85,33 +85,35 @@ async fn update_source(
8585
let mut futs: Vec<BoxFuture<'_, Result<()>>> = Vec::new();
8686

8787
// Deal with change streams.
88-
if let (true, Some(change_stream)) = (options.live_mode, import_op.executor.change_stream()) {
89-
let pool = pool.clone();
90-
let source_update_stats = source_update_stats.clone();
91-
futs.push(
92-
async move {
93-
let mut change_stream = change_stream;
94-
while let Some(change) = change_stream.next().await {
95-
source_context
96-
.process_change(change, &pool, &source_update_stats)
97-
.map(tokio::spawn);
88+
if options.live_mode {
89+
if let Some(change_stream) = import_op.executor.change_stream().await? {
90+
let pool = pool.clone();
91+
let source_update_stats = source_update_stats.clone();
92+
futs.push(
93+
async move {
94+
let mut change_stream = change_stream;
95+
while let Some(change) = change_stream.next().await {
96+
source_context
97+
.process_change(change, &pool, &source_update_stats)
98+
.map(tokio::spawn);
99+
}
100+
Ok(())
98101
}
99-
Ok(())
100-
}
101-
.boxed(),
102-
);
103-
futs.push(
104-
async move {
105-
let mut interval = tokio::time::interval(REPORT_INTERVAL);
106-
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
107-
interval.tick().await;
108-
loop {
102+
.boxed(),
103+
);
104+
futs.push(
105+
async move {
106+
let mut interval = tokio::time::interval(REPORT_INTERVAL);
107+
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
109108
interval.tick().await;
110-
report_stats();
109+
loop {
110+
interval.tick().await;
111+
report_stats();
112+
}
111113
}
112-
}
113-
.boxed(),
114-
);
114+
.boxed(),
115+
);
116+
}
115117
}
116118

117119
// The main update loop.

src/ops/interface.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ pub trait SourceExecutor: Send + Sync {
7575
// Get the value for the given key.
7676
async fn get_value(&self, key: &KeyValue) -> Result<Option<FieldValues>>;
7777

78-
fn change_stream<'a>(&'a self) -> Option<BoxStream<'a, SourceChange>> {
79-
None
78+
async fn change_stream(&self) -> Result<Option<BoxStream<'async_trait, SourceChange>>> {
79+
Ok(None)
8080
}
8181
}
8282

src/ops/sources/google_drive.rs

Lines changed: 126 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,4 @@
1-
use std::{
2-
collections::{HashMap, HashSet},
3-
sync::{Arc, LazyLock},
4-
};
5-
6-
use async_stream::try_stream;
1+
use chrono::Duration;
72
use google_drive3::{
83
api::{File, Scope},
94
yup_oauth2::{read_service_account_key, ServiceAccountAuthenticator},
@@ -12,7 +7,6 @@ use google_drive3::{
127
use http_body_util::BodyExt;
138
use hyper_rustls::HttpsConnector;
149
use hyper_util::client::legacy::connect::HttpConnector;
15-
use log::{trace, warn};
1610

1711
use crate::base::field_attrs;
1812
use crate::ops::sdk::*;
@@ -75,12 +69,14 @@ pub struct Spec {
7569
service_account_credential_path: String,
7670
binary: bool,
7771
root_folder_ids: Vec<String>,
72+
recent_changes_poll_interval: Option<std::time::Duration>,
7873
}
7974

8075
struct Executor {
8176
drive_hub: DriveHub<HttpsConnector<HttpConnector>>,
8277
binary: bool,
83-
root_folder_ids: Vec<Arc<str>>,
78+
root_folder_ids: IndexSet<Arc<str>>,
79+
recent_updates_poll_interval: Option<std::time::Duration>,
8480
}
8581

8682
impl Executor {
@@ -106,6 +102,7 @@ impl Executor {
106102
drive_hub,
107103
binary: spec.binary,
108104
root_folder_ids: spec.root_folder_ids.into_iter().map(Arc::from).collect(),
105+
recent_updates_poll_interval: spec.recent_changes_poll_interval,
109106
})
110107
}
111108
}
@@ -122,6 +119,7 @@ fn escape_string(s: &str) -> String {
122119
escaped
123120
}
124121

122+
const CUTOFF_TIME_BUFFER: Duration = Duration::seconds(1);
125123
impl Executor {
126124
fn visit_file(
127125
&self,
@@ -151,7 +149,6 @@ impl Executor {
151149
ordinal: file.modified_time.map(|t| t.try_into()).transpose()?,
152150
})
153151
} else {
154-
trace!("Skipping file with unsupported mime type: id={id}, mime_type={mime_type}, name={:?}", file.name);
155152
None
156153
};
157154
Ok(result)
@@ -175,9 +172,101 @@ impl Executor {
175172
list_call = list_call.page_token(next_page_token);
176173
}
177174
let (_, files) = list_call.doit().await?;
175+
*next_page_token = files.next_page_token;
178176
let file_iter = files.files.into_iter().flat_map(|file| file.into_iter());
179177
Ok(file_iter)
180178
}
179+
180+
fn make_cutoff_time(
181+
most_recent_modified_time: Option<DateTime<Utc>>,
182+
list_start_time: DateTime<Utc>,
183+
) -> DateTime<Utc> {
184+
let safe_upperbound = list_start_time - CUTOFF_TIME_BUFFER;
185+
most_recent_modified_time
186+
.map(|t| t.min(safe_upperbound))
187+
.unwrap_or(safe_upperbound)
188+
}
189+
190+
async fn get_recent_updates(
191+
&self,
192+
cutoff_time: &mut DateTime<Utc>,
193+
) -> Result<Vec<SourceChange>> {
194+
let mut page_size: i32 = 10;
195+
let mut next_page_token: Option<String> = None;
196+
let mut changes = Vec::new();
197+
let mut most_recent_modified_time = None;
198+
let start_time = Utc::now();
199+
'paginate: loop {
200+
let mut list_call = self
201+
.drive_hub
202+
.files()
203+
.list()
204+
.add_scope(Scope::Readonly)
205+
.param("fields", "files(id,modifiedTime,parents,trashed)")
206+
.order_by("modifiedTime desc")
207+
.page_size(page_size);
208+
if let Some(token) = next_page_token {
209+
list_call = list_call.page_token(token.as_str());
210+
}
211+
let (_, files) = list_call.doit().await?;
212+
for file in files.files.into_iter().flat_map(|files| files.into_iter()) {
213+
let modified_time = file.modified_time.unwrap_or_default();
214+
if most_recent_modified_time.is_none() {
215+
most_recent_modified_time = Some(modified_time);
216+
}
217+
if modified_time <= *cutoff_time {
218+
break 'paginate;
219+
}
220+
if self.is_file_covered(&file).await? {
221+
changes.push(SourceChange {
222+
ordinal: Some(modified_time.try_into()?),
223+
key: KeyValue::Str(Arc::from(
224+
file.id.ok_or_else(|| anyhow!("File has no id"))?,
225+
)),
226+
value: SourceValueChange::Upsert(None),
227+
});
228+
}
229+
}
230+
if let Some(token) = files.next_page_token {
231+
next_page_token = Some(token);
232+
} else {
233+
break;
234+
}
235+
// List more in a page since 2nd.
236+
page_size = 100;
237+
}
238+
*cutoff_time = Self::make_cutoff_time(most_recent_modified_time, start_time);
239+
Ok(changes)
240+
}
241+
242+
async fn is_file_covered(&self, file: &File) -> Result<bool> {
243+
if file.trashed == Some(true) {
244+
return Ok(false);
245+
}
246+
let mut next_file_id = Some(Cow::Borrowed(
247+
file.id.as_ref().ok_or_else(|| anyhow!("File has no id"))?,
248+
));
249+
while let Some(file_id) = next_file_id {
250+
if self.root_folder_ids.contains(file_id.as_str()) {
251+
return Ok(true);
252+
}
253+
let (_, file) = self
254+
.drive_hub
255+
.files()
256+
.get(&file_id)
257+
.add_scope(Scope::Readonly)
258+
.param("fields", "parents")
259+
.doit()
260+
.await?;
261+
next_file_id = file
262+
.parents
263+
.into_iter()
264+
.flat_map(|parents| parents.into_iter())
265+
.map(Cow::Owned)
266+
.next();
267+
}
268+
Ok(false)
269+
}
181270
}
182271

183272
trait ResultExt<T> {
@@ -311,6 +400,34 @@ impl SourceExecutor for Executor {
311400
};
312401
Ok(value)
313402
}
403+
404+
async fn change_stream(&self) -> Result<Option<BoxStream<'async_trait, SourceChange>>> {
405+
let poll_interval = if let Some(poll_interval) = self.recent_updates_poll_interval {
406+
poll_interval
407+
} else {
408+
return Ok(None);
409+
};
410+
let mut cutoff_time = Utc::now() - CUTOFF_TIME_BUFFER;
411+
let mut interval = tokio::time::interval(poll_interval);
412+
interval.tick().await;
413+
let stream = stream! {
414+
loop {
415+
interval.tick().await;
416+
let changes = self.get_recent_updates(&mut cutoff_time).await;
417+
match changes {
418+
Ok(changes) => {
419+
for change in changes {
420+
yield change;
421+
}
422+
}
423+
Err(e) => {
424+
error!("Error getting recent updates: {e}");
425+
}
426+
}
427+
}
428+
};
429+
Ok(Some(stream.boxed()))
430+
}
314431
}
315432

316433
pub struct Factory;

src/prelude.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22

33
pub(crate) use anyhow::Result;
44
pub(crate) use async_trait::async_trait;
5+
pub(crate) use chrono::{DateTime, Utc};
56
pub(crate) use futures::{future::BoxFuture, prelude::*, stream::BoxStream};
67
pub(crate) use futures::{FutureExt, StreamExt};
8+
pub(crate) use indexmap::{IndexMap, IndexSet};
79
pub(crate) use itertools::Itertools;
810
pub(crate) use serde::{Deserialize, Serialize};
911
pub(crate) use std::borrow::Cow;
10-
pub(crate) use std::collections::{BTreeMap, HashMap};
12+
pub(crate) use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
1113
pub(crate) use std::sync::{Arc, LazyLock, Mutex, OnceLock, RwLock, Weak};
1214

1315
pub(crate) use crate::base::{schema, spec, value};
@@ -20,4 +22,5 @@ pub(crate) use crate::service::error::ApiError;
2022
pub(crate) use crate::{api_bail, api_error};
2123

2224
pub(crate) use anyhow::{anyhow, bail};
25+
pub(crate) use async_stream::{stream, try_stream};
2326
pub(crate) use log::{debug, error, info, trace, warn};

0 commit comments

Comments
 (0)