Skip to content

Commit 00acb20

Browse files
authored
Merge pull request #1 from js1010/task/implement-io
Task/implement io
2 parents 482646c + 68915a8 commit 00acb20

File tree

19 files changed

+1144
-31
lines changed

19 files changed

+1144
-31
lines changed

README.md

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,16 @@
1-
# cusim
2-
cuda implementaion of w2v and lda
1+
### How to install
2+
3+
4+
```shell
5+
# clone repo and submodules
6+
git clone [email protected]:js1010/cusim.git && cd cusim && git submodule update --init
7+
8+
# install requirements
9+
pip install -r requirements.txt
10+
11+
# generate proto
12+
python -m grpc_tools.protoc --python_out cusim/ --proto_path cusim/proto/ config.proto
13+
14+
# install
15+
python setup.py install
16+
```

cpp/include/culda.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
namespace cusim {
3434

3535
class CuLDA {
36-
public:
36+
public:
3737
CuLDA();
3838
~CuLDA();
3939
private:

cpp/include/ioutils.hpp

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,35 @@
2020
#include <iostream>
2121
#include <unordered_map>
2222

23+
#include "json11.hpp"
24+
#include "log.hpp"
25+
#include "types.hpp"
26+
2327
namespace cusim {
2428

2529
class IoUtils {
2630
public:
2731
IoUtils();
2832
~IoUtils();
29-
void LoadGensimVocab(std::string filepath, int min_count);
33+
bool Init(std::string opt_path);
34+
int LoadStreamFile(std::string filepath);
35+
std::pair<int, int> ReadStreamForVocab(int num_lines, int num_threads);
36+
std::pair<int, int> TokenizeStream(int num_lines, int num_threads);
37+
void GetWordVocab(int min_count, std::string keys_path);
38+
void GetToken(int* indices, int* indptr, int offset);
3039
private:
31-
std::vector<std::string> parse_line(std::string line);
32-
std::unordered_map<std::string, int> word_idmap_;
40+
void ParseLine(std::string line, std::vector<std::string>& line_vec);
41+
void ParseLineImpl(std::string line, std::vector<std::string>& line_vec);
42+
43+
std::vector<std::vector<int>> indices_;
44+
std::vector<int> indptr_;
45+
std::mutex global_lock_;
46+
std::ifstream stream_fin_;
47+
json11::Json opt_;
48+
std::shared_ptr<spdlog::logger> logger_;
49+
std::unordered_map<std::string, int> word_idmap_, word_count_;
3350
std::vector<std::string> word_list_;
51+
int num_lines_, remain_lines_;
3452
}; // class IoUtils
3553

3654
} // namespace cusim

cpp/include/log.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,6 @@ class CuSimLogger {
3939
private:
4040
static int global_logging_level_;
4141
std::shared_ptr<spdlog::logger> logger_;
42-
}; // class CuHNSWLogger
42+
}; // class CuSimLogger
4343

4444
} // namespace cusim

cpp/src/culda.cu

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
//
44
// This source code is licensed under the Apache 2.0 license found in the
55
// LICENSE file in the root directory of this source tree.
6-
#include "culda.cuh"
6+
#include "culda.hpp"
77

