@@ -3510,27 +3510,76 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doDurabilityMonitorStats(
35103510 return ENGINE_SUCCESS;
35113511}
35123512
3513+ class DcpStatsFilter {
3514+ public:
3515+ explicit DcpStatsFilter (cb::const_char_buffer value) {
3516+ if (!value.empty ()) {
3517+ try {
3518+ auto attributes = nlohmann::json::parse (value);
3519+ auto filter = attributes.find (" filter" );
3520+ if (filter != attributes.end ()) {
3521+ auto iter = filter->find (" user" );
3522+ if (iter != filter->end ()) {
3523+ user.reset (cb::tagUserData (iter->get <std::string>()));
3524+ }
3525+ iter = filter->find (" port" );
3526+ if (iter != filter->end ()) {
3527+ port.reset (iter->get <in_port_t >());
3528+ }
3529+ }
3530+ } catch (const std::exception& e) {
3531+ EP_LOG_ERR (
3532+ " Failed to decode provided DCP filter: {}. Filter:{}" ,
3533+ e.what (),
3534+ value);
3535+ }
3536+ }
3537+ }
3538+
3539+ bool include (const std::shared_ptr<ConnHandler>& tc) {
3540+ if ((user && *user != tc->getAuthenticatedUser ()) ||
3541+ (port && *port != tc->getConnectedPort ())) {
3542+ // Connection should not be part of this output
3543+ return false ;
3544+ }
3545+
3546+ return true ;
3547+ }
3548+
3549+ protected:
3550+ boost::optional<std::string> user;
3551+ boost::optional<in_port_t > port;
3552+ };
3553+
35133554/* *
35143555 * Function object to send stats for a single dcp connection.
35153556 */
35163557struct ConnStatBuilder {
3517- ConnStatBuilder (const void * c, const AddStatFn& as, ConnCounter& tc)
3518- : cookie(c), add_stat(as), aggregator(tc) {
3558+ ConnStatBuilder (const void * c,
3559+ AddStatFn as,
3560+ DcpStatsFilter filter,
3561+ ConnCounter& tc)
3562+ : cookie(c),
3563+ add_stat (std::move(as)),
3564+ filter(std::move(filter)),
3565+ aggregator(tc) {
35193566 }
35203567
35213568 void operator ()(std::shared_ptr<ConnHandler> tc) {
35223569 ++aggregator.totalConns ;
3523- tc->addStats (add_stat, cookie);
3524-
3525- auto tp = std::dynamic_pointer_cast<DcpProducer>(tc);
3526- if (tp) {
3527- ++aggregator.totalProducers ;
3528- tp->aggregateQueueStats (aggregator);
3570+ if (filter.include (tc)) {
3571+ tc->addStats (add_stat, cookie);
3572+ auto tp = std::dynamic_pointer_cast<DcpProducer>(tc);
3573+ if (tp) {
3574+ ++aggregator.totalProducers ;
3575+ tp->aggregateQueueStats (aggregator);
3576+ }
35293577 }
35303578 }
35313579
35323580 const void *cookie;
35333581 AddStatFn add_stat;
3582+ DcpStatsFilter filter;
35343583 ConnCounter& aggregator;
35353584};
35363585
@@ -3670,9 +3719,12 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doConnAggStats(
36703719}
36713720
36723721ENGINE_ERROR_CODE EventuallyPersistentEngine::doDcpStats (
3673- const void * cookie, const AddStatFn& add_stat) {
3722+ const void * cookie,
3723+ const AddStatFn& add_stat,
3724+ cb::const_char_buffer value) {
36743725 ConnCounter aggregator;
3675- ConnStatBuilder dcpVisitor (cookie, add_stat, aggregator);
3726+ ConnStatBuilder dcpVisitor (
3727+ cookie, add_stat, DcpStatsFilter{value}, aggregator);
36763728 dcpConnMap_->each (dcpVisitor);
36773729
36783730 add_casted_stat (" ep_dcp_count" , aggregator.totalConns , add_stat, cookie);
@@ -4343,7 +4395,7 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::getStats(
43434395 return doConnAggStats (cookie, add_stat, key.data () + 7 , key.size () - 7 );
43444396 }
43454397 if (key == " dcp" _ccb) {
4346- return doDcpStats (cookie, add_stat);
4398+ return doDcpStats (cookie, add_stat, value );
43474399 }
43484400 if (key == " eviction" _ccb) {
43494401 return doEvictionStats (cookie, add_stat);
0 commit comments