forked from scylladb/gocql
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpolicies_integration_test.go
More file actions
154 lines (133 loc) · 4.52 KB
/
policies_integration_test.go
File metadata and controls
154 lines (133 loc) · 4.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
//go:build integration
// +build integration
package gocql
import (
"context"
"fmt"
"testing"
"time"
)
// Check if session fail to start if DC name provided in the policy is wrong
func TestDCValidationTokenAware(t *testing.T) {
cluster := createCluster()
fallback := DCAwareRoundRobinPolicy("WRONG_DC")
cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(fallback)
_, err := cluster.CreateSession()
if err == nil {
t.Fatal("createSession was expected to fail with wrong DC name provided.")
}
}
func TestDCValidationDCAware(t *testing.T) {
cluster := createCluster()
cluster.PoolConfig.HostSelectionPolicy = DCAwareRoundRobinPolicy("WRONG_DC")
_, err := cluster.CreateSession()
if err == nil {
t.Fatal("createSession was expected to fail with wrong DC name provided.")
}
}
func TestDCValidationRackAware(t *testing.T) {
cluster := createCluster()
cluster.PoolConfig.HostSelectionPolicy = RackAwareRoundRobinPolicy("WRONG_DC", "RACK")
_, err := cluster.CreateSession()
if err == nil {
t.Fatal("createSession was expected to fail with wrong DC name provided.")
}
}
func TestTokenAwareHostPolicy(t *testing.T) {
t.Run("keyspace", func(t *testing.T) {
ks := "tokenaware_init_test"
createKeyspace(t, createCluster(), ks, false)
policy := TokenAwareHostPolicy(RoundRobinHostPolicy())
tokenPolicy := policy.(*tokenAwareHostPolicy)
cluster := createCluster()
cluster.Keyspace = ks
cluster.PoolConfig.HostSelectionPolicy = policy
testIfPolicyInitializedProperly(t, cluster, tokenPolicy)
})
t.Run("no-keyspace", func(t *testing.T) {
policy := TokenAwareHostPolicy(RoundRobinHostPolicy())
tokenPolicy := policy.(*tokenAwareHostPolicy)
cluster := createCluster()
cluster.PoolConfig.HostSelectionPolicy = policy
testIfPolicyInitializedProperly(t, cluster, tokenPolicy)
})
}
func testIfPolicyInitializedProperly(t *testing.T, cluster *ClusterConfig, policy *tokenAwareHostPolicy) {
_, err := cluster.CreateSession()
if err != nil {
t.Fatalf(fmt.Errorf("faled to create session: %v", err).Error())
}
md := policy.getMetadataReadOnly()
if md == nil {
t.Fatalf("tokenAwareHostPolicy has no metadata")
}
if len(md.tokenRing.tokens) == 0 {
t.Fatalf("tokenAwareHostPolicy metadata has no tokens")
}
if len(md.tokenRing.hosts) == 0 {
t.Fatalf("tokenAwareHostPolicy metadata has no hosts")
}
if md.tokenRing.partitioner == nil {
t.Fatalf("tokenAwareHostPolicy metadata has no partitioner")
}
if cluster.Keyspace != "" {
if len(md.replicas[cluster.Keyspace]) == 0 {
t.Fatalf("tokenAwareHostPolicy metadata has no replicas in target keyspace")
}
}
}
// This test ensures that when all hosts are down, the query execution does not hang.
func TestNoHangAllHostsDown(t *testing.T) {
cluster := createCluster()
session := createSessionFromCluster(cluster, t)
hosts := session.GetHosts()
dc := hosts[0].DataCenter()
rack := hosts[0].Rack()
session.Close()
policies := []HostSelectionPolicy{
DCAwareRoundRobinPolicy(dc),
DCAwareRoundRobinPolicy(dc, HostPolicyOptionDisableDCFailover),
TokenAwareHostPolicy(DCAwareRoundRobinPolicy(dc)),
TokenAwareHostPolicy(DCAwareRoundRobinPolicy(dc, HostPolicyOptionDisableDCFailover)),
RackAwareRoundRobinPolicy(dc, rack),
RackAwareRoundRobinPolicy(dc, rack, HostPolicyOptionDisableDCFailover),
TokenAwareHostPolicy(RackAwareRoundRobinPolicy(dc, rack)),
TokenAwareHostPolicy(RackAwareRoundRobinPolicy(dc, rack, HostPolicyOptionDisableDCFailover)),
nil,
}
for _, policy := range policies {
cluster = createCluster()
cluster.PoolConfig.HostSelectionPolicy = policy
session = createSessionFromCluster(cluster, t)
hosts = session.GetHosts()
// simulating hosts are down
for _, host := range hosts {
pool, _ := session.pool.getPoolByHostID(host.HostID())
pool.host.setState(NodeDown)
if policy != nil {
policy.AddHost(host)
}
}
ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second)
defer cancel()
_ = session.Query("SELECT host_id FROM system.local").WithContext(ctx).Exec()
if ctx.Err() != nil {
t.Errorf("policy %T should be no hangups when all hosts are down", policy)
}
// remove all host except one
if policy != nil {
for i, host := range hosts {
if i != 0 {
policy.RemoveHost(host)
}
}
}
ctx, cancel2 := context.WithTimeout(context.Background(), 12*time.Second)
defer cancel2()
_ = session.Query("SELECT host_id FROM system.local").WithContext(ctx).Exec()
if ctx.Err() != nil {
t.Errorf("policy %T should be no hangups when all hosts are down", policy)
}
session.Close()
}
}