Skip to content

Commit fdeee1b

Browse files
authored
Merge branch 'master' into hidden-token_info
2 parents cb73784 + 961f70e commit fdeee1b

File tree

57 files changed

+2316
-738
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+2316
-738
lines changed

be/src/io/cache/block_file_cache_downloader.cpp

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,34 @@ void FileCacheBlockDownloader::download_file_cache_block(
181181
<< ", rowset_id=" << meta.rowset_id() << ", segment_id=" << meta.segment_id()
182182
<< ", offset=" << meta.offset() << ", size=" << meta.size()
183183
<< ", type=" << meta.cache_type();
184+
185+
// Helper to decrease inflight count on early return.
186+
// NOTE: This lambda captures 'this' pointer. It's safe because:
187+
// 1. download_segment_file() calls download_done synchronously
188+
// 2. ~FileCacheBlockDownloader() waits for all workers to finish via _workers->shutdown()
189+
// If this assumption changes (e.g., async callback), consider using shared_from_this pattern.
190+
auto decrease_inflight_count = [this, tablet_id = meta.tablet_id()]() {
191+
std::lock_guard lock(_inflight_mtx);
192+
auto it = _inflight_tablets.find(tablet_id);
193+
if (it == _inflight_tablets.end()) {
194+
LOG(WARNING) << "inflight ref cnt not exist, tablet id " << tablet_id;
195+
} else {
196+
it->second--;
197+
VLOG_DEBUG << "download_file_cache_block: inflight_tablets[" << tablet_id
198+
<< "] = " << it->second;
199+
if (it->second <= 0) {
200+
DCHECK_EQ(it->second, 0) << it->first;
201+
_inflight_tablets.erase(it);
202+
VLOG_DEBUG << "download_file_cache_block: erase inflight_tablets[" << tablet_id
203+
<< "]";
204+
}
205+
}
206+
};
207+
184208
CloudTabletSPtr tablet;
185209
if (auto res = _engine.tablet_mgr().get_tablet(meta.tablet_id(), false); !res.has_value()) {
186210
LOG(INFO) << "failed to find tablet " << meta.tablet_id() << " : " << res.error();
211+
decrease_inflight_count();
187212
return;
188213
} else {
189214
tablet = std::move(res).value();
@@ -202,12 +227,14 @@ void FileCacheBlockDownloader::download_file_cache_block(
202227
if (find_it == id_to_rowset_meta_map.end()) {
203228
LOG(WARNING) << "download_file_cache_block: tablet_id=" << meta.tablet_id()
204229
<< " rowset_id not found, rowset_id=" << meta.rowset_id();
230+
decrease_inflight_count();
205231
return;
206232
}
207233

208234
auto storage_resource = find_it->second->remote_storage_resource();
209235
if (!storage_resource) {
210236
LOG(WARNING) << storage_resource.error();
237+
decrease_inflight_count();
211238
return;
212239
}
213240
// Use RowsetMeta::fs() instead of storage_resource->fs to support packed file.
@@ -218,26 +245,15 @@ void FileCacheBlockDownloader::download_file_cache_block(
218245
if (!file_system) {
219246
LOG(WARNING) << "download_file_cache_block: failed to get file system for tablet_id="
220247
<< meta.tablet_id() << ", rowset_id=" << meta.rowset_id();
248+
decrease_inflight_count();
221249
return;
222250
}
223251

224-
auto download_done = [&, tablet_id = meta.tablet_id()](Status st) {
225-
std::lock_guard lock(_inflight_mtx);
226-
auto it = _inflight_tablets.find(tablet_id);
252+
// Capture decrease_inflight_count by value to ensure lifetime safety
253+
// even if download_done is called asynchronously in the future
254+
auto download_done = [decrease_inflight_count, tablet_id = meta.tablet_id()](Status st) {
227255
TEST_SYNC_POINT_CALLBACK("FileCacheBlockDownloader::download_file_cache_block");
228-
if (it == _inflight_tablets.end()) {
229-
LOG(WARNING) << "inflight ref cnt not exist, tablet id " << tablet_id;
230-
} else {
231-
it->second--;
232-
VLOG_DEBUG << "download_file_cache_block: inflight_tablets[" << tablet_id
233-
<< "] = " << it->second;
234-
if (it->second <= 0) {
235-
DCHECK_EQ(it->second, 0) << it->first;
236-
_inflight_tablets.erase(it);
237-
VLOG_DEBUG << "download_file_cache_block: erase inflight_tablets[" << tablet_id
238-
<< "]";
239-
}
240-
}
256+
decrease_inflight_count();
241257
LOG(INFO) << "download_file_cache_block: download_done, tablet_Id=" << tablet_id
242258
<< " status=" << st.to_string();
243259
};

be/src/runtime_filter/runtime_filter.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,10 @@ Status RuntimeFilter::_push_to_remote(RuntimeState* state, const TNetworkAddress
6262
RETURN_IF_ERROR(serialize(merge_filter_request.get(), &data, &len));
6363

6464
if (len > 0) {
65-
DCHECK(data != nullptr);
65+
if (data == nullptr) {
66+
return Status::InternalError(
67+
"data is nullptr after serialization with len > 0, filter: {}", debug_string());
68+
}
6669
merge_filter_callback->cntl_->request_attachment().append(data, len);
6770
}
6871

be/src/runtime_filter/runtime_filter.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,11 @@ class RuntimeFilter {
7070
auto in_filter = request->mutable_in_filter();
7171
RETURN_IF_ERROR(_to_protobuf(in_filter));
7272
} else if (real_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER) {
73-
DCHECK(data != nullptr);
73+
if (data == nullptr) {
74+
return Status::InternalError(
75+
"data is nullptr for bloom filter serialization, filter_id: {}",
76+
_wrapper->filter_id());
77+
}
7478
RETURN_IF_ERROR(_to_protobuf(request->mutable_bloom_filter(), (char**)data, len));
7579
} else if (real_runtime_filter_type == RuntimeFilterType::MINMAX_FILTER ||
7680
real_runtime_filter_type == RuntimeFilterType::MIN_FILTER ||
@@ -89,7 +93,12 @@ class RuntimeFilter {
8993
RuntimeFilter(const TRuntimeFilterDesc* desc)
9094
: _has_remote_target(desc->has_remote_targets),
9195
_runtime_filter_type(get_runtime_filter_type(desc)) {
92-
DCHECK_NE(desc->has_remote_targets, desc->has_local_targets);
96+
if (desc->has_remote_targets == desc->has_local_targets) {
97+
throw Exception(ErrorCode::INTERNAL_ERROR,
98+
"has_remote_targets ({}) should not equal has_local_targets ({}), "
99+
"filter_id: {}",
100+
desc->has_remote_targets, desc->has_local_targets, desc->filter_id);
101+
}
93102
}
94103

95104
virtual Status _init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options);

be/src/runtime_filter/runtime_filter_consumer.cpp

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,9 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFi
121121
_wrapper->minmax_func()->get_min(), min_literal));
122122
min_pred->add_child(probe_ctx->root());
123123
min_pred->add_child(min_literal);
124-
DCHECK(null_aware == false) << "only min predicate do not support null aware";
124+
if (null_aware) {
125+
return Status::InternalError("only min predicate do not support null aware");
126+
}
125127
container.push_back(vectorized::VRuntimeFilterWrapper::create_shared(
126128
min_pred_node, min_pred, get_comparison_ignore_thredhold(), null_aware,
127129
_wrapper->filter_id()));
@@ -138,7 +140,9 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFi
138140
_wrapper->minmax_func()->get_max(), max_literal));
139141
max_pred->add_child(probe_ctx->root());
140142
max_pred->add_child(max_literal);
141-
DCHECK(null_aware == false) << "only max predicate do not support null aware";
143+
if (null_aware) {
144+
return Status::InternalError("only max predicate do not support null aware");
145+
}
142146
container.push_back(vectorized::VRuntimeFilterWrapper::create_shared(
143147
max_pred_node, max_pred, get_comparison_ignore_thredhold(), null_aware,
144148
_wrapper->filter_id()));
@@ -207,22 +211,27 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFi
207211
auto bitmap_pred = vectorized::VBitmapPredicate::create_shared(node);
208212
bitmap_pred->set_filter(_wrapper->bitmap_filter_func());
209213
bitmap_pred->add_child(probe_ctx->root());
210-
DCHECK(null_aware == false) << "bitmap predicate do not support null aware";
214+
if (null_aware) {
215+
return Status::InternalError("bitmap predicate do not support null aware");
216+
}
211217
auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(
212218
node, bitmap_pred, 0, null_aware, _wrapper->filter_id());
213219
container.push_back(wrapper);
214220
break;
215221
}
216222
default:
217-
DCHECK(false);
218-
break;
223+
return Status::InternalError("unknown runtime filter type: {}", int(real_filter_type));
219224
}
220225
return Status::OK();
221226
}
222227

