Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions collab/src/folder/folder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -748,17 +748,17 @@ impl FolderBody {
accumulated_views: &mut Vec<View>,
uid: i64,
) {
if !visited.insert(view_id.to_string()) {
return;
}
match self.views.get_view_with_txn(txn, view_id, uid) {
None => (),
Some(parent_view) => {
let mut stack = vec![*view_id];
while let Some(current_id) = stack.pop() {
if !visited.insert(current_id.to_string()) {
continue;
}
if let Some(parent_view) = self.views.get_view_with_txn(txn, &current_id, uid) {
accumulated_views.push(parent_view.as_ref().clone());
parent_view.children.items.iter().for_each(|child| {
self.get_view_recursively_with_txn(txn, &child.id, visited, accumulated_views, uid)
})
},
for child in parent_view.children.items.iter().rev() {
stack.push(child.id);
}
}
}
}

Expand Down
50 changes: 46 additions & 4 deletions collab/src/folder/hierarchy_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,22 @@ pub struct FlattedViews;

impl FlattedViews {
pub fn flatten_views(views: Vec<ParentChildViews>) -> Vec<View> {
let mut result = vec![];
for view in views {
result.push(view.view);
result.append(&mut Self::flatten_views(view.children));
let mut result = Vec::new();
let mut stack: Vec<ParentChildViews> = Vec::new();

for view in views.into_iter().rev() {
stack.push(view);
}

while let Some(mut current) = stack.pop() {
let mut children = Vec::new();
std::mem::swap(&mut children, &mut current.children);
for child in children.into_iter().rev() {
stack.push(child);
}
result.push(current.view);
}

result
}
}
Expand Down Expand Up @@ -389,6 +400,37 @@ mod tests {
assert_eq!(views.len(), 3);
}

#[tokio::test]
async fn flatten_views_handles_deep_hierarchy() {
const DEPTH: usize = 2048;
let workspace_id: ViewId =
uuid::Uuid::parse_str("00000000-0000-0000-0000-0000000000aa").unwrap();
let mut builder = NestedViewBuilder::new(workspace_id, 1);

builder
.with_view_builder(|view_builder| async move {
let mut current = view_builder.with_name("root");
for depth in 0..DEPTH {
current = current
.with_child_view_builder(|child| async move {
child.with_name(&format!("node_{depth}")).build()
})
.await;
}
current.build()
})
.await;

let workspace_views = builder.build();
let flattened = FlattedViews::flatten_views(workspace_views.into_inner());
assert_eq!(flattened.len(), DEPTH + 1);
assert_eq!(flattened.first().unwrap().name, "root");
assert_eq!(
flattened.last().unwrap().name,
format!("node_{}", DEPTH - 1)
);
}

#[tokio::test]
async fn create_view_with_children_test() {
let workspace_id: ViewId =
Expand Down
248 changes: 88 additions & 160 deletions collab/src/importer/notion/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::document::importer::define::URL_FIELD;
use crate::document::importer::md_importer::{MDImporter, create_image_block};
use crate::entity::CollabType;
use crate::entity::uuid_validation::DatabaseId;
use futures::stream::{self, StreamExt};
use futures::stream;

use crate::core::collab::default_client_id;
use crate::database::database_trait::NoPersistenceDatabaseCollabService;
Expand Down Expand Up @@ -214,75 +214,40 @@ impl NotionPage {
{
let mut document_resources = HashSet::new();
if let Some(page_id) = document.get_page_id() {
// Start the recursive processing with the root block (page_id)
self
.process_block_and_children(
parent_path,
document,
&page_id,
resources,
&file_url_builder,
&mut document_resources,
)
.await;
}

document_resources.into_iter().collect()
}

#[async_recursion::async_recursion(?Send)]
async fn process_block_and_children<'a, B, O>(
&'a self,
parent_path: &Path,
document: &mut Document,
block_id: &str,
resources: &[PathBuf],
file_url_builder: &B,
document_resources: &mut HashSet<PathBuf>,
) where
B: Fn(&'a str, PathBuf) -> O + Send + Sync + 'a,
O: Future<Output = Option<String>> + Send + 'a,
{
// Process the current block
if let Some((block_type, mut block_data)) = document.get_block_data(block_id) {
if matches!(block_type, BlockType::Image) {
if let Some(image_url) = block_data
.get(URL_FIELD)
.and_then(|v| v.as_str())
.and_then(|s| percent_decode_str(s).decode_utf8().ok())
{
let full_image_url = parent_path.join(image_url.to_string());
let pos = resources.iter().position(|r| r == &full_image_url);
if let Some(pos) = pos {
if let Some(url) = file_url_builder(&self.view_id, full_image_url).await {
document_resources.insert(resources[pos].clone());
block_data.insert(URL_FIELD.to_string(), json!(url));
if let Err(err) = document.update_block(block_id, block_data) {
error!(
"Failed to update block when trying to replace image. error:{:?}",
err
);
let mut stack = vec![page_id];
while let Some(block_id) = stack.pop() {
if let Some((block_type, mut block_data)) = document.get_block_data(&block_id) {
if matches!(block_type, BlockType::Image) {
if let Some(image_url) = block_data
.get(URL_FIELD)
.and_then(|v| v.as_str())
.and_then(|s| percent_decode_str(s).decode_utf8().ok())
{
let full_image_url = parent_path.join(image_url.to_string());
if let Some(pos) = resources.iter().position(|r| r == &full_image_url) {
if let Some(url) = file_url_builder(&self.view_id, full_image_url).await {
document_resources.insert(resources[pos].clone());
block_data.insert(URL_FIELD.to_string(), json!(url));
if let Err(err) = document.update_block(&block_id, block_data) {
error!(
"Failed to update block when trying to replace image. error:{:?}",
err
);
}
}
}
}
}
}

let block_children_ids = document.get_block_children_ids(&block_id);
for child_id in block_children_ids.into_iter().rev() {
stack.push(child_id);
}
}
}

// Recursively process each child block
let block_children_ids = document.get_block_children_ids(block_id);
for child_id in block_children_ids.iter() {
self
.process_block_and_children(
parent_path,
document,
child_id,
resources,
file_url_builder,
document_resources,
)
.await;
}
document_resources.into_iter().collect()
}

async fn replace_link_views<'a, 'b, B, O>(
Expand All @@ -300,94 +265,53 @@ impl NotionPage {
{
let mut delta_resources = HashSet::new();
if let Some(first_page_id) = document.get_page_id() {
// Start the recursive processing with the first page's root block
self
.process_link_views_recursive(
parent_path,
document,
&first_page_id,
resources,
&external_link_views,
file_url_builder,
&mut delta_resources,
)
.await;
}
let mut stack = vec![first_page_id];
while let Some(block_id) = stack.pop() {
if let Some((block_type, deltas)) = document.get_block_delta(&block_id) {
let block_deltas_result = self
.process_block_deltas(
parent_path,
document,
&block_id,
block_type,
deltas,
&external_link_views,
resources,
file_url_builder,
)
.await;

delta_resources.into_iter().collect()
}
delta_resources.extend(block_deltas_result.delta_resources);

#[async_recursion::async_recursion]
#[allow(clippy::too_many_arguments)]
async fn process_link_views_recursive<'a, 'b, B, O>(
&'b self,
parent_path: &Path,
document: &mut Document,
block_id: &str,
resources: &[PathBuf],
external_link_views: &HashMap<String, NotionPage>,
file_url_builder: &'a B,
delta_resources: &mut HashSet<PathBuf>,
) where
B: Fn(&'a str, PathBuf) -> O + Send + Sync + 'a,
O: Future<Output = Option<String>> + Send + 'a,
'b: 'a,
{
if let Some((block_type, deltas)) = document.get_block_delta(block_id) {
let block_deltas_result = self
.process_block_deltas(
parent_path,
document,
block_id,
block_type,
deltas,
external_link_views,
resources,
file_url_builder,
)
.await;

// Collect resources from this block
delta_resources.extend(block_deltas_result.delta_resources);

// Update document deltas if new ones are created
if let Some(new_deltas) = block_deltas_result.new_deltas {
if let Err(err) = document.set_block_delta(block_id, new_deltas) {
error!(
"Failed to set block delta when trying to replace ref link. error: {:?}",
err
);
if let Some(new_deltas) = block_deltas_result.new_deltas {
if let Err(err) = document.set_block_delta(&block_id, new_deltas) {
error!(
"Failed to set block delta when trying to replace ref link. error: {:?}",
err
);
}
}

for image_url in block_deltas_result.new_delta_image_blocks {
let new_block_id = crate::document::document_data::generate_id();
let image_block = create_image_block(&new_block_id, image_url, &block_id);
if let Err(err) = document.insert_block(image_block, Some(block_id.clone())) {
error!(
"Failed to insert image block when trying to replace delta link. error: {:?}",
err
);
}
}
}
}

// Insert new image blocks if any are created
for image_url in block_deltas_result.new_delta_image_blocks {
let new_block_id = crate::document::document_data::generate_id();
let image_block = create_image_block(&new_block_id, image_url, block_id);
if let Err(err) = document.insert_block(image_block, Some(block_id.to_string())) {
error!(
"Failed to insert image block when trying to replace delta link. error: {:?}",
err
);
let block_children_ids = document.get_block_children_ids(&block_id);
for child_id in block_children_ids.into_iter().rev() {
stack.push(child_id);
}
}
}

// Recursively process each child block
let block_children_ids = document.get_block_children_ids(block_id);
for child_id in block_children_ids.iter() {
self
.process_link_views_recursive(
parent_path,
document,
child_id,
resources,
external_link_views,
file_url_builder,
delta_resources,
)
.await;
}
delta_resources.into_iter().collect()
}

/// Process the deltas for a block, looking for links to replace
Expand Down Expand Up @@ -690,23 +614,27 @@ impl NotionPage {
pub async fn build_imported_collab_recursively<'a>(
notion_page: NotionPage,
) -> ImportedCollabInfoStream<'a> {
let imported_collab_info = notion_page.build_imported_collab().await;
let initial_stream: ImportedCollabInfoStream = match imported_collab_info {
Ok(Some(info)) => Box::pin(stream::once(async { info })),
Ok(None) => Box::pin(stream::empty()),
Err(_) => Box::pin(stream::empty()),
};

let child_streams = notion_page
.children
.into_iter()
.map(|child| async move { build_imported_collab_recursively(child).await });

let child_stream = stream::iter(child_streams)
.then(|stream_future| stream_future)
.flatten();

Box::pin(initial_stream.chain(child_stream))
let stack = vec![notion_page];
let stream = stream::unfold(stack, |mut stack| async move {
while let Some(page) = stack.pop() {
let result = page.build_imported_collab().await;
let children = page.children;
for child in children.into_iter().rev() {
stack.push(child);
}
match result {
Ok(Some(info)) => return Some((info, stack)),
Ok(None) => continue,
Err(err) => {
error!("Failed to build imported collab: {}", err);
continue;
},
}
}
None
});

Box::pin(stream)
}

pub struct ProcessBlockDeltaResult {
Expand Down
Loading