diff --git a/Cargo.lock b/Cargo.lock index f8919d00..8dd0e37b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1317,6 +1317,7 @@ dependencies = [ "blake2", "bytes", "chrono", + "cocoindex-utils", "config", "const_format", "derivative", @@ -1401,6 +1402,35 @@ dependencies = [ "yup-oauth2 12.1.0", ] +[[package]] +name = "cocoindex-utils" +version = "999.0.0" +dependencies = [ + "anyhow", + "async-openai", + "async-trait", + "base64 0.22.1", + "blake2", + "encoding_rs", + "futures", + "hex", + "indenter", + "indexmap 2.10.0", + "itertools 0.14.0", + "log", + "neo4rs", + "rand 0.9.2", + "regex", + "reqwest", + "serde", + "serde_json", + "serde_path_to_error", + "sqlx", + "tokio", + "tokio-util", + "yaml-rust2", +] + [[package]] name = "colorchoice" version = "1.0.4" diff --git a/Cargo.toml b/Cargo.toml index d0f64928..a758ba33 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["rust/cocoindex"] +members = ["rust/cocoindex", "rust/utils"] resolver = "2" [workspace.package] diff --git a/rust/cocoindex/Cargo.toml b/rust/cocoindex/Cargo.toml index 0db5ad56..326eb75f 100644 --- a/rust/cocoindex/Cargo.toml +++ b/rust/cocoindex/Cargo.toml @@ -14,6 +14,8 @@ default = ["legacy-states-v0"] legacy-states-v0 = [] [dependencies] +cocoindex-utils = { path = "../utils" } + pyo3 = { workspace = true } pythonize = { workspace = true } pyo3-async-runtimes = { workspace = true } diff --git a/rust/cocoindex/src/base/json_schema.rs b/rust/cocoindex/src/base/json_schema.rs index c7a9756c..e001e9df 100644 --- a/rust/cocoindex/src/base/json_schema.rs +++ b/rust/cocoindex/src/base/json_schema.rs @@ -1,11 +1,11 @@ use crate::prelude::*; -use crate::utils::immutable::RefList; use schemars::schema::{ ArrayValidation, InstanceType, ObjectValidation, Schema, SchemaObject, SingleOrVec, SubschemaValidation, }; use std::fmt::Write; +use utils::immutable::RefList; pub struct ToJsonSchemaOptions { /// If true, mark all fields as required. diff --git a/rust/cocoindex/src/builder/analyzer.rs b/rust/cocoindex/src/builder/analyzer.rs index ad16331e..c3662d7a 100644 --- a/rust/cocoindex/src/builder/analyzer.rs +++ b/rust/cocoindex/src/builder/analyzer.rs @@ -6,13 +6,13 @@ use crate::prelude::*; use super::plan::*; use crate::lib_context::get_auth_registry; -use crate::utils::fingerprint::Fingerprinter; use crate::{ base::{schema::*, spec::*}, ops::interface::*, }; use futures::future::{BoxFuture, try_join3}; use futures::{FutureExt, future::try_join_all}; +use utils::fingerprint::Fingerprinter; #[derive(Debug)] pub(super) enum ValueTypeBuilder { diff --git a/rust/cocoindex/src/builder/plan.rs b/rust/cocoindex/src/builder/plan.rs index 31ed60ce..33c0989c 100644 --- a/rust/cocoindex/src/builder/plan.rs +++ b/rust/cocoindex/src/builder/plan.rs @@ -3,7 +3,7 @@ use crate::base::spec::FieldName; use crate::prelude::*; use crate::ops::interface::*; -use crate::utils::fingerprint::{Fingerprint, Fingerprinter}; +use utils::fingerprint::{Fingerprint, Fingerprinter}; #[derive(Debug, Clone, PartialEq, Eq, Serialize)] pub struct AnalyzedLocalFieldReference { diff --git a/rust/cocoindex/src/execution/db_tracking.rs b/rust/cocoindex/src/execution/db_tracking.rs index a5faa058..d68280ee 100644 --- a/rust/cocoindex/src/execution/db_tracking.rs +++ b/rust/cocoindex/src/execution/db_tracking.rs @@ -1,12 +1,12 @@ use crate::prelude::*; use super::{db_tracking_setup::TrackingTableSetupState, memoization::StoredMemoizationInfo}; -use crate::utils::{db::WriteAction, fingerprint::Fingerprint}; use futures::Stream; use serde::de::{self, Deserializer, SeqAccess, Visitor}; use serde::ser::SerializeSeq; use sqlx::PgPool; use std::fmt; +use utils::{db::WriteAction, fingerprint::Fingerprint}; //////////////////////////////////////////////////////////// // Access for the row tracking table diff --git a/rust/cocoindex/src/execution/dumper.rs b/rust/cocoindex/src/execution/dumper.rs index 8903d4f8..a4549023 100644 --- a/rust/cocoindex/src/execution/dumper.rs +++ b/rust/cocoindex/src/execution/dumper.rs @@ -13,7 +13,7 @@ use super::row_indexer; use crate::base::{schema, value}; use crate::builder::plan::{AnalyzedImportOp, ExecutionPlan}; use crate::ops::interface::SourceExecutorReadOptions; -use crate::utils::yaml_ser::YamlSerializer; +use utils::yaml_ser::YamlSerializer; #[derive(Debug, Clone, Deserialize)] pub struct EvaluateAndDumpOptions { diff --git a/rust/cocoindex/src/execution/evaluator.rs b/rust/cocoindex/src/execution/evaluator.rs index 956cbdde..aab987f0 100644 --- a/rust/cocoindex/src/execution/evaluator.rs +++ b/rust/cocoindex/src/execution/evaluator.rs @@ -4,12 +4,10 @@ use anyhow::{Context, Ok}; use futures::future::try_join_all; use crate::base::value::EstimatedByteSize; +use crate::base::{schema, value}; use crate::builder::{AnalyzedTransientFlow, plan::*}; use crate::py::AnyhowIntoPyResult; -use crate::{ - base::{schema, value}, - utils::immutable::RefList, -}; +use utils::immutable::RefList; use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, evaluate_with_cell}; diff --git a/rust/cocoindex/src/execution/memoization.rs b/rust/cocoindex/src/execution/memoization.rs index 56774326..9c297099 100644 --- a/rust/cocoindex/src/execution/memoization.rs +++ b/rust/cocoindex/src/execution/memoization.rs @@ -10,8 +10,8 @@ use std::{ use crate::{ base::{schema, value}, service::error::{SharedError, SharedResultExtRef}, - utils::fingerprint::{Fingerprint, Fingerprinter}, }; +use cocoindex_utils::fingerprint::{Fingerprint, Fingerprinter}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StoredCacheEntry { diff --git a/rust/cocoindex/src/execution/row_indexer.rs b/rust/cocoindex/src/execution/row_indexer.rs index 9f61010c..09d6a4c4 100644 --- a/rust/cocoindex/src/execution/row_indexer.rs +++ b/rust/cocoindex/src/execution/row_indexer.rs @@ -18,8 +18,8 @@ use crate::builder::plan::*; use crate::ops::interface::{ ExportTargetMutation, ExportTargetUpsertEntry, Ordinal, SourceExecutorReadOptions, }; -use crate::utils::db::WriteAction; -use crate::utils::fingerprint::{Fingerprint, Fingerprinter}; +use utils::db::WriteAction; +use utils::fingerprint::{Fingerprint, Fingerprinter}; pub fn extract_primary_key_for_export( primary_key_def: &AnalyzedPrimaryKeyDef, @@ -947,7 +947,7 @@ mod tests { #[test] fn test_content_hash_computation() { use crate::base::value::{BasicValue, FieldValues, Value}; - use crate::utils::fingerprint::Fingerprinter; + use utils::fingerprint::Fingerprinter; // Test that content hash is computed correctly from source data let source_data1 = FieldValues { @@ -999,7 +999,7 @@ mod tests { // This test documents the exact requirements for GitHub Actions scenario // where file modification times change but content remains the same - use crate::utils::fingerprint::Fingerprinter; + use utils::fingerprint::Fingerprinter; // Simulate file content that remains the same across GitHub Actions checkout let file_content = "const hello = 'world';\nexport default hello;"; diff --git a/rust/cocoindex/src/execution/source_indexer.rs b/rust/cocoindex/src/execution/source_indexer.rs index 65ff5f04..5ea48e65 100644 --- a/rust/cocoindex/src/execution/source_indexer.rs +++ b/rust/cocoindex/src/execution/source_indexer.rs @@ -1,6 +1,5 @@ -use crate::{ - execution::row_indexer::ContentHashBasedCollapsingBaseline, prelude::*, utils::batching, -}; +use crate::{execution::row_indexer::ContentHashBasedCollapsingBaseline, prelude::*}; +use utils::batching; use futures::future::Ready; use sqlx::PgPool; diff --git a/rust/cocoindex/src/lib.rs b/rust/cocoindex/src/lib.rs index 9d916528..ca1c1ecf 100644 --- a/rust/cocoindex/src/lib.rs +++ b/rust/cocoindex/src/lib.rs @@ -10,4 +10,3 @@ mod server; mod service; mod settings; mod setup; -mod utils; diff --git a/rust/cocoindex/src/llm/openai.rs b/rust/cocoindex/src/llm/openai.rs index cf22c54d..67a23be3 100644 --- a/rust/cocoindex/src/llm/openai.rs +++ b/rust/cocoindex/src/llm/openai.rs @@ -5,7 +5,6 @@ use super::{LlmEmbeddingClient, LlmGenerationClient, detect_image_mime_type}; use async_openai::{ Client as OpenAIClient, config::OpenAIConfig, - error::OpenAIError, types::{ ChatCompletionRequestMessage, ChatCompletionRequestMessageContentPartImage, ChatCompletionRequestMessageContentPartText, ChatCompletionRequestSystemMessage, @@ -61,15 +60,6 @@ impl Client { } } -impl utils::retryable::IsRetryable for OpenAIError { - fn is_retryable(&self) -> bool { - match self { - OpenAIError::Reqwest(e) => e.is_retryable(), - _ => false, - } - } -} - fn create_llm_generation_request( request: &super::LlmGenerateRequest, ) -> Result { diff --git a/rust/cocoindex/src/ops/factory_bases.rs b/rust/cocoindex/src/ops/factory_bases.rs index 9be8ebab..491acd9b 100644 --- a/rust/cocoindex/src/ops/factory_bases.rs +++ b/rust/cocoindex/src/ops/factory_bases.rs @@ -385,8 +385,10 @@ pub trait BatchedFunctionExecutor: Send + Sync + Sized + 'static { fn batching_options(&self) -> batching::BatchingOptions; } +struct BatchedFunctionExecutorRunner(E); + #[async_trait] -impl batching::Runner for E { +impl batching::Runner for BatchedFunctionExecutorRunner { type Input = Vec; type Output = value::Value; @@ -394,12 +396,12 @@ impl batching::Runner for E { &self, inputs: Vec, ) -> Result> { - Ok(self.evaluate_batch(inputs).await?.into_iter()) + Ok(self.0.evaluate_batch(inputs).await?.into_iter()) } } struct BatchedFunctionExecutorWrapper { - batcher: batching::Batcher, + batcher: batching::Batcher>, enable_cache: bool, behavior_version: Option, } @@ -407,10 +409,15 @@ struct BatchedFunctionExecutorWrapper { impl BatchedFunctionExecutorWrapper { fn new(executor: E) -> Self { let batching_options = executor.batching_options(); + let enable_cache = executor.enable_cache(); + let behavior_version = executor.behavior_version(); Self { - enable_cache: executor.enable_cache(), - behavior_version: executor.behavior_version(), - batcher: batching::Batcher::new(executor, batching_options), + enable_cache, + behavior_version, + batcher: batching::Batcher::new( + BatchedFunctionExecutorRunner(executor), + batching_options, + ), } } } diff --git a/rust/cocoindex/src/ops/targets/neo4j.rs b/rust/cocoindex/src/ops/targets/neo4j.rs index b02993a1..ce992a81 100644 --- a/rust/cocoindex/src/ops/targets/neo4j.rs +++ b/rust/cocoindex/src/ops/targets/neo4j.rs @@ -51,16 +51,6 @@ impl GraphKey { } } -impl retryable::IsRetryable for neo4rs::Error { - fn is_retryable(&self) -> bool { - match self { - neo4rs::Error::ConnectionError => true, - neo4rs::Error::Neo4j(e) => e.kind() == neo4rs::Neo4jErrorKind::Transient, - _ => false, - } - } -} - #[derive(Default)] pub struct GraphPool { graphs: Mutex>>>>, diff --git a/rust/cocoindex/src/prelude.rs b/rust/cocoindex/src/prelude.rs index 70864813..f8354a98 100644 --- a/rust/cocoindex/src/prelude.rs +++ b/rust/cocoindex/src/prelude.rs @@ -26,8 +26,9 @@ pub(crate) use crate::ops::interface; pub(crate) use crate::service::error::{ApiError, invariance_violation}; pub(crate) use crate::setup; pub(crate) use crate::setup::AuthRegistry; -pub(crate) use crate::utils::{self, batching, concur_control, http, retryable}; pub(crate) use crate::{api_bail, api_error}; +pub(crate) use cocoindex_utils as utils; +pub(crate) use cocoindex_utils::{batching, concur_control, http, retryable}; pub(crate) use anyhow::{anyhow, bail}; pub(crate) use async_stream::{stream, try_stream}; diff --git a/rust/cocoindex/src/service/error.rs b/rust/cocoindex/src/service/error.rs index 420a643e..d397830d 100644 --- a/rust/cocoindex/src/service/error.rs +++ b/rust/cocoindex/src/service/error.rs @@ -11,6 +11,11 @@ use std::{ fmt::{Debug, Display}, }; +// Re-export error types from cocoindex-utils +pub use cocoindex_utils::error::{ + SharedError, SharedResultExt, SharedResultExtRef, invariance_violation, shared_ok, +}; + #[derive(Debug)] pub struct ApiError { pub err: anyhow::Error, @@ -71,136 +76,6 @@ impl From for PyErr { } } -pub struct ResidualErrorData { - message: String, - debug: String, -} - -#[derive(Clone)] -pub struct ResidualError(Arc); - -impl ResidualError { - pub fn new(err: &Err) -> Self { - Self(Arc::new(ResidualErrorData { - message: err.to_string(), - debug: err.to_string(), - })) - } -} - -impl Display for ResidualError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}", self.0.message) - } -} - -impl Debug for ResidualError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}", self.0.debug) - } -} - -impl Error for ResidualError {} - -enum SharedErrorState { - Anyhow(anyhow::Error), - ResidualErrorMessage(ResidualError), -} - -/// SharedError allows to be cloned. -/// The original `anyhow::Error` can be extracted once, and later it decays to `ResidualError` which preserves the message and debug information. -#[derive(Clone)] -pub struct SharedError(Arc>); - -impl SharedError { - pub fn new(err: anyhow::Error) -> Self { - Self(Arc::new(Mutex::new(SharedErrorState::Anyhow(err)))) - } - - fn extract_anyhow_error(&self) -> anyhow::Error { - let mut state = self.0.lock().unwrap(); - let mut_state = &mut *state; - - let residual_err = match mut_state { - SharedErrorState::ResidualErrorMessage(err) => { - return anyhow::Error::from(err.clone()); - } - SharedErrorState::Anyhow(err) => ResidualError::new(err), - }; - let orig_state = std::mem::replace( - mut_state, - SharedErrorState::ResidualErrorMessage(residual_err), - ); - let SharedErrorState::Anyhow(err) = orig_state else { - panic!("Expected anyhow error"); - }; - err - } -} -impl Debug for SharedError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - let state = self.0.lock().unwrap(); - match &*state { - SharedErrorState::Anyhow(err) => Debug::fmt(err, f), - SharedErrorState::ResidualErrorMessage(err) => Debug::fmt(err, f), - } - } -} - -impl Display for SharedError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - let state = self.0.lock().unwrap(); - match &*state { - SharedErrorState::Anyhow(err) => Display::fmt(err, f), - SharedErrorState::ResidualErrorMessage(err) => Display::fmt(err, f), - } - } -} - -impl From for SharedError { - fn from(err: E) -> Self { - Self(Arc::new(Mutex::new(SharedErrorState::Anyhow( - anyhow::Error::from(err), - )))) - } -} - -pub fn shared_ok(value: T) -> Result { - Ok(value) -} - -pub type SharedResult = Result; - -pub trait SharedResultExt { - fn anyhow_result(self) -> Result; -} - -impl SharedResultExt for Result { - fn anyhow_result(self) -> Result { - match self { - Ok(value) => Ok(value), - Err(err) => Err(err.extract_anyhow_error()), - } - } -} - -pub trait SharedResultExtRef<'a, T> { - fn anyhow_result(self) -> Result<&'a T, anyhow::Error>; -} - -impl<'a, T> SharedResultExtRef<'a, T> for &'a Result { - fn anyhow_result(self) -> Result<&'a T, anyhow::Error> { - match self { - Ok(value) => Ok(value), - Err(err) => Err(err.extract_anyhow_error()), - } - } -} - -pub fn invariance_violation() -> anyhow::Error { - anyhow::anyhow!("Invariance violation") -} - #[macro_export] macro_rules! api_bail { ( $fmt:literal $(, $($arg:tt)*)?) => { diff --git a/rust/cocoindex/src/setup/db_metadata.rs b/rust/cocoindex/src/setup/db_metadata.rs index ed62d589..88e7bdbb 100644 --- a/rust/cocoindex/src/setup/db_metadata.rs +++ b/rust/cocoindex/src/setup/db_metadata.rs @@ -1,9 +1,9 @@ use crate::prelude::*; use super::{ResourceSetupChange, ResourceSetupInfo, SetupChangeType, StateChange}; -use crate::utils::db::WriteAction; use axum::http::StatusCode; use sqlx::PgPool; +use utils::db::WriteAction; const SETUP_METADATA_TABLE_NAME: &str = "cocoindex_setup_metadata"; pub const FLOW_VERSION_RESOURCE_TYPE: &str = "__FlowVersion"; diff --git a/rust/utils/Cargo.toml b/rust/utils/Cargo.toml new file mode 100644 index 00000000..2257e32b --- /dev/null +++ b/rust/utils/Cargo.toml @@ -0,0 +1,48 @@ +[package] +name = "cocoindex-utils" +version = { workspace = true } +edition = { workspace = true } +rust-version = { workspace = true } +license = { workspace = true } + +[dependencies] +# Core dependencies +anyhow = { workspace = true } +async-trait = { workspace = true } +log = { workspace = true } + +# Serialization +serde = { workspace = true } +serde_json = { workspace = true } +base64 = { workspace = true } + +# Async runtime +tokio = { workspace = true } +tokio-util = { workspace = true } +futures = { workspace = true } + +# Hashing +blake2 = { workspace = true } + +# HTTP client and related +reqwest = { workspace = true } +async-openai = { workspace = true } +neo4rs = { workspace = true } + +# Database +sqlx = { workspace = true } + +# Text encoding +encoding_rs = { workspace = true } + +# Utilities +regex = { workspace = true } +itertools = { workspace = true } +indexmap = { workspace = true } +hex = { workspace = true } +serde_path_to_error = { workspace = true } +rand = { workspace = true } + +# YAML serialization +yaml-rust2 = { workspace = true } +indenter = { workspace = true } diff --git a/rust/cocoindex/src/utils/batching.rs b/rust/utils/src/batching.rs similarity index 99% rename from rust/cocoindex/src/utils/batching.rs rename to rust/utils/src/batching.rs index 6b9eace4..d3608a7b 100644 --- a/rust/cocoindex/src/utils/batching.rs +++ b/rust/utils/src/batching.rs @@ -1,4 +1,9 @@ -use crate::{prelude::*, service::error::ResidualError}; +use crate::error::ResidualError; +use anyhow::{Result, anyhow, bail}; +use async_trait::async_trait; +use log::error; +use serde::{Deserialize, Serialize}; +use std::sync::{Arc, Mutex}; use tokio::sync::{oneshot, watch}; use tokio_util::task::AbortOnDropHandle; diff --git a/rust/cocoindex/src/utils/bytes_decode.rs b/rust/utils/src/bytes_decode.rs similarity index 100% rename from rust/cocoindex/src/utils/bytes_decode.rs rename to rust/utils/src/bytes_decode.rs diff --git a/rust/cocoindex/src/utils/concur_control.rs b/rust/utils/src/concur_control.rs similarity index 99% rename from rust/cocoindex/src/utils/concur_control.rs rename to rust/utils/src/concur_control.rs index 88988864..4fa0fe8e 100644 --- a/rust/cocoindex/src/utils/concur_control.rs +++ b/rust/utils/src/concur_control.rs @@ -1,5 +1,4 @@ -use crate::prelude::*; - +use std::sync::Arc; use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore}; struct WeightedSemaphore { diff --git a/rust/cocoindex/src/utils/db.rs b/rust/utils/src/db.rs similarity index 100% rename from rust/cocoindex/src/utils/db.rs rename to rust/utils/src/db.rs diff --git a/rust/cocoindex/src/utils/deser.rs b/rust/utils/src/deser.rs similarity index 100% rename from rust/cocoindex/src/utils/deser.rs rename to rust/utils/src/deser.rs diff --git a/rust/utils/src/error.rs b/rust/utils/src/error.rs new file mode 100644 index 00000000..853c7d65 --- /dev/null +++ b/rust/utils/src/error.rs @@ -0,0 +1,136 @@ +use anyhow; +use std::{ + error::Error, + fmt::{Debug, Display}, + sync::{Arc, Mutex}, +}; + +pub struct ResidualErrorData { + message: String, + debug: String, +} + +#[derive(Clone)] +pub struct ResidualError(Arc); + +impl ResidualError { + pub fn new(err: &Err) -> Self { + Self(Arc::new(ResidualErrorData { + message: err.to_string(), + debug: err.to_string(), + })) + } +} + +impl Display for ResidualError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.0.message) + } +} + +impl Debug for ResidualError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.0.debug) + } +} + +impl Error for ResidualError {} + +enum SharedErrorState { + Anyhow(anyhow::Error), + ResidualErrorMessage(ResidualError), +} + +/// SharedError allows to be cloned. +/// The original `anyhow::Error` can be extracted once, and later it decays to `ResidualError` which preserves the message and debug information. +#[derive(Clone)] +pub struct SharedError(Arc>); + +impl SharedError { + pub fn new(err: anyhow::Error) -> Self { + Self(Arc::new(Mutex::new(SharedErrorState::Anyhow(err)))) + } + + fn extract_anyhow_error(&self) -> anyhow::Error { + let mut state = self.0.lock().unwrap(); + let mut_state = &mut *state; + + let residual_err = match mut_state { + SharedErrorState::ResidualErrorMessage(err) => { + return anyhow::Error::from(err.clone()); + } + SharedErrorState::Anyhow(err) => ResidualError::new(err), + }; + let orig_state = std::mem::replace( + mut_state, + SharedErrorState::ResidualErrorMessage(residual_err), + ); + let SharedErrorState::Anyhow(err) = orig_state else { + panic!("Expected anyhow error"); + }; + err + } +} +impl Debug for SharedError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let state = self.0.lock().unwrap(); + match &*state { + SharedErrorState::Anyhow(err) => Debug::fmt(err, f), + SharedErrorState::ResidualErrorMessage(err) => Debug::fmt(err, f), + } + } +} + +impl Display for SharedError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let state = self.0.lock().unwrap(); + match &*state { + SharedErrorState::Anyhow(err) => Display::fmt(err, f), + SharedErrorState::ResidualErrorMessage(err) => Display::fmt(err, f), + } + } +} + +impl From for SharedError { + fn from(err: E) -> Self { + Self(Arc::new(Mutex::new(SharedErrorState::Anyhow( + anyhow::Error::from(err), + )))) + } +} + +pub fn shared_ok(value: T) -> Result { + Ok(value) +} + +pub type SharedResult = Result; + +pub trait SharedResultExt { + fn anyhow_result(self) -> Result; +} + +impl SharedResultExt for Result { + fn anyhow_result(self) -> Result { + match self { + Ok(value) => Ok(value), + Err(err) => Err(err.extract_anyhow_error()), + } + } +} + +pub trait SharedResultExtRef<'a, T> { + fn anyhow_result(self) -> Result<&'a T, anyhow::Error>; +} + +impl<'a, T> SharedResultExtRef<'a, T> for &'a Result { + fn anyhow_result(self) -> Result<&'a T, anyhow::Error> { + match self { + Ok(value) => Ok(value), + Err(err) => Err(err.extract_anyhow_error()), + } + } +} + +pub fn invariance_violation() -> anyhow::Error { + anyhow::anyhow!("Invariance violation") +} diff --git a/rust/cocoindex/src/utils/fingerprint.rs b/rust/utils/src/fingerprint.rs similarity index 100% rename from rust/cocoindex/src/utils/fingerprint.rs rename to rust/utils/src/fingerprint.rs diff --git a/rust/cocoindex/src/utils/http.rs b/rust/utils/src/http.rs similarity index 94% rename from rust/cocoindex/src/utils/http.rs rename to rust/utils/src/http.rs index a4cbc2e3..5096b7a2 100644 --- a/rust/cocoindex/src/utils/http.rs +++ b/rust/utils/src/http.rs @@ -1,4 +1,4 @@ -use crate::utils::retryable::{self, IsRetryable}; +use crate::retryable::{self, IsRetryable}; pub async fn request( req_builder: impl Fn() -> reqwest::RequestBuilder, diff --git a/rust/cocoindex/src/utils/immutable.rs b/rust/utils/src/immutable.rs similarity index 100% rename from rust/cocoindex/src/utils/immutable.rs rename to rust/utils/src/immutable.rs diff --git a/rust/cocoindex/src/utils/mod.rs b/rust/utils/src/lib.rs similarity index 93% rename from rust/cocoindex/src/utils/mod.rs rename to rust/utils/src/lib.rs index 9c447385..81656ec8 100644 --- a/rust/cocoindex/src/utils/mod.rs +++ b/rust/utils/src/lib.rs @@ -3,6 +3,7 @@ pub mod bytes_decode; pub mod concur_control; pub mod db; pub mod deser; +pub mod error; pub mod fingerprint; pub mod http; pub mod immutable; diff --git a/rust/cocoindex/src/utils/retryable.rs b/rust/utils/src/retryable.rs similarity index 86% rename from rust/cocoindex/src/utils/retryable.rs rename to rust/utils/src/retryable.rs index 3711bf1d..622f29bc 100644 --- a/rust/cocoindex/src/utils/retryable.rs +++ b/rust/utils/src/retryable.rs @@ -39,6 +39,27 @@ impl IsRetryable for reqwest::Error { } } +// OpenAI errors - retryable if the underlying reqwest error is retryable +impl IsRetryable for async_openai::error::OpenAIError { + fn is_retryable(&self) -> bool { + match self { + async_openai::error::OpenAIError::Reqwest(e) => e.is_retryable(), + _ => false, + } + } +} + +// Neo4j errors - retryable on connection errors and transient errors +impl IsRetryable for neo4rs::Error { + fn is_retryable(&self) -> bool { + match self { + neo4rs::Error::ConnectionError => true, + neo4rs::Error::Neo4j(e) => e.kind() == neo4rs::Neo4jErrorKind::Transient, + _ => false, + } + } +} + impl Error { pub fn retryable>(error: E) -> Self { Self { diff --git a/rust/cocoindex/src/utils/str_sanitize.rs b/rust/utils/src/str_sanitize.rs similarity index 100% rename from rust/cocoindex/src/utils/str_sanitize.rs rename to rust/utils/src/str_sanitize.rs diff --git a/rust/cocoindex/src/utils/yaml_ser.rs b/rust/utils/src/yaml_ser.rs similarity index 100% rename from rust/cocoindex/src/utils/yaml_ser.rs rename to rust/utils/src/yaml_ser.rs