Skip to content

Commit d0f0e4b

Browse files
authored
chore: improve the unwrap error (#18571)
1 parent 432a428 commit d0f0e4b

File tree

17 files changed

+211
-73
lines changed

17 files changed

+211
-73
lines changed

src/query/pipeline/core/src/finished_chain.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ impl FinishedCallbackChain {
186186

187187
fn log_states(apply_states: &[ApplyState]) {
188188
let mut message = String::new();
189-
writeln!(&mut message, "Executor apply finished callback state:").unwrap();
189+
let _ = writeln!(&mut message, "Executor apply finished callback state:");
190190
for apply_state in apply_states {
191191
let execute_state = match apply_state.successfully {
192192
true => "\u{2705}",
@@ -201,7 +201,7 @@ impl FinishedCallbackChain {
201201
},
202202
};
203203

204-
writeln!(
204+
let _ = writeln!(
205205
&mut message,
206206
"├──{}:{:?} - {}{}:{}:{}",
207207
execute_state,
@@ -210,8 +210,7 @@ impl FinishedCallbackChain {
210210
apply_state.location.file(),
211211
apply_state.location.line(),
212212
apply_state.location.column()
213-
)
214-
.unwrap();
213+
);
215214
}
216215

217216
info!("{}", message);

src/query/pipeline/core/src/pipeline.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,9 @@ impl Pipeline {
140140
}
141141

142142
pub fn finalize(mut self, root_scope: Option<Arc<PlanScope>>) -> Pipeline {
143-
if root_scope.is_none() {
143+
let Some(root_scope) = root_scope else {
144144
return self;
145-
}
146-
147-
let root_scope = root_scope.unwrap();
145+
};
148146

149147
for node in self.graph.node_weights_mut() {
150148
let Some(scope) = node.scope.as_mut() else {
@@ -598,7 +596,10 @@ impl Pipeline {
598596
if let Some((source, target)) = other.graph.edge_endpoints(index) {
599597
let source = NodeIndex::new(offset + source.index());
600598
let target = NodeIndex::new(offset + target.index());
601-
let edge_weight = other.graph.edge_weight(index).unwrap();
599+
let edge_weight = other
600+
.graph
601+
.edge_weight(index)
602+
.expect("Edge weight must exist for valid edge index");
602603
self.graph.add_edge(source, target, edge_weight.clone());
603604
}
604605
}

src/query/pipeline/core/src/processors/duplicate_processor.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,11 @@ impl Processor for DuplicateProcessor {
8181

8282
self.input.set_need_data();
8383
if self.input.has_data() {
84-
let block = self.input.pull_data().unwrap();
84+
let block = self.input.pull_data().ok_or_else(|| {
85+
databend_common_exception::ErrorCode::Internal(
86+
"Failed to pull data from input port when data is available",
87+
)
88+
})?;
8589
for output in self.outputs.iter() {
8690
if !output.is_finished() {
8791
output.push_data(block.clone());

src/query/pipeline/core/src/processors/port_trigger.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,20 @@ pub enum DirectedEdge {
3939
}
4040

4141
impl DirectedEdge {
42-
pub fn get_source<N, E>(&self, graph: &StableGraph<N, E>) -> NodeIndex {
42+
pub fn get_source<N, E>(&self, graph: &StableGraph<N, E>) -> Result<NodeIndex> {
4343
match self {
44-
DirectedEdge::Source(edge_index) => graph.edge_endpoints(*edge_index).unwrap().0,
45-
DirectedEdge::Target(edge_index) => graph.edge_endpoints(*edge_index).unwrap().1,
44+
DirectedEdge::Source(edge_index) => graph
45+
.edge_endpoints(*edge_index)
46+
.map(|(source, _)| source)
47+
.ok_or_else(|| {
48+
ErrorCode::Internal(format!("Edge not found in graph: {:?}", edge_index))
49+
}),
50+
DirectedEdge::Target(edge_index) => graph
51+
.edge_endpoints(*edge_index)
52+
.map(|(_, target)| target)
53+
.ok_or_else(|| {
54+
ErrorCode::Internal(format!("Edge not found in graph: {:?}", edge_index))
55+
}),
4656
}
4757
}
4858

@@ -105,13 +115,20 @@ impl UpdateList {
105115
/// # Safety
106116
///
107117
/// Must be thread safe call. In other words, it needs to be called in single thread or in mutex guard.
108-
pub unsafe fn create_trigger(self: &Arc<Self>, edge_index: EdgeIndex) -> *mut UpdateTrigger {
118+
pub unsafe fn create_trigger(
119+
self: &Arc<Self>,
120+
edge_index: EdgeIndex,
121+
) -> Result<*mut UpdateTrigger> {
109122
let inner = &mut *self.inner.get();
110123
let update_trigger = UpdateTrigger::create(edge_index, self.inner.get());
111124
inner
112125
.updated_triggers
113126
.push(Arc::new(UnsafeCell::new(update_trigger)));
114-
inner.updated_triggers.last().unwrap().get()
127+
inner
128+
.updated_triggers
129+
.last()
130+
.map(|trigger| trigger.get())
131+
.ok_or_else(|| ErrorCode::Internal("Failed to get last trigger after push"))
115132
}
116133
}
117134

src/query/pipeline/core/src/processors/resize_processor.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,11 @@ impl Processor for ResizeProcessor {
112112
}
113113

114114
while !self.waiting_outputs.is_empty() && !self.waiting_inputs.is_empty() {
115-
let output_index = self.waiting_outputs.pop_front().unwrap();
115+
let output_index = self.waiting_outputs.pop_front().ok_or_else(|| {
116+
databend_common_exception::ErrorCode::Internal(
117+
"Waiting outputs queue should not be empty",
118+
)
119+
})?;
116120

117121
// Port is finished when waiting.
118122
if self.outputs[output_index].port.is_finished() {
@@ -124,11 +128,18 @@ impl Processor for ResizeProcessor {
124128
continue;
125129
}
126130

127-
let input_index = self.waiting_inputs.pop_front().unwrap();
128-
129-
self.outputs[output_index]
130-
.port
131-
.push_data(self.inputs[input_index].port.pull_data().unwrap());
131+
let input_index = self.waiting_inputs.pop_front().ok_or_else(|| {
132+
databend_common_exception::ErrorCode::Internal(
133+
"Waiting inputs queue should not be empty",
134+
)
135+
})?;
136+
137+
let data = self.inputs[input_index].port.pull_data().ok_or_else(|| {
138+
databend_common_exception::ErrorCode::Internal(
139+
"Failed to pull data from input port",
140+
)
141+
})?;
142+
self.outputs[output_index].port.push_data(data);
132143
self.inputs[input_index].status = PortStatus::Idle;
133144
self.outputs[output_index].status = PortStatus::Idle;
134145

src/query/pipeline/core/src/processors/sequence_group.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,11 @@ impl Processor for SequenceGroupProcessor {
116116
}
117117
} else if input.port.has_data() {
118118
if self.ignore_output {
119-
let _ignore = input.port.pull_data().unwrap()?;
119+
let _ignore = input.port.pull_data().ok_or_else(|| {
120+
databend_common_exception::ErrorCode::Internal(
121+
"Failed to pull data from input port when ignoring output",
122+
)
123+
})??;
120124
input.status = PortStatus::Idle;
121125

122126
if input.port.is_finished() {
@@ -135,7 +139,11 @@ impl Processor for SequenceGroupProcessor {
135139
}
136140

137141
while !self.waiting_outputs.is_empty() && !self.waiting_inputs.is_empty() {
138-
let output_index = self.waiting_outputs.pop_front().unwrap();
142+
let output_index = self.waiting_outputs.pop_front().ok_or_else(|| {
143+
databend_common_exception::ErrorCode::Internal(
144+
"Waiting outputs queue should not be empty",
145+
)
146+
})?;
139147

140148
// Port is finished when waiting.
141149
if self.outputs[output_index].port.is_finished() {
@@ -147,11 +155,18 @@ impl Processor for SequenceGroupProcessor {
147155
continue;
148156
}
149157

150-
let input_index = self.waiting_inputs.pop_front().unwrap();
151-
152-
self.outputs[output_index]
153-
.port
154-
.push_data(self.inputs[input_index].port.pull_data().unwrap());
158+
let input_index = self.waiting_inputs.pop_front().ok_or_else(|| {
159+
databend_common_exception::ErrorCode::Internal(
160+
"Waiting inputs queue should not be empty",
161+
)
162+
})?;
163+
164+
let data = self.inputs[input_index].port.pull_data().ok_or_else(|| {
165+
databend_common_exception::ErrorCode::Internal(
166+
"Failed to pull data from input port",
167+
)
168+
})?;
169+
self.outputs[output_index].port.push_data(data);
155170
self.inputs[input_index].status = PortStatus::Idle;
156171
self.outputs[output_index].status = PortStatus::Idle;
157172

src/query/pipeline/core/src/processors/shuffle_processor.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,12 @@ impl Processor for ShuffleProcessor {
131131
}
132132

133133
if input.has_data() {
134-
output.push_data(input.pull_data().unwrap());
134+
let data = input.pull_data().ok_or_else(|| {
135+
databend_common_exception::ErrorCode::Internal(
136+
"Failed to pull data from input port in shuffle processor",
137+
)
138+
})?;
139+
output.push_data(data);
135140
return Ok(Event::NeedConsume);
136141
}
137142

@@ -230,7 +235,12 @@ impl<T: Exchange> Processor for PartitionProcessor<T> {
230235
}
231236

232237
if self.input.has_data() {
233-
self.input_data = Some(self.input.pull_data().unwrap()?);
238+
let data = self.input.pull_data().ok_or_else(|| {
239+
databend_common_exception::ErrorCode::Internal(
240+
"Failed to pull data from input port in partition processor",
241+
)
242+
})??;
243+
self.input_data = Some(data);
234244
return Ok(Event::Sync);
235245
}
236246

@@ -327,12 +337,22 @@ impl<T: Exchange> Processor for MergePartitionProcessor<T> {
327337
match T::STRATEGY {
328338
MultiwayStrategy::Random => {
329339
if self.output.can_push() {
330-
self.output.push_data(Ok(input.pull_data().unwrap()?));
340+
let data = input.pull_data().ok_or_else(|| {
341+
databend_common_exception::ErrorCode::Internal(
342+
"Failed to pull data from input port in multiway random strategy"
343+
)
344+
})??;
345+
self.output.push_data(Ok(data));
331346
}
332347
}
333348
MultiwayStrategy::Custom => {
334349
if self.inputs_data[index].is_none() {
335-
self.inputs_data[index] = Some(input.pull_data().unwrap()?);
350+
let data = input.pull_data().ok_or_else(|| {
351+
databend_common_exception::ErrorCode::Internal(
352+
"Failed to pull data from input port in multiway custom strategy"
353+
)
354+
})??;
355+
self.inputs_data[index] = Some(data);
336356
}
337357
}
338358
}

src/query/pipeline/sinks/src/async_mpsc_sink.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,11 @@ impl<T: AsyncMpscSink + 'static> Processor for AsyncMpscSinker<T> {
140140
Ok(Event::Async)
141141
}
142142
} else {
143-
let block = input.pull_data().unwrap()?;
143+
let block = input.pull_data().ok_or_else(|| {
144+
databend_common_exception::ErrorCode::Internal(
145+
"Failed to pull data from input port in async mpsc sink",
146+
)
147+
})??;
144148
self.input_data = Some(block);
145149
Ok(Event::Async)
146150
}

src/query/pipeline/sinks/src/async_sink.rs

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,12 @@ impl<T: AsyncSink + 'static> Processor for AsyncSinker<T> {
135135
match self.input.has_data() {
136136
true => {
137137
// Wake up upstream while executing async work
138-
self.input_data = Some(self.input.pull_data().unwrap()?);
138+
let data = self.input.pull_data().ok_or_else(|| {
139+
databend_common_exception::ErrorCode::Internal(
140+
"Failed to pull data from input port in async sink",
141+
)
142+
})??;
143+
self.input_data = Some(data);
139144
self.input.set_need_data();
140145
Ok(Event::Async)
141146
}
@@ -150,12 +155,37 @@ impl<T: AsyncSink + 'static> Processor for AsyncSinker<T> {
150155
async fn async_process(&mut self) -> Result<()> {
151156
if !self.called_on_start {
152157
self.called_on_start = true;
153-
self.inner.as_mut().unwrap().on_start().await?;
158+
self.inner
159+
.as_mut()
160+
.ok_or_else(|| {
161+
databend_common_exception::ErrorCode::Internal(
162+
"AsyncSink inner is None when calling on_start",
163+
)
164+
})?
165+
.on_start()
166+
.await?;
154167
} else if let Some(data_block) = self.input_data.take() {
155-
self.finished = self.inner.as_mut().unwrap().consume(data_block).await?;
168+
self.finished = self
169+
.inner
170+
.as_mut()
171+
.ok_or_else(|| {
172+
databend_common_exception::ErrorCode::Internal(
173+
"AsyncSink inner is None when consuming data",
174+
)
175+
})?
176+
.consume(data_block)
177+
.await?;
156178
} else if !self.called_on_finish {
157179
self.called_on_finish = true;
158-
self.inner.as_mut().unwrap().on_finish().await?;
180+
self.inner
181+
.as_mut()
182+
.ok_or_else(|| {
183+
databend_common_exception::ErrorCode::Internal(
184+
"AsyncSink inner is None when calling on_finish",
185+
)
186+
})?
187+
.on_finish()
188+
.await?;
159189
}
160190

161191
Ok(())

src/query/pipeline/sinks/src/sync_mpsc_sink.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,11 @@ impl<T: SyncMpscSink + 'static> Processor for SyncMpscSinker<T> {
138138
Ok(Event::Sync)
139139
}
140140
} else {
141-
let block = input.pull_data().unwrap()?;
141+
let block = input.pull_data().ok_or_else(|| {
142+
databend_common_exception::ErrorCode::Internal(
143+
"Failed to pull data from input port in sync mpsc sink",
144+
)
145+
})??;
142146
self.input_data = Some(block);
143147
Ok(Event::Sync)
144148
}

0 commit comments

Comments
 (0)