223228
void RuntimeFilterConsumer::collect_realtime_profile(RuntimeProfile* parent_operator_profile) {
224229
std::unique_lock<std::recursive_mutex> l(_rmtx);
225-
DCHECK(parent_operator_profile != nullptr);
230+
if (parent_operator_profile == nullptr) {
231+
LOG(WARNING) << "parent_operator_profile is nullptr in "
232+
"RuntimeFilterConsumer::collect_realtime_profile";
233+
return;
234+
}
226235
int filter_id = -1;
227236
{
228237
// since debug_string will read from RuntimeFilter::_wrapper

be/src/runtime_filter/runtime_filter_mgr.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,10 @@ Status RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id,
126126
filter_id);
127127
}
128128
*local_merge_filters = &iter->second;
129-
DCHECK(iter->second.merger);
129+
if (!iter->second.merger) {
130+
return Status::InternalError("local merge context merger is nullptr for filter_id: {}",
131+
filter_id);
132+
}
130133
return Status::OK();
131134
}
132135

@@ -354,7 +357,11 @@ Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
354357
int64_t merge_time,
355358
PUniqueId query_id,
356359
int execution_timeout) {
357-
DCHECK_GT(cnt_val.targetv2_info.size(), 0);
360+
if (cnt_val.targetv2_info.empty()) {
361+
return Status::InternalError(
362+
"_send_rf_to_target called with empty targetv2_info, filter: {}",
363+
cnt_val.merger ? cnt_val.merger->debug_string() : "unknown");
364+
}
358365

359366
if (cnt_val.done) {
360367
return Status::InternalError("Runtime filter has been sent",

be/src/runtime_filter/runtime_filter_producer.cpp

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,11 @@ Status RuntimeFilterProducer::publish(RuntimeState* state, bool build_hash_table
8686
RETURN_IF_ERROR(do_merge());
8787
}
8888
} else {
89-
DCHECK(_is_broadcast_join);
89+
if (!_is_broadcast_join) {
90+
return Status::InternalError(
91+
"Expected broadcast join for non-build hash table path in publish, filter: {}",
92+
debug_string());
93+
}
9094
}
9195

9296
// wrapper may moved to rf merger, release wrapper here to make sure thread safe
@@ -148,7 +152,10 @@ void RuntimeFilterProducer::latch_dependency(
148152
if (_rf_state != State::WAITING_FOR_SEND_SIZE) {
149153
return;
150154
}
151-
DCHECK(dependency != nullptr);
155+
if (dependency == nullptr) {
156+
throw Exception(ErrorCode::INTERNAL_ERROR,
157+
"dependency is nullptr in latch_dependency, filter: {}", debug_string());
158+
}
152159
_dependency = dependency;
153160
_dependency->add();
154161
}
@@ -158,7 +165,10 @@ Status RuntimeFilterProducer::send_size(RuntimeState* state, uint64_t local_filt
158165
if (_rf_state != State::WAITING_FOR_SEND_SIZE) {
159166
return Status::OK();
160167
}
161-
DCHECK(_dependency != nullptr);
168+
if (_dependency == nullptr) {
169+
return Status::InternalError("_dependency is nullptr in send_size, filter: {}",
170+
debug_string());
171+
}
162172
set_state(State::WAITING_FOR_SYNCED_SIZE);
163173

164174
// two case we need do local merge:
@@ -239,7 +249,10 @@ void RuntimeFilterProducer::set_synced_size(uint64_t global_size) {
239249
}
240250

241251
_synced_size = global_size;
242-
DCHECK(_dependency != nullptr);
252+
if (_dependency == nullptr) {
253+
throw Exception(ErrorCode::INTERNAL_ERROR,
254+
"_dependency is nullptr in set_synced_size, filter: {}", debug_string());
255+
}
243256
_dependency->sub();
244257
}
245258

be/src/runtime_filter/runtime_filter_producer_helper.cpp

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,10 @@ Status RuntimeFilterProducerHelper::_insert(const vectorized::Block* block, size
7474
for (int i = 0; i < _producers.size(); i++) {
7575
auto filter = _producers[i];
7676
int result_column_id = _filter_expr_contexts[i]->get_last_result_column_id();
77-
DCHECK_NE(result_column_id, -1);
77+
if (result_column_id == -1) {
78+
return Status::InternalError(
79+
"runtime filter producer _insert got invalid result_column_id -1");
80+
}
7881
const auto& column = block->get_by_position(result_column_id).column;
7982
RETURN_IF_ERROR(filter->insert(column, start));
8083
}
@@ -108,12 +111,25 @@ Status RuntimeFilterProducerHelper::build(
108111

109112
for (const auto& filter : _producers) {
110113
if (use_shared_table) {
111-
DCHECK(_is_broadcast_join);
114+
if (!_is_broadcast_join) {
115+
return Status::InternalError(
116+
"use_shared_table is true but _is_broadcast_join is false");
117+
}
112118
if (_should_build_hash_table) {
113-
DCHECK(!runtime_filters.contains(filter->wrapper()->filter_id()));
119+
if (runtime_filters.contains(filter->wrapper()->filter_id())) {
120+
return Status::InternalError(
121+
"runtime_filters already contains filter_id {} when building hash "
122+
"table",
123+
filter->wrapper()->filter_id());
124+
}
114125
runtime_filters[filter->wrapper()->filter_id()] = filter->wrapper();
115126
} else {
116-
DCHECK(runtime_filters.contains(filter->wrapper()->filter_id()));
127+
if (!runtime_filters.contains(filter->wrapper()->filter_id())) {
128+
return Status::InternalError(
129+
"runtime_filters does not contain filter_id {} when not building "
130+
"hash table",
131+
filter->wrapper()->filter_id());
132+
}
117133
filter->set_wrapper(runtime_filters[filter->wrapper()->filter_id()]);
118134
}
119135
}
@@ -147,8 +163,9 @@ Status RuntimeFilterProducerHelper::skip_process(RuntimeState* state) {
147163

148164
void RuntimeFilterProducerHelper::collect_realtime_profile(
149165
RuntimeProfile* parent_operator_profile) {
150-
DCHECK(parent_operator_profile != nullptr);
151166
if (parent_operator_profile == nullptr) {
167+
LOG(WARNING) << "parent_operator_profile is nullptr in "
168+
"RuntimeFilterProducerHelper::collect_realtime_profile";
152169
return;
153170
}
154171

be/src/runtime_filter/runtime_filter_producer_helper_cross.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,12 @@ class RuntimeFilterProducerHelperCross : public RuntimeFilterProducerHelper {
5151
for (const auto& vexpr_ctx : _filter_expr_contexts) {
5252
int result_column_id = -1;
5353
RETURN_IF_ERROR(vexpr_ctx->execute(block, &result_column_id));
54-
DCHECK_NE(result_column_id, -1) << vexpr_ctx->root()->debug_string();
54+
if (result_column_id == -1) {
55+
return Status::InternalError(
56+
"runtime filter cross join _process_block got invalid result_column_id "
57+
"-1, expr: {}",
58+
vexpr_ctx->root()->debug_string());
59+
}
5560
block->get_by_position(result_column_id).column =
5661
block->get_by_position(result_column_id)
5762
.column->convert_to_full_column_if_const();

0 commit comments

Comments
 (0)