Skip to content

Commit ebf8214

Browse files
committed
Improve the speed of text file position restore
1 parent 7a3de70 commit ebf8214

File tree

15 files changed

+278
-50
lines changed

15 files changed

+278
-50
lines changed

native/python/src/fairseq2n/bindings/data/text/text_reader.cc

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,14 @@ def_text_reader(py::module_ &text_module)
5353
return read_text(std::move(path), std::move(key), std::move(opts));
5454
},
5555
py::arg("path"),
56-
py::arg("key") = std::nullopt,
57-
py::arg("encoding") = std::nullopt,
58-
py::arg("line_ending") = line_ending::infer,
59-
py::arg("ltrim") = false,
60-
py::arg("rtrim") = false,
61-
py::arg("skip_empty") = false,
62-
py::arg("memory_map") = false,
63-
py::arg("block_size") = std::nullopt);
56+
py::arg("key") = std::nullopt,
57+
py::arg("encoding") = std::nullopt,
58+
py::arg("line_ending") = line_ending::infer,
59+
py::arg("ltrim") = false,
60+
py::arg("rtrim") = false,
61+
py::arg("skip_empty") = false,
62+
py::arg("memory_map") = false,
63+
py::arg("block_size") = std::nullopt);
6464
}
6565

6666
} // namespace fairseq2n

native/src/fairseq2n/data/byte_stream.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
#include "fairseq2n/api.h"
1212
#include "fairseq2n/memory.h"
13+
#include "fairseq2n/data/tape.h"
1314

1415
namespace fairseq2n {
1516

@@ -29,8 +30,23 @@ class FAIRSEQ2_API byte_stream {
2930
virtual memory_block
3031
read_chunk() = 0;
3132

33+
virtual void
34+
seek(std::size_t offset) = 0;
35+
36+
virtual std::size_t
37+
position() const = 0;
38+
3239
virtual void
3340
reset() = 0;
41+
42+
virtual void
43+
record_position(tape &t) const = 0;
44+
45+
virtual void
46+
reload_position(tape &t) = 0;
47+
48+
virtual bool
49+
supports_seek() const noexcept = 0;
3450
};
3551

3652
class FAIRSEQ2_API byte_stream_error : public std::runtime_error {

native/src/fairseq2n/data/file.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,10 @@ open_file(const std::filesystem::path &path, const file_options &opts)
8383
} else
8484
stream = std::make_unique<file_stream>(std::move(fd), path, chunk_size);
8585

86-
if (opts.mode() == file_mode::text)
87-
stream = std::make_unique<utf8_stream>(
88-
std::move(stream), opts.maybe_text_encoding(), chunk_size);
86+
// TODO: fix!
87+
// if (opts.mode() == file_mode::text)
88+
// stream = std::make_unique<utf8_stream>(
89+
// std::move(stream), opts.maybe_text_encoding(), chunk_size);
8990

9091
return stream;
9192
}

native/src/fairseq2n/data/file.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ class file_options {
8383
return maybe_text_encoding_;
8484
}
8585

86+
8687
private:
8788
file_mode mode_ = file_mode::binary;
8889
bool memory_map_ = false;

native/src/fairseq2n/data/file_stream.cc

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,23 +61,48 @@ file_stream::read_chunk()
6161
return chunk.share_first(chunk.size() - remaining_space.size());
6262
}
6363

64+
void
65+
file_stream::seek(std::size_t offset)
66+
{
67+
seek(offset, /*from_current=*/false);
68+
}
69+
70+
std::size_t
71+
file_stream::position() const
72+
{
73+
return seek(0, /*from_current=*/true);
74+
}
75+
6476
void
6577
file_stream::reset()
6678
{
67-
::off_t offset = ::lseek(fd_.get(), 0, SEEK_SET);
68-
if (offset == -1) {
69-
std::error_code err = last_error();
79+
seek(0);
7080

71-
if (err == std::errc::invalid_seek)
72-
throw_<byte_stream_error>("'{}' is not seekable and cannot be reset.", path_.string());
81+
is_eod_ = false;
82+
}
7383

74-
throw_system_error(err,
75-
"'{}' cannot be reset", path_.string());
76-
}
84+
void
85+
file_stream::record_position(tape &t) const
86+
{
87+
std::size_t offset = position();
88+
89+
t.record(offset);
90+
}
91+
92+
void
93+
file_stream::reload_position(tape &t)
94+
{
95+
seek(t.read<std::size_t>());
7796

7897
is_eod_ = false;
7998
}
8099

100+
bool
101+
file_stream::supports_seek() const noexcept
102+
{
103+
return true;
104+
}
105+
81106
std::size_t
82107
file_stream::fill_chunk(writable_memory_span chunk)
83108
{
@@ -89,4 +114,22 @@ file_stream::fill_chunk(writable_memory_span chunk)
89114
return static_cast<std::size_t>(num_bytes_read);
90115
}
91116

117+
std::size_t
118+
file_stream::seek(std::size_t offset, bool from_current) const
119+
{
120+
::off_t off = ::lseek(
121+
fd_.get(), static_cast<::off_t>(offset), from_current ? SEEK_CUR : SEEK_SET);
122+
123+
if (off != -1)
124+
return static_cast<std::size_t>(off);
125+
126+
std::error_code err = last_error();
127+
128+
if (err == std::errc::invalid_seek)
129+
throw_<byte_stream_error>("'{}' is not seekable.", path_.string());
130+
131+
throw_system_error(err,
132+
"'{}' cannot be read", path_.string());
133+
}
134+
92135
} // namespace fairseq2n::detail

