Skip to content

Commit 30afefb

Browse files
Merge branch 'apache:master' into s7
2 parents 5c862d7 + e85f839 commit 30afefb

File tree

450 files changed

+15160
-5502
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

450 files changed

+15160
-5502
lines changed

.github/workflows/build-extension.yml

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ jobs:
4040
outputs:
4141
broker_changes: ${{ steps.filter.outputs.broker_changes }}
4242
docs_changes: ${{ steps.filter.outputs.docs_changes }}
43+
cdc_client_changes: ${{ steps.filter.outputs.cdc_client_changes }}
4344
steps:
4445
- name: Checkout ${{ github.ref }}
4546
uses: actions/checkout@v3
@@ -53,9 +54,11 @@ jobs:
5354
with:
5455
filters: |
5556
broker_changes:
56-
- 'fs_brokers/**'
57+
- 'fs_brokers/apache_hdfs_broker/**'
5758
docs_changes:
5859
- 'docs/**'
60+
cdc_client_changes:
61+
- 'fs_brokers/cdc_client/**'
5962
build-broker:
6063
name: Build Broker
6164
needs: changes
@@ -92,6 +95,41 @@ jobs:
9295
- name: Build broker
9396
run: |
9497
cd fs_brokers/apache_hdfs_broker/ && /bin/bash build.sh
98+
build-cdc-client:
99+
name: Build Cdc Client
100+
needs: changes
101+
if: ${{ needs.changes.outputs.cdc_client_changes == 'true' }}
102+
runs-on: ubuntu-latest
103+
steps:
104+
- name: Checkout ${{ github.ref }}
105+
uses: actions/checkout@v3
106+
107+
- name: Setup java
108+
uses: actions/setup-java@v2
109+
with:
110+
distribution: adopt
111+
java-version: '17'
112+
113+
- name: Setup thrift
114+
run: |
115+
pushd thirdparty
116+
branch="${{ github.base_ref }}"
117+
if [[ -z "${branch}" ]] || [[ "${branch}" == 'master' || "${branch}" == 'branch-4.0' || "${branch}" == 'branch-3.0' || "${branch}" == 'branch-2.1' ]]; then
118+
curl -L https://github.com/apache/doris-thirdparty/releases/download/automation/doris-thirdparty-prebuilt-linux-x86_64.tar.xz \
119+
-o doris-thirdparty-prebuilt-linux-x86_64.tar.xz
120+
else
121+
curl -L "https://github.com/apache/doris-thirdparty/releases/download/automation-${branch/branch-/}/doris-thirdparty-prebuilt-linux-x86_64.tar.xz" \
122+
-o doris-thirdparty-prebuilt-linux-x86_64.tar.xz
123+
fi
124+
tar -xvf doris-thirdparty-prebuilt-linux-x86_64.tar.xz
125+
popd
126+
export PATH="${DEFAULT_DIR}/ldb-toolchain/bin/:$(pwd)/thirdparty/installed/bin/:${PATH}"
127+
128+
thrift --version
129+
130+
- name: Build cdc client
131+
run: |
132+
cd fs_brokers/cdc_client/ && /bin/bash build.sh
95133
# build-docs:
96134
# name: Build Documents
97135
# needs: changes

be/src/agent/task_worker_pool.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2399,7 +2399,7 @@ void clean_udf_cache_callback(const TAgentTaskRequest& req) {
23992399
if (doris::config::enable_java_support) {
24002400
LOG(INFO) << "clean udf cache start: " << req.clean_udf_cache_req.function_signature;
24012401
static_cast<void>(
2402-
JniUtil::clean_udf_class_load_cache(req.clean_udf_cache_req.function_signature));
2402+
Jni::Util::clean_udf_class_load_cache(req.clean_udf_cache_req.function_signature));
24032403
LOG(INFO) << "clean udf cache finish: " << req.clean_udf_cache_req.function_signature;
24042404
}
24052405
}

be/src/cloud/cloud_rowset_writer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ Status CloudRowsetWriter::_collect_packed_slice_location(io::FileWriter* file_wr
195195
}
196196

197197
rowset_meta->add_packed_slice_location(file_path, index.packed_file_path, index.offset,
198-
index.size);
198+
index.size, index.packed_file_size);
199199
LOG(INFO) << "collect packed file index: " << file_path << " -> " << index.packed_file_path
200200
<< ", offset: " << index.offset << ", size: " << index.size;
201201
return Status::OK();

