Skip to content
Jon Meredith edited this page Sep 3, 2014 · 3 revisions

WORK IN PROGRESS

RiakNet

Goals:

  • Common layer for establishing streaming connections.

  • Protocol, version and capability negotiation.

  • Reuse performance pattern for realtime code (block/non-block coprocesses).

  • Fault detection - heartbeat for 'dead' remote processes

  • Connection to logical resources - other nodes in the same cluster, nodes in remote clusters.

  • Connection retry/backoff for unreachable remotes.

  • NAT traversal.

  • Encryption/protoocol independence.

  • Per-connection statistics (perhaps integrated with heartbeat)

  • Aggregated statistics - how much realtime, fullsync, handoff etc

  • No erlang terms used until after negotiation/establishment so that non-Erlang libraries could be easily written.

  • Realtime, Fullsync and Proxyget should be rewritten on top of it, as well as possibly handoff/repair.

Future:

  • Message passing layer to build global strong consistency on

Outbound connections

Connection manager

Inbound connections

Locators

Locators provide a service for looking up remote services and converting them to a {Transport, Address} pair. They may take time to execute and can either reply immediately or provide a unique reference they will answer on in the future.

% Register locator function - if Pid provided, monitor and
% remove locator on death, otherwise live forever.
% Last registration wins, previous are discarded
riak_net:register_locator(Location, LocatorFun)
riak_net:register_locator(Location, LocatorFun, Pid)

% Remove locator
riak_net:unregister_locator(Location)

% Locator function called to find connection information.  May
% reply immediately, or return a reference it will call
% riak_net:location with later
LocatorFun(Location, Name, Opts) -> {reply, Transport, Address} | {noreply, Ref} | {error, Reason}

riak_net:location(Ref, Transport, Address) -> ok | {error, Reason}

For local applications like handoff, expect to register something as simple as {localnode, Node} and have the locator return the best Transport/Address. That could be made aware of which network interface to pick in the configuration.

Future usase could include finding nodes responsible for a portion of the keyspace {keyspace, RingSize, Index, Nval} or responsible for a vnode {vnode, RingSize, Index}

Locators need a robust way to re-register on crashes. It was a major source of problems in 1.3/1.4.

Cluster manager

Each local cluster needs enough information to connect to the remote cluster. Currently stored in the ring, should be stored in cluster or global metadata.

For simple MDC connections where both clusters can see one another, a direct IP/port can be determined the same way as a local node.

The cluster manager is responsible for

  • Updating the local bootstrap IP address for establishing connections.
  • Providing Transport/Address pairs for each MDC connection as well as any other needed information (like certificates)
  • Balance the MDC workload across the cluster with the background manager.

Perhaps

  • Track when fullsync connection is active between both sides if bidir, for status reporting/preventing duplicate work.

SSL

SSL is currently enabled/disabled for all connections. It should be more granular, by remote cluster.

NAT connections

Connections may have to traverse firewalls with NAT enabled. Repl2/3 stored a NAT map for converting local IP addresses to remote IP addresses.

Not sure if that makes more sense, or specifying inbound transport/address by node name, and applying that for all addresses.

NAT is currently applied based on the IP address that is expecting to make the connection. That functionality is duplicated in a few places.

Outbound-only clusters

Some clusters can only make outbound connections. This is why proxy get has such perverse setup.

For clusters that can only make outbound connections, the cluster manager would need to establish a connection to the desired endpoint and reverse the normal source/sink as done previously.

Some kind of protocol saying that the connection is for a particular connection request and the ability to change what protocol is being run.

Connection negotiation

Streaming connections

Provide an abstraction use in realtime and make it generally available.

source_conn - controlling process source_helper - sending process sink_conn - controlling process sink_helper - sending process

callbacks

  • msg encoding/decoding (testable for round trips) {Proto, Msg} -> Bin {Proto, Bin} -> Msg
  • source next request (blocking pull)
  • source send request
  • sink handle request
  • source handle response

protocol negotiation

heartbeat/rtt mechanism

  • sink is responsible for transmitting data within agreed time (maybe a message)
  • source sends local timestamp to remote, remote adds timestamp and sends back. Calculate RTT and drift.

Is there really any distinction between source/sink connections at this layer. Should you still just break into controlling/sender process, then specialize into source/sink.

Take current rtsource_conn

-record(state, {remote, % remote name : CONNECTION address, % {IP, Port} : CONNECTION connection_ref, % reference handed out by connection manager transport, % transport module : CONNECTION socket, % socket to use with transport : CONNECTION peername, % cached when socket becomes active : CONNECTION proto, % protocol version negotiated : CONNECTION ver, % wire format negotiated : REALTIME? helper_pid,% riak_repl2_rtsource_helper pid : CONNECTIOn hb_interval,% seconds to send new heartbeat after last : CONNECTIOn hb_interval_tref, : CONNECTION hb_timeout,% seconds to wait for heartbeat after send CONNECTION hb_timeout_tref,% heartbeat timeout timer reference CONNECTIOn hb_sent_q, % queue of heartbeats now() that were sent CONNECTION hb_rtt, % RTT in milliseconds for last completed heartbeat CONNECTIOn cont = <<>>}). % continuation from previous TCP buffer CONNECTION

