@@ -601,6 +601,60 @@ func (s *Session) ExecuteAggregationQueryWithLegalNodes(paths []string, aggregat
601601 }
602602}
603603
604+ func (s * Session ) ExecuteGroupByQueryIntervalQuery (database * string , device , measurement string , aggregationType common.TAggregationType ,
605+ dataType int32 , startTime * int64 , endTime * int64 , interval * int64 , timeoutMs * int64 , isAligned * bool ) (* SessionDataSet , error ) {
606+
607+ request := rpc.TSGroupByQueryIntervalReq {SessionId : s .sessionId , StatementId : s .requestStatementId ,
608+ Database : database , Device : device , Measurement : measurement , AggregationType : aggregationType , DataType : dataType ,
609+ StartTime : startTime , EndTime : endTime , Interval : interval , FetchSize : & s .config .FetchSize ,
610+ Timeout : timeoutMs , IsAligned : isAligned }
611+ if resp , err := s .client .ExecuteGroupByQueryIntervalQuery (context .Background (), & request ); err == nil {
612+ if statusErr := VerifySuccess (resp .Status ); statusErr == nil {
613+ return NewSessionDataSet ("" , resp .Columns , resp .DataTypeList , resp .ColumnNameIndexMap , * resp .QueryId , s .requestStatementId , s .client , s .sessionId , resp .QueryResult_ , resp .IgnoreTimeStamp != nil && * resp .IgnoreTimeStamp , timeoutMs , * resp .MoreData , s .config .FetchSize , s .config .TimeZone , s .timeFactor , resp .GetColumnIndex2TsBlockColumnIndexList ())
614+ } else {
615+ return nil , statusErr
616+ }
617+ } else {
618+ if s .reconnect () {
619+ request .SessionId = s .sessionId
620+ resp , err = s .client .ExecuteGroupByQueryIntervalQuery (context .Background (), & request )
621+ if statusErr := VerifySuccess (resp .Status ); statusErr == nil {
622+ return NewSessionDataSet ("" , resp .Columns , resp .DataTypeList , resp .ColumnNameIndexMap , * resp .QueryId , s .requestStatementId , s .client , s .sessionId , resp .QueryResult_ , resp .IgnoreTimeStamp != nil && * resp .IgnoreTimeStamp , timeoutMs , * resp .MoreData , s .config .FetchSize , s .config .TimeZone , s .timeFactor , resp .GetColumnIndex2TsBlockColumnIndexList ())
623+ } else {
624+ return nil , statusErr
625+ }
626+ }
627+ return nil , err
628+ }
629+ }
630+
631+ func (s * Session ) ExecuteFastLastDataQueryForOnePrefixPath (prefixes []string , timeoutMs * int64 ) (* SessionDataSet , error ) {
632+ request := rpc.TSFastLastDataQueryForOnePrefixPathReq {
633+ SessionId : s .sessionId ,
634+ StatementId : s .requestStatementId ,
635+ Prefixes : prefixes ,
636+ Timeout : timeoutMs ,
637+ }
638+ if resp , err := s .client .ExecuteFastLastDataQueryForOnePrefixPath (context .Background (), & request ); err == nil {
639+ if statusErr := VerifySuccess (resp .Status ); statusErr == nil {
640+ return NewSessionDataSet ("" , resp .Columns , resp .DataTypeList , resp .ColumnNameIndexMap , * resp .QueryId , s .requestStatementId , s .client , s .sessionId , resp .QueryResult_ , resp .IgnoreTimeStamp != nil && * resp .IgnoreTimeStamp , timeoutMs , * resp .MoreData , s .config .FetchSize , s .config .TimeZone , s .timeFactor , resp .GetColumnIndex2TsBlockColumnIndexList ())
641+ } else {
642+ return nil , statusErr
643+ }
644+ } else {
645+ if s .reconnect () {
646+ request .SessionId = s .sessionId
647+ resp , err = s .client .ExecuteFastLastDataQueryForOnePrefixPath (context .Background (), & request )
648+ if statusErr := VerifySuccess (resp .Status ); statusErr == nil {
649+ return NewSessionDataSet ("" , resp .Columns , resp .DataTypeList , resp .ColumnNameIndexMap , * resp .QueryId , s .requestStatementId , s .client , s .sessionId , resp .QueryResult_ , resp .IgnoreTimeStamp != nil && * resp .IgnoreTimeStamp , timeoutMs , * resp .MoreData , s .config .FetchSize , s .config .TimeZone , s .timeFactor , resp .GetColumnIndex2TsBlockColumnIndexList ())
650+ } else {
651+ return nil , statusErr
652+ }
653+ }
654+ return nil , err
655+ }
656+ }
657+
604658func (s * Session ) genTSInsertRecordReq (deviceId string , time int64 ,
605659 measurements []string ,
606660 types []TSDataType ,
0 commit comments