Skip to content

Commit 941d6d3

Browse files
committed
test
1 parent 2dfd958 commit 941d6d3

File tree

8 files changed

+190
-76
lines changed

8 files changed

+190
-76
lines changed

flex/engines/graph_db/app/kafka_wal_ingester_app.cc

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ namespace gs {
2222
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
2323

2424
struct WalIngester {
25+
constexpr static size_t BUFFSIZ = 4096;
2526
GraphDBSession& session_;
2627
timestamp_t begin_;
2728
timestamp_t end_;
@@ -31,30 +32,32 @@ struct WalIngester {
3132
std::vector<uint8_t> states_;
3233

3334
void resize() {
34-
size_t n = data_.size();
35-
std::vector<std::string> new_data(n + 4096);
36-
std::vector<uint8_t> new_states(n + 4096, 0);
37-
size_t idx = (ingested_plus_one_ - begin_) % n;
38-
for (size_t i = 0; i < n; ++i) {
35+
size_t origin_len = data_.size();
36+
std::vector<std::string> new_data(origin_len + BUFFSIZ);
37+
std::vector<uint8_t> new_states(origin_len + BUFFSIZ, 0);
38+
size_t idx = (ingested_plus_one_ - begin_) % origin_len;
39+
for (size_t i = 0; i < origin_len; ++i) {
3940
new_data[i] = data_[idx];
4041
new_states[i] = states_[idx];
4142
if (states_[idx]) {
4243
end_ = ingested_plus_one_ + i + 1;
4344
}
4445
++idx;
45-
idx %= n;
46+
idx %= origin_len;
4647
}
4748
data_ = std::move(new_data);
4849
states_ = std::move(new_states);
4950
begin_ = ingested_plus_one_;
5051
}
52+
53+
timestamp_t last_ingested() const { return ingested_plus_one_ - 1; }
5154
WalIngester(GraphDBSession& session, timestamp_t cur)
5255
: session_(session),
5356
begin_(cur),
5457
end_(cur),
5558
ingested_plus_one_(cur),
56-
data_(4096),
57-
states_(4096, 0) {}
59+
data_(BUFFSIZ),
60+
states_(BUFFSIZ, 0) {}
5861

5962
bool empty() const { return ingested_plus_one_ == end_; }
6063

@@ -77,7 +80,8 @@ struct WalIngester {
7780
}
7881

7982
void ingest() {
80-
size_t idx = (ingested_plus_one_ - begin_) % data_.size();
83+
size_t len = data_.size();
84+
size_t idx = (ingested_plus_one_ - begin_) % len;
8185
bool flag = false;
8286
while (states_[idx] == 2 || states_[idx] == 1) {
8387
if (states_[idx] == 1) {
@@ -86,7 +90,7 @@ struct WalIngester {
8690
states_[idx] = 0;
8791
++ingested_plus_one_;
8892
++idx;
89-
idx %= data_.size();
93+
idx %= len;
9094
flag = true;
9195
}
9296
if (flag) {
@@ -187,7 +191,7 @@ bool KafkaWalIngesterApp::Query(GraphDBSession& graph, Decoder& input,
187191
timestamp_t cur_ts = graph.db().get_last_ingested_wal_ts() + 1;
188192
gs::WalIngester ingester(graph, cur_ts);
189193
gs::KafkaWalConsumer consumer(ingester, config, topic_name, 1);
190-
while (graph.db().kafka_wal_ingester_state()) {
194+
while (!force_stop_.load()) {
191195
consumer.poll();
192196
ingester.ingest();
193197
std::this_thread::sleep_for(std::chrono::milliseconds(100));
@@ -196,8 +200,16 @@ bool KafkaWalIngesterApp::Query(GraphDBSession& graph, Decoder& input,
196200
consumer.poll();
197201
ingester.ingest();
198202
}
203+
int64_t ts = ingester.last_ingested();
204+
ouput.put_long(ts);
205+
return true;
206+
}
207+
208+
bool KafkaWalIngesterApp::terminal() {
209+
force_stop_.store(true);
199210
return true;
200211
}
212+
201213
AppWrapper KafkaWalIngesterAppFactory::CreateApp(const GraphDB& db) {
202214
return AppWrapper(new KafkaWalIngesterApp(), NULL);
203215
}

flex/engines/graph_db/app/kafka_wal_ingester_app.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,14 @@ namespace gs {
2424
// Ingest wal from kafka
2525
class KafkaWalIngesterApp : public WriteAppBase {
2626
public:
27-
KafkaWalIngesterApp() {}
27+
KafkaWalIngesterApp() : force_stop_(false) {}
2828

2929
AppType type() const override { return AppType::kBuiltIn; }
3030

3131
bool Query(GraphDBSession& graph, Decoder& input, Encoder& output) override;
32+
33+
bool terminal() override;
34+
std::atomic<bool> force_stop_{false};
3235
};
3336

3437
class KafkaWalIngesterAppFactory : public AppFactoryBase {

flex/engines/graph_db/database/graph_db.cc

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ struct SessionLocalContext {
5858

5959
GraphDB::GraphDB()
6060
: monitor_thread_running_(false),
61-
kafka_wal_ingester_thread_running_(false),
6261
last_ingested_wal_ts_(0),
6362
compact_thread_running_(false) {}
6463
GraphDB::~GraphDB() {
@@ -282,54 +281,6 @@ void GraphDB::Close() {
282281
std::fill(app_factories_.begin(), app_factories_.end(), nullptr);
283282
}
284283

285-
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
286-
bool GraphDB::kafka_wal_ingester_state() const {
287-
return kafka_wal_ingester_thread_running_.load();
288-
}
289-
290-
void GraphDB::start_kafka_wal_ingester(const cppkafka::Configuration& config,
291-
const std::string& topic_name) {
292-
if (kafka_wal_ingester_thread_running_) {
293-
kafka_wal_ingester_thread_running_ = false;
294-
if (kafka_wal_ingester_thread_.joinable()) {
295-
kafka_wal_ingester_thread_.join();
296-
}
297-
}
298-
kafka_wal_ingester_thread_running_ = true;
299-
300-
kafka_wal_ingester_thread_ = std::thread([&]() {
301-
std::vector<char> buffer;
302-
gs::Encoder encoder(buffer);
303-
encoder.put_string("topic_name");
304-
encoder.put_string(topic_name);
305-
if (config.has_property("metadata.broker.list")) {
306-
encoder.put_string("metadata.broker.list");
307-
encoder.put_string(config.get("metadata.broker.list"));
308-
}
309-
if (config.has_property("group.id")) {
310-
encoder.put_string("group.id");
311-
encoder.put_string(config.get("group.id"));
312-
}
313-
if (config.has_property("enable.auto.commit")) {
314-
encoder.put_string("enable.auto.commit");
315-
encoder.put_string(config.get("enable.auto.commit"));
316-
}
317-
if (config.has_property("auto.offset.reset")) {
318-
encoder.put_string("auto.offset.reset");
319-
encoder.put_string(config.get("auto.offset.reset"));
320-
}
321-
gs::Decoder decoder(buffer.data(), buffer.size());
322-
KafkaWalIngesterApp().Query(GetSession(0), decoder, encoder);
323-
});
324-
}
325-
326-
void GraphDB::stop_kafka_wal_ingester() {
327-
kafka_wal_ingester_thread_running_ = false;
328-
kafka_wal_ingester_thread_.join();
329-
}
330-
331-
#endif
332-
333284
ReadTransaction GraphDB::GetReadTransaction(int thread_id) {
334285
return contexts_[thread_id].session.GetReadTransaction();
335286
}

flex/engines/graph_db/database/graph_db.h

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,6 @@
3333
#include "flex/storages/rt_mutable_graph/loader/loader_factory.h"
3434
#include "flex/storages/rt_mutable_graph/loading_config.h"
3535
#include "flex/storages/rt_mutable_graph/mutable_property_fragment.h"
36-
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
37-
#include "cppkafka/cppkafka.h"
38-
#endif
3936

4037
namespace gs {
4138

@@ -172,15 +169,6 @@ class GraphDB {
172169

173170
inline const GraphDBConfig& config() const { return config_; }
174171

175-
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
176-
bool kafka_wal_ingester_state() const;
177-
178-
void start_kafka_wal_ingester(const cppkafka::Configuration& config,
179-
const std::string& topic_name);
180-
181-
void stop_kafka_wal_ingester();
182-
#endif
183-
184172
uint64_t get_last_ingested_wal_ts() const { return last_ingested_wal_ts_; }
185173
void set_last_ingested_wal_ts(uint64_t ts) { last_ingested_wal_ts_ = ts; }
186174

@@ -219,8 +207,7 @@ class GraphDB {
219207
std::thread monitor_thread_;
220208
bool monitor_thread_running_;
221209

222-
std::thread kafka_wal_ingester_thread_;
223-
std::atomic<bool> kafka_wal_ingester_thread_running_;
210+
std::unique_ptr<WalSession> wal_session_;
224211
uint64_t last_ingested_wal_ts_;
225212

226213
timestamp_t last_compaction_ts_;
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/** Copyright 2020 Alibaba Group Holding Limited.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
#include "flex/engines/graph_db/database/wal/kafka_wal_session.h"
16+
#include "flex/engines/graph_db/app/kafka_wal_ingester_app.h"
17+
#include "flex/engines/graph_db/database/graph_db.h"
18+
#include "flex/engines/graph_db/database/graph_db_session.h"
19+
20+
namespace gs {
21+
bool KafkaWalSession::open(GraphDBSession& session,
22+
const std::string& wal_uri) {
23+
ingester_ = std::make_unique<KafkaWalIngesterApp>();
24+
std::vector<char> buf;
25+
gs::Encoder encoder(buf);
26+
27+
const std::string prefix = "kafka://";
28+
if (wal_uri.find(prefix) != 0) {
29+
LOG(ERROR) << "Invalid uri: " << wal_uri;
30+
return false;
31+
}
32+
33+
std::string hosts_part = wal_uri.substr(prefix.length());
34+
size_t query_pos = hosts_part.find('/');
35+
std::string hosts;
36+
std::string query;
37+
cppkafka::Configuration config;
38+
if (query_pos != std::string::npos) {
39+
hosts = hosts_part.substr(0, query_pos);
40+
query = hosts_part.substr(query_pos + 1);
41+
} else {
42+
LOG(ERROR) << "Invalid uri: " << wal_uri;
43+
return false;
44+
}
45+
std::string kafka_brokers = hosts;
46+
encoder.put_string("metadata.broker.list");
47+
encoder.put_string(kafka_brokers);
48+
size_t top_pos = query.find('?');
49+
std::string topic;
50+
if (top_pos != std::string::npos) {
51+
topic = query.substr(0, top_pos);
52+
query = query.substr(top_pos + 1);
53+
} else {
54+
topic = query;
55+
}
56+
encoder.put_string("topic_name");
57+
encoder.put_string(topic);
58+
std::istringstream query_stream(query);
59+
std::string pair;
60+
while (std::getline(query_stream, pair, '&')) {
61+
size_t eq_pos = pair.find('=');
62+
if (eq_pos != std::string::npos) {
63+
std::string key = pair.substr(0, eq_pos);
64+
std::string value = pair.substr(eq_pos + 1);
65+
encoder.put_string(key);
66+
encoder.put_string(value);
67+
}
68+
}
69+
encoder.put_string("enable.auto.commit");
70+
encoder.put_string("false");
71+
Decoder decoder(buf.data(), buf.size());
72+
73+
ingester_thread_ = std::thread([&]() {
74+
std::vector<char> buf;
75+
gs::Encoder encoder(buf);
76+
ingester_->Query(session, decoder, encoder);
77+
gs::Decoder output(buf.data(), buf.size());
78+
session.db().set_last_ingested_wal_ts(output.get_long());
79+
});
80+
return true;
81+
}
82+
83+
bool KafkaWalSession::close() {
84+
if (ingester_) {
85+
ingester_->terminal();
86+
}
87+
if (ingester_thread_.joinable()) {
88+
ingester_thread_.join();
89+
}
90+
return true;
91+
}
92+
93+
} // namespace gs
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/** Copyright 2020 Alibaba Group Holding Limited.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
#ifndef FLEX_ENGINES_GRAPH_DB_DATABASE_WAL_KAFKA_WAL_SESSION_H_
17+
#define FLEX_ENGINES_GRAPH_DB_DATABASE_WAL_KAFKA_WAL_SESSION_H_
18+
19+
#include <cppkafka/cppkafka.h>
20+
#include <vector>
21+
22+
#include "flex/engines/graph_db/app/kafka_wal_ingester_app.h"
23+
#include "flex/engines/graph_db/database/wal/wal_session.h"
24+
25+
namespace gs {
26+
class KafkaWalSession : public WalSession {
27+
public:
28+
bool open(gs::GraphDBSession& graph, const std::string& wal_uri) override;
29+
30+
bool close() override;
31+
32+
private:
33+
timestamp_t last_ts_;
34+
std::unique_ptr<KafkaWalIngesterApp> ingester_;
35+
std::thread ingester_thread_;
36+
};
37+
} // namespace gs
38+
39+
#endif // FLEX_ENGINES_GRAPH_DB_DATABASE_WAL_KAFKA_WAL_SESSION_H_
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/** Copyright 2020 Alibaba Group Holding Limited.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
#ifndef FLEX_ENGINES_GRAPH_DB_DATABASE_WAL_WAL_SESSION_H_
17+
#define FLEX_ENGINES_GRAPH_DB_DATABASE_WAL_WAL_SESSION_H_
18+
#include "flex/engines/graph_db/database/graph_db_session.h"
19+
20+
namespace gs {
21+
class WalSession {
22+
public:
23+
virtual bool open(gs::GraphDBSession& graph, const std::string& wal_uri) = 0;
24+
25+
virtual bool close() = 0;
26+
};
27+
}; // namespace gs
28+
29+
#endif // FLEX_ENGINES_GRAPH_DB_DATABASE_WAL_WAL_SESSION_H_

flex/tests/hqps/kafka_wal_ingester_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ int main(int argc, char** argv) {
9898
}
9999
LOG(INFO) << db.GetReadTransaction(0).GetVertexNum(0);
100100

101-
std::this_thread::sleep_for(std::chrono::seconds(3));
101+
std::this_thread::sleep_for(std::chrono::seconds(4));
102102
{
103103
auto txn = db2.GetReadTransaction(0);
104104
CHECK(txn.GetVertexNum(0) == 195) << "Vertex num: " << txn.GetVertexNum(0);

0 commit comments

Comments
 (0)