Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 7 additions & 14 deletions src/base/duration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn parse_components(

// Parse digits and optional decimal point
while let Some(&c) = iter.peek() {
if c.is_digit(10) || (c == '.' && !has_decimal) {
if c.is_ascii_digit() || (c == '.' && !has_decimal) {
if c == '.' {
has_decimal = true;
}
Expand Down Expand Up @@ -53,8 +53,8 @@ fn parse_components(

/// Parses an ISO 8601 duration string into a `chrono::Duration`.
fn parse_iso8601_duration(s: &str, original_input: &str) -> Result<Duration> {
let (is_negative, s_after_sign) = if s.starts_with('-') {
(true, &s[1..])
let (is_negative, s_after_sign) = if let Some(stripped) = s.strip_prefix('-') {
(true, stripped)
} else {
(false, s)
};
Expand Down Expand Up @@ -193,28 +193,21 @@ mod tests {

fn check_ok(res: Result<Duration>, expected: Duration, input_str: &str) {
match res {
Ok(duration) => assert_eq!(duration, expected, "Input: '{}'", input_str),
Err(e) => panic!(
"Input: '{}', expected Ok({:?}), but got Err: {}",
input_str, expected, e
),
Ok(duration) => assert_eq!(duration, expected, "Input: '{input_str}'"),
Err(e) => panic!("Input: '{input_str}', expected Ok({expected:?}), but got Err: {e}"),
}
}

fn check_err_contains(res: Result<Duration>, expected_substring: &str, input_str: &str) {
match res {
Ok(d) => panic!(
"Input: '{}', expected error containing '{}', but got Ok({:?})",
input_str, expected_substring, d
"Input: '{input_str}', expected error containing '{expected_substring}', but got Ok({d:?})"
),
Err(e) => {
let err_msg = e.to_string();
assert!(
err_msg.contains(expected_substring),
"Input: '{}', error message '{}' does not contain expected substring '{}'",
input_str,
err_msg,
expected_substring
"Input: '{input_str}', error message '{err_msg}' does not contain expected substring '{expected_substring}'"
);
}
}
Expand Down
15 changes: 8 additions & 7 deletions src/base/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl std::fmt::Display for BasicValueType {
BasicValueType::Vector(s) => {
write!(f, "Vector[{}", s.element_type)?;
if let Some(dimension) = s.dimension {
write!(f, ", {}", dimension)?;
write!(f, ", {dimension}")?;
}
write!(f, "]")
}
Expand All @@ -97,7 +97,7 @@ impl std::fmt::Display for BasicValueType {
// Add type delimiter
write!(f, " | ")?;
}
write!(f, "{}", typ)?;
write!(f, "{typ}")?;
}
write!(f, "]")
}
Expand Down Expand Up @@ -129,13 +129,14 @@ impl std::fmt::Display for StructSchema {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{}", field)?;
write!(f, "{field}")?;
}
write!(f, ")")
}
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[allow(clippy::enum_variant_names)]
pub enum TableKind {
/// An table with unordered rows, without key.
UTable,
Expand Down Expand Up @@ -307,9 +308,9 @@ impl std::fmt::Display for EnrichedValueType {
impl std::fmt::Display for ValueType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ValueType::Basic(b) => write!(f, "{}", b),
ValueType::Struct(s) => write!(f, "{}", s),
ValueType::Table(c) => write!(f, "{}", c),
ValueType::Basic(b) => write!(f, "{b}"),
ValueType::Struct(s) => write!(f, "{s}"),
ValueType::Table(c) => write!(f, "{c}"),
}
}
}
Expand Down Expand Up @@ -371,7 +372,7 @@ impl std::fmt::Display for CollectorSchema {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{}", field)?;
write!(f, "{field}")?;
}
write!(f, ")")
}
Expand Down
20 changes: 10 additions & 10 deletions src/base/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub struct OpArgName(pub Option<String>);
impl fmt::Display for OpArgName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(arg_name) = &self.0 {
write!(f, "${}", arg_name)
write!(f, "${arg_name}")
} else {
write!(f, "?")
}
Expand Down Expand Up @@ -113,7 +113,7 @@ impl fmt::Display for FieldMapping {
if scope.is_empty() {
"".to_string()
} else {
format!("{}.", scope)
format!("{scope}.")
},
self.field_path
)
Expand All @@ -129,7 +129,7 @@ pub struct ConstantMapping {
impl fmt::Display for ConstantMapping {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let value = serde_json::to_string(&self.value).unwrap_or("#serde_error".to_string());
write!(f, "{}", value)
write!(f, "{value}")
}
}

Expand All @@ -152,7 +152,7 @@ impl fmt::Display for StructMapping {
.map(|field| field.name.clone())
.collect::<Vec<_>>()
.join(",");
write!(f, "{}", fields)
write!(f, "{fields}")
}
}

Expand Down Expand Up @@ -280,9 +280,9 @@ impl fmt::Display for SourceRefreshOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let refresh = self
.refresh_interval
.map(|d| format!("{:?}", d))
.map(|d| format!("{d:?}"))
.unwrap_or("none".to_string());
write!(f, "{}", refresh)
write!(f, "{refresh}")
}
}

Expand Down Expand Up @@ -327,8 +327,8 @@ impl SpecFormatter for TransformOpSpec {
.join(",");
let op_str = self.op.format(mode);
match mode {
OutputMode::Concise => format!("op={}, inputs={}", op_str, inputs),
OutputMode::Verbose => format!("op={}, inputs=[{}]", op_str, inputs),
OutputMode::Concise => format!("op={op_str}, inputs={inputs}"),
OutputMode::Verbose => format!("op={op_str}, inputs=[{inputs}]"),
}
}
}
Expand Down Expand Up @@ -453,7 +453,7 @@ impl fmt::Display for IndexOptions {
.map(|v| v.to_string())
.collect::<Vec<_>>()
.join(",");
write!(f, "keys={}, indexes={}", primary_keys, vector_indexes)
write!(f, "keys={primary_keys}, indexes={vector_indexes}")
}
}