native/src/fairseq2n/data/file_stream.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,31 @@ class file_stream final : public byte_stream {
2929
memory_block
3030
read_chunk() override;
3131

32+
void
33+
seek(std::size_t offset) override;
34+
35+
std::size_t
36+
position() const override;
37+
3238
void
3339
reset() override;
3440

41+
void
42+
record_position(tape &t) const override;
43+
44+
void
45+
reload_position(tape &t) override;
46+
47+
bool
48+
supports_seek() const noexcept override;
49+
3550
private:
3651
std::size_t
3752
fill_chunk(writable_memory_span chunk);
3853

54+
std::size_t
55+
seek(std::size_t offset, bool from_current) const;
56+
3957
private:
4058
file_desc fd_;
4159
std::filesystem::path path_;

native/src/fairseq2n/data/memory_stream.cc

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,47 @@ memory_stream::read_chunk()
1414
return std::exchange(block_, {});
1515
}
1616

17+
void
18+
memory_stream::seek(std::size_t offset)
19+
{
20+
if (offset >= block_.size())
21+
block_ = {};
22+
else
23+
block_ = block_.share_slice(offset);
24+
}
25+
26+
std::size_t
27+
memory_stream::position() const
28+
{
29+
return original_block_.size() - block_.size();
30+
}
31+
1732
void
1833
memory_stream::reset()
1934
{
2035
block_ = original_block_;
2136
}
2237

38+
void
39+
memory_stream::record_position(tape &t) const
40+
{
41+
std::size_t offset = position();
42+
43+
t.record(offset);
44+
}
45+
46+
void
47+
memory_stream::reload_position(tape &t)
48+
{
49+
block_ = original_block_;
50+
51+
seek(t.read<std::size_t>());
52+
}
53+
54+
bool
55+
memory_stream::supports_seek() const noexcept
56+
{
57+
return true;
58+
}
59+
2360
} // namespace fairseq2n::detail

native/src/fairseq2n/data/memory_stream.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,24 @@ class memory_stream final : public byte_stream {
2525
memory_block
2626
read_chunk() override;
2727

28+
void
29+
seek(std::size_t offset) override;
30+
31+
std::size_t
32+
position() const override;
33+
2834
void
2935
reset() override;
3036

37+
void
38+
record_position(tape &t) const override;
39+
40+
void
41+
reload_position(tape &t) override;
42+
43+
bool
44+
supports_seek() const noexcept override;
45+
3146
private:
3247
memory_block block_;
3348
memory_block original_block_;

native/src/fairseq2n/data/record_reader.cc

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "fairseq2n/data/record_reader.h"
88

99
#include <algorithm>
10+
#include <stdexcept>
1011

1112
#include <fmt/format.h>
1213

@@ -34,11 +35,41 @@ record_reader::next()
3435
void
3536
record_reader::reset()
3637
{
38+
stream_->reset();
39+
3740
current_chunk_ = {};
3841

39-
previous_chunks_.clear();
42+
stream_offset_ = 0;
4043

41-
stream_->reset();
44+
chunk_offset_ = 0;
45+
}
46+
47+
void
48+
record_reader::record_position(tape &t) const
49+
{
50+
t.record(stream_offset_);
51+
52+
t.record(chunk_offset_);
53+
}
54+
55+
void
56+
record_reader::reload_position(tape &t)
57+
{
58+
stream_offset_ = t.read<std::size_t>();
59+
60+
stream_->seek(stream_offset_);
61+
62+
chunk_offset_ = t.read<std::size_t>();
63+
if (chunk_offset_ > 0) {
64+
current_chunk_ = stream_->read_chunk();
65+
66+
if (chunk_offset_ >= current_chunk_.size())
67+
throw_<std::invalid_argument>(
68+
"The tape is corrupt. The state of the record reader cannot be restored.");
69+
70+
current_chunk_ = current_chunk_.share_slice(chunk_offset_);
71+
} else
72+
current_chunk_ = {};
4273
}
4374

4475
bool
@@ -52,6 +83,8 @@ record_reader::load_next_record()
5283

5384
// Load and store memory chunks until we find the end of the next record.
5485
while (!(maybe_record_end_offset = maybe_find_record_end(current_chunk_, first_chunk))) {
86+
stream_offset_ = stream_->position();
87+
5588
memory_block next_chunk = stream_->read_chunk();
5689
if (next_chunk.empty()) {
5790
// If `next_chunk` is empty and we don't have any partial record
@@ -71,6 +104,8 @@ record_reader::load_next_record()
71104

72105
current_chunk_ = std::move(next_chunk);
73106

107+
chunk_offset_ = 0;
108+
74109
first_chunk = false;
75110
}
76111

@@ -117,6 +152,8 @@ record_reader::move_to_next_record()
117152
{
118153
current_chunk_ = current_chunk_.share_slice(record_end_offset_);
119154

155+
chunk_offset_ += record_end_offset_;
156+
120157
previous_chunks_.clear();
121158
}
122159

native/src/fairseq2n/data/record_reader.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ class FAIRSEQ2_API record_reader {
4242
void
4343
reset();
4444

45+
void
46+
record_position(tape &t) const;
47+
48+
void
49+
reload_position(tape &);
50+
4551
private:
4652
bool
4753
load_next_record();
@@ -62,6 +68,8 @@ class FAIRSEQ2_API record_reader {
6268
std::unique_ptr<byte_stream> stream_;
6369
memory_block current_chunk_{};
6470
std::vector<memory_block> previous_chunks_{};
71+
std::size_t stream_offset_ = 0;
72+
std::size_t chunk_offset_ = 0;
6573
std::size_t record_len_ = 0;
6674
std::size_t record_end_offset_ = 0;
6775
};

0 commit comments

Comments
 (0)