Skip to content

Commit 16da847

Browse files
committed
[enhancement](be_metrics) update scan bytes metric in file_scanner.
1 parent 9ef1b5d commit 16da847

21 files changed

+276
-91
lines changed

be/src/io/fs/tracing_file_reader.h

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
#include "common/status.h"
20+
#include "io/fs/file_reader.h"
21+
#include "util/runtime_profile.h"
22+
23+
namespace doris {
24+
25+
namespace io {
26+
27+
class TracingFileReader : public FileReader {
28+
public:
29+
TracingFileReader(doris::io::FileReaderSPtr inner, FileReaderStats* stats)
30+
: _inner(std::move(inner)), _stats(stats) {}
31+
32+
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
33+
const IOContext* io_ctx) override {
34+
SCOPED_RAW_TIMER(&_stats->read_time_ns);
35+
Status st = _inner->read_at(offset, result, bytes_read, io_ctx);
36+
_stats->read_calls++;
37+
_stats->read_bytes += *bytes_read;
38+
return st;
39+
}
40+
41+
Status close() override { return _inner->close(); }
42+
const doris::io::Path& path() const override { return _inner->path(); }
43+
size_t size() const override { return _inner->size(); }
44+
bool closed() const override { return _inner->closed(); }
45+
const std::string& get_data_dir_path() override { return _inner->get_data_dir_path(); }
46+
47+
void _collect_profile_at_runtime() override { return _inner->collect_profile_at_runtime(); }
48+
void _collect_profile_before_close() override { return _inner->collect_profile_before_close(); }
49+
50+
FileReaderStats* stats() const { return _stats; }
51+
52+
private:
53+
doris::io::FileReaderSPtr _inner;
54+
FileReaderStats* _stats;
55+
};
56+
57+
} // namespace io
58+
} // namespace doris

be/src/io/io_common.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@ enum class ReaderType : uint8_t {
3535

3636
namespace io {
3737

38+
struct FileReaderStats {
39+
size_t read_calls = 0;
40+
size_t read_bytes = 0;
41+
int64_t read_time_ns = 0;
42+
size_t read_rows = 0;
43+
};
44+
3845
struct FileCacheStatistics {
3946
int64_t num_local_io_total = 0;
4047
int64_t num_remote_io_total = 0;
@@ -73,6 +80,7 @@ struct IOContext {
7380
int64_t expiration_time = 0;
7481
const TUniqueId* query_id = nullptr; // Ref
7582
FileCacheStatistics* file_cache_stats = nullptr; // Ref
83+
FileReaderStats* file_reader_stats = nullptr; // Ref
7684
bool is_inverted_index = false;
7785
// if is_dryrun, read IO will download data to cache but return no data to reader
7886
// useful to skip cache data read from local disk to accelarate warm up

be/src/pipeline/exec/file_scan_operator.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class FileScanLocalState final : public ScanLocalState<FileScanLocalState> {
5555
std::string name_suffix() const override;
5656

5757
private:
58+
friend class vectorized::VFileScanner;
5859
std::shared_ptr<vectorized::SplitSourceConnector> _split_source = nullptr;
5960
int _max_scanners;
6061
// A in memory cache to save some common components

be/src/service/internal_service.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -825,6 +825,8 @@ void PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
825825
io::IOContext io_ctx;
826826
io::FileCacheStatistics file_cache_statis;
827827
io_ctx.file_cache_stats = &file_cache_statis;
828+
io::FileReaderStats file_reader_stats;
829+
io_ctx.file_reader_stats = &file_reader_stats;
828830
// file_slots is no use, but the lifetime should be longer than reader
829831
std::vector<SlotDescriptor*> file_slots;
830832
switch (params.format_type) {

be/src/vec/exec/format/arrow/arrow_stream_reader.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "arrow_pip_input_stream.h"
2626
#include "common/logging.h"
2727
#include "io/fs/stream_load_pipe.h"
28+
#include "io/fs/tracing_file_reader.h"
2829
#include "runtime/descriptors.h"
2930
#include "runtime/runtime_state.h"
3031
#include "vec/core/block.h"
@@ -43,14 +44,22 @@ ArrowStreamReader::ArrowStreamReader(RuntimeState* state, RuntimeProfile* profil
4344
const TFileRangeDesc& range,
4445
const std::vector<SlotDescriptor*>& file_slot_descs,
4546
io::IOContext* io_ctx)
46-
: _state(state), _range(range), _file_slot_descs(file_slot_descs), _file_reader(nullptr) {
47+
: _state(state),
48+
_range(range),
49+
_file_slot_descs(file_slot_descs),
50+
_io_ctx(io_ctx),
51+
_file_reader(nullptr) {
4752
TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, _ctzz);
4853
}
4954

5055
ArrowStreamReader::~ArrowStreamReader() = default;
5156

5257
Status ArrowStreamReader::init_reader() {
53-
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state, false));
58+
io::FileReaderSPtr file_reader;
59+
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &file_reader, _state, false));
60+
_file_reader = _io_ctx ? std::make_shared<io::TracingFileReader>(std::move(file_reader),
61+
_io_ctx->file_reader_stats)
62+
: file_reader;
5463
_pip_stream = ArrowPipInputStream::create_unique(_file_reader);
5564
return Status::OK();
5665
}
@@ -121,4 +130,4 @@ Status ArrowStreamReader::get_columns(std::unordered_map<std::string, TypeDescri
121130
}
122131

123132
#include "common/compile_check_end.h"
124-
} // namespace doris::vectorized
133+
} // namespace doris::vectorized

