-
Notifications
You must be signed in to change notification settings - Fork 739
Databus 2.0 Client Load Balancing Design
Chavdar Botev edited this page Sep 30, 2013
·
3 revisions
The page outlines the functionality, design and some implementation details of ‘client load balancing’ feature of databus v2 client library.
- Databus V2 client already provides the ability to specify filters that will be applied on the databus v2 stream at the service (server-side-filtering). The client is required to configure partition settings, which is static.
- Assumptiion here is that the client nodes are symmetric; i.e. they can handle any partition(s)
- The intended functionality is to
- dynamically assign partition numbers to the client nodes on start up:
- handle failure of a node by reassigning partitions handled by failed node(s) to active participants
- handle addition of client nodes with minimum re-assignments
- delay assignment of partitions until a condition , such as minimum number of participants have joined the cluster
- checkpoints of partitions are shared
- Ability to dynamically create cluster , if none exists.
- Ability to specify the number of partitions or tokens handled by the cluster.
- Ability to detect addition and removal of client node from the cluster.
- Ability to reassign partitions (with minimum number of reassignments) on changes to cluster membership
- Ability to delay initial assignment until a quorum of client nodes join the cluster.
- Ability for checkpoints to be accessed by any node (constitutes shared state amongst client nodes)
- Ability for the client code to be notified on events such as ‘startProcessing(partition_i) ’ and ‘stopProcessing(partition_i)’ .
- Ability for client code to alter ‘server-side filter’ configuration on the fly (may not be required if a connection is created per partition )
- Ability for client node to identify and access shared state per partition.
Client library awareness of relay cluster (if one exists)
Handled by Helix ‘auto-rebalance’ mode. Dynamic creation of cluster, consistent-hashing based reassignment, can specify cluster. (Not known here is the ‘delay initial assignment’)
- The client configuration will specify the number of tokensor partitions and cluster name.
- The partition assignment is done at the level of registrations - i.e. pair of (source,filter)->consumers
- A new connection is made for each partition, this connection will read its checkpoint from a shared location.
- The server-side filtering configuration can be left empty. It is set dynamically , when reacting to partition assignment.
- The client library will use the cluster manager API .
- How is the connection started/paused? Wait until at least one partition is received. The ‘sourcesConnection’ start should be referenced by the callbacks.
- Open Question It is possible that the checkpoint will be managed by application. Otherwise, checkpoints will be have to be partition aware and shared. This means a checkpoint per partition will be written to shared storage.
in start()
if (! clusterManager.exists(clusterName) ) { clusterManager.create(clusterName,AUTO-REBALANCE,n,quorum); } //no callback called until quorum nodes have joined the cluster \\
Cluster cluster= clusterManager.get(clusterName) ;
// now the clusterStateModel's callback function should be set appropriately.
// The callback implements onPartitionAssignment(n) and onPartitionRemoval\(n) and
// onPartitionAssignment(list of n) and onPartitionRemoval(list of n) .
// In addition client should allow external callbacks to be set
client.setExternalCallback(externalCallback);
// In our case; the internal callback also contains a handle to reg - the registration object
cluster.setOnPartitionAssignment(intCallback= new InternalCallback(externalCallback,client); //this callback can be an internal/external (client code) object
cluster.setOnPartitionRemoval( intCallback);
//internalCallback dynamically creates the registration /stops the registration
internalCallback.onPartitionAssignment(list of partitions) {
sources=_client.getSources();
foreach p of list_of_partitions {
reg= _client.register(client.getSources, createFilter(p),client.getConsumers(sources));
_internalPartitionTable.add(p);
//a connection object is made for each reg
reg.createSourcesConnection(client.getSourceMap() ) ;
externalCallback.onPartitionAssignment(p,reg,reg.getSourcesConnection()); }
}
internalCallback.onPartitionRemoval(list of partitions) {
sources=_client.getSources();
foreach p of list_of_partitions {
_internalPartitionTable.remove\(n);
//a connection object is made for each reg
externalCallback.onPartitionRemoval(p,reg,reg.getSourcesConnection());
}
// the default external callback could implement something basic like treat n as the final serverside partition number : note that externallCallback API \\
externalCallback.setOnPartitionAssignment(n ,reg,sourcesConnection)
{
_internalPartitionTable.add(n);
add the new partition ; sourceConnection.start();}
externalCallback.setOnPartitionRemoval(n,reg,sourcesConnection) { sourceConnection.stop(); }
}
cluster.join(reg.getId()); //join with id of registration; is this unique across machines?
in shutdown()
+ cluster.leave(reg.getId());
in dispatcher
foreach p sourceConn.getFilter().getPartitions(); storeCheckpoint(checkpoint, p); CheckpointManager will have a ref to 'cluster' and cluster.setPropertyStore(p+regular_check+point_name, checkpoint);
end
Consumer Factories may be required to instantiate a new consumer for every connection. Rest of callback API not changed
- V2 and V3 have near identical registration semantics
- No consumer level API changes
- partition artifact of subscription at client, does not figure in event stream.
- proliferation of connections - absence of grouping of partitions
- client code aware of registration objects
- external callback API for partitionAssignments set