88
namespace cusim {
99

cpp/src/ioutils.cc

Lines changed: 140 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,47 +13,166 @@ IoUtils::IoUtils() {
1313

1414
IoUtils::~IoUtils() {}
1515

16-
std::vector<std::string> IoUtils::parse_line(std::string line) {
16+
bool IoUtils::Init(std::string opt_path) {
17+
std::ifstream in(opt_path.c_str());
18+
if (not in.is_open()) return false;
19+
20+
std::string str((std::istreambuf_iterator<char>(in)),
21+
std::istreambuf_iterator<char>());
22+
std::string err_cmt;
23+
auto _opt = json11::Json::parse(str, err_cmt);
24+
if (not err_cmt.empty()) return false;
25+
opt_ = _opt;
26+
CuSimLogger().set_log_level(opt_["c_log_level"].int_value());
27+
return true;
28+
}
29+
30+
void IoUtils::ParseLine(std::string line, std::vector<std::string>& ret) {
31+
ParseLineImpl(line, ret);
32+
}
33+
34+
35+
void IoUtils::ParseLineImpl(std::string line, std::vector<std::string>& ret) {
36+
ret.clear();
1737
int n = line.size();
18-
std::vector<std::string> ret;
1938
std::string element;
2039
for (int i = 0; i < n; ++i) {
21-
if (line[i] == ' ') {
40+
if (line[i] == ' ' or line[i] == ',') {
2241
ret.push_back(element);
2342
element.clear();
24-
} else {
25-
element += line[i];
43+
} else if (line[i] != '"') {
44+
element += std::tolower(line[i]);
2645
}
2746
}
2847
if (element.size() > 0) {
2948
ret.push_back(element);
3049
}
31-
return ret;
3250
}
3351

34-
void IoUtils::LoadGensimVocab(std::string filepath, int min_count) {
35-
INFO("read gensim file to generate vocabulary: {}, min_count: {}", filepath, min_count);
36-
std::ifstream fin(filepath.c_str());
37-
std::unordered_map<std::string, int> word_count;
38-
while (not fin.eof()) {
52+
int IoUtils::LoadStreamFile(std::string filepath) {
53+
INFO("read gensim file to generate vocabulary: {}", filepath);
54+
if (stream_fin_.is_open()) stream_fin_.close();
55+
stream_fin_.open(filepath.c_str());
56+
int count = 0;
57+
std::string line;
58+
while (getline(stream_fin_, line))
59+
count++;
60+
stream_fin_.close();
61+
stream_fin_.open(filepath.c_str());
62+
num_lines_ = count;
63+
remain_lines_ = num_lines_;
64+
INFO("number of lines: {}", num_lines_);
65+
return count;
66+
}
67+
68+
std::pair<int, int> IoUtils::TokenizeStream(int num_lines, int num_threads) {
69+
int read_lines = std::min(num_lines, remain_lines_);
70+
if (not read_lines) return {0, 0};
71+
remain_lines_ -= read_lines;
72+
indices_.clear();
73+
indices_.resize(read_lines);
74+
indptr_.resize(read_lines);
75+
std::fill(indptr_.begin(), indptr_.end(), 0);
76+
#pragma omp parallel num_threads(num_threads)
77+
{
78+
std::string line;
79+
std::vector<std::string> line_vec;
80+
#pragma omp for schedule(dynamic, 4)
81+
for (int i = 0; i < read_lines; ++i) {
82+
// get line thread-safely
83+
{
84+
std::unique_lock<std::mutex> lock(global_lock_);
85+
getline(stream_fin_, line);
86+
}
87+
88+
// seems to be bottle-neck
89+
ParseLine(line, line_vec);
90+
91+
// tokenize
92+
for (auto& word: line_vec) {
93+
if (not word_count_.count(word)) continue;
94+
indices_[i].push_back(word_count_[word]);
95+
}
96+
}
97+
}
98+
int cumsum = 0;
99+
for (int i = 0; i < read_lines; ++i) {
100+
cumsum += indices_[i].size();
101+
indptr_[i] = cumsum;
102+
}
103+
return {read_lines, indptr_[read_lines - 1]};
104+
}
105+
106+
void IoUtils::GetToken(int* indices, int* indptr, int offset) {
107+
int n = indices_.size();
108+
for (int i = 0; i < n; ++i) {
109+
int beg = i == 0? 0: indptr_[i - 1];
110+
int end = indptr_[i];
111+
for (int j = beg; j < end; ++j) {
112+
indices[j] = indices_[i][j - beg];
113+
}
114+
indptr[i] = offset + indptr_[i];
115+
}
116+
}
117+
118+
std::pair<int, int> IoUtils::ReadStreamForVocab(int num_lines, int num_threads) {
119+
int read_lines = std::min(num_lines, remain_lines_);
120+
remain_lines_ -= read_lines;
121+
#pragma omp parallel num_threads(num_threads)
122+
{
39123
std::string line;
40-
getline(fin, line);
41-
std::vector<std::string> line_vec = parse_line(line);
42-
for (auto& word: line_vec) {
43-
if (not word_count.count(word)) word_count[word] = 0;
44-
word_count[word]++;
124+
std::vector<std::string> line_vec;
125+
std::unordered_map<std::string, int> word_count;
126+
#pragma omp for schedule(dynamic, 4)
127+
for (int i = 0; i < read_lines; ++i) {
128+
// get line thread-safely
129+
{
130+
std::unique_lock<std::mutex> lock(global_lock_);
131+
getline(stream_fin_, line);
132+
}
133+
134+
// seems to be bottle-neck
135+
ParseLine(line, line_vec);
136+
137+
// update private word count
138+
for (auto& word: line_vec) {
139+
word_count[word]++;
140+
}
141+
}
142+
143+
// update word count to class variable
144+
{
145+
std::unique_lock<std::mutex> lock(global_lock_);
146+
for (auto& it: word_count) {
147+
word_count_[it.first] += it.second;
148+
}
45149
}
46150
}
47-
INFO("number of raw words: {}", word_count.size());
48-
word_idmap_.clear();
49-
word_list_.clear();
50-
for (auto& it: word_count) {
151+
if (not remain_lines_) stream_fin_.close();
152+
return {read_lines, word_count_.size()};
153+
}
154+
155+
void IoUtils::GetWordVocab(int min_count, std::string keys_path) {
156+
INFO("number of raw words: {}", word_count_.size());
157+
for (auto& it: word_count_) {
51158
if (it.second >= min_count) {
52-
word_idmap_[it.first] = vocab_.size();
159+
word_idmap_[it.first] = word_idmap_.size();
53160
word_list_.push_back(it.first);
54161
}
55162
}
56163
INFO("number of words after filtering: {}", word_list_.size());
164+
165+
// write keys to csv file
166+
std::ofstream fout(keys_path.c_str());
167+
INFO("dump keys to {}", keys_path);
168+
std::string header = "index,key\n";
169+
fout.write(header.c_str(), header.size());
170+
int n = word_list_.size();
171+
for (int i = 0; i < n; ++i) {
172+
std::string line = std::to_string(i) + ",\"" + word_list_[i] + "\"\n";
173+
fout.write(line.c_str(), line.size());
174+
}
175+
fout.close();
57176
}
58177

59178
} // namespace cusim

cpp/src/log.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@
1111
namespace cusim {
1212
int CuSimLogger::global_logging_level_ = 2;
1313

14-
CuSimLogger::CuHNSWLogger() {
14+
CuSimLogger::CuSimLogger() {
1515
spdlog::set_pattern("[%^%-8l%$] %Y-%m-%d %H:%M:%S %v");
1616
logger_ = spdlog::default_logger();
1717
}
1818

19-
std::shared_ptr<spdlog::logger>& CuHNSWLogger::get_logger() {
19+
std::shared_ptr<spdlog::logger>& CuSimLogger::get_logger() {
2020
return logger_;
2121
}
2222

0 commit comments

Comments
 (0)