be/src/vec/exec/format/arrow/arrow_stream_reader.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class ArrowStreamReader : public GenericReader {
6565
RuntimeState* _state;
6666
const TFileRangeDesc& _range;
6767
const std::vector<SlotDescriptor*>& _file_slot_descs;
68+
io::IOContext* _io_ctx;
6869
io::FileReaderSPtr _file_reader;
6970
std::unique_ptr<doris::vectorized::ArrowPipInputStream> _pip_stream;
7071
cctz::time_zone _ctzz;

be/src/vec/exec/format/csv/csv_reader.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#include "io/fs/buffered_reader.h"
4040
#include "io/fs/file_reader.h"
4141
#include "io/fs/s3_file_reader.h"
42+
#include "io/fs/tracing_file_reader.h"
4243
#include "runtime/descriptors.h"
4344
#include "runtime/runtime_state.h"
4445
#include "util/string_util.h"
@@ -541,10 +542,13 @@ Status CsvReader::_create_file_reader(bool need_schema) {
541542
_file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
542543
io::FileReaderOptions reader_options =
543544
FileFactory::get_reader_options(_state, _file_description);
544-
_file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
545+
auto file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
545546
_profile, _system_properties, _file_description, reader_options,
546547
io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
547548
io::PrefetchRange(_range.start_offset, _range.start_offset + _range.size)));
549+
_file_reader = _io_ctx ? std::make_shared<io::TracingFileReader>(std::move(file_reader),
550+
_io_ctx->file_reader_stats)
551+
: file_reader;
548552
}
549553
if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM &&
550554
_params.file_type != TFileType::FILE_BROKER) {

be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -221,12 +221,8 @@ NewPlainTextLineReader::NewPlainTextLineReader(RuntimeProfile* profile,
221221
_more_input_bytes(0),
222222
_more_output_bytes(0),
223223
_current_offset(current_offset),
224-
_bytes_read_counter(nullptr),
225-
_read_timer(nullptr),
226224
_bytes_decompress_counter(nullptr),
227225
_decompress_timer(nullptr) {
228-
_bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
229-
_read_timer = ADD_TIMER(_profile, "FileReadTime");
230226
_bytes_decompress_counter = ADD_COUNTER(_profile, "BytesDecompressed", TUnit::BYTES);
231227
_decompress_timer = ADD_TIMER(_profile, "DecompressTime");
232228
}
@@ -384,16 +380,12 @@ Status NewPlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool
384380
}
385381
}
386382

387-
{
388-
SCOPED_TIMER(_read_timer);
389-
Slice file_slice(file_buf, buffer_len);
390-
RETURN_IF_ERROR(
391-
_file_reader->read_at(_current_offset, file_slice, &read_len, io_ctx));
392-
_current_offset += read_len;
393-
if (read_len == 0) {
394-
_file_eof = true;
395-
}
396-
COUNTER_UPDATE(_bytes_read_counter, read_len);
383+
Slice file_slice(file_buf, buffer_len);
384+
RETURN_IF_ERROR(
385+
_file_reader->read_at(_current_offset, file_slice, &read_len, io_ctx));
386+
_current_offset += read_len;
387+
if (read_len == 0) {
388+
_file_eof = true;
397389
}
398390
if (_file_eof || read_len == 0) {
399391
if (!stream_end) {

be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,8 +332,6 @@ class NewPlainTextLineReader : public LineReader {
332332
size_t _current_offset;
333333

334334
// Profile counters
335-
RuntimeProfile::Counter* _bytes_read_counter = nullptr;
336-
RuntimeProfile::Counter* _read_timer = nullptr;
337335
RuntimeProfile::Counter* _bytes_decompress_counter = nullptr;
338336
RuntimeProfile::Counter* _decompress_timer = nullptr;
339337
};

0 commit comments

Comments
 (0)