Expand Down Expand Up @@ -494,7 +494,7 @@ impl SpecFormatter for ReactiveOpSpec {
match self {
ReactiveOpSpec::Transform(t) => format!("Transform: {}", t.format(mode)),
ReactiveOpSpec::ForEach(fe) => match mode {
OutputMode::Concise => format!("{}", fe.get_label()),
OutputMode::Concise => fe.get_label().to_string(),
OutputMode::Verbose => format!("ForEach: {}", fe.format(mode)),
},
ReactiveOpSpec::Collect(c) => format!("Collect: {}", c.format(mode)),
Expand Down
20 changes: 9 additions & 11 deletions src/base/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,11 @@ impl std::fmt::Display for KeyValue {
match self {
KeyValue::Bytes(v) => write!(f, "{}", BASE64_STANDARD.encode(v)),
KeyValue::Str(v) => write!(f, "\"{}\"", v.escape_default()),
KeyValue::Bool(v) => write!(f, "{}", v),
KeyValue::Int64(v) => write!(f, "{}", v),
KeyValue::Bool(v) => write!(f, "{v}"),
KeyValue::Int64(v) => write!(f, "{v}"),
KeyValue::Range(v) => write!(f, "[{}, {})", v.start, v.end),
KeyValue::Uuid(v) => write!(f, "{}", v),
KeyValue::Date(v) => write!(f, "{}", v),
KeyValue::Uuid(v) => write!(f, "{v}"),
KeyValue::Date(v) => write!(f, "{v}"),
KeyValue::Struct(v) => {
write!(
f,
Expand All @@ -191,7 +191,7 @@ impl KeyValue {
let field_values: FieldValues = FieldValues::from_json(value, fields_schema)?;
Value::Struct(field_values)
};
Ok(value.as_key()?)
value.as_key()
}

pub fn from_values<'a>(values: impl ExactSizeIterator<Item = &'a Value>) -> Result<Self> {
Expand Down Expand Up @@ -226,10 +226,10 @@ impl KeyValue {
.next()
.ok_or_else(|| api_error!("Key parts less than expected"))?;
match basic_type {
BasicValueType::Bytes { .. } => {
BasicValueType::Bytes => {
KeyValue::Bytes(Bytes::from(BASE64_STANDARD.decode(v)?))
}
BasicValueType::Str { .. } => KeyValue::Str(Arc::from(v)),
BasicValueType::Str => KeyValue::Str(Arc::from(v)),
BasicValueType::Bool => KeyValue::Bool(v.parse()?),
BasicValueType::Int64 => KeyValue::Int64(v.parse()?),
BasicValueType::Range => {
Expand Down Expand Up @@ -1030,12 +1030,10 @@ impl serde::Serialize for BasicValue {
impl BasicValue {
pub fn from_json(value: serde_json::Value, schema: &BasicValueType) -> Result<Self> {
let result = match (value, schema) {
(serde_json::Value::String(v), BasicValueType::Bytes { .. }) => {
(serde_json::Value::String(v), BasicValueType::Bytes) => {
BasicValue::Bytes(Bytes::from(BASE64_STANDARD.decode(v)?))
}
(serde_json::Value::String(v), BasicValueType::Str { .. }) => {
BasicValue::Str(Arc::from(v))
}
(serde_json::Value::String(v), BasicValueType::Str) => BasicValue::Str(Arc::from(v)),
(serde_json::Value::Bool(v), BasicValueType::Bool) => BasicValue::Bool(v),
(serde_json::Value::Number(v), BasicValueType::Int64) => BasicValue::Int64(
v.as_i64()
Expand Down
9 changes: 5 additions & 4 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,9 +695,9 @@ impl AnalyzerContext {
.get_concur_control_options();
let global_concurrency_controller = self.lib_ctx.global_concurrency_controller.clone();
let result_fut = async move {
trace!("Start building executor for source op `{}`", op_name);
trace!("Start building executor for source op `{op_name}`");
let executor = executor.await?;
trace!("Finished building executor for source op `{}`", op_name);
trace!("Finished building executor for source op `{op_name}`");
Ok(AnalyzedImportOp {
executor,
output,
Expand Down Expand Up @@ -840,14 +840,15 @@ impl AnalyzerContext {
Ok(result_fut)
}

#[allow(clippy::too_many_arguments)]
async fn analyze_export_op_group(
&self,
target_kind: &str,
op_scope: &Arc<OpScope>,
flow_inst: &FlowInstanceSpec,
export_op_group: &AnalyzedExportTargetOpGroup,
declarations: Vec<serde_json::Value>,
targets_analyzed_ss: &mut Vec<Option<exec_ctx::AnalyzedTargetSetupState>>,
targets_analyzed_ss: &mut [Option<exec_ctx::AnalyzedTargetSetupState>],
declarations_analyzed_ss: &mut Vec<exec_ctx::AnalyzedTargetSetupState>,
) -> Result<Vec<impl Future<Output = Result<AnalyzedExportOp>> + Send + use<>>> {
let mut collection_specs = Vec::<interface::ExportDataCollectionSpec>::new();
Expand Down Expand Up @@ -875,7 +876,7 @@ impl AnalyzerContext {

let key_fields_schema = pk_fields_idx
.iter()
.map(|idx| collector_schema.fields[*idx as usize].clone())
.map(|idx| collector_schema.fields[*idx].clone())
.collect::<Vec<_>>();
let primary_key_type = if pk_fields_idx.len() == 1 {
key_fields_schema[0].value_type.typ.clone()
Expand Down
2 changes: 1 addition & 1 deletion src/builder/exec_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ pub fn build_flow_setup_execution_context(
.ok_or_else(invariance_violation)?;
build_import_op_exec_ctx(
&import_op.name,
&output_type,
output_type,
source_states_by_name.get(&import_op.name.as_str()),
&mut setup_state.metadata,
)
Expand Down
17 changes: 9 additions & 8 deletions src/builder/flow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl std::fmt::Display for OpScopeRef {
#[pymethods]
impl OpScopeRef {
pub fn __str__(&self) -> String {
format!("{}", self)
format!("{self}")
}

pub fn __repr__(&self) -> String {
Expand Down Expand Up @@ -105,7 +105,7 @@ impl DataSlice {
}

pub fn __str__(&self) -> String {
format!("{}", self)
format!("{self}")
}

pub fn __repr__(&self) -> String {
Expand Down Expand Up @@ -142,7 +142,7 @@ impl DataSlice {
.iter()
.find(|f| f.name == field_name)
.map(|f| f.spec.clone())
.ok_or_else(|| PyException::new_err(format!("field {} not found", field_name)))?,
.ok_or_else(|| PyException::new_err(format!("field {field_name} not found")))?,

spec::ValueMapping::Constant { .. } => {
return Err(PyException::new_err(
Expand Down Expand Up @@ -191,7 +191,7 @@ pub struct DataCollector {
#[pymethods]
impl DataCollector {
fn __str__(&self) -> String {
format!("{}", self)
format!("{self}")
}

fn __repr__(&self) -> String {
Expand Down Expand Up @@ -271,6 +271,7 @@ impl FlowBuilder {
}

#[pyo3(signature = (kind, op_spec, target_scope, name, refresh_options=None, execution_options=None))]
#[allow(clippy::too_many_arguments)]
pub fn add_source(
&mut self,
py: Python<'_>,
Expand Down Expand Up @@ -327,7 +328,7 @@ impl FlowBuilder {
let schema = value_type.into_inner();
let value = py::value_from_py_object(&schema.typ, &value)?;
let slice = DataSlice {
scope: self.root_op_scope.clone().into(),
scope: self.root_op_scope.clone(),
value: Arc::new(spec::ValueMapping::Constant(spec::ConstantMapping {
schema: schema.clone(),
value: serde_json::to_value(value).into_py_result()?,
Expand Down Expand Up @@ -571,7 +572,7 @@ impl FlowBuilder {
let (_, field_schema) = scope_builder
.data
.find_field(field_name)
.ok_or_else(|| PyException::new_err(format!("field {} not found", field_name)))?;
.ok_or_else(|| PyException::new_err(format!("field {field_name} not found")))?;
schema::EnrichedValueType::from_alternative(&field_schema.value_type)
.into_py_result()?
};
Expand Down Expand Up @@ -671,7 +672,7 @@ impl FlowBuilder {
}

pub fn __str__(&self) -> String {
format!("{}", self)
format!("{self}")
}

pub fn __repr__(&self) -> String {
Expand Down Expand Up @@ -713,7 +714,7 @@ impl std::fmt::Display for FlowBuilder {
)?;
}
if let Some(output) = &self.direct_output_value {
write!(f, "Direct output: {}\n\n", output)?;
write!(f, "Direct output: {output}\n\n")?;
}
Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions src/execution/db_tracking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ pub async fn read_source_tracking_info_for_precommit(
Ok(precommit_tracking_info)
}

#[allow(clippy::too_many_arguments)]
pub async fn precommit_source_tracking_info(
source_id: i32,
source_key_json: &serde_json::Value,
Expand Down Expand Up @@ -191,6 +192,7 @@ pub async fn read_source_tracking_info_for_commit(
Ok(commit_tracking_info)
}

#[allow(clippy::too_many_arguments)]
pub async fn commit_source_tracking_info(
source_id: i32,
source_key_json: &serde_json::Value,
Expand Down
2 changes: 1 addition & 1 deletion src/execution/db_tracking_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl TrackingTableSetupStatus {
}
} else {
for lagacy_name in self.legacy_table_names.iter() {
let query = format!("DROP TABLE IF EXISTS {}", lagacy_name);
let query = format!("DROP TABLE IF EXISTS {lagacy_name}");
sqlx::query(&query).execute(pool).await?;
}
return Ok(());
Expand Down
2 changes: 1 addition & 1 deletion src/execution/dumper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl<'a> Dumper<'a> {
let num_keys = keys.len();
keys.into_iter().enumerate().map(move |(i, key)| {
let extra_id = if num_keys > 1 {
Cow::Owned(format!(".{}", i))
Cow::Owned(format!(".{i}"))
} else {
Cow::Borrowed("")
};
Expand Down
Loading