-record(state, {remote, % remote site name : CONNECTION transport, % erlang module to use for transport : CONNECTION socket, % socket to pass to transport : CONNECTION proto, % protocol version negotiated : CONNECTION deliver_fun,% Deliver function : REALTIME sent_seq, % last sequence sent : REALTIME v1_offset = 0, : REALTIME v1_seq_map = [], : REALTIME objects = 0}). % number of objects sent - really number of pulls as could be multiobj : REALTIME

Take current rtsink_conn

-record(state, {remote, %% Remote site name : CONNECTION transport, %% Module for sending : CONNECTION socket, %% Socket : CONNECTION proto, %% Protocol version negotiated : CONNECTION peername, %% peername of socket : CONNECTION ver, %% wire format agreed with rt source : REALTIME max_pending, %% Maximum number of operations : REALTIME active = true, %% If socket is set active : CONNECTION deactivated = 0, %% Count of times deactivated : CONNECTION source_drops = 0, %% Count of upstream drops : REALTIME helper, %% Helper PID : CONNECTION hb_last, %% os:timestamp last heartbeat message received : CONNECTION seq_ref, %% Sequence reference for completed/acked : REALTIME expect_seq = undefined,%% Next expected sequence number : REALTIME acked_seq = undefined, %% Last sequence number acknowledged : REALTIME completed = [], %% Completed sequence numbers that need to be sent : REALTIME cont = <<>>, %% Continuation from previous TCP buffer : CONNECTION bt_drops, %% drops due to bucket type mis-matches : REALTIME bt_interval, %% how often (in ms) to report bt_drops : REALTIME bt_timer %% timer reference for interval :REALTIME }). -record(state, {parent %% Parent process : CONNECTION }).

How would it really work.

Realtime knows it needs to establish an outbound connection to a particular cluster. {cluster, <>, [realtime]} and what versions we can support

Needs to register with RTQ once connection is established

Callbacks

riak_repl2_rtsource:start_link(Remote) ->
   gen_stream:start_link(?MODULE,
       [{location, {cluster, Remote}},
        #proto{name=realtime,
               version=[{3,0},{2,0},{1,0}],
               framing=riak_repl4_rtframing},
               capability=[lz4]
        ]

rtsource:

init(Args) -> {ok, #state{}}.

established(ConnInfo, State) -> % re-register with RTQ

handle_msg({ack, Seq}, State) -> % ack RTQ

helper_init() -> % rtq:pull_req() {send, Msg}

helper_info()

start_link(Module, Location, Proto, Args, Opts)
rtt(Pid) -> {ok, RTT} - Get last measurement of app-level RTT
hb(Pid) -> % needed?
established(Pid) -> true | {false, Reasons}
status(Pid)
stream_status(Pid)
socket_status(Pid)
cast(Pid, Msg)
call(Pid, Msg)

  init(Args) - {ok, State}
  init_helper(Args) -> {ok, HelperState} / loop() ?
  established(ConnInfo, State)
  retry(Failure, State)
  status(State) -> {ok, List}
  handle_cast(Msg, State)
  handle_call(Msg, From, State) %% Maybe send something, maybe return something
    -> {ok, State} | {send, Msg, State}
     -> list of {send, Msg}, {reply, Answer}, {state, Blah},
                {helper, Msg}
  handle_msg(Msg, State) -> {ok, State} | {send, Msg, State} | {send_encoded
  helper_msg(Msg, HelperState) -> {ok, HelperState}
  

Returns

  {send, Msg, State} -
  {send_encoded, Bin, State}
  {disconnect, State}
  {

Post-refactor State

  • RiakNet component usable by all of core.

  • Realtime replication simplified

    Plumbtree overlay for all clusters in the domain. Realtime puts will need to be able to answer whether messages have been seen before from other sources, should be able to compare based on vclock. Perhaps modify the put FSM to return extra status on whether a put descends.

    Queue simplified to just track objects that need sending. Can we detect drops with at-least-once delivery?

    Would you still have a consumer for each remote, and skip delivery if plumbtree said not to send? Would you have a single consumer? Who worries about that?

    multiple vnodes -> multi-consumer queues -> realtime connections

    postcommit/vnode hook -> add to realtime queue realtime source becomes framing+pull+ack

  • Proxy-get simplified

    Use riak_net to simplify connection logic. Callback module for requesting/responding. What about cascading - how do you request which cluster it comes from, what if you don't have a direct connection to it? Can plumbtree handle that? Does it need the same service as global SC?

  • Fullsync

    Make a peer-to-peer difference exchanging connection Make a diff-sending connection Make a diff-requesting connection ... or should you mux?

Clone this wiki locally