Skip to content
Dave Parfitt edited this page Oct 3, 2013 · 23 revisions

General Notes

Note: I left out all supervisors in the links to source.

Ranch

[https://github.com/basho/ranch/tree/0.4.0-p1](Basho's repo )

A socket acceptor pool for TCP protocols.

As of 1.3/1.4/2.0, we are pinned at Ranch 0.4.0-p1 to support older version of Erlang.

Service Manager

Open sourced in Riak 1.3

Listens on a single TCP port and negotiates which protocol to start on a new connection. Ranch is used to create a connection pool and accept new socket connections. When a connection is accepted, the client supplies a hello with their revision and capabilities. The server replies in kind. The client then sends the service they wish to use, and which versions of the service they support. The server will find the highest major version in common, and highest major version in common. If there is no major version in common, the connection fails. Minor versions do not need to match. On a success, the server sends the Major version, Client minor version, and Host minor version to the client. After that, the registered module:function/5 is called and control of the socket passed to it.

Source

https://github.com/basho/riak_core/blob/develop/src/riak_core_service_mgr.erl

Connection Manager

  • connection manager (1.3 vs 1.4+) Open sourced in Riak 1.3

when a connection request comes in,

  • call the locator service to get the list of {transport, {address, port}}
  • create a linked helper process to call riak_core_connection (just once) on the next available connection (ignore blacklisted ones, they'll get picked up if a repeat is necessary)
  • on connection it transfers control of the socket back to the connmgr, casts a success message back to the connection manager and exits normally.
    • on success, the connection manager increments successful connects, reset the backoff timeout on that connection.
    • on failure, casts a failure message back to the connection manager (error, timeout etc) the connection manager marks the {Transport, {Address, Port}} as blacklisted, increases the failure counter and starts a timer for the backoff time (and updates it for next time). The connection manager checks for the next non--blacklisted endpoint in the connection request list to launch a new connection, if the list is empty call the locator service again to get a new list. If all connections are blacklisted, use send_after message to wake up and retry (perhaps with backoff time too).

Source

(Riak 1.3.2+) https://github.com/basho/riak_repl/blob/1.3/src/riak_core_connection.erl https://github.com/basho/riak_repl/blob/1.3/src/riak_core_connection_mgr.erl https://github.com/basho/riak_repl/blob/1.3/src/riak_core_connection_mgr_stats.erl

(Riak 1.4.0+) https://github.com/basho/riak_core/blob/develop/src/riak_core_connection.erl https://github.com/basho/riak_core/blob/develop/src/riak_core_connection_mgr.erl https://github.com/basho/riak_core/blob/develop/src/riak_core_connection_mgr_stats.erl

Cluster Manager

  • closed source

A cluster manager runs on every node. It registers a service via the riak_core_service_mgr with protocol 'cluster_mgr'. The service will either answer queries (if it's the leader), or foward them to the leader (if it's not the leader).

Every cluster manager instance (one per node in the cluster) is told who the leader is when there is a leader change. An outside agent is responsible for determining which instance of cluster manager is the leader. For example, the riak_repl2_leader server is probably a good place to do this from. Call set_leader_node(node(), pid()).

If I'm the leader, I answer local gen_server:call requests from non-leader cluster managers. I also establish out-bound connections to any IP address added via add_remote_cluster(ip_addr()), in order to resolve the name of the remote cluster and to collect any additional member addresses of that cluster. I keep a database of members per named cluster.

If I am not the leader, I proxy all requests to the actual leader because I probably don't have the latest inforamtion. I don't make outbound connections either.

The local cluster's members list is supplied by the members_fun in register_member_fun() API call. The cluster manager will call the registered function to get a list of the local cluster members; that function should return a list of {IP,Port} tuples in order of the least "busy" to most "busy". Busy is probably proportional to the number of connections it has for replication or handoff. The cluster manager will then hand out the full list to remote cluster managers when asked for its members, except that each time it hands our the list, it will rotate the list so that the fist "least busy" is moved to the end, and all others are pushed up the front of the list. This helps balance the load when the local connection manager asks the cluster manager for a list of IPs to connect for a single connection request. Thus, successive calls from the connection manager will appear to round-robin through the last known list of IPs from the remote cluster. The remote clusters are occasionaly polled to get a fresh list, which will also help balance the connection load on them.

Source

https://github.com/basho/riak_repl/blob/develop/src/riak_core_cluster_mgr.erl https://github.com/basho/riak_repl/blob/develop/src/riak_core_cluster_conn.erl

Realtime Replication

A Riak postcommit hook that replicates objects to a sink cluster. The put on the sink cluster disables all postcommit hooks for the object being written.

Source

https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rt.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rtsource_conn.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rtsink_conn.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rtsink_helper.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rtsource_helper.erl

  • realtime
  • rt_cascasding

Real time queue (RTQ)

1) Realtime Queue Overflow

  • if the # of objects in the queue > rtq_max_bytes, drop the oldest messages in the queue until the queue is < rtq_max_bytes

  • rtq_max_bytes documented here

2) Realtime Queue Overload

Periodically check to see if objects are being sent too quickly to the realtime queue. If rtq_overload_threshold (default: 2000ms) messages are received within rtq_drop_report_interval (default: 5000 ms), drop and log incoming messages to prevent overloading the source node. If the rtq is in overload state, check for overload state again in rtq_overload_recover(default: 1000ms).

  • if rtq state is overloadad and the Erlang message queue <= rtq_overload_recover

    • Recovered from overloaded condition
  • if rtq is NOT overloadad and the Erlang message queue > rtq_overload_threshold

    • set rtq state to overload, drop objects until recovery

3) multiple sinks sharing the same queue

Multiple sink clusters configured on a source cluster share the same realtime queue.

4) rtq shutdown

During graceful shutdown via riak stop, a node that has a realtime queue size > 0 will distribute all objects in it's queue to other nodes in the same cluster before shutting down.

Source

https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rtq.erl

RT Frame

https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rtframe.erl

Source

RTQ Overload Counter

https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rtq_overload_counter.erl

Source

RTQ Proxy

A proxy process that handles realtime messages received while and after the riak_repl application has shut down. This allows us to avoid dropping realtime messages around shutdown events.

https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rtq_proxy.erl

Source

RT Cascading

Docs

Source

Repl hooks

Docs

Special thanks to @cv

Source

https://github.com/basho/riak_repl/blob/develop/src/riak_repl_util.erl#L195 https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rt.erl#L140

Fullsync

  • proxy_get (v2 vs v3)

Fullsync Coordinator

Source

https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_fssink.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_fssource.erl

Keylist sync strategy

Not using merkle trees :-(

Source

AAE sync strategy

Docs

Currently a Technology Preview.

Built using existing AAE trees. If AAE isn't enabled on the source and sink clusters, default back to keylist strategy.

Source

Riak CS proxy_get

Source

https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_pg.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_pg_block_provider.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_pg_block_requester.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_pg_proxy.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl_pb_get.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl_cs.erl

gen_leader

Source

  • riak_repl2_leader
  • gen_leader

Replication stats

Source

https://github.com/basho/riak_repl/blob/develop/src/riak_repl_console.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl_wm_stats.erl

Command line interface

Source

https://github.com/basho/riak_repl/blob/develop/src/riak_repl_console.erl

Per bucket

Source

http://docs.basho.com/riakee/latest/cookbooks/Multi-Data-Center-Replication-Per-Bucket/

Clone this wiki locally