Skip to content

Commit 553bcc5

Browse files
authored
fix: there is no errors when sql is incorrect (#123)
1 parent 033fcb8 commit 553bcc5

File tree

2 files changed

+74
-37
lines changed

2 files changed

+74
-37
lines changed

client/session.go

Lines changed: 65 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,15 @@ import (
2525
"encoding/binary"
2626
"errors"
2727
"fmt"
28-
"github.com/apache/iotdb-client-go/common"
2928
"log"
3029
"net"
3130
"reflect"
3231
"sort"
3332
"strings"
3433
"time"
3534

35+
"github.com/apache/iotdb-client-go/common"
36+
3637
"github.com/apache/iotdb-client-go/rpc"
3738
"github.com/apache/thrift/lib/go/thrift"
3839
)
@@ -103,7 +104,7 @@ func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) err
103104
ConnectTimeout: time.Duration(connectionTimeoutInMs) * time.Millisecond, // Use 0 for no timeout
104105
})
105106
// s.trans = thrift.NewTFramedTransport(s.trans) // deprecated
106-
var tmp_conf = thrift.TConfiguration{MaxFrameSize: thrift.DEFAULT_MAX_FRAME_SIZE}
107+
tmp_conf := thrift.TConfiguration{MaxFrameSize: thrift.DEFAULT_MAX_FRAME_SIZE}
107108
s.trans = thrift.NewTFramedTransportConf(s.trans, &tmp_conf)
108109
if !s.trans.IsOpen() {
109110
err = s.trans.Open()
@@ -115,8 +116,10 @@ func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) err
115116
iprot := s.protocolFactory.GetProtocol(s.trans)
116117
oprot := s.protocolFactory.GetProtocol(s.trans)
117118
s.client = rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, oprot))
118-
req := rpc.TSOpenSessionReq{ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, Username: s.config.UserName,
119-
Password: &s.config.Password}
119+
req := rpc.TSOpenSessionReq{
120+
ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, Username: s.config.UserName,
121+
Password: &s.config.Password,
122+
}
120123
req.Configuration = make(map[string]string)
121124
req.Configuration["sql_dialect"] = s.config.sqlDialect
122125
if s.config.Version == "" {
@@ -165,8 +168,10 @@ func (s *Session) OpenCluster(enableRPCCompression bool) error {
165168
iprot := s.protocolFactory.GetProtocol(s.trans)
166169
oprot := s.protocolFactory.GetProtocol(s.trans)
167170
s.client = rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, oprot))
168-
req := rpc.TSOpenSessionReq{ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, Username: s.config.UserName,
169-
Password: &s.config.Password}
171+
req := rpc.TSOpenSessionReq{
172+
ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, Username: s.config.UserName,
173+
Password: &s.config.Password,
174+
}
170175
req.Configuration = make(map[string]string)
171176
req.Configuration["sql_dialect"] = s.config.sqlDialect
172177
if s.config.Version == "" {
@@ -267,8 +272,10 @@ func (s *Session) DeleteStorageGroups(storageGroupIds ...string) (r *common.TSSt
267272
*error: correctness of operation
268273
*/
269274
func (s *Session) CreateTimeseries(path string, dataType TSDataType, encoding TSEncoding, compressor TSCompressionType, attributes map[string]string, tags map[string]string) (r *common.TSStatus, err error) {
270-
request := rpc.TSCreateTimeseriesReq{SessionId: s.sessionId, Path: path, DataType: int32(dataType), Encoding: int32(encoding),
271-
Compressor: int32(compressor), Attributes: attributes, Tags: tags}
275+
request := rpc.TSCreateTimeseriesReq{
276+
SessionId: s.sessionId, Path: path, DataType: int32(dataType), Encoding: int32(encoding),
277+
Compressor: int32(compressor), Attributes: attributes, Tags: tags,
278+
}
272279
status, err := s.client.CreateTimeseries(context.Background(), &request)
273280
if err != nil && status == nil {
274281
if s.reconnect() {
@@ -352,8 +359,10 @@ func (s *Session) CreateMultiTimeseries(paths []string, dataTypes []TSDataType,
352359
destCompressions[i] = int32(e)
353360
}
354361

355-
request := rpc.TSCreateMultiTimeseriesReq{SessionId: s.sessionId, Paths: paths, DataTypes: destTypes,
356-
Encodings: destEncodings, Compressors: destCompressions}
362+
request := rpc.TSCreateMultiTimeseriesReq{
363+
SessionId: s.sessionId, Paths: paths, DataTypes: destTypes,
364+
Encodings: destEncodings, Compressors: destCompressions,
365+
}
357366
r, err = s.client.CreateMultiTimeseries(context.Background(), &request)
358367

359368
if err != nil && r == nil {
@@ -415,8 +424,10 @@ func (s *Session) DeleteData(paths []string, startTime int64, endTime int64) (r
415424
*error: correctness of operation
416425
*/
417426
func (s *Session) InsertStringRecord(deviceId string, measurements []string, values []string, timestamp int64) (r *common.TSStatus, err error) {
418-
request := rpc.TSInsertStringRecordReq{SessionId: s.sessionId, PrefixPath: deviceId, Measurements: measurements,
419-
Values: values, Timestamp: timestamp}
427+
request := rpc.TSInsertStringRecordReq{
428+
SessionId: s.sessionId, PrefixPath: deviceId, Measurements: measurements,
429+
Values: values, Timestamp: timestamp,
430+
}
420431
r, err = s.client.InsertStringRecord(context.Background(), &request)
421432
if err != nil && r == nil {
422433
if s.reconnect() {
@@ -442,26 +453,33 @@ func (s *Session) SetTimeZone(timeZone string) (r *common.TSStatus, err error) {
442453
return r, err
443454
}
444455

445-
func (s *Session) ExecuteStatement(sql string) (*SessionDataSet, error) {
456+
func (s *Session) ExecuteStatementWithContext(ctx context.Context, sql string) (*SessionDataSet, error) {
446457
request := rpc.TSExecuteStatementReq{
447458
SessionId: s.sessionId,
448459
Statement: sql,
449460
StatementId: s.requestStatementId,
450461
FetchSize: &s.config.FetchSize,
451462
}
452-
resp, err := s.client.ExecuteStatement(context.Background(), &request)
463+
resp, err := s.client.ExecuteStatement(ctx, &request)
453464

454465
if err != nil && resp == nil {
455466
if s.reconnect() {
456467
request.SessionId = s.sessionId
457468
request.StatementId = s.requestStatementId
458-
resp, err = s.client.ExecuteStatement(context.Background(), &request)
469+
resp, err = s.client.ExecuteStatement(ctx, &request)
459470
}
460471
}
472+
if statusErr := VerifySuccess(resp.Status); statusErr != nil {
473+
return nil, statusErr
474+
}
461475

462476
return s.genDataSet(sql, resp), err
463477
}
464478

479+
func (s *Session) ExecuteStatement(sql string) (*SessionDataSet, error) {
480+
return s.ExecuteStatementWithContext(context.Background(), sql)
481+
}
482+
465483
func (s *Session) ExecuteNonQueryStatement(sql string) (r *common.TSStatus, err error) {
466484
request := rpc.TSExecuteStatementReq{
467485
SessionId: s.sessionId,
@@ -490,8 +508,10 @@ func (s *Session) changeDatabase(database string) {
490508
}
491509

492510
func (s *Session) ExecuteQueryStatement(sql string, timeoutMs *int64) (*SessionDataSet, error) {
493-
request := rpc.TSExecuteStatementReq{SessionId: s.sessionId, Statement: sql, StatementId: s.requestStatementId,
494-
FetchSize: &s.config.FetchSize, Timeout: timeoutMs}
511+
request := rpc.TSExecuteStatementReq{
512+
SessionId: s.sessionId, Statement: sql, StatementId: s.requestStatementId,
513+
FetchSize: &s.config.FetchSize, Timeout: timeoutMs,
514+
}
495515
if resp, err := s.client.ExecuteQueryStatement(context.Background(), &request); err == nil {
496516
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
497517
return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
@@ -515,10 +535,12 @@ func (s *Session) ExecuteQueryStatement(sql string, timeoutMs *int64) (*SessionD
515535

516536
func (s *Session) ExecuteAggregationQuery(paths []string, aggregations []common.TAggregationType,
517537
startTime *int64, endTime *int64, interval *int64,
518-
timeoutMs *int64) (*SessionDataSet, error) {
519-
520-
request := rpc.TSAggregationQueryReq{SessionId: s.sessionId, StatementId: s.requestStatementId, Paths: paths,
521-
Aggregations: aggregations, StartTime: startTime, EndTime: endTime, Interval: interval, FetchSize: &s.config.FetchSize, Timeout: timeoutMs}
538+
timeoutMs *int64,
539+
) (*SessionDataSet, error) {
540+
request := rpc.TSAggregationQueryReq{
541+
SessionId: s.sessionId, StatementId: s.requestStatementId, Paths: paths,
542+
Aggregations: aggregations, StartTime: startTime, EndTime: endTime, Interval: interval, FetchSize: &s.config.FetchSize, Timeout: timeoutMs,
543+
}
522544
if resp, err := s.client.ExecuteAggregationQuery(context.Background(), &request); err == nil {
523545
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
524546
return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
@@ -541,11 +563,13 @@ func (s *Session) ExecuteAggregationQuery(paths []string, aggregations []common.
541563

542564
func (s *Session) ExecuteAggregationQueryWithLegalNodes(paths []string, aggregations []common.TAggregationType,
543565
startTime *int64, endTime *int64, interval *int64,
544-
timeoutMs *int64, legalNodes *bool) (*SessionDataSet, error) {
545-
546-
request := rpc.TSAggregationQueryReq{SessionId: s.sessionId, StatementId: s.requestStatementId, Paths: paths,
566+
timeoutMs *int64, legalNodes *bool,
567+
) (*SessionDataSet, error) {
568+
request := rpc.TSAggregationQueryReq{
569+
SessionId: s.sessionId, StatementId: s.requestStatementId, Paths: paths,
547570
Aggregations: aggregations, StartTime: startTime, EndTime: endTime, Interval: interval, FetchSize: &s.config.FetchSize,
548-
Timeout: timeoutMs, LegalPathNodes: legalNodes}
571+
Timeout: timeoutMs, LegalPathNodes: legalNodes,
572+
}
549573
if resp, err := s.client.ExecuteAggregationQuery(context.Background(), &request); err == nil {
550574
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
551575
return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
@@ -570,7 +594,8 @@ func (s *Session) genTSInsertRecordReq(deviceId string, time int64,
570594
measurements []string,
571595
types []TSDataType,
572596
values []interface{},
573-
isAligned bool) (*rpc.TSInsertRecordReq, error) {
597+
isAligned bool,
598+
) (*rpc.TSInsertRecordReq, error) {
574599
request := &rpc.TSInsertRecordReq{}
575600
request.SessionId = s.sessionId
576601
request.PrefixPath = deviceId
@@ -709,7 +734,7 @@ func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId string, timestamps []
709734
return nil, err
710735
}
711736
}
712-
var isAligned = true
737+
isAligned := true
713738
request := &rpc.TSInsertRecordsOfOneDeviceReq{
714739
SessionId: s.sessionId,
715740
PrefixPath: deviceId,
@@ -744,7 +769,8 @@ func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId string, timestamps []
744769
*
745770
*/
746771
func (s *Session) InsertRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{},
747-
timestamps []int64) (r *common.TSStatus, err error) {
772+
timestamps []int64,
773+
) (r *common.TSStatus, err error) {
748774
request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps, false)
749775
if err != nil {
750776
return nil, err
@@ -761,7 +787,8 @@ func (s *Session) InsertRecords(deviceIds []string, measurements [][]string, dat
761787
}
762788

763789
func (s *Session) InsertAlignedRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{},
764-
timestamps []int64) (r *common.TSStatus, err error) {
790+
timestamps []int64,
791+
) (r *common.TSStatus, err error) {
765792
request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps, true)
766793
if err != nil {
767794
return nil, err
@@ -932,7 +959,8 @@ func (s *Session) genInsertTabletsReq(tablets []*Tablet, isAligned bool) (*rpc.T
932959
}
933960

934961
func (s *Session) genInsertRecordsReq(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{},
935-
timestamps []int64, isAligned bool) (*rpc.TSInsertRecordsReq, error) {
962+
timestamps []int64, isAligned bool,
963+
) (*rpc.TSInsertRecordsReq, error) {
936964
length := len(deviceIds)
937965
if length != len(timestamps) || length != len(measurements) || length != len(values) {
938966
return nil, errLength
@@ -1169,7 +1197,7 @@ func newClusterSessionWithSqlDialect(clusterConfig *ClusterConfig) (Session, err
11691197
ConnectTimeout: time.Duration(0), // Use 0 for no timeout
11701198
})
11711199
// session.trans = thrift.NewTFramedTransport(session.trans) // deprecated
1172-
var tmp_conf = thrift.TConfiguration{MaxFrameSize: thrift.DEFAULT_MAX_FRAME_SIZE}
1200+
tmp_conf := thrift.TConfiguration{MaxFrameSize: thrift.DEFAULT_MAX_FRAME_SIZE}
11731201
session.trans = thrift.NewTFramedTransportConf(session.trans, &tmp_conf)
11741202
if !session.trans.IsOpen() {
11751203
err = session.trans.Open()
@@ -1196,7 +1224,7 @@ func (s *Session) initClusterConn(node endPoint) error {
11961224
})
11971225
if err == nil {
11981226
// s.trans = thrift.NewTFramedTransport(s.trans) // deprecated
1199-
var tmp_conf = thrift.TConfiguration{MaxFrameSize: thrift.DEFAULT_MAX_FRAME_SIZE}
1227+
tmp_conf := thrift.TConfiguration{MaxFrameSize: thrift.DEFAULT_MAX_FRAME_SIZE}
12001228
s.trans = thrift.NewTFramedTransportConf(s.trans, &tmp_conf)
12011229
if !s.trans.IsOpen() {
12021230
err = s.trans.Open()
@@ -1221,8 +1249,10 @@ func (s *Session) initClusterConn(node endPoint) error {
12211249
iprot := s.protocolFactory.GetProtocol(s.trans)
12221250
oprot := s.protocolFactory.GetProtocol(s.trans)
12231251
s.client = rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, oprot))
1224-
req := rpc.TSOpenSessionReq{ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, Username: s.config.UserName,
1225-
Password: &s.config.Password}
1252+
req := rpc.TSOpenSessionReq{
1253+
ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, Username: s.config.UserName,
1254+
Password: &s.config.Password,
1255+
}
12261256

12271257
resp, err := s.client.OpenSession(context.Background(), &req)
12281258
if err != nil {
@@ -1231,7 +1261,6 @@ func (s *Session) initClusterConn(node endPoint) error {
12311261
s.sessionId = resp.GetSessionId()
12321262
s.requestStatementId, err = s.client.RequestStatementId(context.Background(), s.sessionId)
12331263
return err
1234-
12351264
}
12361265

12371266
func getConfig(host string, port string, userName string, passWord string, fetchSize int32, timeZone string, connectRetryMax int, database string, sqlDialect string) *Config {
@@ -1250,7 +1279,7 @@ func getConfig(host string, port string, userName string, passWord string, fetch
12501279

12511280
func (s *Session) reconnect() bool {
12521281
var err error
1253-
var connectedSuccess = false
1282+
connectedSuccess := false
12541283

12551284
for i := 0; i < s.config.ConnectRetryMax; i++ {
12561285
for i := range s.endPointList {

test/e2e/e2e_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,16 @@
2020
package e2e
2121

2222
import (
23+
"context"
2324
"fmt"
24-
"github.com/apache/iotdb-client-go/common"
2525
"log"
2626
"math/rand"
2727
"strings"
2828
"testing"
2929
"time"
3030

31+
"github.com/apache/iotdb-client-go/common"
32+
3133
"github.com/apache/iotdb-client-go/client"
3234
"github.com/stretchr/testify/suite"
3335
)
@@ -392,3 +394,9 @@ func (s *e2eTestSuite) Test_InsertAlignedTablets() {
392394
assert.Equal(status, "8")
393395
s.session.DeleteStorageGroup("root.ln.**")
394396
}
397+
398+
func (s *e2eTestSuite) Test_InvalidSQL() {
399+
_, err := s.session.ExecuteStatementWithContext(context.Background(), "select1 from device")
400+
assert := s.Require()
401+
assert.Error(err)
402+
}

0 commit comments

Comments
 (0)