Skip to content

Commit b119cd5

Browse files
committed
[feature](vparquet-reader) Implements parquet file page cache.
1 parent 6ec35ec commit b119cd5

37 files changed

+1532
-78
lines changed

be/src/io/cache/cached_remote_file_reader.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ class CachedRemoteFileReader final : public FileReader {
5555

5656
static std::pair<size_t, size_t> s_align_size(size_t offset, size_t size, size_t length);
5757

58+
int64_t mtime() const override { return _remote_file_reader->mtime(); }
59+
5860
protected:
5961
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
6062
const IOContext* io_ctx) override;

be/src/io/file_factory.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,21 @@ Result<io::FileReaderSPtr> FileFactory::create_file_reader(
203203
const io::FileSystemProperties& system_properties,
204204
const io::FileDescription& file_description, const io::FileReaderOptions& reader_options,
205205
RuntimeProfile* profile) {
206+
auto reader_res = _create_file_reader_internal(system_properties, file_description,
207+
reader_options, profile);
208+
if (!reader_res.has_value()) {
209+
return unexpected(std::move(reader_res).error());
210+
}
211+
auto file_reader = std::move(reader_res).value();
212+
LOG_INFO("create file reader for path={}, size={}, mtime={}", file_description.path,
213+
file_description.file_size, file_description.mtime);
214+
return file_reader;
215+
}
216+
217+
Result<io::FileReaderSPtr> FileFactory::_create_file_reader_internal(
218+
const io::FileSystemProperties& system_properties,
219+
const io::FileDescription& file_description, const io::FileReaderOptions& reader_options,
220+
RuntimeProfile* profile) {
206221
TFileType::type type = system_properties.system_type;
207222
switch (type) {
208223
case TFileType::FILE_LOCAL: {

be/src/io/file_factory.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,12 @@ class FileFactory {
126126

127127
private:
128128
static std::string _get_fs_name(const io::FileDescription& file_description);
129+
130+
/// Create FileReader without FS
131+
static Result<io::FileReaderSPtr> _create_file_reader_internal(
132+
const io::FileSystemProperties& system_properties,
133+
const io::FileDescription& file_description,
134+
const io::FileReaderOptions& reader_options, RuntimeProfile* profile = nullptr);
129135
};
130136

131137
} // namespace doris

be/src/io/fs/broker_file_reader.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,14 @@ struct IOContext;
3939

4040
BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t file_size,
4141
TBrokerFD fd,
42-
std::shared_ptr<BrokerServiceConnection> connection)
42+
std::shared_ptr<BrokerServiceConnection> connection,
43+
int64_t mtime)
4344
: _path(std::move(path)),
4445
_file_size(file_size),
4546
_broker_addr(broker_addr),
4647
_fd(fd),
47-
_connection(std::move(connection)) {
48+
_connection(std::move(connection)),
49+
_mtime(mtime) {
4850
DorisMetrics::instance()->broker_file_open_reading->increment(1);
4951
DorisMetrics::instance()->broker_file_reader_total->increment(1);
5052
}

be/src/io/fs/broker_file_reader.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ struct IOContext;
3838
class BrokerFileReader final : public FileReader {
3939
public:
4040
BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t file_size, TBrokerFD fd,
41-
std::shared_ptr<BrokerServiceConnection> connection);
41+
std::shared_ptr<BrokerServiceConnection> connection, int64_t mtime = 0);
4242

4343
~BrokerFileReader() override;
4444

@@ -50,6 +50,8 @@ class BrokerFileReader final : public FileReader {
5050

5151
bool closed() const override { return _closed.load(std::memory_order_acquire); }
5252

53+
int64_t mtime() const override { return _mtime; }
54+
5355
protected:
5456
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
5557
const IOContext* io_ctx) override;
@@ -62,6 +64,7 @@ class BrokerFileReader final : public FileReader {
6264
TBrokerFD _fd;
6365

6466
std::shared_ptr<BrokerServiceConnection> _connection;
67+
int64_t _mtime;
6568
std::atomic<bool> _closed = false;
6669
};
6770
} // namespace doris::io

be/src/io/fs/broker_file_system.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ Status BrokerFileSystem::open_file_internal(const Path& file, FileReaderSPtr* re
139139
error_msg(response->opStatus.message));
140140
}
141141
*reader = std::make_shared<BrokerFileReader>(_broker_addr, file, fsize, response->fd,
142-
_connection);
142+
_connection, opts.mtime);
143143
return Status::OK();
144144
}
145145

