Skip to content

Commit 1e7056f

Browse files
committed
[Refactor] Store the DCP conn handler in its own variable
ep-engine used to store the "conn handler" object which represents the DCP object in the "engine-specific" part of the cookie, but that field is also used by other commands for different things (a sync-write would use it to keep the cas; compaction use it to store a setting that it is running etc). If any of these commands would be received on a DCP connection they would clear the engine-specific part in the cookie when they complete and none of the methods in the DcpIface would work. This patch change that logic to store the conn handler in the Connection object. Change-Id: I9850b1b30881868a9ba1731cea387f1014368fb9 Reviewed-on: http://review.couchbase.org/c/kv_engine/+/140956 Tested-by: Build Bot <[email protected]> Reviewed-by: Paolo Cocchi <[email protected]>
1 parent 465f125 commit 1e7056f

File tree

13 files changed

+219
-13
lines changed

13 files changed

+219
-13
lines changed

auditd/tests/testauditd.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@ static bool ready = false;
4747

4848
class AuditMockServerCookieApi : public ServerCookieIface {
4949
public:
50+
void setDcpConnHandler(gsl::not_null<const void*> cookie,
51+
DcpConnHandlerIface* handler) override {
52+
throw std::runtime_error("Not implemented");
53+
}
54+
DcpConnHandlerIface* getDcpConnHandler(
55+
gsl::not_null<const void*> cookie) override {
56+
throw std::runtime_error("Not implemented");
57+
}
5058
void store_engine_specific(gsl::not_null<const void*> cookie,
5159
void* engine_data) override {
5260
throw std::runtime_error("Not implemented");

daemon/connection.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,14 @@ class Connection : public DcpMessageProducersIface {
553553
*/
554554
bool processServerEvents();
555555

556+
DcpConnHandlerIface* getDcpConnHandlerIface() const {
557+
return dcpConnHandlerIface.load(std::memory_order_acquire);
558+
}
559+
560+
void setDcpConnHandlerIface(DcpConnHandlerIface* handler) {
561+
dcpConnHandlerIface.store(handler, std::memory_order_release);
562+
}
563+
556564
/**
557565
* Set the name of the connected agent
558566
*/
@@ -869,6 +877,9 @@ class Connection : public DcpMessageProducersIface {
869877
/** Name of the local socket if known */
870878
const std::string sockname;
871879

880+
/// The stored DCP Connection Interface
881+
std::atomic<DcpConnHandlerIface*> dcpConnHandlerIface{nullptr};
882+
872883
/**
873884
* The connections' priority.
874885
* atomic to allow read (from DCP stats) without acquiring any

daemon/server_api.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,16 @@ struct ServerDocumentApi : public ServerDocumentIface {
118118
};
119119

120120
struct ServerCookieApi : public ServerCookieIface {
121+
void setDcpConnHandler(gsl::not_null<const void*> cookie,
122+
DcpConnHandlerIface* handler) override {
123+
getCookie(cookie).getConnection().setDcpConnHandlerIface(handler);
124+
}
125+
126+
DcpConnHandlerIface* getDcpConnHandler(
127+
gsl::not_null<const void*> cookie) override {
128+
return getCookie(cookie).getConnection().getDcpConnHandlerIface();
129+
}
130+
121131
void store_engine_specific(gsl::not_null<const void*> void_cookie,
122132
void* engine_data) override {
123133
const auto* cc = reinterpret_cast<const Cookie*>(void_cookie.get());

engines/ep/src/connhandler.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ struct ConnCounter {
7575
size_t conn_queueItemOnDisk;
7676
};
7777

78-
class ConnHandler {
78+
class ConnHandler : public DcpConnHandlerIface {
7979
public:
8080
/// The maximum length of a DCP stat name
8181
static constexpr size_t MaxDcpStatNameLength = 47;
@@ -92,7 +92,7 @@ class ConnHandler {
9292
const void* c,
9393
std::string name);
9494

95-
virtual ~ConnHandler();
95+
~ConnHandler() override;
9696

9797
virtual ENGINE_ERROR_CODE addStream(uint32_t opaque,
9898
Vbid vbucket,

engines/ep/src/ep_engine.cc

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1986,6 +1986,18 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::releaseCookie(const void *cookie)
19861986
return serverApi->cookie->release(cookie);
19871987
}
19881988

1989+
void EventuallyPersistentEngine::setDcpConnHandler(
1990+
const void* cookie, DcpConnHandlerIface* handler) {
1991+
NonBucketAllocationGuard guard;
1992+
serverApi->cookie->setDcpConnHandler(cookie, handler);
1993+
}
1994+
1995+
DcpConnHandlerIface* EventuallyPersistentEngine::getDcpConnHandler(
1996+
const void* cookie) {
1997+
NonBucketAllocationGuard guard;
1998+
return serverApi->cookie->getDcpConnHandler(cookie);
1999+
}
2000+
19892001
void EventuallyPersistentEngine::storeEngineSpecific(const void* cookie,
19902002
void* engine_data) {
19912003
NonBucketAllocationGuard guard;
@@ -6139,14 +6151,14 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::dcpOpen(
61396151
(void) seqno;
61406152
std::string connName{stream_name};
61416153

6142-
if (getEngineSpecific(cookie) != nullptr) {
6154+
auto* handler = getConnHandler(cookie);
6155+
if (handler) {
61436156
EP_LOG_WARN(
6144-
"Cannot open DCP connection as another"
6145-
" connection exists on the same socket");
6157+
"Cannot open DCP connection as another connection exists on "
6158+
"the same socket");
61466159
return ENGINE_DISCONNECT;
61476160
}
61486161

6149-
ConnHandler *handler = nullptr;
61506162
if (flags & (cb::mcbp::request::DcpOpenPayload::Producer |
61516163
cb::mcbp::request::DcpOpenPayload::Notifier)) {
61526164
if (flags & cb::mcbp::request::DcpOpenPayload::PiTR) {
@@ -6202,8 +6214,7 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::dcpOpen(
62026214
"cannot be reserved");
62036215
return ENGINE_DISCONNECT;
62046216
}
6205-
6206-
storeEngineSpecific(cookie, handler);
6217+
setDcpConnHandler(cookie, handler);
62076218

62086219
return ENGINE_SUCCESS;
62096220
}
@@ -6222,15 +6233,22 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::dcpAddStream(const void* cookie,
62226233

62236234
ConnHandler* EventuallyPersistentEngine::getConnHandler(const void* cookie,
62246235
bool logNonExistent) {
6225-
void* specific = getEngineSpecific(cookie);
6226-
auto* handler = reinterpret_cast<ConnHandler*>(specific);
6227-
if (!handler && logNonExistent) {
6236+
auto* iface = getDcpConnHandler(cookie);
6237+
if (iface) {
6238+
auto* handler = dynamic_cast<ConnHandler*>(iface);
6239+
if (handler) {
6240+
return handler;
6241+
}
6242+
}
6243+
6244+
if (logNonExistent) {
62286245
auto li = serverApi->cookie->get_log_info(cookie);
62296246
EP_LOG_WARN("{}: Invalid streaming connection: cookie:{}",
62306247
li.first,
62316248
cb::to_hex(uint64_t(cookie)));
62326249
}
6233-
return handler;
6250+
6251+
return nullptr;
62346252
}
62356253

62366254
void EventuallyPersistentEngine::handleDisconnect(const void *cookie) {

engines/ep/src/ep_engine.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,9 @@ class EventuallyPersistentEngine : public EngineIface, public DcpIface {
533533
ENGINE_ERROR_CODE reserveCookie(const void *cookie);
534534
ENGINE_ERROR_CODE releaseCookie(const void *cookie);
535535

536+
void setDcpConnHandler(const void* cookie, DcpConnHandlerIface* handler);
537+
DcpConnHandlerIface* getDcpConnHandler(const void* cookie);
538+
536539
void storeEngineSpecific(const void* cookie, void* engine_data);
537540

538541
void* getEngineSpecific(const void* cookie);

engines/ep/tests/module_tests/dcp_test.cc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,14 @@ class WrappedServerCookieIface : public ServerCookieIface {
8080
~WrappedServerCookieIface() override {
8181
get_mock_server_api()->cookie = wrapped;
8282
}
83-
83+
void setDcpConnHandler(gsl::not_null<const void*> cookie,
84+
DcpConnHandlerIface* handler) override {
85+
wrapped->setDcpConnHandler(cookie, handler);
86+
}
87+
DcpConnHandlerIface* getDcpConnHandler(
88+
gsl::not_null<const void*> cookie) override {
89+
return wrapped->getDcpConnHandler(cookie);
90+
}
8491
void store_engine_specific(gsl::not_null<const void*> cookie,
8592
void* engine_data) override {
8693
wrapped->store_engine_specific(cookie, engine_data);

include/memcached/dcp.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ enum class id : uint32_t;
4141
enum class version : uint8_t;
4242
} // namespace mcbp::systemevent
4343

44+
class DcpConnHandlerIface {
45+
public:
46+
virtual ~DcpConnHandlerIface() = default;
47+
};
48+
4449
/**
4550
* The message producers are used by the engine's DCP producer
4651
* to add messages into the DCP stream. Please look at the full

include/memcached/server_cookie_iface.h

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,35 @@ namespace cb::mcbp {
3131
class Request;
3232
} // namespace cb::mcbp
3333

34+
class DcpConnHandlerIface;
35+
3436
/**
3537
* Commands to operate on a specific cookie.
3638
*/
3739
struct ServerCookieIface {
3840
virtual ~ServerCookieIface() = default;
3941

42+
/**
43+
* Set the DCP connection handler to be used for the connection the
44+
* provided cookie belongs to.
45+
*
46+
* @param cookie The cookie provided by the core for the operation
47+
* @param handler The new handler (may be nullptr to clear the handler)
48+
*/
49+
virtual void setDcpConnHandler(gsl::not_null<const void*> cookie,
50+
DcpConnHandlerIface* handler) = 0;
51+
52+
/**
53+
* Get the DCP connection handler for the connection the provided
54+
* cookie belongs to
55+
*
56+
* @param cookie The cookie provided by the core for the operation
57+
* @return The handler stored for the connection (may be nullptr if
58+
* none is specified)
59+
*/
60+
virtual DcpConnHandlerIface* getDcpConnHandler(
61+
gsl::not_null<const void*> cookie) = 0;
62+
4063
/**
4164
* Store engine-specific session data on the given cookie.
4265
*

programs/engine_testapp/mock_cookie.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
#include <mutex>
2727
#include <string>
2828

29+
class DcpConnHandlerIface;
30+
2931
struct MockCookie : cb::tracing::Traceable {
3032
/**
3133
* Create a new cookie which isn't bound to an engine. This cookie won't
@@ -59,6 +61,7 @@ struct MockCookie : cb::tracing::Traceable {
5961
uint64_t num_processed_notifications{};
6062
std::string authenticatedUser{"nobody"};
6163
in_port_t parent_port{666};
64+
DcpConnHandlerIface* connHandlerIface = nullptr;
6265

6366
void validate() const;
6467

0 commit comments

Comments
 (0)