1515
1616-include (" ns_common.hrl" ).
1717
18+ -ifdef (TEST ).
19+ -include_lib (" eunit/include/eunit.hrl" ).
20+ -endif .
21+
1822% % gen_server callbacks
1923-export ([init /1 , handle_call /3 , handle_cast /2 ,
2024 handle_info /2 , terminate /2 , code_change /3 ]).
2125
22- -export ([start_link /3 , start_link /6 ,
26+ -export ([start_link /3 , start_link /7 ,
2327 get_partitions /1 ,
2428 setup_replication /2 , setup_replication /3 ,
2529 takeover /2 , takeover /3 ,
2933
3034-record (state , {proxies ,
3135 consumer_conn :: pid (),
32- connection_name :: nonempty_string (),
36+ % % Connection names will be the same unless abbreviated
37+ % % ones are needed.
38+ producer_connection_name :: nonempty_string (),
39+ consumer_connection_name :: nonempty_string (),
3340 producer_node :: node (),
3441 bucket :: bucket_name ()}).
3542
3643-define (VBUCKET_POLL_INTERVAL , 100 ).
3744-define (SHUT_CONSUMER_TIMEOUT , ? get_timeout (dcp_shut_consumer , 60000 )).
45+ -define (MAX_CONNECTION_NAME , 200 ).
3846
39- init ({ConsumerNode , ProducerNode , Bucket , ConnName , RepFeatures }) ->
47+ init ({ConsumerNode , ProducerNode , Bucket , ProducerConnName ,
48+ ConsumerConnName , RepFeatures }) ->
4049 process_flag (trap_exit , true ),
4150
42- {ok , ConsumerConn } = dcp_consumer_conn :start_link (ConnName , ConsumerNode ,
43- Bucket , RepFeatures ),
44- {ok , ProducerConn } = dcp_producer_conn :start_link (ConnName , ProducerNode ,
45- Bucket , RepFeatures ),
51+ {ok , ConsumerConn } = dcp_consumer_conn :start_link (ProducerConnName ,
52+ ConsumerNode , Bucket ,
53+ RepFeatures ),
54+ {ok , ProducerConn } = dcp_producer_conn :start_link (ConsumerConnName ,
55+ ProducerNode , Bucket ,
56+ RepFeatures ),
4657
4758 Proxies = dcp_proxy :connect_proxies (ConsumerConn , ProducerConn ),
4859
49- ? log_debug (" initiated new dcp replication with consumer side: ~p and producer side: ~p " , [ConsumerConn , ProducerConn ]),
60+ ? log_debug (" initiated new dcp replication with consumer side: ~p (~p ) "
61+ " and producer side: ~p (~p )" ,
62+ [ConsumerConn , ConsumerConnName ,
63+ ProducerConn , ProducerConnName ]),
5064
51- master_activity_events :note_dcp_replicator_start (Bucket , ConnName ,
52- ProducerNode , ConsumerConn , ProducerConn ),
65+ master_activity_events :note_dcp_replicator_start (Bucket , ProducerConnName ,
66+ ProducerNode , ConsumerConn ,
67+ ProducerConn ),
5368
5469 {ok , # state {
5570 proxies = Proxies ,
5671 consumer_conn = ConsumerConn ,
57- connection_name = ConnName ,
72+ producer_connection_name = ProducerConnName ,
73+ consumer_connection_name = ConsumerConnName ,
5874 producer_node = ProducerNode ,
5975 bucket = Bucket
6076 }}.
6177
62- start_link (Name , ConsumerNode , ProducerNode , Bucket , ConnName , RepFeatures ) ->
78+ start_link (Name , ConsumerNode , ProducerNode , Bucket , ProducerConnName ,
79+ ConsumerConnName , RepFeatures ) ->
6380 % % We (and ep-engine actually) depend on this naming.
64- true = lists :prefix (" replication:" , ConnName ),
81+ true = lists :prefix (" replication:" , ProducerConnName ),
82+ true = lists :prefix (" replication:" , ConsumerConnName ),
6583
6684 Args0 = [? MODULE , {ConsumerNode ,
67- ProducerNode , Bucket , ConnName , RepFeatures }, []],
85+ ProducerNode , Bucket , ProducerConnName ,
86+ ConsumerConnName , RepFeatures }, []],
6887 Args = case Name of
6988 undefined ->
7089 Args0 ;
@@ -75,9 +94,13 @@ start_link(Name, ConsumerNode, ProducerNode, Bucket, ConnName, RepFeatures) ->
7594
7695start_link (ProducerNode , Bucket , RepFeatures ) ->
7796 ConsumerNode = node (),
78- ConnName = get_connection_name (ConsumerNode , ProducerNode , Bucket ),
97+ ProducerConnName = get_connection_name (ConsumerNode , ProducerNode , Bucket ,
98+ consumer_optional ),
99+ ConsumerConnName = get_connection_name (ConsumerNode , ProducerNode , Bucket ,
100+ producer_optional ),
79101 start_link (server_name (ProducerNode , Bucket ),
80- ConsumerNode , ProducerNode , Bucket , ConnName , RepFeatures ).
102+ ConsumerNode , ProducerNode , Bucket , ProducerConnName ,
103+ ConsumerConnName , RepFeatures ).
81104
82105server_name (ProducerNode , Bucket ) ->
83106 list_to_atom (? MODULE_STRING " -" ++ Bucket ++ " -" ++ atom_to_list (ProducerNode )).
@@ -158,7 +181,7 @@ wait_for_data_move(Nodes, Bucket, Partition) ->
158181wait_for_data_move_loop ([], _ , _ , _DoneLimit ) ->
159182 ok ;
160183wait_for_data_move_loop ([Node | Rest ], Bucket , Partition , DoneLimit ) ->
161- Connection = get_connection_name (Node , node (), Bucket ),
184+ Connection = get_connection_name (Node , node (), Bucket , producer_optional ),
162185 case wait_for_data_move_on_one_node (0 , Connection ,
163186 Bucket , Partition , DoneLimit ) of
164187 ok ->
@@ -209,11 +232,39 @@ check_move_done(_Estimate, _DoneLimit) ->
209232-spec get_docs_estimate (bucket_name (), vbucket_id (), node ()) ->
210233 {ok , {non_neg_integer (), non_neg_integer (), binary ()}}.
211234get_docs_estimate (Bucket , Partition , ConsumerNode ) ->
212- Connection = get_connection_name (ConsumerNode , node (), Bucket ),
235+ Connection = get_connection_name (ConsumerNode , node (), Bucket ,
236+ producer_optional ),
213237 ns_memcached :get_dcp_docs_estimate (Bucket , Partition , Connection ).
214238
215- get_connection_name (ConsumerNode , ProducerNode , Bucket ) ->
216- " replication:" ++ atom_to_list (ProducerNode ) ++ " ->" ++ atom_to_list (ConsumerNode ) ++ " :" ++ Bucket .
239+ get_connection_name (ConsumerNode , ProducerNode , Bucket , LocalNode ) ->
240+ ConsumerNodeList = atom_to_list (ConsumerNode ),
241+ ProducerNodeList = atom_to_list (ProducerNode ),
242+ CName = " replication:" ++ ProducerNodeList ++ " ->" ++ ConsumerNodeList ++
243+ " :" ++ Bucket ,
244+
245+ case length (CName ) =< ? MAX_CONNECTION_NAME of
246+ true ->
247+ CName ;
248+ false ->
249+ % % Used an abbreviated connection name by not including the
250+ % % specified node and limiting the length of the bucket name.
251+ % % Include a hash of the full connection name to guarantee
252+ % % uniqueness.
253+ Hash = binary_to_list (base64 :encode (crypto :hash (sha , CName ))),
254+ {CNode , PNode } =
255+ case LocalNode of
256+ consumer_optional ->
257+ {" " , string :slice (ProducerNodeList , 0 , 90 )};
258+ producer_optional ->
259+ {string :slice (ConsumerNodeList , 0 , 90 ), " " }
260+ end ,
261+ Bkt = string :slice (Bucket , 0 , 60 ),
262+
263+ CName2 = " replication:" ++ PNode ++ " ->" ++ CNode ++ " :" ++ Bkt ++
264+ " :" ++ Hash ,
265+ true = length (CName2 ) =< ? MAX_CONNECTION_NAME ,
266+ CName2
267+ end .
217268
218269get_connections (Bucket ) ->
219270 {ok , Connections } =
@@ -273,3 +324,43 @@ maybe_shut_consumer(Reason, Consumer) ->
273324 false ->
274325 ok
275326 end .
327+
328+ -ifdef (TEST ).
329+ get_connection_name_test () ->
330+
331+ % % Connection name fits into the maximum allowed
332+
333+ NodeA = 'nodeA.eng.couchbase.com' ,
334+ NodeB = 'nodeB.eng.couchbase.com' ,
335+ BucketAB = " bucket1" ,
336+ ConnAB = get_connection_name (NodeA , NodeB , BucketAB , undefined ),
337+ ? assertEqual (" replication:nodeB.eng.couchbase.com->"
338+ " nodeA.eng.couchbase.com:bucket1" , ConnAB ),
339+ ? assertEqual (true , length (ConnAB ) =< ? MAX_CONNECTION_NAME ),
340+
341+ % % Test where the connection name, using the pre-NEO method, won't
342+ % % fit into the maximum allowed.
343+
344+ 345+ " couchbase-new-pxxxxxxx.svc" ,
346+ 347+ " couchbase-new-pxxxxxxx.svc" ,
348+ Bucket12 = " com.yyyyyy.digital.ms.shoppingcart.shoppingcart" ,
349+ Conn12l = get_connection_name (list_to_atom (Node1 ), list_to_atom (Node2 ),
350+ Bucket12 , consumer_optional ),
351+ ? assertEqual (
" replication:[email protected] " 352+ " couchbase-cluster.couchbase-new-pxxxxxxx.svc->:"
353+ " com.yyyyyy.digital.ms.shoppingcart.shoppingcart:"
354+ " /Jh4NdC6VwWeFlb9os/VBnSgpvg=" , Conn12l ),
355+ ? assertEqual (true , length (Conn12l ) =< ? MAX_CONNECTION_NAME ),
356+
357+ Conn12p = get_connection_name (list_to_atom (Node1 ), list_to_atom (Node2 ),
358+ Bucket12 , producer_optional ),
359+ ? assertEqual (
" replication:->[email protected] " 360+ " couchbase-cluster.couchbase-new-pxxxxxxx.svc:"
361+ " com.yyyyyy.digital.ms.shoppingcart.shoppingcart:"
362+ " /Jh4NdC6VwWeFlb9os/VBnSgpvg=" ,
363+ Conn12p ),
364+ ? assertEqual (true , length (Conn12p ) =< ? MAX_CONNECTION_NAME ).
365+
366+ -endif .
0 commit comments