Skip to content

Commit 6eed2f8

Browse files
committed
dump keys
1 parent 27a54d6 commit 6eed2f8

File tree

7 files changed

+120
-19
lines changed

7 files changed

+120
-19
lines changed

cpp/include/ioutils.hpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,15 @@ class IoUtils {
3333
bool Init(std::string opt_path);
3434
int LoadStreamFile(std::string filepath);
3535
std::pair<int, int> ReadStreamForVocab(int num_lines, int num_threads);
36-
void GetWordVocab(int min_count);
36+
std::pair<int, int> TokenizeStream(int num_lines, int num_threads);
37+
void GetWordVocab(int min_count, std::string keys_path);
38+
void GetToken(int* indices, int* indptr, int offset);
3739
private:
3840
void ParseLine(std::string line, std::vector<std::string>& line_vec);
41+
void ParseLineImpl(std::string line, std::vector<std::string>& line_vec);
3942

43+
std::vector<std::vector<int>> indices_;
44+
std::vector<int> indptr_;
4045
std::mutex global_lock_;
4146
std::ifstream stream_fin_;
4247
json11::Json opt_;

cpp/src/ioutils.cc

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,19 @@ bool IoUtils::Init(std::string opt_path) {
2828
}
2929

3030
void IoUtils::ParseLine(std::string line, std::vector<std::string>& ret) {
31+
ParseLineImpl(line, ret);
32+
}
33+
34+
35+
void IoUtils::ParseLineImpl(std::string line, std::vector<std::string>& ret) {
3136
ret.clear();
3237
int n = line.size();
3338
std::string element;
3439
for (int i = 0; i < n; ++i) {
35-
if (line[i] == ' ') {
40+
if (line[i] == ' ' or line[i] == ',') {
3641
ret.push_back(element);
3742
element.clear();
38-
} else {
43+
} else if (line[i] != '"') {
3944
element += line[i];
4045
}
4146
}
@@ -61,6 +66,56 @@ int IoUtils::LoadStreamFile(std::string filepath) {
6166
return count;
6267
}
6368

69+
std::pair<int, int> IoUtils::TokenizeStream(int num_lines, int num_threads) {
70+
int read_lines = std::min(num_lines, remain_lines_);
71+
if (not read_lines) return {0, 0};
72+
remain_lines_ -= read_lines;
73+
indices_.clear();
74+
indices_.resize(read_lines);
75+
indptr_.resize(read_lines);
76+
std::fill(indptr_.begin(), indptr_.end(), 0);
77+
#pragma omp parallel num_threads(num_threads)
78+
{
79+
std::string line;
80+
std::vector<std::string> line_vec;
81+
#pragma omp for schedule(dynamic, 4)
82+
for (int i = 0; i < read_lines; ++i) {
83+
// get line thread-safely
84+
{
85+
std::unique_lock<std::mutex> lock(global_lock_);
86+
getline(stream_fin_, line);
87+
}
88+
89+
// seems to be bottle-neck
90+
ParseLine(line, line_vec);
91+
92+
// tokenize
93+
for (auto& word: line_vec) {
94+
if (word_count_.count(word)) continue;
95+
indices_[i].push_back(word_count_[word]);
96+
}
97+
}
98+
}
99+
int cumsum = 0;
100+
for (int i = 0; i < read_lines; ++i) {
101+
cumsum += indices_[i].size();
102+
indptr_[i] = cumsum;
103+
}
104+
return {read_lines, indptr_[read_lines - 1]};
105+
}
106+
107+
void IoUtils::GetToken(int* indices, int* indptr, int offset) {
108+
int n = indices_.size();
109+
for (int i = 0; i < n; ++i) {
110+
int beg = i == 0? 0: indptr_[i - 1];
111+
int end = indptr_[i];
112+
for (int j = beg; j < end; ++j) {
113+
indices[j] = indices_[i][j - beg];
114+
}
115+
indptr[i] = offset + indptr_[i];
116+
}
117+
}
118+
64119
std::pair<int, int> IoUtils::ReadStreamForVocab(int num_lines, int num_threads) {
65120
int read_lines = std::min(num_lines, remain_lines_);
66121
remain_lines_ -= read_lines;
@@ -77,7 +132,7 @@ std::pair<int, int> IoUtils::ReadStreamForVocab(int num_lines, int num_threads)
77132
getline(stream_fin_, line);
78133
}
79134

80-
// seems to bottle-neck
135+
// seems to be bottle-neck
81136
ParseLine(line, line_vec);
82137

83138
// update private word count
@@ -94,10 +149,10 @@ std::pair<int, int> IoUtils::ReadStreamForVocab(int num_lines, int num_threads)
94149
}
95150
}
96151
}
97-
return {read_lines, remain_lines_};
152+
return {read_lines, word_count_.size()};
98153
}
99154

