diff --git a/src/base/duration.rs b/src/base/duration.rs index e4e1734f9..61e8d8338 100644 --- a/src/base/duration.rs +++ b/src/base/duration.rs @@ -18,7 +18,7 @@ fn parse_components( // Parse digits and optional decimal point while let Some(&c) = iter.peek() { - if c.is_digit(10) || (c == '.' && !has_decimal) { + if c.is_ascii_digit() || (c == '.' && !has_decimal) { if c == '.' { has_decimal = true; } @@ -53,8 +53,8 @@ fn parse_components( /// Parses an ISO 8601 duration string into a `chrono::Duration`. fn parse_iso8601_duration(s: &str, original_input: &str) -> Result { - let (is_negative, s_after_sign) = if s.starts_with('-') { - (true, &s[1..]) + let (is_negative, s_after_sign) = if let Some(stripped) = s.strip_prefix('-') { + (true, stripped) } else { (false, s) }; @@ -193,28 +193,21 @@ mod tests { fn check_ok(res: Result, expected: Duration, input_str: &str) { match res { - Ok(duration) => assert_eq!(duration, expected, "Input: '{}'", input_str), - Err(e) => panic!( - "Input: '{}', expected Ok({:?}), but got Err: {}", - input_str, expected, e - ), + Ok(duration) => assert_eq!(duration, expected, "Input: '{input_str}'"), + Err(e) => panic!("Input: '{input_str}', expected Ok({expected:?}), but got Err: {e}"), } } fn check_err_contains(res: Result, expected_substring: &str, input_str: &str) { match res { Ok(d) => panic!( - "Input: '{}', expected error containing '{}', but got Ok({:?})", - input_str, expected_substring, d + "Input: '{input_str}', expected error containing '{expected_substring}', but got Ok({d:?})" ), Err(e) => { let err_msg = e.to_string(); assert!( err_msg.contains(expected_substring), - "Input: '{}', error message '{}' does not contain expected substring '{}'", - input_str, - err_msg, - expected_substring + "Input: '{input_str}', error message '{err_msg}' does not contain expected substring '{expected_substring}'" ); } } diff --git a/src/base/schema.rs b/src/base/schema.rs index c85a3d4c4..b51597761 100644 --- a/src/base/schema.rs +++ b/src/base/schema.rs @@ -86,7 +86,7 @@ impl std::fmt::Display for BasicValueType { BasicValueType::Vector(s) => { write!(f, "Vector[{}", s.element_type)?; if let Some(dimension) = s.dimension { - write!(f, ", {}", dimension)?; + write!(f, ", {dimension}")?; } write!(f, "]") } @@ -97,7 +97,7 @@ impl std::fmt::Display for BasicValueType { // Add type delimiter write!(f, " | ")?; } - write!(f, "{}", typ)?; + write!(f, "{typ}")?; } write!(f, "]") } @@ -129,13 +129,14 @@ impl std::fmt::Display for StructSchema { if i > 0 { write!(f, ", ")?; } - write!(f, "{}", field)?; + write!(f, "{field}")?; } write!(f, ")") } } #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[allow(clippy::enum_variant_names)] pub enum TableKind { /// An table with unordered rows, without key. UTable, @@ -307,9 +308,9 @@ impl std::fmt::Display for EnrichedValueType { impl std::fmt::Display for ValueType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - ValueType::Basic(b) => write!(f, "{}", b), - ValueType::Struct(s) => write!(f, "{}", s), - ValueType::Table(c) => write!(f, "{}", c), + ValueType::Basic(b) => write!(f, "{b}"), + ValueType::Struct(s) => write!(f, "{s}"), + ValueType::Table(c) => write!(f, "{c}"), } } } @@ -371,7 +372,7 @@ impl std::fmt::Display for CollectorSchema { if i > 0 { write!(f, ", ")?; } - write!(f, "{}", field)?; + write!(f, "{field}")?; } write!(f, ")") } diff --git a/src/base/spec.rs b/src/base/spec.rs index 5eac22a07..a128c9885 100644 --- a/src/base/spec.rs +++ b/src/base/spec.rs @@ -67,7 +67,7 @@ pub struct OpArgName(pub Option); impl fmt::Display for OpArgName { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { if let Some(arg_name) = &self.0 { - write!(f, "${}", arg_name) + write!(f, "${arg_name}") } else { write!(f, "?") } @@ -113,7 +113,7 @@ impl fmt::Display for FieldMapping { if scope.is_empty() { "".to_string() } else { - format!("{}.", scope) + format!("{scope}.") }, self.field_path ) @@ -129,7 +129,7 @@ pub struct ConstantMapping { impl fmt::Display for ConstantMapping { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let value = serde_json::to_string(&self.value).unwrap_or("#serde_error".to_string()); - write!(f, "{}", value) + write!(f, "{value}") } } @@ -152,7 +152,7 @@ impl fmt::Display for StructMapping { .map(|field| field.name.clone()) .collect::>() .join(","); - write!(f, "{}", fields) + write!(f, "{fields}") } } @@ -280,9 +280,9 @@ impl fmt::Display for SourceRefreshOptions { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let refresh = self .refresh_interval - .map(|d| format!("{:?}", d)) + .map(|d| format!("{d:?}")) .unwrap_or("none".to_string()); - write!(f, "{}", refresh) + write!(f, "{refresh}") } } @@ -327,8 +327,8 @@ impl SpecFormatter for TransformOpSpec { .join(","); let op_str = self.op.format(mode); match mode { - OutputMode::Concise => format!("op={}, inputs={}", op_str, inputs), - OutputMode::Verbose => format!("op={}, inputs=[{}]", op_str, inputs), + OutputMode::Concise => format!("op={op_str}, inputs={inputs}"), + OutputMode::Verbose => format!("op={op_str}, inputs=[{inputs}]"), } } } @@ -453,7 +453,7 @@ impl fmt::Display for IndexOptions { .map(|v| v.to_string()) .collect::>() .join(","); - write!(f, "keys={}, indexes={}", primary_keys, vector_indexes) + write!(f, "keys={primary_keys}, indexes={vector_indexes}") } } @@ -494,7 +494,7 @@ impl SpecFormatter for ReactiveOpSpec { match self { ReactiveOpSpec::Transform(t) => format!("Transform: {}", t.format(mode)), ReactiveOpSpec::ForEach(fe) => match mode { - OutputMode::Concise => format!("{}", fe.get_label()), + OutputMode::Concise => fe.get_label().to_string(), OutputMode::Verbose => format!("ForEach: {}", fe.format(mode)), }, ReactiveOpSpec::Collect(c) => format!("Collect: {}", c.format(mode)), diff --git a/src/base/value.rs b/src/base/value.rs index f594b1171..ba4987c0a 100644 --- a/src/base/value.rs +++ b/src/base/value.rs @@ -164,11 +164,11 @@ impl std::fmt::Display for KeyValue { match self { KeyValue::Bytes(v) => write!(f, "{}", BASE64_STANDARD.encode(v)), KeyValue::Str(v) => write!(f, "\"{}\"", v.escape_default()), - KeyValue::Bool(v) => write!(f, "{}", v), - KeyValue::Int64(v) => write!(f, "{}", v), + KeyValue::Bool(v) => write!(f, "{v}"), + KeyValue::Int64(v) => write!(f, "{v}"), KeyValue::Range(v) => write!(f, "[{}, {})", v.start, v.end), - KeyValue::Uuid(v) => write!(f, "{}", v), - KeyValue::Date(v) => write!(f, "{}", v), + KeyValue::Uuid(v) => write!(f, "{v}"), + KeyValue::Date(v) => write!(f, "{v}"), KeyValue::Struct(v) => { write!( f, @@ -191,7 +191,7 @@ impl KeyValue { let field_values: FieldValues = FieldValues::from_json(value, fields_schema)?; Value::Struct(field_values) }; - Ok(value.as_key()?) + value.as_key() } pub fn from_values<'a>(values: impl ExactSizeIterator) -> Result { @@ -226,10 +226,10 @@ impl KeyValue { .next() .ok_or_else(|| api_error!("Key parts less than expected"))?; match basic_type { - BasicValueType::Bytes { .. } => { + BasicValueType::Bytes => { KeyValue::Bytes(Bytes::from(BASE64_STANDARD.decode(v)?)) } - BasicValueType::Str { .. } => KeyValue::Str(Arc::from(v)), + BasicValueType::Str => KeyValue::Str(Arc::from(v)), BasicValueType::Bool => KeyValue::Bool(v.parse()?), BasicValueType::Int64 => KeyValue::Int64(v.parse()?), BasicValueType::Range => { @@ -1030,12 +1030,10 @@ impl serde::Serialize for BasicValue { impl BasicValue { pub fn from_json(value: serde_json::Value, schema: &BasicValueType) -> Result { let result = match (value, schema) { - (serde_json::Value::String(v), BasicValueType::Bytes { .. }) => { + (serde_json::Value::String(v), BasicValueType::Bytes) => { BasicValue::Bytes(Bytes::from(BASE64_STANDARD.decode(v)?)) } - (serde_json::Value::String(v), BasicValueType::Str { .. }) => { - BasicValue::Str(Arc::from(v)) - } + (serde_json::Value::String(v), BasicValueType::Str) => BasicValue::Str(Arc::from(v)), (serde_json::Value::Bool(v), BasicValueType::Bool) => BasicValue::Bool(v), (serde_json::Value::Number(v), BasicValueType::Int64) => BasicValue::Int64( v.as_i64() diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index 470cd7297..0c57279f3 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -695,9 +695,9 @@ impl AnalyzerContext { .get_concur_control_options(); let global_concurrency_controller = self.lib_ctx.global_concurrency_controller.clone(); let result_fut = async move { - trace!("Start building executor for source op `{}`", op_name); + trace!("Start building executor for source op `{op_name}`"); let executor = executor.await?; - trace!("Finished building executor for source op `{}`", op_name); + trace!("Finished building executor for source op `{op_name}`"); Ok(AnalyzedImportOp { executor, output, @@ -840,6 +840,7 @@ impl AnalyzerContext { Ok(result_fut) } + #[allow(clippy::too_many_arguments)] async fn analyze_export_op_group( &self, target_kind: &str, @@ -847,7 +848,7 @@ impl AnalyzerContext { flow_inst: &FlowInstanceSpec, export_op_group: &AnalyzedExportTargetOpGroup, declarations: Vec, - targets_analyzed_ss: &mut Vec>, + targets_analyzed_ss: &mut [Option], declarations_analyzed_ss: &mut Vec, ) -> Result> + Send + use<>>> { let mut collection_specs = Vec::::new(); @@ -875,7 +876,7 @@ impl AnalyzerContext { let key_fields_schema = pk_fields_idx .iter() - .map(|idx| collector_schema.fields[*idx as usize].clone()) + .map(|idx| collector_schema.fields[*idx].clone()) .collect::>(); let primary_key_type = if pk_fields_idx.len() == 1 { key_fields_schema[0].value_type.typ.clone() diff --git a/src/builder/exec_ctx.rs b/src/builder/exec_ctx.rs index 719a0eec6..41065760e 100644 --- a/src/builder/exec_ctx.rs +++ b/src/builder/exec_ctx.rs @@ -239,7 +239,7 @@ pub fn build_flow_setup_execution_context( .ok_or_else(invariance_violation)?; build_import_op_exec_ctx( &import_op.name, - &output_type, + output_type, source_states_by_name.get(&import_op.name.as_str()), &mut setup_state.metadata, ) diff --git a/src/builder/flow_builder.rs b/src/builder/flow_builder.rs index de1c03194..e9a8ed15a 100644 --- a/src/builder/flow_builder.rs +++ b/src/builder/flow_builder.rs @@ -46,7 +46,7 @@ impl std::fmt::Display for OpScopeRef { #[pymethods] impl OpScopeRef { pub fn __str__(&self) -> String { - format!("{}", self) + format!("{self}") } pub fn __repr__(&self) -> String { @@ -105,7 +105,7 @@ impl DataSlice { } pub fn __str__(&self) -> String { - format!("{}", self) + format!("{self}") } pub fn __repr__(&self) -> String { @@ -142,7 +142,7 @@ impl DataSlice { .iter() .find(|f| f.name == field_name) .map(|f| f.spec.clone()) - .ok_or_else(|| PyException::new_err(format!("field {} not found", field_name)))?, + .ok_or_else(|| PyException::new_err(format!("field {field_name} not found")))?, spec::ValueMapping::Constant { .. } => { return Err(PyException::new_err( @@ -191,7 +191,7 @@ pub struct DataCollector { #[pymethods] impl DataCollector { fn __str__(&self) -> String { - format!("{}", self) + format!("{self}") } fn __repr__(&self) -> String { @@ -271,6 +271,7 @@ impl FlowBuilder { } #[pyo3(signature = (kind, op_spec, target_scope, name, refresh_options=None, execution_options=None))] + #[allow(clippy::too_many_arguments)] pub fn add_source( &mut self, py: Python<'_>, @@ -327,7 +328,7 @@ impl FlowBuilder { let schema = value_type.into_inner(); let value = py::value_from_py_object(&schema.typ, &value)?; let slice = DataSlice { - scope: self.root_op_scope.clone().into(), + scope: self.root_op_scope.clone(), value: Arc::new(spec::ValueMapping::Constant(spec::ConstantMapping { schema: schema.clone(), value: serde_json::to_value(value).into_py_result()?, @@ -571,7 +572,7 @@ impl FlowBuilder { let (_, field_schema) = scope_builder .data .find_field(field_name) - .ok_or_else(|| PyException::new_err(format!("field {} not found", field_name)))?; + .ok_or_else(|| PyException::new_err(format!("field {field_name} not found")))?; schema::EnrichedValueType::from_alternative(&field_schema.value_type) .into_py_result()? }; @@ -671,7 +672,7 @@ impl FlowBuilder { } pub fn __str__(&self) -> String { - format!("{}", self) + format!("{self}") } pub fn __repr__(&self) -> String { @@ -713,7 +714,7 @@ impl std::fmt::Display for FlowBuilder { )?; } if let Some(output) = &self.direct_output_value { - write!(f, "Direct output: {}\n\n", output)?; + write!(f, "Direct output: {output}\n\n")?; } Ok(()) } diff --git a/src/execution/db_tracking.rs b/src/execution/db_tracking.rs index b7424a125..5e11f0182 100644 --- a/src/execution/db_tracking.rs +++ b/src/execution/db_tracking.rs @@ -136,6 +136,7 @@ pub async fn read_source_tracking_info_for_precommit( Ok(precommit_tracking_info) } +#[allow(clippy::too_many_arguments)] pub async fn precommit_source_tracking_info( source_id: i32, source_key_json: &serde_json::Value, @@ -191,6 +192,7 @@ pub async fn read_source_tracking_info_for_commit( Ok(commit_tracking_info) } +#[allow(clippy::too_many_arguments)] pub async fn commit_source_tracking_info( source_id: i32, source_key_json: &serde_json::Value, diff --git a/src/execution/db_tracking_setup.rs b/src/execution/db_tracking_setup.rs index 8765bf919..a4909f452 100644 --- a/src/execution/db_tracking_setup.rs +++ b/src/execution/db_tracking_setup.rs @@ -181,7 +181,7 @@ impl TrackingTableSetupStatus { } } else { for lagacy_name in self.legacy_table_names.iter() { - let query = format!("DROP TABLE IF EXISTS {}", lagacy_name); + let query = format!("DROP TABLE IF EXISTS {lagacy_name}"); sqlx::query(&query).execute(pool).await?; } return Ok(()); diff --git a/src/execution/dumper.rs b/src/execution/dumper.rs index 52cfcd127..8079af45a 100644 --- a/src/execution/dumper.rs +++ b/src/execution/dumper.rs @@ -207,7 +207,7 @@ impl<'a> Dumper<'a> { let num_keys = keys.len(); keys.into_iter().enumerate().map(move |(i, key)| { let extra_id = if num_keys > 1 { - Cow::Owned(format!(".{}", i)) + Cow::Owned(format!(".{i}")) } else { Cow::Borrowed("") }; diff --git a/src/execution/indexing_status.rs b/src/execution/indexing_status.rs index 5bc5da9bd..2fd42efd1 100644 --- a/src/execution/indexing_status.rs +++ b/src/execution/indexing_status.rs @@ -35,7 +35,7 @@ pub async fn get_source_row_indexing_status( pool, ); let current_fut = src_eval_ctx.import_op.executor.get_value( - &src_eval_ctx.key, + src_eval_ctx.key, &interface::SourceExecutorGetOptions { include_value: false, include_ordinal: true, @@ -47,10 +47,9 @@ pub async fn get_source_row_indexing_status( source_ordinal: interface::Ordinal(l.processed_source_ordinal), processing_time: l .process_time_micros - .map(chrono::DateTime::::from_timestamp_micros) - .flatten(), + .and_then(chrono::DateTime::::from_timestamp_micros), is_logic_current: Some(src_eval_ctx.plan.logic_fingerprint.0.as_slice()) - == l.process_logic_fingerprint.as_ref().map(|b| b.as_slice()), + == l.process_logic_fingerprint.as_deref(), }); let current = SourceRowInfo { ordinal: current diff --git a/src/execution/live_updater.rs b/src/execution/live_updater.rs index 43dc6b369..021789e4d 100644 --- a/src/execution/live_updater.rs +++ b/src/execution/live_updater.rs @@ -238,16 +238,10 @@ impl FlowLiveUpdater { while let Some(result) = self.tasks.join_next().await { match result { Err(e) if !e.is_cancelled() => { - error!( - "A background task in FlowLiveUpdater failed to join: {:?}", - e - ); + error!("A background task in FlowLiveUpdater failed to join: {e:?}"); } Ok(Err(e)) => { - error!( - "Error reported by a source update task during live update: {:?}", - e - ); + error!("Error reported by a source update task during live update: {e:?}"); } _ => {} } diff --git a/src/execution/row_indexer.rs b/src/execution/row_indexer.rs index d947c771b..e8dad9edf 100644 --- a/src/execution/row_indexer.rs +++ b/src/execution/row_indexer.rs @@ -26,7 +26,7 @@ pub fn extract_primary_key( ) -> Result { match primary_key_def { AnalyzedPrimaryKeyDef::Fields(fields) => { - KeyValue::from_values(fields.iter().map(|field| &record.fields[*field as usize])) + KeyValue::from_values(fields.iter().map(|field| &record.fields[*field])) } } } @@ -175,6 +175,7 @@ struct PrecommitOutput { target_mutations: HashMap, } +#[allow(clippy::too_many_arguments)] async fn precommit_source_tracking_info( source_id: i32, source_key_json: &serde_json::Value, @@ -413,6 +414,7 @@ async fn precommit_source_tracking_info( })) } +#[allow(clippy::too_many_arguments)] async fn commit_source_tracking_info( source_id: i32, source_key_json: &serde_json::Value, @@ -500,6 +502,7 @@ async fn commit_source_tracking_info( Ok(()) } +#[allow(clippy::too_many_arguments)] async fn try_content_hash_optimization( source_id: i32, src_eval_ctx: &SourceRowEvaluationContext<'_>, @@ -513,17 +516,17 @@ async fn try_content_hash_optimization( pool: &PgPool, ) -> Result>> { // Check if we can use content hash optimization - if !existing_version + if existing_version .as_ref() - .map_or(false, |v| v.kind == SourceVersionKind::CurrentLogic) + .is_none_or(|v| v.kind != SourceVersionKind::CurrentLogic) { return Ok(None); } - if !tracking_info + if tracking_info .max_process_ordinal .zip(tracking_info.process_ordinal) - .map_or(false, |(max_ord, proc_ord)| max_ord == proc_ord) + .is_none_or(|(max_ord, proc_ord)| max_ord != proc_ord) { return Ok(None); } diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index f8c61ac16..e3fd35372 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -339,7 +339,7 @@ impl SourceIndexingContext { while let Some(result) = join_set.join_next().await { if let Err(e) = result { if !e.is_cancelled() { - error!("{:?}", e); + error!("{e:?}"); } } } @@ -375,7 +375,7 @@ impl SourceIndexingContext { while let Some(result) = join_set.join_next().await { if let Err(e) = result { if !e.is_cancelled() { - error!("{:?}", e); + error!("{e:?}"); } } } diff --git a/src/execution/stats.rs b/src/execution/stats.rs index 64e814a64..e084e2a6e 100644 --- a/src/execution/stats.rs +++ b/src/execution/stats.rs @@ -106,7 +106,7 @@ impl std::fmt::Display for UpdateStats { let num_skipped = self.num_no_change.get(); if num_skipped > 0 { - messages.push(format!("{} source rows NO CHANGE", num_skipped)); + messages.push(format!("{num_skipped} source rows NO CHANGE")); } let num_insertions = self.num_insertions.get(); @@ -150,7 +150,7 @@ pub struct IndexUpdateInfo { impl std::fmt::Display for IndexUpdateInfo { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { for source in self.sources.iter() { - writeln!(f, "{}", source)?; + writeln!(f, "{source}")?; } Ok(()) } diff --git a/src/lib_context.rs b/src/lib_context.rs index 9ce02a82c..11414553b 100644 --- a/src/lib_context.rs +++ b/src/lib_context.rs @@ -78,7 +78,7 @@ impl FlowExecutionContext { source_idx: usize, pool: &PgPool, ) -> Result<&Arc> { - Ok(self.source_indexing_contexts[source_idx] + self.source_indexing_contexts[source_idx] .get_or_try_init(|| async move { anyhow::Ok(Arc::new( SourceIndexingContext::load( @@ -90,7 +90,7 @@ impl FlowExecutionContext { .await?, )) }) - .await?) + .await } } @@ -151,9 +151,12 @@ impl FlowContext { static TOKIO_RUNTIME: LazyLock = LazyLock::new(|| Runtime::new().unwrap()); static AUTH_REGISTRY: LazyLock> = LazyLock::new(|| Arc::new(AuthRegistry::new())); +type PoolKey = (String, Option); +type PoolValue = Arc>; + #[derive(Default)] pub struct DbPools { - pub pools: Mutex), Arc>>>, + pub pools: Mutex>, } impl DbPools { diff --git a/src/llm/anthropic.rs b/src/llm/anthropic.rs index 49d6cdd36..1ce3bc4fe 100644 --- a/src/llm/anthropic.rs +++ b/src/llm/anthropic.rs @@ -6,7 +6,6 @@ use crate::llm::{ ToJsonSchemaOptions, detect_image_mime_type, }; use anyhow::Context; -use json5; use urlencoding::encode; pub struct Client { diff --git a/src/llm/ollama.rs b/src/llm/ollama.rs index 1e62255cb..6b22febb4 100644 --- a/src/llm/ollama.rs +++ b/src/llm/ollama.rs @@ -41,7 +41,7 @@ impl Client { None => OLLAMA_DEFAULT_ADDRESS, }; Ok(Self { - generate_url: format!("{}/api/generate", address), + generate_url: format!("{address}/api/generate"), reqwest_client: reqwest::Client::new(), }) } @@ -56,7 +56,7 @@ impl LlmGenerationClient for Client { let req = OllamaRequest { model: request.model, prompt: request.user_prompt.as_ref(), - images: request.image.as_deref().map(|img| vec![img.as_ref()]), + images: request.image.as_deref().map(|img| vec![img]), format: request.output_format.as_ref().map( |super::OutputFormat::JsonSchema { schema, .. }| { OllamaFormat::JsonSchema(schema.as_ref()) diff --git a/src/llm/openai.rs b/src/llm/openai.rs index 7600962aa..27755f6e4 100644 --- a/src/llm/openai.rs +++ b/src/llm/openai.rs @@ -71,7 +71,7 @@ impl LlmGenerationClient for Client { Some(img_bytes) => { let base64_image = BASE64_STANDARD.encode(img_bytes.as_ref()); let mime_type = detect_image_mime_type(img_bytes.as_ref())?; - let image_url = format!("data:{};base64,{}", mime_type, base64_image); + let image_url = format!("data:{mime_type};base64,{base64_image}"); ChatCompletionRequestUserMessageContent::Array(vec![ ChatCompletionRequestUserMessageContentPart::Text( ChatCompletionRequestMessageContentPartText { diff --git a/src/ops/factory_bases.rs b/src/ops/factory_bases.rs index d20412c9d..b213c3fb6 100644 --- a/src/ops/factory_bases.rs +++ b/src/ops/factory_bases.rs @@ -331,11 +331,11 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static { fn describe_resource(&self, key: &Self::Key) -> Result; - fn extract_additional_key<'ctx>( + fn extract_additional_key( &self, _key: &value::KeyValue, _value: &value::FieldValues, - _export_context: &'ctx Self::ExportContext, + _export_context: &Self::ExportContext, ) -> Result { Ok(serde_json::Value::Null) } @@ -461,11 +461,11 @@ impl ExportTargetFactory for T { Ok(result) } - fn extract_additional_key<'ctx>( + fn extract_additional_key( &self, key: &value::KeyValue, value: &value::FieldValues, - export_context: &'ctx (dyn Any + Send + Sync), + export_context: &(dyn Any + Send + Sync), ) -> Result { StorageFactoryBase::extract_additional_key( self, diff --git a/src/ops/functions/embed_text.rs b/src/ops/functions/embed_text.rs index c60b0f2af..8688bb366 100644 --- a/src/ops/functions/embed_text.rs +++ b/src/ops/functions/embed_text.rs @@ -142,11 +142,11 @@ mod tests { for item in arc_vec.iter() { match item { BasicValue::Float32(_) => {} - _ => panic!("Embedding vector element is not Float32: {:?}", item), + _ => panic!("Embedding vector element is not Float32: {item:?}"), } } } - _ => panic!("Expected Value::Basic(BasicValue::Vector), got {:?}", value), + _ => panic!("Expected Value::Basic(BasicValue::Vector), got {value:?}"), } } } diff --git a/src/ops/functions/extract_by_llm.rs b/src/ops/functions/extract_by_llm.rs index 5a399f946..bbd95811d 100644 --- a/src/ops/functions/extract_by_llm.rs +++ b/src/ops/functions/extract_by_llm.rs @@ -245,7 +245,7 @@ mod tests { } } } - _ => panic!("Expected Value::Struct, got {:?}", value), + _ => panic!("Expected Value::Struct, got {value:?}"), } } } diff --git a/src/ops/functions/parse_json.rs b/src/ops/functions/parse_json.rs index 946f64764..aa883c251 100644 --- a/src/ops/functions/parse_json.rs +++ b/src/ops/functions/parse_json.rs @@ -14,8 +14,8 @@ struct LanguageConfig { parse_fn: ParseFn, } -fn add_language<'a>( - output: &'a mut HashMap, Arc>, +fn add_language( + output: &mut HashMap, Arc>, name: &'static str, aliases: impl IntoIterator, parse_fn: ParseFn, @@ -141,7 +141,7 @@ mod tests { "Parsed JSON value mismatch with specified language" ); } - _ => panic!("Expected Value::Basic(BasicValue::Json), got {:?}", value), + _ => panic!("Expected Value::Basic(BasicValue::Json), got {value:?}"), } } } diff --git a/src/ops/functions/split_recursively.rs b/src/ops/functions/split_recursively.rs index af62b7196..2d5842a90 100644 --- a/src/ops/functions/split_recursively.rs +++ b/src/ops/functions/split_recursively.rs @@ -3,7 +3,6 @@ use log::{error, trace}; use regex::{Matches, Regex}; use std::collections::HashSet; use std::sync::LazyLock; -use std::usize; use std::{collections::HashMap, sync::Arc}; use unicase::UniCase; @@ -389,7 +388,7 @@ fn line_break_level(c: &str) -> LineBreakLevel { while let Some(c) = iter.next() { if c == '\n' || c == '\r' { lb_level = LineBreakLevel::Newline; - while let Some(c2) = iter.next() { + for c2 in iter.by_ref() { if c2 == '\n' || c2 == '\r' { if c == c2 { return LineBreakLevel::DoubleNewline; @@ -568,7 +567,8 @@ impl<'t, 's: 't> RecursiveChunker<'s> { next_regexp_sep_id, } => { if next_regexp_sep_id >= lang_config.separator_regex.len() { - Ok(atom_collector.collect(chunk.range)) + atom_collector.collect(chunk.range); + Ok(()) } else { self.collect_atom_chunks_from_iter( TextChunksIter::new(lang_config, &chunk, next_regexp_sep_id), @@ -619,7 +619,7 @@ impl<'t, 's: 't> RecursiveChunker<'s> { } }; - for (i, chunk) in (&atom_chunks[0..atom_chunks.len() - 1]).iter().enumerate() { + for (i, chunk) in atom_chunks[0..atom_chunks.len() - 1].iter().enumerate() { let mut min_cost = usize::MAX; let mut arg_min_start_idx: usize = 0; let mut arg_min_prev_plan_idx: usize = 0; @@ -1047,14 +1047,12 @@ mod tests { // Assert that the expected text matches the text provided in the chunk. assert_eq!( actual_chunk.text, expected_text, - "Provided chunk text mismatch - {}", - context + "Provided chunk text mismatch - {context}" ); // Assert that the expected text also matches the text extracted using the chunk's range. assert_eq!( extracted_text, expected_text, - "Range inconsistency: extracted text mismatch - {}", - context + "Range inconsistency: extracted text mismatch - {context}" ); } @@ -1118,16 +1116,17 @@ mod tests { let key: KeyValue = range.into(); match table.get(&key) { Some(scope_value_ref) => { - let chunk_text = scope_value_ref.0.fields[0] - .as_str() - .expect(&format!("Chunk text not a string for key {:?}", key)); + let chunk_text = + scope_value_ref.0.fields[0].as_str().unwrap_or_else(|_| { + panic!("Chunk text not a string for key {key:?}") + }); assert_eq!(**chunk_text, *expected_text); } - None => panic!("Expected row value for key {:?}, not found", key), + None => panic!("Expected row value for key {key:?}, not found"), } } } - other => panic!("Expected Value::KTable, got {:?}", other), + other => panic!("Expected Value::KTable, got {other:?}"), } } diff --git a/src/ops/interface.rs b/src/ops/interface.rs index 90acdb346..f06ba52f4 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -287,11 +287,11 @@ pub trait ExportTargetFactory: Send + Sync { fn describe_resource(&self, key: &serde_json::Value) -> Result; - fn extract_additional_key<'ctx>( + fn extract_additional_key( &self, key: &KeyValue, value: &FieldValues, - export_context: &'ctx (dyn Any + Send + Sync), + export_context: &(dyn Any + Send + Sync), ) -> Result; async fn apply_mutation( diff --git a/src/ops/py_factory.rs b/src/ops/py_factory.rs index 8092323fe..cfa0f017a 100644 --- a/src/ops/py_factory.rs +++ b/src/ops/py_factory.rs @@ -11,7 +11,7 @@ use pythonize::pythonize; use crate::{ base::{schema, value}, builder::plan, - py::{self, FromPyResult}, + py::{self, ToResultWithPyTrace}, }; use anyhow::{Result, anyhow}; @@ -83,7 +83,7 @@ impl PyFunctionExecutor { .transpose()? .as_ref(), ) - .from_py_result(py)?; + .to_result_with_py_trace(py)?; Ok(result.into_bound(py)) } } @@ -103,7 +103,7 @@ impl SimpleFunctionExecutor for Arc { })?; let result = result_fut.await; Python::with_gil(|py| -> Result<_> { - let result = result.from_py_result(py)?; + let result = result.to_result_with_py_trace(py)?; Ok(py::value_from_py_object( &self.result_type.typ, &result.into_bound(py), @@ -167,7 +167,7 @@ impl SimpleFunctionFactory for PyFunctionFactory { PyTuple::new(py, args.into_iter())?, Some(&kwargs.into_py_dict(py)?), ) - .from_py_result(py)?; + .to_result_with_py_trace(py)?; let (result_type, executor) = result .extract::<(crate::py::Pythonized, Py)>(py)?; Ok(( @@ -190,7 +190,7 @@ impl SimpleFunctionFactory for PyFunctionFactory { Python::with_gil(|py| -> anyhow::Result<_> { let prepare_coro = executor .call_method(py, "prepare", (), None) - .from_py_result(py)?; + .to_result_with_py_trace(py)?; let prepare_fut = pyo3_async_runtimes::into_future_with_locals( &pyo3_async_runtimes::TaskLocals::new( py_exec_ctx.event_loop.bind(py).clone(), @@ -199,11 +199,11 @@ impl SimpleFunctionFactory for PyFunctionFactory { )?; let enable_cache = executor .call_method(py, "enable_cache", (), None) - .from_py_result(py)? + .to_result_with_py_trace(py)? .extract::(py)?; let behavior_version = executor .call_method(py, "behavior_version", (), None) - .from_py_result(py)? + .to_result_with_py_trace(py)? .extract::>(py)?; Ok((prepare_fut, enable_cache, behavior_version)) })?; diff --git a/src/ops/sources/amazon_s3.rs b/src/ops/sources/amazon_s3.rs index 8f1f1ca47..fdd0a70b1 100644 --- a/src/ops/sources/amazon_s3.rs +++ b/src/ops/sources/amazon_s3.rs @@ -138,7 +138,7 @@ impl SourceExecutor for Executor { .send() .await; let obj = match resp { - Err(e) if e.as_service_error().map_or(false, |e| e.is_no_such_key()) => { + Err(e) if e.as_service_error().is_some_and(|e| e.is_no_such_key()) => { return Ok(PartialSourceRowData { value: Some(SourceValue::NonExistence), ordinal: Some(Ordinal::unavailable()), @@ -174,7 +174,7 @@ impl SourceExecutor for Executor { }; let stream = stream! { loop { - match self.poll_sqs(&sqs_context).await { + match self.poll_sqs(sqs_context).await { Ok(messages) => { for message in messages { yield Ok(message); @@ -251,7 +251,7 @@ impl Executor { if !self .prefix .as_ref() - .map_or(true, |prefix| s3.object.key.starts_with(prefix)) + .is_none_or(|prefix| s3.object.key.starts_with(prefix)) { continue; } diff --git a/src/ops/targets/kuzu.rs b/src/ops/targets/kuzu.rs index 6fc91c52d..931b4778d 100644 --- a/src/ops/targets/kuzu.rs +++ b/src/ops/targets/kuzu.rs @@ -173,9 +173,9 @@ struct SetupState { referenced_node_tables: Option<(ReferencedNodeTable, ReferencedNodeTable)>, } -impl<'a> Into>> for &'a SetupState { - fn into(self) -> Cow<'a, TableColumnsSchema> { - Cow::Borrowed(&self.schema) +impl<'a> From<&'a SetupState> for Cow<'a, TableColumnsSchema> { + fn from(val: &'a SetupState) -> Self { + Cow::Borrowed(&val.schema) } } @@ -204,18 +204,18 @@ fn append_drop_table( if !setup_status.actions.drop_existing { return Ok(()); } - write!( + writeln!( cypher.query_mut(), - "DROP TABLE IF EXISTS {};\n", + "DROP TABLE IF EXISTS {};", elem_type.label() )?; Ok(()) } fn append_delete_orphaned_nodes(cypher: &mut CypherBuilder, node_table: &str) -> Result<()> { - write!( + writeln!( cypher.query_mut(), - "MATCH (n:{node_table}) WITH n WHERE NOT (n)--() DELETE n;\n" + "MATCH (n:{node_table}) WITH n WHERE NOT (n)--() DELETE n;" )?; Ok(()) } @@ -244,7 +244,7 @@ fn append_upsert_table( cypher.query_mut().push_str( keys.iter() .chain(values.iter()) - .map(|(name, kuzu_type)| format!("{} {}", name, kuzu_type)) + .map(|(name, kuzu_type)| format!("{name} {kuzu_type}")) .join(", ") .as_str(), ); @@ -269,15 +269,15 @@ fn append_upsert_table( .iter() .chain(columns_to_upsert.iter().map(|(name, _)| name)) { - write!( + writeln!( cypher.query_mut(), - "ALTER TABLE {table_name} DROP IF EXISTS {name};\n" + "ALTER TABLE {table_name} DROP IF EXISTS {name};" )?; } for (name, kuzu_type) in columns_to_upsert.iter() { - write!( + writeln!( cypher.query_mut(), - "ALTER TABLE {table_name} ADD {name} {kuzu_type};\n", + "ALTER TABLE {table_name} ADD {name} {kuzu_type};", )?; } } @@ -305,7 +305,7 @@ fn append_string_literal(cypher: &mut CypherBuilder, s: &str) -> Result<()> { let code = c as u32; let high = 0xD800 + ((code - 0x10000) >> 10); let low = 0xDC00 + ((code - 0x10000) & 0x3FF); - write!(out, "\\u{:04X}\\u{:04X}", high, low)?; + write!(out, "\\u{high:04X}\\u{low:04X}")?; } } } @@ -318,7 +318,7 @@ fn append_basic_value(cypher: &mut CypherBuilder, basic_value: &BasicValue) -> R BasicValue::Bytes(bytes) => { write!(cypher.query_mut(), "BLOB(")?; for byte in bytes { - write!(cypher.query_mut(), "\\\\x{:02X}", byte)?; + write!(cypher.query_mut(), "\\\\x{byte:02X}")?; } write!(cypher.query_mut(), ")")?; } @@ -326,46 +326,39 @@ fn append_basic_value(cypher: &mut CypherBuilder, basic_value: &BasicValue) -> R append_string_literal(cypher, s)?; } BasicValue::Bool(b) => { - write!(cypher.query_mut(), "{}", b)?; + write!(cypher.query_mut(), "{b}")?; } BasicValue::Int64(i) => { - write!(cypher.query_mut(), "{}", i)?; + write!(cypher.query_mut(), "{i}")?; } BasicValue::Float32(f) => { - write!(cypher.query_mut(), "{}", f)?; + write!(cypher.query_mut(), "{f}")?; } BasicValue::Float64(f) => { - write!(cypher.query_mut(), "{}", f)?; + write!(cypher.query_mut(), "{f}")?; } BasicValue::Range(r) => { write!(cypher.query_mut(), "[{}, {}]", r.start, r.end)?; } BasicValue::Uuid(u) => { - write!(cypher.query_mut(), "UUID(\"{}\")", u)?; + write!(cypher.query_mut(), "UUID(\"{u}\")")?; } BasicValue::Date(d) => { - write!(cypher.query_mut(), "DATE(\"{}\")", d)?; - } - BasicValue::LocalDateTime(dt) => { - write!(cypher.query_mut(), "TIMESTAMP(\"{}\")", dt)?; - } - BasicValue::OffsetDateTime(dt) => { - write!(cypher.query_mut(), "TIMESTAMP(\"{}\")", dt)?; + write!(cypher.query_mut(), "DATE(\"{d}\")")?; } + BasicValue::LocalDateTime(dt) => write!(cypher.query_mut(), "TIMESTAMP(\"{dt}\")")?, + BasicValue::OffsetDateTime(dt) => write!(cypher.query_mut(), "TIMESTAMP(\"{dt}\")")?, BasicValue::TimeDelta(td) => { let num_days = td.num_days(); let sub_day_duration = *td - TimeDelta::days(num_days); write!(cypher.query_mut(), "INTERVAL(\"")?; if num_days != 0 { - write!(cypher.query_mut(), "{} days ", num_days)?; + write!(cypher.query_mut(), "{num_days} days ")?; } - write!( - cypher.query_mut(), - "{} microseconds\")", - sub_day_duration - .num_microseconds() - .ok_or_else(invariance_violation)? - )?; + let microseconds = sub_day_duration + .num_microseconds() + .ok_or_else(invariance_violation)?; + write!(cypher.query_mut(), "{microseconds} microseconds\")")?; } BasicValue::Vector(v) => { write!(cypher.query_mut(), "[")?; @@ -546,10 +539,10 @@ fn append_upsert_node( NODE_VAR_NAME, &data_coll.schema.value_fields, &data_coll.value_fields_input_idx, - &upsert_entry, + upsert_entry, true, )?; - write!(cypher.query_mut(), ";\n")?; + writeln!(cypher.query_mut(), ";")?; Ok(()) } @@ -581,10 +574,10 @@ fn append_merge_node_for_rel( var_name, &field_mapping.schema.value_fields, &field_mapping.fields_input_idx.value, - &upsert_entry, + upsert_entry, false, )?; - write!(cypher.query_mut(), "\n")?; + writeln!(cypher.query_mut())?; Ok(()) } @@ -602,8 +595,8 @@ fn append_upsert_rel( } else { return Ok(()); }; - append_merge_node_for_rel(cypher, SRC_NODE_VAR_NAME, &rel_info.source, &upsert_entry)?; - append_merge_node_for_rel(cypher, TGT_NODE_VAR_NAME, &rel_info.target, &upsert_entry)?; + append_merge_node_for_rel(cypher, SRC_NODE_VAR_NAME, &rel_info.source, upsert_entry)?; + append_merge_node_for_rel(cypher, TGT_NODE_VAR_NAME, &rel_info.target, upsert_entry)?; { let rel_type = data_coll.schema.elem_type.label(); write!( @@ -625,10 +618,10 @@ fn append_upsert_rel( REL_VAR_NAME, &data_coll.schema.value_fields, &data_coll.value_fields_input_idx, - &upsert_entry, + upsert_entry, false, )?; - write!(cypher.query_mut(), ";\n")?; + writeln!(cypher.query_mut(), ";")?; Ok(()) } @@ -646,16 +639,16 @@ fn append_delete_node( key.fields_iter(data_coll.schema.key_fields.len())? .map(|f| Cow::Owned(value::Value::from(f))), )?; - write!(cypher.query_mut(), ")\n")?; - write!( + writeln!(cypher.query_mut(), ")")?; + writeln!( cypher.query_mut(), - "WITH {NODE_VAR_NAME} SET {NODE_VAR_NAME}.{SELF_CONTAINED_TAG_FIELD_NAME} = NULL\n" + "WITH {NODE_VAR_NAME} SET {NODE_VAR_NAME}.{SELF_CONTAINED_TAG_FIELD_NAME} = NULL" )?; - write!( + writeln!( cypher.query_mut(), - "WITH {NODE_VAR_NAME} WHERE NOT ({NODE_VAR_NAME})--() DELETE {NODE_VAR_NAME}\n" + "WITH {NODE_VAR_NAME} WHERE NOT ({NODE_VAR_NAME})--() DELETE {NODE_VAR_NAME}" )?; - write!(cypher.query_mut(), ";\n")?; + writeln!(cypher.query_mut(), ";")?; Ok(()) } @@ -708,7 +701,7 @@ fn append_delete_rel( .map(|k| Cow::Owned(value::Value::from(k))), )?; write!(cypher.query_mut(), ") DELETE {REL_VAR_NAME}")?; - write!(cypher.query_mut(), ";\n")?; + writeln!(cypher.query_mut(), ";")?; Ok(()) } @@ -726,12 +719,12 @@ fn append_maybe_gc_node( key.fields_iter(schema.key_fields.len())? .map(|f| Cow::Owned(value::Value::from(f))), )?; - write!(cypher.query_mut(), ")\n")?; + writeln!(cypher.query_mut(), ")")?; write!( cypher.query_mut(), "WITH {NODE_VAR_NAME} WHERE NOT ({NODE_VAR_NAME})--() DELETE {NODE_VAR_NAME}" )?; - write!(cypher.query_mut(), ";\n")?; + writeln!(cypher.query_mut(), ";")?; Ok(()) } @@ -856,7 +849,7 @@ impl StorageFactoryBase for Factory { existing: CombinedState, _auth_registry: &Arc, ) -> Result { - let existing_invalidated = desired.as_ref().map_or(false, |desired| { + let existing_invalidated = desired.as_ref().is_some_and(|desired| { existing .possible_versions() .any(|v| v.referenced_node_tables != desired.referenced_node_tables) @@ -875,8 +868,7 @@ impl StorageFactoryBase for Factory { Ok(GraphElementDataSetupStatus { actions, referenced_node_tables: desired - .map(|desired| desired.referenced_node_tables) - .flatten() + .and_then(|desired| desired.referenced_node_tables) .map(|(src, tgt)| (src.table_name, tgt.table_name)), drop_affected_referenced_node_tables, }) @@ -904,11 +896,11 @@ impl StorageFactoryBase for Factory { )) } - fn extract_additional_key<'ctx>( + fn extract_additional_key( &self, _key: &KeyValue, value: &FieldValues, - export_context: &'ctx ExportContext, + export_context: &ExportContext, ) -> Result { let additional_key = if let Some(rel_info) = &export_context.analyzed_data_coll.rel { serde_json::to_value(( @@ -935,7 +927,7 @@ impl StorageFactoryBase for Factory { for mutations in mutations_by_conn.into_values() { let kuzu_client = &mutations[0].export_context.kuzu_client; let mut cypher = CypherBuilder::new(); - write!(cypher.query_mut(), "BEGIN TRANSACTION;\n")?; + writeln!(cypher.query_mut(), "BEGIN TRANSACTION;")?; let (mut rel_mutations, nodes_mutations): (Vec<_>, Vec<_>) = mutations .into_iter() @@ -1039,7 +1031,7 @@ impl StorageFactoryBase for Factory { } } - write!(cypher.query_mut(), "COMMIT;\n")?; + writeln!(cypher.query_mut(), "COMMIT;")?; kuzu_client.run_cypher(cypher).await?; } Ok(()) @@ -1074,7 +1066,7 @@ impl StorageFactoryBase for Factory { if !change.setup_status.actions.drop_existing { continue; } - append_drop_table(&mut cypher, &change.setup_status, &change.key.typ)?; + append_drop_table(&mut cypher, change.setup_status, &change.key.typ)?; partial_affected_node_tables.extend( change @@ -1088,11 +1080,11 @@ impl StorageFactoryBase for Factory { } // Nodes first when creating. for change in node_changes.iter().chain(rel_changes.iter()) { - append_upsert_table(&mut cypher, &change.setup_status, &change.key.typ)?; + append_upsert_table(&mut cypher, change.setup_status, &change.key.typ)?; } for table in partial_affected_node_tables { - append_delete_orphaned_nodes(&mut cypher, &table)?; + append_delete_orphaned_nodes(&mut cypher, table)?; } kuzu_client.run_cypher(cypher).await?; diff --git a/src/ops/targets/neo4j.rs b/src/ops/targets/neo4j.rs index b685ea9fa..d098b9d67 100644 --- a/src/ops/targets/neo4j.rs +++ b/src/ops/targets/neo4j.rs @@ -167,8 +167,8 @@ fn field_values_to_bolt<'a>( } fn mapped_field_values_to_bolt( - fields_schema: &Vec, - fields_input_idx: &Vec, + fields_schema: &[schema::FieldSchema], + fields_input_idx: &[usize], field_values: &FieldValues, ) -> Result { let bolt_value = BoltType::Map(neo4rs::BoltMap { @@ -298,8 +298,8 @@ impl ExportContext { .into_iter() .enumerate() .map(|(i, name)| { - let param = format!("{}_{}", param_prefix, i); - let item = format!("{}: ${}", name, param); + let param = format!("{param_prefix}_{i}"); + let item = format!("{name}: ${param}"); (param, item) }) .unzip(); @@ -750,7 +750,7 @@ impl components::SetupOperator for SetupComponentOperator { format!( "CREATE CONSTRAINT {name} IF NOT EXISTS FOR {matcher} REQUIRE {field_names} IS {key_type} KEY", name = key.name, - field_names = build_composite_field_names(qualifier, &field_names), + field_names = build_composite_field_names(qualifier, field_names), ) } IndexDef::VectorIndex { @@ -796,7 +796,7 @@ fn build_composite_field_names(qualifier: &str, field_names: &[String]) -> Strin if field_names.len() == 1 { strs } else { - format!("({})", strs) + format!("({strs})") } } #[derive(Debug)] @@ -991,7 +991,6 @@ impl StorageFactoryBase for Factory { }) .collect::>>()?; let decl_output = std::iter::zip(declarations, declared_graph_elements) - .into_iter() .map(|(decl, graph_elem_schema)| { let setup_state = SetupState::new(&graph_elem_schema, &decl.decl.index_options, vec![])?; diff --git a/src/ops/targets/postgres.rs b/src/ops/targets/postgres.rs index 9b1766cb5..aaab00481 100644 --- a/src/ops/targets/postgres.rs +++ b/src/ops/targets/postgres.rs @@ -385,7 +385,7 @@ fn to_column_type_sql(column_type: &ValueType) -> String { BasicValueType::Json => "jsonb".into(), BasicValueType::Vector(vec_schema) => { if convertible_to_pgvector(vec_schema) { - format!("vector({})", vec_schema.dimension.unwrap_or(0)).into() + format!("vector({})", vec_schema.dimension.unwrap_or(0)) } else { "jsonb".into() } @@ -396,16 +396,16 @@ fn to_column_type_sql(column_type: &ValueType) -> String { } } -impl<'a> Into>> for &'a SetupState { - fn into(self) -> Cow<'a, TableColumnsSchema> { +impl<'a> From<&'a SetupState> for Cow<'a, TableColumnsSchema> { + fn from(val: &'a SetupState) -> Self { Cow::Owned(TableColumnsSchema { - key_columns: self + key_columns: val .columns .key_columns .iter() .map(|(k, v)| (k.clone(), to_column_type_sql(v))) .collect(), - value_columns: self + value_columns: val .columns .value_columns .iter() @@ -741,7 +741,7 @@ impl StorageFactoryBase for Factory { auth_registry: &Arc, ) -> Result<()> { for change in changes.iter() { - let db_pool = get_db_pool(change.key.database.as_ref(), &auth_registry).await?; + let db_pool = get_db_pool(change.key.database.as_ref(), auth_registry).await?; change .setup_status .apply_change(&db_pool, &change.key.table_name) diff --git a/src/ops/targets/qdrant.rs b/src/ops/targets/qdrant.rs index 518e7011a..558cc12ed 100644 --- a/src/ops/targets/qdrant.rs +++ b/src/ops/targets/qdrant.rs @@ -437,7 +437,7 @@ impl StorageFactoryBase for Factory { key.collection_name, key.connection .as_ref() - .map_or_else(|| "".to_string(), |auth_entry| format!(" @ {}", auth_entry)) + .map_or_else(|| "".to_string(), |auth_entry| format!(" @ {auth_entry}")) )) } diff --git a/src/ops/targets/shared/property_graph.rs b/src/ops/targets/shared/property_graph.rs index 3f447ff20..cc3b12619 100644 --- a/src/ops/targets/shared/property_graph.rs +++ b/src/ops/targets/shared/property_graph.rs @@ -441,10 +441,7 @@ pub fn analyze_graph_mappings<'a, AuthEntry: 'a>( .collect(), )?; - if !(0..input_fields_idx.key.len()) - .into_iter() - .eq(input_fields_idx.key.into_iter()) - { + if !(0..input_fields_idx.key.len()).eq(input_fields_idx.key.into_iter()) { return Err(invariance_violation()); } DataCollectionProcessedInfo { diff --git a/src/ops/targets/shared/table_columns.rs b/src/ops/targets/shared/table_columns.rs index 066b30dbe..22c499500 100644 --- a/src/ops/targets/shared/table_columns.rs +++ b/src/ops/targets/shared/table_columns.rs @@ -57,9 +57,9 @@ impl TableMainSetupAction { desired_state.map(|desired| desired.into()); let possible_existing_cols: Vec>> = existing .possible_versions() - .map(|v| Into::>>::into(v)) + .map(Into::>>::into) .collect(); - let drop_existing = desired_cols.as_ref().map_or(true, |desired| { + let drop_existing = desired_cols.as_ref().is_none_or(|desired| { existing_invalidated || possible_existing_cols .iter() diff --git a/src/py/convert.rs b/src/py/convert.rs index 6dcbaad06..a60c39c31 100644 --- a/src/py/convert.rs +++ b/src/py/convert.rs @@ -76,7 +76,7 @@ fn basic_value_to_py_object<'py>( value::BasicValue::Json(v) => pythonize(py, v).into_py_result()?, value::BasicValue::Vector(v) => handle_vector_to_py(py, v)?, value::BasicValue::UnionVariant { tag_id, value } => { - (*tag_id, basic_value_to_py_object(py, &value)?).into_bound_py_any(py)? + (*tag_id, basic_value_to_py_object(py, value)?).into_bound_py_any(py)? } }; Ok(result) @@ -218,10 +218,10 @@ fn handle_ndarray_from_py<'py>( }; } - match elem_type { - &schema::BasicValueType::Float32 => try_convert!(f32, value::BasicValue::Float32), - &schema::BasicValueType::Float64 => try_convert!(f64, value::BasicValue::Float64), - &schema::BasicValueType::Int64 => try_convert!(i64, value::BasicValue::Int64), + match *elem_type { + schema::BasicValueType::Float32 => try_convert!(f32, value::BasicValue::Float32), + schema::BasicValueType::Float64 => try_convert!(f64, value::BasicValue::Float64), + schema::BasicValueType::Int64 => try_convert!(i64, value::BasicValue::Int64), _ => {} } @@ -371,11 +371,11 @@ mod tests { let py_object = value_to_py_object(py, original_value) .expect("Failed to convert Rust value to Python object"); - println!("Python object: {:?}", py_object); + println!("Python object: {py_object:?}"); let roundtripped_value = value_from_py_object(value_type, &py_object) .expect("Failed to convert Python object back to Rust value"); - println!("Roundtripped value: {:?}", roundtripped_value); + println!("Roundtripped value: {roundtripped_value:?}"); assert_eq!( original_value, &roundtripped_value, "Value mismatch after roundtrip" diff --git a/src/py/mod.rs b/src/py/mod.rs index 5e5d60024..80488e9f5 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -28,16 +28,16 @@ impl PythonExecutionContext { } } -pub trait FromPyResult { - fn from_py_result(self, py: Python<'_>) -> anyhow::Result; +pub trait ToResultWithPyTrace { + fn to_result_with_py_trace(self, py: Python<'_>) -> anyhow::Result; } -impl FromPyResult for Result { - fn from_py_result(self, py: Python<'_>) -> anyhow::Result { +impl ToResultWithPyTrace for Result { + fn to_result_with_py_trace(self, py: Python<'_>) -> anyhow::Result { match self { Ok(value) => Ok(value), Err(err) => { - let mut err_str = format!("Error calling Python function: {}", err); + let mut err_str = format!("Error calling Python function: {err}"); if let Some(tb) = err.traceback(py) { write!(&mut err_str, "\n{}", tb.format()?)?; } @@ -54,7 +54,7 @@ impl IntoPyResult for Result { fn into_py_result(self) -> PyResult { match self { Ok(value) => Ok(value), - Err(err) => Err(PyException::new_err(format!("{:?}", err))), + Err(err) => Err(PyException::new_err(format!("{err:?}"))), } } } @@ -299,7 +299,7 @@ impl Flow { let field_name = format!("{}{}", prefix, field.name); let mut field_type = match &field.value_type.typ { - ValueType::Basic(basic) => format!("{}", basic), + ValueType::Basic(basic) => format!("{basic}"), ValueType::Table(t) => format!("{}", t.kind), ValueType::Struct(_) => "Struct".to_string(), }; @@ -324,10 +324,10 @@ impl Flow { match &field.value_type.typ { ValueType::Struct(s) => { - process_fields(&s.fields, &format!("{}.", field_name), result); + process_fields(&s.fields, &format!("{field_name}."), result); } ValueType::Table(t) => { - process_fields(&t.row.fields, &format!("{}[].", field_name), result); + process_fields(&t.row.fields, &format!("{field_name}[]."), result); } ValueType::Basic(_) => {} } @@ -487,7 +487,7 @@ fn seder_roundtrip<'py>( let typ = typ.into_inner(); let value = value_from_py_object(&typ, &value)?; let value = value::test_util::seder_roundtrip(&value, &typ).into_py_result()?; - Ok(value_to_py_object(py, &value)?) + value_to_py_object(py, &value) } /// A Python module implemented in Rust. diff --git a/src/settings.rs b/src/settings.rs index 42a8fcc8b..7cba1afd1 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -27,7 +27,6 @@ pub struct Settings { #[cfg(test)] mod tests { use super::*; - use serde_json; #[test] fn test_settings_deserialize_with_database() { diff --git a/src/setup/components.rs b/src/setup/components.rs index b0cf74b89..c34b11fe6 100644 --- a/src/setup/components.rs +++ b/src/setup/components.rs @@ -87,7 +87,7 @@ impl SetupStatus { let is_up_to_date = existing_component_states.always_exists() && existing_component_states.possible_versions().all(|v| { v.get(&key) - .map_or(false, |s| desc.is_up_to_date(s, &desired_comp_state)) + .is_some_and(|s| desc.is_up_to_date(s, &desired_comp_state)) }); if !is_up_to_date { let already_exists = existing_component_states diff --git a/src/setup/db_metadata.rs b/src/setup/db_metadata.rs index b7e1fb185..569d30b8d 100644 --- a/src/setup/db_metadata.rs +++ b/src/setup/db_metadata.rs @@ -238,7 +238,7 @@ pub async fn stage_changes_for_flow( }; let mut new_staging_changes = vec![]; if let Some(legacy_key) = &update_info.legacy_key { - if let Some(legacy_record) = existing_records.remove(&legacy_key) { + if let Some(legacy_record) = existing_records.remove(legacy_key) { new_staging_changes.extend(legacy_record.staging_changes.0); delete_state(flow_name, legacy_key, &mut *txn).await?; } diff --git a/src/setup/driver.rs b/src/setup/driver.rs index 8e5aa048c..ec2bec96d 100644 --- a/src/setup/driver.rs +++ b/src/setup/driver.rs @@ -33,7 +33,7 @@ impl Display for MetadataRecordType { MetadataRecordType::FlowVersion => f.write_str(db_metadata::FLOW_VERSION_RESOURCE_TYPE), MetadataRecordType::FlowMetadata => write!(f, "FlowMetadata"), MetadataRecordType::TrackingTable => write!(f, "TrackingTable"), - MetadataRecordType::Target(target_id) => write!(f, "Target:{}", target_id), + MetadataRecordType::Target(target_id) => write!(f, "Target:{target_id}"), } } } @@ -228,7 +228,7 @@ fn group_resource_states<'a>( .existing .legacy_state_key .as_ref() - .map_or(false, |v| v != legacy_state_key) + .is_some_and(|v| v != legacy_state_key) { warn!( "inconsistent legacy key: {:?}, {:?}", @@ -387,7 +387,7 @@ async fn maybe_update_resource_setup< }); writeln!(write, "{}:", resource.description)?; for change in setup_status.describe_changes() { - writeln!(write, " - {}", change)?; + writeln!(write, " - {change}")?; } } } @@ -571,7 +571,7 @@ async fn apply_changes_for_flow( }); } - writeln!(write, "Done for flow {}", flow_name)?; + writeln!(write, "Done for flow {flow_name}")?; Ok(()) } @@ -592,7 +592,7 @@ async fn apply_global_changes( .metadata_table .setup_status .as_ref() - .map_or(false, |c| c.change_type() == SetupChangeType::Create) + .is_some_and(|c| c.change_type() == SetupChangeType::Create) { all_setup_states.has_metadata_table = true; } diff --git a/src/setup/states.rs b/src/setup/states.rs index 167e972f8..2a333185f 100644 --- a/src/setup/states.rs +++ b/src/setup/states.rs @@ -292,10 +292,10 @@ impl std::fmt::Display for ResourceSetupInfo "INVALID", None => "USER MANAGED", }; - let status_str = format!("[ {:^9} ]", status_code); + let status_str = format!("[ {status_code:^9} ]"); let status_full = status_str.color(AnsiColors::Cyan); let desc_colored = &self.description; - writeln!(f, "{} {}", status_full, desc_colored)?; + writeln!(f, "{status_full} {desc_colored}")?; if let Some(setup_status) = &self.setup_status { let changes = setup_status.describe_changes(); if !changes.is_empty() { @@ -424,9 +424,9 @@ impl std::fmt::Display for FormattedFlowSetupStatus<'_> { return Ok(()); } - write!( + writeln!( f, - "{} Flow: {}\n", + "{} Flow: {}", ObjectSetupStatusCode(flow_ssc) .to_string() .color(AnsiColors::Cyan), @@ -435,10 +435,10 @@ impl std::fmt::Display for FormattedFlowSetupStatus<'_> { let mut f = indented(f).with_str(INDENT); if let Some(tracking_table) = &flow_ssc.tracking_table { - write!(f, "{}", tracking_table)?; + write!(f, "{tracking_table}")?; } for target_resource in &flow_ssc.target_resources { - write!(f, "{}", target_resource)?; + write!(f, "{target_resource}")?; } for resource in &flow_ssc.unknown_resources { writeln!(f, "[ UNKNOWN ] {resource}")?; diff --git a/src/utils/concur_control.rs b/src/utils/concur_control.rs index 7f2a5de1f..889888642 100644 --- a/src/utils/concur_control.rs +++ b/src/utils/concur_control.rs @@ -28,8 +28,8 @@ impl WeightedSemaphore { self.sem.clone().acquire_owned().await } - async fn acquire<'a>( - &'a self, + async fn acquire( + &self, weight: usize, reserved: bool, ) -> Result, AcquireError> { @@ -71,9 +71,7 @@ impl ConcurrencyController { inflight_count_sem: exec_options .max_inflight_rows .map(|max| Arc::new(Semaphore::new(max))), - inflight_bytes_sem: exec_options - .max_inflight_bytes - .map(|max| WeightedSemaphore::new(max)), + inflight_bytes_sem: exec_options.max_inflight_bytes.map(WeightedSemaphore::new), } } @@ -104,8 +102,8 @@ impl ConcurrencyController { }) } - pub async fn acquire_bytes_with_reservation<'a>( - &'a self, + pub async fn acquire_bytes_with_reservation( + &self, bytes_fn: impl FnOnce() -> usize, ) -> Result, AcquireError> { if let Some(sem) = &self.inflight_bytes_sem { @@ -158,8 +156,8 @@ impl CombinedConcurrencyController { }) } - pub async fn acquire_bytes_with_reservation<'a>( - &'a self, + pub async fn acquire_bytes_with_reservation( + &self, bytes_fn: impl FnOnce() -> usize, ) -> Result<(Option, Option), AcquireError> { let num_bytes = bytes_fn(); diff --git a/src/utils/retryable.rs b/src/utils/retryable.rs index 0ea7185c6..b8d72abc9 100644 --- a/src/utils/retryable.rs +++ b/src/utils/retryable.rs @@ -115,7 +115,7 @@ pub async fn run< if !err.is_retryable() || options .max_retries - .map_or(false, |max_retries| retries >= max_retries) + .is_some_and(|max_retries| retries >= max_retries) { return Result::Err(err); } diff --git a/src/utils/yaml_ser.rs b/src/utils/yaml_ser.rs index 99706c20b..12ad7f1b0 100644 --- a/src/utils/yaml_ser.rs +++ b/src/utils/yaml_ser.rs @@ -381,10 +381,7 @@ mod tests { fn assert_yaml_serialization(value: T, expected_yaml: Yaml) { let result = YamlSerializer::serialize(&value); - println!( - "Serialized value: {:?}, Expected value: {:?}", - result, expected_yaml - ); + println!("Serialized value: {result:?}, Expected value: {expected_yaml:?}"); assert!( result.is_ok(), @@ -713,7 +710,7 @@ mod tests { msg: "A test error message".to_string(), }; assert_eq!( - format!("{}", error), + format!("{error}"), "YamlSerializerError: A test error message" ); } @@ -723,7 +720,7 @@ mod tests { let error = YamlSerializerError::custom("Custom error detail"); assert_eq!(error.msg, "Custom error detail"); assert_eq!( - format!("{}", error), + format!("{error}"), "YamlSerializerError: Custom error detail" ); let _err_trait_obj: Box = Box::new(error);