be/src/exec/olap_utils.h

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -104,23 +104,23 @@ inline SQLFilterOp to_olap_filter_type(TExprOpcode::type type, bool opposite) {
104104
return FILTER_IN;
105105
}
106106

107-
inline SQLFilterOp to_olap_filter_type(const std::string& function_name, bool opposite) {
107+
inline SQLFilterOp to_olap_filter_type(const std::string& function_name) {
108108
if (function_name == "lt") {
109-
return opposite ? FILTER_LARGER : FILTER_LESS;
109+
return FILTER_LESS;
110110
} else if (function_name == "gt") {
111-
return opposite ? FILTER_LESS : FILTER_LARGER;
111+
return FILTER_LARGER;
112112
} else if (function_name == "le") {
113-
return opposite ? FILTER_LARGER_OR_EQUAL : FILTER_LESS_OR_EQUAL;
113+
return FILTER_LESS_OR_EQUAL;
114114
} else if (function_name == "ge") {
115-
return opposite ? FILTER_LESS_OR_EQUAL : FILTER_LARGER_OR_EQUAL;
115+
return FILTER_LARGER_OR_EQUAL;
116116
} else if (function_name == "eq") {
117-
return opposite ? FILTER_NOT_IN : FILTER_IN;
117+
return FILTER_IN;
118118
} else if (function_name == "ne") {
119-
return opposite ? FILTER_IN : FILTER_NOT_IN;
119+
return FILTER_NOT_IN;
120120
} else if (function_name == "in") {
121-
return opposite ? FILTER_NOT_IN : FILTER_IN;
121+
return FILTER_IN;
122122
} else if (function_name == "not_in") {
123-
return opposite ? FILTER_IN : FILTER_NOT_IN;
123+
return FILTER_NOT_IN;
124124
} else {
125125
DCHECK(false) << "Function Name: " << function_name;
126126
return FILTER_IN;

be/src/exprs/create_predicate_function.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -268,11 +268,13 @@ std::shared_ptr<ColumnPredicate> create_olap_column_predicate(
268268
const TabletColumn* column, bool) {
269269
// currently only support like predicate
270270
if constexpr (PT == TYPE_CHAR) {
271-
return LikeColumnPredicate<TYPE_CHAR>::create_shared(
272-
filter->_opposite, column_id, filter->_fn_ctx, filter->_string_param);
271+
return LikeColumnPredicate<TYPE_CHAR>::create_shared(filter->_opposite, column_id,
272+
column->name(), filter->_fn_ctx,
273+
filter->_string_param);
273274
} else if constexpr (PT == TYPE_VARCHAR || PT == TYPE_STRING) {
274-
return LikeColumnPredicate<TYPE_STRING>::create_shared(
275-
filter->_opposite, column_id, filter->_fn_ctx, filter->_string_param);
275+
return LikeColumnPredicate<TYPE_STRING>::create_shared(filter->_opposite, column_id,
276+
column->name(), filter->_fn_ctx,
277+
filter->_string_param);
276278
}
277279
throw Exception(ErrorCode::INTERNAL_ERROR, "function filter do not support type {}", PT);
278280
}

be/src/io/fs/benchmark/benchmark_factory.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ class MultiBenchmark {
104104
Status status = Status::OK();
105105
if (doris::config::enable_java_support) {
106106
// Init jni
107-
status = doris::JniUtil::Init();
107+
status = doris::Jni::Util::Init();
108108
if (!status.ok()) {
109109
LOG(WARNING) << "Failed to initialize JNI: " << status;
110110
exit(1);

be/src/io/fs/hdfs_file_writer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ class HdfsWriteMemUsageRecorder {
128128
private:
129129
// clang-format off
130130
size_t max_jvm_heap_size() const {
131-
return JniUtil::get_max_jni_heap_memory_size();
131+
return Jni::Util::get_max_jni_heap_memory_size();
132132
}
133133
// clang-format on
134134
[[maybe_unused]] std::size_t cur_memory_comsuption {0};

be/src/io/fs/packed_file_manager.cpp

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,8 @@ Status PackedFileManager::create_new_packed_file_context(
150150
// Create file writer for the packed file
151151
FileWriterPtr new_writer;
152152
FileWriterOptions opts;
153+
// enable write file cache for packed file
154+
opts.write_file_cache = true;
153155
RETURN_IF_ERROR(
154156
packed_file_ctx->file_system->create_file(Path(relative_path), &new_writer, &opts));
155157
packed_file_ctx->writer = std::move(new_writer);
@@ -256,6 +258,7 @@ Status PackedFileManager::append_small_file(const std::string& path, const Slice
256258
location.packed_file_path = active_state->packed_file_path;
257259
location.offset = active_state->current_offset;
258260
location.size = data.get_size();
261+
location.create_time = std::time(nullptr);
259262
location.tablet_id = info.tablet_id;
260263
location.rowset_id = info.rowset_id;
261264
location.resource_id = info.resource_id;
@@ -609,6 +612,15 @@ void PackedFileManager::process_uploading_packed_files() {
609612
first_slice = false;
610613
slices_stream << small_file_path << "(txn=" << index.txn_id
611614
<< ", offset=" << index.offset << ", size=" << index.size << ")";
615+
616+
// Update packed_file_size in global index
617+
{
618+
std::lock_guard<std::mutex> global_lock(_global_index_mutex);
619+
auto it = _global_slice_locations.find(small_file_path);
620+
if (it != _global_slice_locations.end()) {
621+
it->second.packed_file_size = packed_file->total_size;
622+
}
623+
}
612624
}
613625
LOG(INFO) << "Packed file " << packed_file->packed_file_path
614626
<< " uploaded; slices=" << packed_file->slice_locations.size()
@@ -809,30 +821,12 @@ void PackedFileManager::cleanup_expired_data() {
809821

810822
// Clean up expired global index entries
811823
{
812-
std::unordered_set<std::string> active_packed_files;
813-
{
814-
std::lock_guard<std::timed_mutex> current_lock(_current_packed_file_mutex);
815-
for (const auto& [resource_id, state] : _current_packed_files) {
816-
if (state) {
817-
active_packed_files.insert(state->packed_file_path);
818-
}
819-
}
820-
}
821-
{
822-
std::lock_guard<std::mutex> merge_lock(_packed_files_mutex);
823-
for (const auto& [path, state] : _uploading_packed_files) {
824-
active_packed_files.insert(path);
825-
}
826-
for (const auto& [path, state] : _uploaded_packed_files) {
827-
active_packed_files.insert(path);
828-
}
829-
}
830-
831824
std::lock_guard<std::mutex> global_lock(_global_index_mutex);
832825
auto it = _global_slice_locations.begin();
833826
while (it != _global_slice_locations.end()) {
834827
const auto& index = it->second;
835-
if (active_packed_files.find(index.packed_file_path) == active_packed_files.end()) {
828+
if (index.create_time > 0 &&
829+
current_time - index.create_time > config::uploaded_file_retention_seconds) {
836830
it = _global_slice_locations.erase(it);
837831
} else {
838832
++it;

be/src/io/fs/packed_file_manager.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,12 @@ struct PackedSliceLocation {
4545
std::string packed_file_path;
4646
int64_t offset;
4747
int64_t size;
48+
int64_t create_time = 0;
4849
int64_t tablet_id = 0;
4950
std::string rowset_id;
5051
std::string resource_id;
5152
int64_t txn_id = 0;
53+
int64_t packed_file_size = -1; // Total size of the packed file, -1 means not set
5254
};
5355

5456
struct PackedAppendContext {

be/src/io/fs/packed_file_system.cpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,11 @@ Status PackedFileSystem::open_file_impl(const Path& file, FileReaderSPtr* reader
7171
FileReaderSPtr inner_reader;
7272
// Create a new FileReaderOptions with the correct file size
7373
FileReaderOptions local_opts = opts ? *opts : FileReaderOptions();
74-
// DCHECK(opts->file_size == -1 || opts->file_size == index.size)
75-
// << "file size is not correct, expected: " << index.size
76-
// << ", actual: " << opts->file_size;
77-
// local_opts.file_size = index.size + index.offset;
78-
local_opts.file_size = -1;
74+
// Set file_size to packed file size to avoid head object request
75+
local_opts.file_size = index.packed_file_size;
7976
LOG(INFO) << "open packed file: " << index.packed_file_path << ", file: " << file.native()
80-
<< ", offset: " << index.offset << ", size: " << index.size;
77+
<< ", offset: " << index.offset << ", size: " << index.size
78+
<< ", packed_file_size: " << index.packed_file_size;
8179
RETURN_IF_ERROR(
8280
_inner_fs->open_file(Path(index.packed_file_path), &inner_reader, &local_opts));
8381

0 commit comments

Comments
 (0)