100-
void IoUtils::GetWordVocab(int min_count) {
155+
void IoUtils::GetWordVocab(int min_count, std::string keys_path) {
101156
INFO("number of raw words: {}", word_count_.size());
102157
for (auto& it: word_count_) {
103158
if (it.second >= min_count) {
@@ -106,6 +161,18 @@ void IoUtils::GetWordVocab(int min_count) {
106161
}
107162
}
108163
INFO("number of words after filtering: {}", word_list_.size());
164+
165+
// write keys to csv file
166+
std::ofstream fout(keys_path.c_str());
167+
INFO("dump keys to {}", keys_path);
168+
std::string header = "index,key\n";
169+
fout.write(header.c_str(), header.size());
170+
int n = word_list_.size();
171+
for (int i = 0; i < n; ++i) {
172+
std::string line = std::to_string(i) + ",\"" + word_list_[i] + "\"\n";
173+
fout.write(line.c_str(), line.size());
174+
}
175+
fout.close();
109176
}
110177

111178
} // namespace cusim

cusim/ioutils/bindings.cc

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,18 @@ class IoUtilsBind {
3131
return obj_.ReadStreamForVocab(num_lines, num_threads);
3232
}
3333

34-
void GetWordVocab(int min_count) {
35-
return obj_.GetWordVocab(min_count);
34+
std::pair<int, int> TokenizeStream(int num_lines, int num_threads) {
35+
return obj_.TokenizeStream(num_lines, num_threads);
36+
}
37+
38+
void GetWordVocab(int min_count, std::string keys_path) {
39+
obj_.GetWordVocab(min_count, keys_path);
40+
}
41+
42+
void GetToken(py::object& indices, py::object& indptr, int offset) {
43+
int_array _indices(indices);
44+
int_array _indptr(indptr);
45+
obj_.GetToken(_indices.mutable_data(0), _indptr.mutable_data(0), offset);
3646
}
3747

3848
private:
@@ -48,7 +58,12 @@ PYBIND11_PLUGIN(ioutils_bind) {
4858
.def("load_stream_file", &IoUtilsBind::LoadStreamFile, py::arg("filepath"))
4959
.def("read_stream_for_vocab", &IoUtilsBind::ReadStreamForVocab,
5060
py::arg("num_lines"), py::arg("num_threads"))
51-
.def("get_word_vocab", &IoUtilsBind::GetWordVocab, py::arg("min_count"))
61+
.def("tokenize_stream", &IoUtilsBind::TokenizeStream,
62+
py::arg("num_lines"), py::arg("num_threads"))
63+
.def("get_word_vocab", &IoUtilsBind::GetWordVocab,
64+
py::arg("min_count"), py::arg("keys_path"))
65+
.def("get_token", &IoUtilsBind::GetToken,
66+
py::arg("indices"), py::arg("indptr"), py::arg("offset"))
5267
.def("__repr__",
5368
[](const IoUtilsBind &a) {
5469
return "<IoUtilsBind>";

cusim/ioutils/pyioutils.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@
66

77
# pylint: disable=no-name-in-module,too-few-public-methods,no-member
88
import os
9+
from os.path import join as pjoin
10+
911
import json
1012
import tempfile
1113
import tqdm
12-
1314
from cusim import aux
1415
from cusim.ioutils.ioutils_bind import IoUtilsBind
1516
from cusim.config_pb2 import IoUtilsConfigProto
@@ -29,15 +30,24 @@ def __init__(self, opt=None):
2930
assert self.obj.init(bytes(tmp.name, "utf8")), f"failed to load {tmp.name}"
3031
os.remove(tmp.name)
3132

32-
def load_stream_vocab(self, filepath, min_count,
33-
chunk_lines=100000, num_threads=4):
33+
def load_stream_vocab(self, filepath, min_count, keys_path):
3434
full_num_lines = self.obj.load_stream_file(filepath)
35-
pbar = tqdm.trange(full_num_lines)
35+
pbar = tqdm.trange(full_num_lines, unit="line",
36+
postfix={"word_count": 0})
37+
processed = 0
3638
while True:
37-
read_lines, remain_lines = \
38-
self.obj.read_stream_for_vocab(chunk_lines, num_threads)
39+
read_lines, word_count = \
40+
self.obj.read_stream_for_vocab(
41+
self.opt.chunk_lines, self.opt.num_threads)
42+
processed += read_lines
43+
pbar.set_postfix({"word_count": word_count}, refresh=False)
3944
pbar.update(read_lines)
40-
if not remain_lines:
45+
if processed == full_num_lines:
4146
break
4247
pbar.close()
43-
self.obj.get_word_vocab(min_count)
48+
self.obj.get_word_vocab(min_count, keys_path)
49+
50+
def convert_stream_to_h5(self, filepath, min_count, out_dir):
51+
os.makedirs(out_dir, exist_ok=True)
52+
keys_path = pjoin(out_dir, "keys.csv")
53+
self.load_stream_vocab(filepath, min_count, keys_path)

cusim/proto/config.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,6 @@ syntax = "proto2";
99
message IoUtilsConfigProto {
1010
optional int32 py_log_level = 1 [default = 2];
1111
optional int32 c_log_level = 2 [default = 2];
12+
optional int32 chunk_lines = 3 [default = 100000];
13+
optional int32 num_threads = 4 [default = 4];
1214
}

examples/example1.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
# DATASET = "wiki-english-20171001"
1818
DATASET = "fake-news"
1919
DATA_PATH = f"./res/{DATASET}.stream.txt"
20+
DATA_PATH2 = f"./res/{DATASET}-converted"
2021
MIN_COUNT = 5
2122

2223
def download():
@@ -33,8 +34,8 @@ def download():
3334

3435
def run():
3536
download()
36-
iou = IoUtils()
37-
iou.load_stream_vocab(DATA_PATH, 5, 100000, 8)
37+
iou = IoUtils(opt={"chunk_lines": 10000, "num_threads": 8})
38+
iou.convert_stream_to_h5(DATA_PATH, 5, DATA_PATH2)
3839

3940

4041
if __name__ == "__main__":

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
tqdm
12
jsmin
23
numpy
34
pybind11

0 commit comments

Comments
 (0)