Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Cluster struct {
metricsPolicy iatomic.TypedVal[*MetricsPolicy]

// Hints for best node for a partition
partitionWriteMap iatomic.TypedVal[partitionMap] //partitionMap
partitionWriteMap iatomic.TypedVal[*partitionMap] //partitionMap

clientPolicy ClientPolicy
infoPolicy InfoPolicy
Expand Down Expand Up @@ -130,7 +130,8 @@ func NewCluster(policy *ClientPolicy, hosts []*Host) (*Cluster, Error) {
supportsPartitionQuery: *iatomic.NewBool(false),
}

newCluster.partitionWriteMap.Set(make(partitionMap))
// Initialize partitionWriteMap with a new partitionMap
newCluster.partitionWriteMap.Set(newPartitionMap())

// setup auth info for cluster
if policy.RequiresAuthentication() {
Expand Down Expand Up @@ -277,7 +278,7 @@ func (clstr *Cluster) tend() Error {
})
}

var partMap iatomic.Guard[partitionMap]
var partMap iatomic.Guard[*partitionMap]

// find the first host that connects
seq.ParDo(peers.peers(), func(_peer *peer) {
Expand Down Expand Up @@ -307,8 +308,10 @@ func (clstr *Cluster) tend() Error {
// Create new node.
node := clstr.createNode(&nv)
peers.addNode(nv.name, node)
partMap.InitDoVal(clstr.getPartitions().clone, func(partMap partitionMap) {
node.refreshPartitions(peers, partMap, true)
partMap.InitDoVal(func() *partitionMap {
return clstr.getPartitions().clone()
}, func(pm *partitionMap) {
node.refreshPartitions(peers, pm, true)
})
return seq.Break
})
Expand All @@ -317,8 +320,10 @@ func (clstr *Cluster) tend() Error {
// Refresh partition map when necessary.
seq.ParDo(nodes, func(node *Node) {
if node.partitionChanged.Get() {
partMap.InitDoVal(clstr.getPartitions().clone, func(partMap partitionMap) {
node.refreshPartitions(peers, partMap, false)
partMap.InitDoVal(func() *partitionMap {
return clstr.getPartitions().clone()
}, func(pm *partitionMap) {
node.refreshPartitions(peers, pm, false)
})
}
})
Expand Down Expand Up @@ -487,15 +492,15 @@ func (clstr *Cluster) findAlias(alias *Host) *Node {
return clstr.aliases.Get(*alias)
}

func (clstr *Cluster) setPartitions(partMap partitionMap) {
func (clstr *Cluster) setPartitions(partMap *partitionMap) {
if err := partMap.validate(); err != nil {
logger.Logger.Error("Partition map error: %s.", err.Error())
}

clstr.partitionWriteMap.Set(partMap)
}

func (clstr *Cluster) getPartitions() partitionMap {
func (clstr *Cluster) getPartitions() *partitionMap {
return clstr.partitionWriteMap.Get()
}

Expand Down Expand Up @@ -650,18 +655,21 @@ func (clstr *Cluster) findNodesToRemove(refreshCount int) []*Node {

func (clstr *Cluster) findNodeInPartitionMap(filter *Node) bool {
partMap := clstr.getPartitions()
found := false

for _, partitions := range partMap {
partMap.iterate(func(ns string, partitions *Partitions) bool {
for _, nodeArray := range partitions.Replicas {
for _, node := range nodeArray {
// Use reference equality for performance.
if node == filter {
return true
found = true
return false // Stop iteration
}
}
}
}
return false
return true // Continue to next namespace
})

return found
}

func (clstr *Cluster) updateClusterFeatures() {
Expand Down
2 changes: 1 addition & 1 deletion node.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func (nd *Node) refreshPeers(peers *peers) {
peers.refreshCount.IncrementAndGet()
}

func (nd *Node) refreshPartitions(peers *peers, partitions partitionMap, freshlyAdded bool) {
func (nd *Node) refreshPartitions(peers *peers, partitions *partitionMap, freshlyAdded bool) {
// Do not refresh peers when node connection has already failed during this cluster tend iteration.
// Also, avoid "split cluster" case where this node thinks it's a 1-node cluster.
// Unchecked, such a node can dominate the partition map and cause all other
Expand Down
24 changes: 12 additions & 12 deletions partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ func NewPartitionForReplicaPolicy(namespace string, replica ReplicaPolicy) *Part
func PartitionForWrite(cluster *Cluster, policy *BasePolicy, key *Key) (*Partition, Error) {
// Must copy hashmap reference for copy on write semantics to work.
pmap := cluster.getPartitions()
partitions := pmap[key.namespace]
partitions, exists := pmap.get(key.namespace)

if partitions == nil {
return nil, newInvalidNamespaceError(key.namespace, len(pmap))
if !exists {
return nil, newInvalidNamespaceError(key.namespace, pmap.len())
}

return NewPartition(partitions, key, policy.ReplicaPolicy, nil, false), nil
Expand All @@ -71,10 +71,10 @@ func PartitionForWrite(cluster *Cluster, policy *BasePolicy, key *Key) (*Partiti
func PartitionForRead(cluster *Cluster, policy *BasePolicy, key *Key) (*Partition, Error) {
// Must copy hashmap reference for copy on write semantics to work.
pmap := cluster.getPartitions()
partitions := pmap[key.namespace]
partitions, exists := pmap.get(key.namespace)

if partitions == nil {
return nil, newInvalidNamespaceError(key.namespace, len(pmap))
if !exists {
return nil, newInvalidNamespaceError(key.namespace, pmap.len())
}

var replica ReplicaPolicy
Expand Down Expand Up @@ -125,10 +125,10 @@ func GetReplicaPolicySC(policy *BasePolicy) ReplicaPolicy {
func GetNodeBatchRead(cluster *Cluster, key *Key, replica ReplicaPolicy, replicaSC ReplicaPolicy, prevNode *Node, sequence int, sequenceSC int) (*Node, Error) {
// Must copy hashmap reference for copy on write semantics to work.
pmap := cluster.getPartitions()
partitions := pmap[key.namespace]
partitions, exists := pmap.get(key.namespace)

if partitions == nil {
return nil, newInvalidNamespaceError(key.namespace, len(pmap))
if !exists {
return nil, newInvalidNamespaceError(key.namespace, pmap.len())
}

if partitions.SCMode {
Expand All @@ -145,10 +145,10 @@ func GetNodeBatchRead(cluster *Cluster, key *Key, replica ReplicaPolicy, replica
func GetNodeBatchWrite(cluster *Cluster, key *Key, replica ReplicaPolicy, prevNode *Node, sequence int) (*Node, Error) {
// Must copy hashmap reference for copy on write semantics to work.
pmap := cluster.getPartitions()
partitions := pmap[key.namespace]
partitions, exists := pmap.get(key.namespace)

if partitions == nil {
return nil, newInvalidNamespaceError(key.namespace, len(pmap))
if !exists {
return nil, newInvalidNamespaceError(key.namespace, pmap.len())
}

p := NewPartition(partitions, key, replica, prevNode, false)
Expand Down
12 changes: 6 additions & 6 deletions partition_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var partitionMapLock sync.Mutex

// Parse node's master (and optionally prole) partitions.
type partitionParser struct {
pmap partitionMap
pmap *partitionMap
buffer []byte
partitionCount int
generation int
Expand All @@ -46,7 +46,7 @@ type partitionParser struct {
regimeError bool
}

func newPartitionParser(node *Node, partitions partitionMap, partitionCount int) (*partitionParser, Error) {
func newPartitionParser(node *Node, partitions *partitionMap, partitionCount int) (*partitionParser, Error) {
newPartitionParser := &partitionParser{
partitionCount: partitionCount,
}
Expand Down Expand Up @@ -169,17 +169,17 @@ func (pp *partitionParser) parseReplicasAll(node *Node, command string) Error {
return newErrorAndWrap(err, types.PARSE_ERROR, "Failed to find replica count value")
}

partitions := pp.pmap[namespace]
if partitions == nil {
partitions, exists := pp.pmap.get(namespace)
if !exists {
// Create new replica array.
partitions = newPartitions(pp.partitionCount, replicaCount, regime != 0)
pp.pmap[namespace] = partitions
pp.pmap.set(namespace, partitions)
} else if len(partitions.Replicas) != replicaCount {
// Ensure replicaArray is correct size.
logger.Logger.Info("Namespace `%s` replication factor changed from `%d` to `%d` ", namespace, len(partitions.Replicas), replicaCount)

partitions.setReplicaCount(replicaCount) //= clonePartitions(partitions, replicaCount)
pp.pmap[namespace] = partitions
pp.pmap.set(namespace, partitions)
}

// Parse partition bitmaps.
Expand Down
4 changes: 2 additions & 2 deletions partition_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ func (pt *partitionTracker) assignPartitionsToNodes(cluster *Cluster, namespace
list := make([]*nodePartitions, 0, pt.nodeCapacity)

pMap := cluster.getPartitions()
parts := pMap[namespace]
parts, exists := pMap.get(namespace)

if parts == nil {
if !exists {
return nil, newError(types.INVALID_NAMESPACE, fmt.Sprintf("Invalid Partition Map for namespace `%s` in Partition Scan", namespace))
}

Expand Down
88 changes: 67 additions & 21 deletions partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"fmt"
"strconv"
"sync"

"github.com/aerospike/aerospike-client-go/v7/types"
)
Expand Down Expand Up @@ -76,18 +77,60 @@ func (p *Partitions) clone() *Partitions {
}
}

/*
// partitionMap is a thread-safe map that stores partition information for different namespaces.
// It uses a sync.Map internally to provide concurrent read/write access without explicit locking.
// The keys are namespace names (strings), and the values are pointers to Partitions structs.
// This structure allows for efficient, concurrent operations on partition data across multiple goroutines.
//
// Usage:
// - Use get(key) to retrieve partition data for a namespace
// - Use set(key, value) to update or add partition data for a namespace
// - Use iterate(func) to iterate over all namespace-partition pairs safely
// - Use delete(key) to remove partition data for a namespace
// - Use len() to get the number of namespaces in the map
type partitionMap struct {
m sync.Map
}

func newPartitionMap() *partitionMap {
return &partitionMap{}
}

func (pm *partitionMap) get(key string) (*Partitions, bool) {
value, ok := pm.m.Load(key)
if !ok {
return nil, false
}
return value.(*Partitions), true
}

func (pm *partitionMap) set(key string, value *Partitions) {
pm.m.Store(key, value)
}

partitionMap
func (pm *partitionMap) delete(key string) {
pm.m.Delete(key)
}

*/
func (pm *partitionMap) len() int {
length := 0
pm.m.Range(func(key, value interface{}) bool {
length++
return true
})
return length
}

type partitionMap map[string]*Partitions
func (pm *partitionMap) iterate(f func(key string, value *Partitions) bool) {
pm.m.Range(func(key, value interface{}) bool {
return f(key.(string), value.(*Partitions))
})
}

// cleanup removes all the references stored in the lists
// to help the GC identify the unused pointers.
func (pm partitionMap) cleanup() {
for ns, partitions := range pm {
func (pm *partitionMap) cleanup() {
pm.iterate(func(ns string, partitions *Partitions) bool {
for i := range partitions.Replicas {
for j := range partitions.Replicas[i] {
partitions.Replicas[i][j] = nil
Expand All @@ -98,24 +141,25 @@ func (pm partitionMap) cleanup() {
partitions.Replicas = nil
partitions.regimes = nil

delete(pm, ns)
}
pm.delete(ns)
return true
})
}

// String implements stringer interface for partitionMap
func (pm partitionMap) clone() partitionMap {
// Make deep copy of map.
pmap := make(partitionMap, len(pm))
for ns := range pm {
pmap[ns] = pm[ns].clone()
}
return pmap
func (pm *partitionMap) clone() *partitionMap {
newPm := newPartitionMap()
pm.iterate(func(ns string, partitions *Partitions) bool {
newPm.set(ns, partitions.clone())
return true
})
return newPm
}

// String implements stringer interface for partitionMap
func (pm partitionMap) String() string {
func (pm *partitionMap) String() string {
res := bytes.Buffer{}
for ns, partitions := range pm {
pm.iterate(func(ns string, partitions *Partitions) bool {
res.WriteString("-----------------------------------------------------------------------\n")
res.WriteString("Namespace: " + ns + "\n")
res.WriteString(fmt.Sprintf("Regimes: %v\n", partitions.regimes))
Expand All @@ -138,18 +182,19 @@ func (pm partitionMap) String() string {
}
res.WriteString("\n")
}
}
return true
})
res.WriteString("\n")
return res.String()
}

// naively validates the partition map
func (pm partitionMap) validate() Error {
func (pm *partitionMap) validate() Error {
masterNodePartitionNotDefined := map[string][]int{}
replicaNodePartitionNotDefined := map[string][]int{}
var errs Error

for nsName, partition := range pm {
pm.iterate(func(nsName string, partition *Partitions) bool {
if len(partition.regimes) != _PARTITIONS {
errs = chainErrors(newError(types.COMMON_ERROR, fmt.Sprintf("Wrong number of regimes for namespace `%s`. Must be %d, but found %d.", nsName, _PARTITIONS, len(partition.regimes))), errs)
}
Expand All @@ -169,7 +214,8 @@ func (pm partitionMap) validate() Error {
}
}
}
}
return true
})

if errs != nil || len(masterNodePartitionNotDefined) > 0 || len(replicaNodePartitionNotDefined) > 0 {
for nsName, partitionList := range masterNodePartitionNotDefined {
Expand Down