Skip to content

Commit bb230c7

Browse files
acelyc111neverchanje
authored andcommitted
proxy: do some refactor (#378)
1 parent e05e1c1 commit bb230c7

File tree

7 files changed

+746
-809
lines changed

7 files changed

+746
-809
lines changed

src/geo/test/geo_test.cpp

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ TEST_F(geo_client_test, set_and_del)
9797
double got_lng_degrees;
9898
ret = geo_client()->get(test_hash_key, test_sort_key, got_lat_degrees, got_lng_degrees);
9999
ASSERT_EQ(ret, pegasus::PERR_OK);
100-
ASSERT_NEAR(expect_lat_degrees, got_lat_degrees, 1e-6);
101-
ASSERT_NEAR(expect_lng_degrees, got_lng_degrees, 1e-6);
100+
ASSERT_DOUBLE_EQ(expect_lat_degrees, got_lat_degrees);
101+
ASSERT_DOUBLE_EQ(expect_lng_degrees, got_lng_degrees);
102102

103103
// search the inserted data
104104
{
@@ -264,7 +264,7 @@ TEST_F(geo_client_test, same_point_diff_hash_key)
264264
ASSERT_EQ(ret, pegasus::PERR_OK);
265265
ASSERT_EQ(result.size(), 2);
266266
for (auto &r : result) {
267-
ASSERT_NEAR(r.distance, 0.0, 1e-6);
267+
ASSERT_DOUBLE_EQ(r.distance, 0.0);
268268
ASSERT_TRUE(r.hash_key == test_hash_key + "1" || r.hash_key == test_hash_key + "2")
269269
<< r.hash_key;
270270
ASSERT_EQ(r.sort_key, test_sort_key);
@@ -279,7 +279,7 @@ TEST_F(geo_client_test, same_point_diff_hash_key)
279279
ASSERT_EQ(ret, pegasus::PERR_OK);
280280
ASSERT_EQ(result.size(), 2);
281281
for (auto &r : result) {
282-
ASSERT_NEAR(r.distance, 0.0, 1e-6);
282+
ASSERT_DOUBLE_EQ(r.distance, 0.0);
283283
ASSERT_TRUE(r.hash_key == test_hash_key + "1" || r.hash_key == test_hash_key + "2")
284284
<< r.hash_key;
285285
ASSERT_EQ(r.sort_key, test_sort_key);
@@ -331,7 +331,7 @@ TEST_F(geo_client_test, same_point_diff_sort_key)
331331
ASSERT_EQ(ret, pegasus::PERR_OK);
332332
ASSERT_EQ(result.size(), 2);
333333
for (auto &r : result) {
334-
ASSERT_NEAR(r.distance, 0.0, 1e-6);
334+
ASSERT_DOUBLE_EQ(r.distance, 0.0);
335335
ASSERT_EQ(r.hash_key, test_hash_key);
336336
ASSERT_TRUE(r.sort_key == test_sort_key + "1" || r.sort_key == test_sort_key + "2")
337337
<< r.sort_key;
@@ -346,7 +346,7 @@ TEST_F(geo_client_test, same_point_diff_sort_key)
346346
ASSERT_EQ(ret, pegasus::PERR_OK);
347347
ASSERT_EQ(result.size(), 2);
348348
for (auto &r : result) {
349-
ASSERT_NEAR(r.distance, 0.0, 1e-6);
349+
ASSERT_DOUBLE_EQ(r.distance, 0.0);
350350
ASSERT_EQ(r.hash_key, test_hash_key);
351351
ASSERT_TRUE(r.sort_key == test_sort_key + "1" || r.sort_key == test_sort_key + "2")
352352
<< r.sort_key;
@@ -516,12 +516,12 @@ TEST_F(geo_client_test, distance)
516516
int ret = _geo_client->distance(
517517
"test_hash_key1", "test_sort_key1", "test_hash_key2", "test_sort_key2", 2000, distance);
518518
ASSERT_EQ(ret, pegasus::PERR_OK);
519-
ASSERT_NEAR(distance, 1000 * S2Earth::RadiusKm() * M_PI / 4, 1e-6);
519+
ASSERT_DOUBLE_EQ(distance, 1000 * S2Earth::RadiusKm() * M_PI / 4);
520520

521521
ret = _geo_client->distance(
522522
"test_hash_key1", "test_sort_key1", "test_hash_key1", "test_sort_key1", 2000, distance);
523523
ASSERT_EQ(ret, pegasus::PERR_OK);
524-
ASSERT_NEAR(distance, 0.0, 1e-6);
524+
ASSERT_DOUBLE_EQ(distance, 0.0);
525525
}
526526

527527
TEST_F(geo_client_test, large_cap)
@@ -541,7 +541,7 @@ TEST_F(geo_client_test, large_cap)
541541
std::to_string(latlng.lng().degrees()) + "|" +
542542
std::to_string(latlng.lat().degrees()) + "|123.456|456.789|0|-1";
543543

544-
int ret = _geo_client->set(id, "", value, 1000);
544+
int ret = _geo_client->set(id, "", value, 5000);
545545
ASSERT_EQ(ret, pegasus::PERR_OK);
546546
}
547547

@@ -565,7 +565,7 @@ TEST_F(geo_client_test, large_cap)
565565
double distance = 0.0;
566566
ret = _geo_client->distance("0", "", r.hash_key, r.sort_key, 2000, distance);
567567
ASSERT_EQ(ret, pegasus::PERR_OK);
568-
ASSERT_NEAR(distance, r.distance, 1e-6);
568+
ASSERT_DOUBLE_EQ(distance, r.distance);
569569

570570
last = r;
571571
}
@@ -599,7 +599,7 @@ TEST_F(geo_client_test, large_cap)
599599
ret = _geo_client->distance(
600600
test_hash_key, test_sort_key, r.hash_key, r.sort_key, 2000, distance);
601601
ASSERT_EQ(ret, pegasus::PERR_OK);
602-
ASSERT_NEAR(distance, r.distance, 1e-6);
602+
ASSERT_DOUBLE_EQ(distance, r.distance);
603603

604604
last = r;
605605
}

src/redis_protocol/proxy_lib/proxy_layer.cpp

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -42,67 +42,67 @@ proxy_stub::proxy_stub(const proxy_session::factory &f,
4242
void proxy_stub::on_rpc_request(dsn::message_ex *request)
4343
{
4444
::dsn::rpc_address source = request->header->from_address;
45-
std::shared_ptr<proxy_session> ps;
45+
std::shared_ptr<proxy_session> session;
4646
{
4747
::dsn::zauto_read_lock l(_lock);
4848
auto it = _sessions.find(source);
4949
if (it != _sessions.end()) {
50-
ps = it->second;
50+
session = it->second;
5151
}
5252
}
53-
if (nullptr == ps) {
53+
if (nullptr == session) {
5454
::dsn::zauto_write_lock l(_lock);
5555
auto it = _sessions.find(source);
5656
if (it != _sessions.end()) {
57-
ps = it->second;
57+
session = it->second;
5858
} else {
59-
ps = _factory(this, request);
60-
_sessions.emplace(source, ps);
59+
session = _factory(this, request);
60+
_sessions.emplace(source, session);
6161
}
6262
}
6363

64-
ps->on_recv_request(request);
64+
session->on_recv_request(request);
6565
}
6666

6767
void proxy_stub::on_recv_remove_session_request(dsn::message_ex *request)
6868
{
6969
::dsn::rpc_address source = request->header->from_address;
70-
std::shared_ptr<proxy_session> ps = remove_session(source);
71-
if (ps != nullptr) {
72-
ps->on_remove_session();
73-
}
70+
remove_session(source);
7471
}
7572

76-
std::shared_ptr<proxy_session> proxy_stub::remove_session(dsn::rpc_address remote_address)
73+
void proxy_stub::remove_session(dsn::rpc_address remote_address)
7774
{
78-
::dsn::zauto_write_lock l(_lock);
79-
auto iter = _sessions.find(remote_address);
80-
if (iter == _sessions.end()) {
81-
dwarn("%s has been removed from proxy stub", remote_address.to_string());
82-
return nullptr;
75+
std::shared_ptr<proxy_session> session;
76+
{
77+
::dsn::zauto_write_lock l(_lock);
78+
auto iter = _sessions.find(remote_address);
79+
if (iter == _sessions.end()) {
80+
dwarn("%s has been removed from proxy stub", remote_address.to_string());
81+
return;
82+
}
83+
ddebug("remove %s from proxy stub", remote_address.to_string());
84+
session = std::move(iter->second);
85+
_sessions.erase(iter);
8386
}
84-
ddebug("remove %s from proxy stub", remote_address.to_string());
85-
std::shared_ptr<proxy_session> result = std::move(iter->second);
86-
_sessions.erase(iter);
87-
return result;
87+
session->on_remove_session();
8888
}
8989

9090
proxy_session::proxy_session(proxy_stub *op, dsn::message_ex *first_msg)
91-
: stub(op), is_session_reset(false), backup_one_request(first_msg)
91+
: _stub(op), _is_session_reset(false), _backup_one_request(first_msg)
9292
{
9393
dassert(first_msg != nullptr, "null msg when create session");
94-
backup_one_request->add_ref();
94+
_backup_one_request->add_ref();
9595

96-
remote_address = backup_one_request->header->from_address;
97-
dassert(remote_address.type() == HOST_TYPE_IPV4,
96+
_remote_address = _backup_one_request->header->from_address;
97+
dassert(_remote_address.type() == HOST_TYPE_IPV4,
9898
"invalid rpc_address type, type = %d",
99-
(int)remote_address.type());
99+
(int)_remote_address.type());
100100
}
101101

102102
proxy_session::~proxy_session()
103103
{
104-
backup_one_request->release_ref();
105-
ddebug("proxy session %s destroyed", remote_address.to_string());
104+
_backup_one_request->release_ref();
105+
ddebug("proxy session %s destroyed", _remote_address.to_string());
106106
}
107107

108108
void proxy_session::on_recv_request(dsn::message_ex *msg)
@@ -117,16 +117,16 @@ void proxy_session::on_recv_request(dsn::message_ex *msg)
117117
// "parse" with a lock. a subclass may implement a lock inside parse if necessary
118118
if (!parse(msg)) {
119119
derror("%s: got invalid message, try to remove proxy session from proxy stub",
120-
remote_address.to_string());
121-
stub->remove_session(remote_address);
120+
_remote_address.to_string());
121+
_stub->remove_session(_remote_address);
122122

123-
derror("close the rpc session %s", remote_address.to_string());
124-
((dsn::message_ex *)backup_one_request)->io_session->close();
123+
derror("close the rpc session %s", _remote_address.to_string());
124+
((dsn::message_ex *)_backup_one_request)->io_session->close();
125125
}
126126
}
127127

128-
void proxy_session::on_remove_session() { is_session_reset.store(true); }
128+
void proxy_session::on_remove_session() { _is_session_reset.store(true); }
129129

130-
dsn::message_ex *proxy_session::create_response() { return backup_one_request->create_response(); }
130+
dsn::message_ex *proxy_session::create_response() { return _backup_one_request->create_response(); }
131131
} // namespace proxy
132132
} // namespace pegasus

src/redis_protocol/proxy_lib/proxy_layer.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,19 +39,19 @@ class proxy_session : public std::enable_shared_from_this<proxy_session>
3939
void on_remove_session();
4040

4141
protected:
42-
// return if parse ok
42+
// return true if parse ok
4343
virtual bool parse(dsn::message_ex *msg) = 0;
4444
dsn::message_ex *create_response();
4545

4646
protected:
47-
proxy_stub *stub;
48-
std::atomic_bool is_session_reset;
47+
proxy_stub *_stub;
48+
std::atomic_bool _is_session_reset;
4949

5050
// when get message from raw parser, request & response of "dsn::message_ex*" are not in couple.
5151
// we need to backup one request to create a response struct.
52-
dsn::message_ex *backup_one_request;
52+
dsn::message_ex *_backup_one_request;
5353
// the client address for which this session served
54-
dsn::rpc_address remote_address;
54+
dsn::rpc_address _remote_address;
5555
};
5656

5757
class proxy_stub : public ::dsn::serverlet<proxy_stub>
@@ -77,7 +77,7 @@ class proxy_stub : public ::dsn::serverlet<proxy_stub>
7777
this->unregister_rpc_handler(RPC_CALL_RAW_MESSAGE);
7878
this->unregister_rpc_handler(RPC_CALL_RAW_SESSION_DISCONNECT);
7979
}
80-
std::shared_ptr<proxy_session> remove_session(dsn::rpc_address remote_address);
80+
void remove_session(dsn::rpc_address remote_address);
8181

8282
private:
8383
void on_rpc_request(dsn::message_ex *request);

0 commit comments

Comments
 (0)