Skip to content

Commit 033fcb8

Browse files
authored
Remove global endPointList to avoid cluster init error
1 parent 52e3fe5 commit 033fcb8

File tree

2 files changed

+30
-16
lines changed

2 files changed

+30
-16
lines changed

client/session.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ package client
2121

2222
import (
2323
"bytes"
24-
"container/list"
2524
"context"
2625
"encoding/binary"
2726
"errors"
@@ -77,15 +76,14 @@ type Session struct {
7776
trans thrift.TTransport
7877
requestStatementId int64
7978
protocolFactory thrift.TProtocolFactory
79+
endPointList []endPoint
8080
}
8181

8282
type endPoint struct {
8383
Host string
8484
Port string
8585
}
8686

87-
var endPointList = list.New()
88-
8987
func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) error {
9088
if s.config.FetchSize <= 0 {
9189
s.config.FetchSize = DefaultFetchSize
@@ -1140,11 +1138,14 @@ func NewSession(config *Config) Session {
11401138
}
11411139

11421140
func newSessionWithSpecifiedSqlDialect(config *Config) Session {
1143-
endPoint := endPoint{}
1144-
endPoint.Host = config.Host
1145-
endPoint.Port = config.Port
1146-
endPointList.PushBack(endPoint)
1147-
return Session{config: config}
1141+
endPointList := []endPoint{{
1142+
Host: config.Host,
1143+
Port: config.Port,
1144+
}}
1145+
return Session{
1146+
config: config,
1147+
endPointList: endPointList,
1148+
}
11481149
}
11491150

11501151
func NewClusterSession(clusterConfig *ClusterConfig) (Session, error) {
@@ -1154,15 +1155,17 @@ func NewClusterSession(clusterConfig *ClusterConfig) (Session, error) {
11541155

11551156
func newClusterSessionWithSqlDialect(clusterConfig *ClusterConfig) (Session, error) {
11561157
session := Session{}
1157-
node := endPoint{}
1158+
session.endPointList = make([]endPoint, len(clusterConfig.NodeUrls))
11581159
for i := 0; i < len(clusterConfig.NodeUrls); i++ {
1160+
node := endPoint{}
11591161
node.Host = strings.Split(clusterConfig.NodeUrls[i], ":")[0]
11601162
node.Port = strings.Split(clusterConfig.NodeUrls[i], ":")[1]
1161-
endPointList.PushBack(node)
1163+
session.endPointList[i] = node
11621164
}
11631165
var err error
1164-
for e := endPointList.Front(); e != nil; e = e.Next() {
1165-
session.trans = thrift.NewTSocketConf(net.JoinHostPort(e.Value.(endPoint).Host, e.Value.(endPoint).Port), &thrift.TConfiguration{
1166+
for i := range session.endPointList {
1167+
ep := session.endPointList[i]
1168+
session.trans = thrift.NewTSocketConf(net.JoinHostPort(ep.Host, ep.Port), &thrift.TConfiguration{
11661169
ConnectTimeout: time.Duration(0), // Use 0 for no timeout
11671170
})
11681171
// session.trans = thrift.NewTFramedTransport(session.trans) // deprecated
@@ -1173,7 +1176,7 @@ func newClusterSessionWithSqlDialect(clusterConfig *ClusterConfig) (Session, err
11731176
if err != nil {
11741177
log.Println(err)
11751178
} else {
1176-
session.config = getConfig(e.Value.(endPoint).Host, e.Value.(endPoint).Port,
1179+
session.config = getConfig(ep.Host, ep.Port,
11771180
clusterConfig.UserName, clusterConfig.Password, clusterConfig.FetchSize, clusterConfig.TimeZone, clusterConfig.ConnectRetryMax, clusterConfig.Database, clusterConfig.sqlDialect)
11781181
break
11791182
}
@@ -1250,13 +1253,14 @@ func (s *Session) reconnect() bool {
12501253
var connectedSuccess = false
12511254

12521255
for i := 0; i < s.config.ConnectRetryMax; i++ {
1253-
for e := endPointList.Front(); e != nil; e = e.Next() {
1254-
err = s.initClusterConn(e.Value.(endPoint))
1256+
for i := range s.endPointList {
1257+
ep := s.endPointList[i]
1258+
err = s.initClusterConn(ep)
12551259
if err == nil {
12561260
connectedSuccess = true
12571261
break
12581262
} else {
1259-
log.Println("Connection refused:", e.Value.(endPoint))
1263+
log.Println("Connection refused:", ep)
12601264
}
12611265
}
12621266
if connectedSuccess {

test/e2e/e2e_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,16 @@ func (s *e2eTestSuite) checkError(status *common.TSStatus, err error) {
7575
}
7676
}
7777

78+
func (s *e2eTestSuite) Test_WrongURL() {
79+
clusterConfig := client.ClusterConfig{
80+
NodeUrls: strings.Split("iotdb1:6667", ","),
81+
UserName: "root",
82+
Password: "root",
83+
}
84+
_, err := client.NewClusterSession(&clusterConfig)
85+
s.Require().Error(err)
86+
}
87+
7888
func (s *e2eTestSuite) Test_CreateTimeseries() {
7989
var (
8090
path = "root.tsg1.dev1.status"

0 commit comments

Comments
 (0)