be/src/io/fs/buffered_reader.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ class RangeCacheFileReader : public io::FileReader {
160160

161161
bool closed() const override { return _closed; }
162162

163+
int64_t mtime() const override { return _inner_reader->mtime(); }
164+
163165
protected:
164166
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
165167
const IOContext* io_ctx) override;
@@ -329,6 +331,8 @@ class MergeRangeFileReader : public io::FileReader {
329331

330332
bool closed() const override { return _closed; }
331333

334+
int64_t mtime() const override { return _reader->mtime(); }
335+
332336
// for test only
333337
size_t buffer_remaining() const { return _remaining; }
334338

@@ -532,6 +536,8 @@ class PrefetchBufferedReader final : public io::FileReader {
532536

533537
bool closed() const override { return _closed; }
534538

539+
int64_t mtime() const override { return _reader->mtime(); }
540+
535541
void set_random_access_ranges(const std::vector<PrefetchRange>* random_access_ranges) {
536542
_random_access_ranges = random_access_ranges;
537543
for (auto& _pre_buffer : _pre_buffers) {
@@ -592,6 +598,8 @@ class InMemoryFileReader final : public io::FileReader {
592598

593599
bool closed() const override { return _closed; }
594600

601+
int64_t mtime() const override { return _reader->mtime(); }
602+
595603
protected:
596604
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
597605
const IOContext* io_ctx) override;
@@ -626,6 +634,8 @@ class BufferedStreamReader {
626634
virtual ~BufferedStreamReader() = default;
627635
// return the file path
628636
virtual std::string path() = 0;
637+
638+
virtual int64_t mtime() const = 0;
629639
};
630640

631641
class BufferedFileStreamReader : public BufferedStreamReader, public ProfileCollector {
@@ -639,6 +649,8 @@ class BufferedFileStreamReader : public BufferedStreamReader, public ProfileColl
639649
Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) override;
640650
std::string path() override { return _file->path(); }
641651

652+
int64_t mtime() const override { return _file->mtime(); }
653+
642654
protected:
643655
void _collect_profile_before_close() override {
644656
if (_file != nullptr) {

be/src/io/fs/file_reader.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ class FileReader : public doris::ProfileCollector {
9090

9191
virtual const std::string& get_data_dir_path() { return VIRTUAL_REMOTE_DATA_DIR; }
9292

93+
// File modification time (seconds since epoch). Default to 0 meaning unknown.
94+
virtual int64_t mtime() const = 0;
95+
9396
protected:
9497
virtual Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
9598
const IOContext* io_ctx) = 0;

be/src/io/fs/hdfs_file_reader.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,16 +66,17 @@ Result<FileReaderSPtr> HdfsFileReader::create(Path full_path, const hdfsFS& fs,
6666
auto path = convert_path(full_path, fs_name);
6767
return get_file(fs, path, opts.mtime, opts.file_size).transform([&](auto&& accessor) {
6868
return std::make_shared<HdfsFileReader>(std::move(path), std::move(fs_name),
69-
std::move(accessor), profile);
69+
std::move(accessor), profile, opts.mtime);
7070
});
7171
}
7272

7373
HdfsFileReader::HdfsFileReader(Path path, std::string fs_name, FileHandleCache::Accessor accessor,
74-
RuntimeProfile* profile)
74+
RuntimeProfile* profile, int64_t mtime)
7575
: _path(std::move(path)),
7676
_fs_name(std::move(fs_name)),
7777
_accessor(std::move(accessor)),
78-
_profile(profile) {
78+
_profile(profile),
79+
_mtime(mtime) {
7980
_handle = _accessor.get();
8081

8182
DorisMetrics::instance()->hdfs_file_open_reading->increment(1);

be/src/io/fs/hdfs_file_reader.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class HdfsFileReader final : public FileReader {
4545
const FileReaderOptions& opts, RuntimeProfile* profile);
4646

4747
HdfsFileReader(Path path, std::string fs_name, FileHandleCache::Accessor accessor,
48-
RuntimeProfile* profile);
48+
RuntimeProfile* profile, int64_t mtime = 0);
4949

5050
~HdfsFileReader() override;
5151

@@ -57,6 +57,8 @@ class HdfsFileReader final : public FileReader {
5757

5858
bool closed() const override { return _closed.load(std::memory_order_acquire); }
5959

60+
int64_t mtime() const override { return _mtime; }
61+
6062
protected:
6163
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
6264
const IOContext* io_ctx) override;
@@ -86,6 +88,7 @@ class HdfsFileReader final : public FileReader {
8688
CachedHdfsFileHandle* _handle = nullptr; // owned by _cached_file_handle
8789
std::atomic<bool> _closed = false;
8890
RuntimeProfile* _profile = nullptr;
91+
int64_t _mtime;
8992
#ifdef USE_HADOOP_HDFS
9093
HDFSProfile _hdfs_profile;
9194
#endif

0 commit comments

Comments
 (0)