Skip to content

Commit 27a54d6

Browse files
committed
parallel data load
1 parent 9a49abe commit 27a54d6

File tree

5 files changed

+50
-27
lines changed

5 files changed

+50
-27
lines changed

cpp/include/ioutils.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,18 @@ class IoUtils {
3232
~IoUtils();
3333
bool Init(std::string opt_path);
3434
int LoadStreamFile(std::string filepath);
35-
std::pair<int, bool> ReadStreamForVocab(int num_lines);
35+
std::pair<int, int> ReadStreamForVocab(int num_lines, int num_threads);
3636
void GetWordVocab(int min_count);
3737
private:
3838
void ParseLine(std::string line, std::vector<std::string>& line_vec);
3939

40+
std::mutex global_lock_;
4041
std::ifstream stream_fin_;
4142
json11::Json opt_;
4243
std::shared_ptr<spdlog::logger> logger_;
4344
std::unordered_map<std::string, int> word_idmap_, word_count_;
4445
std::vector<std::string> word_list_;
46+
int num_lines_, remain_lines_;
4547
}; // class IoUtils
4648

4749
} // namespace cusim

cpp/src/ioutils.cc

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -56,28 +56,45 @@ int IoUtils::LoadStreamFile(std::string filepath) {
5656
word_idmap_.clear();
5757
word_list_.clear();
5858
word_count_.clear();
59+
num_lines_ = count;
60+
remain_lines_ = num_lines_;
5961
return count;
6062
}
6163

62-
std::pair<int, bool> IoUtils::ReadStreamForVocab(int num_lines) {
63-
int read_cnt = 0;
64-
std::string line;
65-
std::vector<std::string> line_vec;
66-
while (not stream_fin_.eof() and read_cnt < num_lines) {
67-
getline(stream_fin_, line);
68-
ParseLine(line, line_vec);
69-
for (auto& word: line_vec) {
70-
if (not word_count_.count(word)) word_count_[word] = 0;
71-
word_count_[word]++;
64+
std::pair<int, int> IoUtils::ReadStreamForVocab(int num_lines, int num_threads) {
65+
int read_lines = std::min(num_lines, remain_lines_);
66+
remain_lines_ -= read_lines;
67+
#pragma omp parallel num_threads(num_threads)
68+
{
69+
std::string line;
70+
std::vector<std::string> line_vec;
71+
std::unordered_map<std::string, int> word_count;
72+
#pragma omp for schedule(dynamic, 4)
73+
for (int i = 0; i < read_lines; ++i) {
74+
// get line thread-safely
75+
{
76+
std::unique_lock<std::mutex> lock(global_lock_);
77+
getline(stream_fin_, line);
78+
}
79+
80+
// seems to bottle-neck
81+
ParseLine(line, line_vec);
82+
83+
// update private word count
84+
for (auto& word: line_vec) {
85+
word_count[word]++;
86+
}
87+
}
88+
89+
// update word count to class variable
90+
{
91+
std::unique_lock<std::mutex> lock(global_lock_);
92+
for (auto& it: word_count) {
93+
word_count_[it.first] += it.second;
94+
}
7295
}
73-
read_cnt++;
74-
}
75-
bool finished = false;
76-
if (stream_fin_.eof()) {
77-
stream_fin_.close();
78-
finished = true;
7996
}
80-
return {read_cnt, finished};
97+
return {read_lines, remain_lines_};
8198
}
8299

83100
void IoUtils::GetWordVocab(int min_count) {

cusim/ioutils/bindings.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ class IoUtilsBind {
2727
return obj_.LoadStreamFile(filepath);
2828
}
2929

30-
std::pair<int, bool> ReadStreamForVocab(int num_lines) {
31-
return obj_.ReadStreamForVocab(num_lines);
30+
std::pair<int, int> ReadStreamForVocab(int num_lines, int num_threads) {
31+
return obj_.ReadStreamForVocab(num_lines, num_threads);
3232
}
3333

3434
void GetWordVocab(int min_count) {
@@ -46,7 +46,8 @@ PYBIND11_PLUGIN(ioutils_bind) {
4646
.def(py::init())
4747
.def("init", &IoUtilsBind::Init, py::arg("opt_path"))
4848
.def("load_stream_file", &IoUtilsBind::LoadStreamFile, py::arg("filepath"))
49-
.def("read_stream_for_vocab", &IoUtilsBind::ReadStreamForVocab, py::arg("num_lines"))
49+
.def("read_stream_for_vocab", &IoUtilsBind::ReadStreamForVocab,
50+
py::arg("num_lines"), py::arg("num_threads"))
5051
.def("get_word_vocab", &IoUtilsBind::GetWordVocab, py::arg("min_count"))
5152
.def("__repr__",
5253
[](const IoUtilsBind &a) {

cusim/ioutils/pyioutils.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,15 @@ def __init__(self, opt=None):
2929
assert self.obj.init(bytes(tmp.name, "utf8")), f"failed to load {tmp.name}"
3030
os.remove(tmp.name)
3131

32-
def load_stream_vocab(self, filepath, min_count, chunk_lines=100000):
32+
def load_stream_vocab(self, filepath, min_count,
33+
chunk_lines=100000, num_threads=4):
3334
full_num_lines = self.obj.load_stream_file(filepath)
3435
pbar = tqdm.trange(full_num_lines)
3536
while True:
36-
num_lines, finished = self.obj.read_stream_for_vocab(chunk_lines)
37-
pbar.update(num_lines)
38-
if finished:
37+
read_lines, remain_lines = \
38+
self.obj.read_stream_for_vocab(chunk_lines, num_threads)
39+
pbar.update(read_lines)
40+
if not remain_lines:
3941
break
4042
pbar.close()
4143
self.obj.get_word_vocab(min_count)

examples/example1.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414

1515
LOGGER = aux.get_logger()
1616
DOWNLOAD_PATH = "./res"
17-
DATASET = "wiki-english-20171001"
17+
# DATASET = "wiki-english-20171001"
18+
DATASET = "fake-news"
1819
DATA_PATH = f"./res/{DATASET}.stream.txt"
1920
MIN_COUNT = 5
2021

@@ -33,7 +34,7 @@ def download():
3334
def run():
3435
download()
3536
iou = IoUtils()
36-
iou.load_stream_vocab(DATA_PATH, 5, 10000)
37+
iou.load_stream_vocab(DATA_PATH, 5, 100000, 8)
3738

3839

3940
if __name__ == "__main__":

0 commit comments

Comments
 (0)