diff --git a/client/column.go b/client/column.go new file mode 100644 index 0000000..23cb274 --- /dev/null +++ b/client/column.go @@ -0,0 +1,821 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package client + +import "fmt" + +type ColumnEncoding uint8 + +const ( + BYTE_ARRAY_COLUMN_ENCODING = ColumnEncoding(iota) + INT32_ARRAY_COLUMN_ENCODING + INT64_ARRAY_COLUMN_ENCODING + BINARY_ARRAY_COLUMN_ENCODING + RLE_COLUMN_ENCODING +) + +var encodingToDecoder = map[ColumnEncoding]ColumnDecoder{ + INT32_ARRAY_COLUMN_ENCODING: new(Int32ArrayColumnDecoder), + INT64_ARRAY_COLUMN_ENCODING: new(Int64ArrayColumnDecoder), + BYTE_ARRAY_COLUMN_ENCODING: new(ByteArrayColumnDecoder), + BINARY_ARRAY_COLUMN_ENCODING: new(BinaryArrayColumnDecoder), + RLE_COLUMN_ENCODING: new(RunLengthColumnDecoder), +} + +var byteToEncoding = map[byte]ColumnEncoding{ + 0: BYTE_ARRAY_COLUMN_ENCODING, + 1: INT32_ARRAY_COLUMN_ENCODING, + 2: INT64_ARRAY_COLUMN_ENCODING, + 3: BINARY_ARRAY_COLUMN_ENCODING, + 4: RLE_COLUMN_ENCODING, +} + +func getColumnDecoder(encoding ColumnEncoding) (ColumnDecoder, error) { + decoder, exists := encodingToDecoder[encoding] + if !exists { + return nil, fmt.Errorf("unsupported column encoding: %v", encoding) + } + return decoder, nil +} + +func getColumnEncodingByByte(b byte) (ColumnEncoding, error) { + encoding, exists := byteToEncoding[b] + if !exists { + return INT32_ARRAY_COLUMN_ENCODING, fmt.Errorf("invalid value: %v", b) + } + return encoding, nil +} + +type Column interface { + GetDataType() TSDataType + GetEncoding() ColumnEncoding + GetBoolean(position int32) (bool, error) + GetInt(position int32) (int32, error) + GetLong(position int32) (int64, error) + GetFloat(position int32) (float32, error) + GetDouble(position int32) (float64, error) + GetBinary(position int32) (*Binary, error) + GetObject(position int32) (interface{}, error) + + GetBooleans() ([]bool, error) + GetInts() ([]int32, error) + GetLongs() ([]int64, error) + GetFloats() ([]float32, error) + GetDoubles() ([]float64, error) + GetBinaries() ([]*Binary, error) + GetObjects() ([]interface{}, error) + + MayHaveNull() bool + IsNull(position int32) bool + IsNulls() []bool + + GetPositionCount() int32 +} + +type baseColumn struct { +} + +func (c *baseColumn) GetBoolean(_ int32) (bool, error) { + return false, fmt.Errorf("unsupported operation: GetBoolean") +} + +func (c *baseColumn) GetInt(_ int32) (int32, error) { + return 0, fmt.Errorf("unsupported operation: GetInt") +} + +func (c *baseColumn) GetLong(_ int32) (int64, error) { + return 0, fmt.Errorf("unsupported operation: GetLong") +} + +func (c *baseColumn) GetFloat(_ int32) (float32, error) { + return 0, fmt.Errorf("unsupported operation: GetFloat") +} + +func (c *baseColumn) GetDouble(_ int32) (float64, error) { + return 0, fmt.Errorf("unsupported operation: GetDouble") +} + +func (c *baseColumn) GetBinary(_ int32) (*Binary, error) { + return nil, fmt.Errorf("unsupported operation: GetBinary") +} + +func (c *baseColumn) GetObject(_ int32) (interface{}, error) { + return nil, fmt.Errorf("unsupported operation: GetObject") +} + +func (c *baseColumn) GetBooleans() ([]bool, error) { + return nil, fmt.Errorf("unsupported operation: GetBooleans") +} + +func (c *baseColumn) GetInts() ([]int32, error) { + return nil, fmt.Errorf("unsupported operation: GetInts") +} + +func (c *baseColumn) GetLongs() ([]int64, error) { + return nil, fmt.Errorf("unsupported operation: GetLongs") +} + +func (c *baseColumn) GetFloats() ([]float32, error) { + return nil, fmt.Errorf("unsupported operation: GetFloats") +} + +func (c *baseColumn) GetDoubles() ([]float64, error) { + return nil, fmt.Errorf("unsupported operation: GetDoubles") +} + +func (c *baseColumn) GetBinaries() ([]*Binary, error) { + return nil, fmt.Errorf("unsupported operation: GetBinaries") +} + +func (c *baseColumn) GetObjects() ([]interface{}, error) { + return nil, fmt.Errorf("unsupported operation: GetObjects") +} + +type TimeColumn struct { + baseColumn + arrayOffset int32 + positionCount int32 + values []int64 +} + +func NewTimeColumn(arrayOffset int32, positionCount int32, values []int64) (*TimeColumn, error) { + if arrayOffset < 0 { + return nil, fmt.Errorf("arrayOffset is negative") + } + if positionCount < 0 { + return nil, fmt.Errorf("arrayOffset is negative") + } + if int32(len(values))-arrayOffset < positionCount { + return nil, fmt.Errorf("values length is less than positionCount") + } + return &TimeColumn{ + arrayOffset: arrayOffset, + positionCount: positionCount, + values: values, + }, nil +} + +func (tc *TimeColumn) GetDataType() TSDataType { + return INT64 +} + +func (tc *TimeColumn) GetEncoding() ColumnEncoding { + return INT64_ARRAY_COLUMN_ENCODING +} + +func (tc *TimeColumn) GetLong(position int32) (int64, error) { + return tc.values[position+tc.arrayOffset], nil +} + +func (tc *TimeColumn) MayHaveNull() bool { + return false +} + +func (tc *TimeColumn) IsNull(_ int32) bool { + return false +} + +func (tc *TimeColumn) IsNulls() []bool { + return nil +} + +func (tc *TimeColumn) GetPositionCount() int32 { + return tc.positionCount +} + +func (tc *TimeColumn) GetStartTime() int64 { + return tc.values[tc.arrayOffset] +} + +func (tc *TimeColumn) GetEndTime() int64 { + return tc.values[tc.positionCount+tc.arrayOffset-1] +} + +func (tc *TimeColumn) GetTimes() []int64 { + return tc.values +} + +func (tc *TimeColumn) GetLongs() ([]int64, error) { + return tc.GetTimes(), nil +} + +type BinaryColumn struct { + baseColumn + arrayOffset int32 + positionCount int32 + valueIsNull []bool + values []*Binary +} + +func NewBinaryColumn(arrayOffset int32, positionCount int32, valueIsNull []bool, values []*Binary) (*BinaryColumn, error) { + if arrayOffset < 0 { + return nil, fmt.Errorf("arrayOffset is negative") + } + if positionCount < 0 { + return nil, fmt.Errorf("positionCount is negative") + } + if int32(len(values))-arrayOffset < positionCount { + return nil, fmt.Errorf("values length is less than positionCount") + } + if valueIsNull != nil && int32(len(valueIsNull))-arrayOffset < positionCount { + return nil, fmt.Errorf("isNull length is less than positionCount") + } + return &BinaryColumn{ + arrayOffset: arrayOffset, + positionCount: positionCount, + valueIsNull: valueIsNull, + values: values, + }, nil +} + +func (c *BinaryColumn) GetDataType() TSDataType { + return TEXT +} + +func (c *BinaryColumn) GetEncoding() ColumnEncoding { + return BINARY_ARRAY_COLUMN_ENCODING +} + +func (c *BinaryColumn) GetBinary(position int32) (*Binary, error) { + return c.values[position+c.arrayOffset], nil +} + +func (c *BinaryColumn) GetBinaries() ([]*Binary, error) { + return c.values, nil +} + +func (c *BinaryColumn) GetObject(position int32) (interface{}, error) { + return c.GetBinary(position) +} + +func (c *BinaryColumn) MayHaveNull() bool { + return c.valueIsNull != nil +} + +func (c *BinaryColumn) IsNull(position int32) bool { + return c.valueIsNull != nil && c.valueIsNull[position+c.arrayOffset] +} + +func (c *BinaryColumn) IsNulls() []bool { + if c.valueIsNull != nil { + return c.valueIsNull + } + result := make([]bool, c.positionCount) + for i := int32(0); i < c.positionCount; i++ { + result[i] = false + } + return result +} + +func (c *BinaryColumn) GetPositionCount() int32 { + return c.positionCount +} + +type IntColumn struct { + baseColumn + arrayOffset int32 + positionCount int32 + valueIsNull []bool + values []int32 +} + +func NewIntColumn(arrayOffset int32, positionCount int32, valueIsNull []bool, values []int32) (*IntColumn, error) { + if arrayOffset < 0 { + return nil, fmt.Errorf("arrayOffset is negative") + } + if positionCount < 0 { + return nil, fmt.Errorf("positionCount is negative") + } + if int32(len(values))-arrayOffset < positionCount { + return nil, fmt.Errorf("values length is less than positionCount") + } + if valueIsNull != nil && int32(len(valueIsNull))-arrayOffset < positionCount { + return nil, fmt.Errorf("isNull length is less than positionCount") + } + return &IntColumn{ + arrayOffset: arrayOffset, + positionCount: positionCount, + valueIsNull: valueIsNull, + values: values, + }, nil +} + +func (c *IntColumn) GetDataType() TSDataType { + return INT32 +} + +func (c *IntColumn) GetEncoding() ColumnEncoding { + return INT32_ARRAY_COLUMN_ENCODING +} + +func (c *IntColumn) GetInt(position int32) (int32, error) { + return c.values[position+c.arrayOffset], nil +} + +func (c *IntColumn) GetInts() ([]int32, error) { + return c.values, nil +} + +func (c *IntColumn) GetObject(position int32) (interface{}, error) { + return c.GetInt(position) +} + +func (c *IntColumn) MayHaveNull() bool { + return c.valueIsNull != nil +} + +func (c *IntColumn) IsNull(position int32) bool { + return c.valueIsNull != nil && c.valueIsNull[position+c.arrayOffset] +} + +func (c *IntColumn) IsNulls() []bool { + if c.valueIsNull != nil { + return c.valueIsNull + } + result := make([]bool, c.positionCount) + for i := int32(0); i < c.positionCount; i++ { + result[i] = false + } + return result +} + +func (c *IntColumn) GetPositionCount() int32 { + return c.positionCount +} + +type FloatColumn struct { + baseColumn + arrayOffset int32 + positionCount int32 + valueIsNull []bool + values []float32 +} + +func NewFloatColumn(arrayOffset int32, positionCount int32, valueIsNull []bool, values []float32) (*FloatColumn, error) { + if arrayOffset < 0 { + return nil, fmt.Errorf("arrayOffset is negative") + } + if positionCount < 0 { + return nil, fmt.Errorf("positionCount is negative") + } + if int32(len(values))-arrayOffset < positionCount { + return nil, fmt.Errorf("values length is less than positionCount") + } + if valueIsNull != nil && int32(len(valueIsNull))-arrayOffset < positionCount { + return nil, fmt.Errorf("isNull length is less than positionCount") + } + return &FloatColumn{ + arrayOffset: arrayOffset, + positionCount: positionCount, + valueIsNull: valueIsNull, + values: values, + }, nil +} + +func (c *FloatColumn) GetDataType() TSDataType { + return FLOAT +} + +func (c *FloatColumn) GetEncoding() ColumnEncoding { + return INT32_ARRAY_COLUMN_ENCODING +} + +func (c *FloatColumn) GetFloat(position int32) (float32, error) { + return c.values[position+c.arrayOffset], nil +} + +func (c *FloatColumn) GetFloats() ([]float32, error) { + return c.values, nil +} + +func (c *FloatColumn) GetObject(position int32) (interface{}, error) { + return c.GetFloat(position) +} + +func (c *FloatColumn) MayHaveNull() bool { + return c.valueIsNull != nil +} + +func (c *FloatColumn) IsNull(position int32) bool { + return c.valueIsNull != nil && c.valueIsNull[position+c.arrayOffset] +} + +func (c *FloatColumn) IsNulls() []bool { + if c.valueIsNull != nil { + return c.valueIsNull + } + result := make([]bool, c.positionCount) + for i := int32(0); i < c.positionCount; i++ { + result[i] = false + } + return result +} + +func (c *FloatColumn) GetPositionCount() int32 { + return c.positionCount +} + +type LongColumn struct { + baseColumn + arrayOffset int32 + positionCount int32 + valueIsNull []bool + values []int64 +} + +func NewLongColumn(arrayOffset int32, positionCount int32, valueIsNull []bool, values []int64) (*LongColumn, error) { + if arrayOffset < 0 { + return nil, fmt.Errorf("arrayOffset is negative") + } + if positionCount < 0 { + return nil, fmt.Errorf("positionCount is negative") + } + if int32(len(values))-arrayOffset < positionCount { + return nil, fmt.Errorf("values length is less than positionCount") + } + if valueIsNull != nil && int32(len(valueIsNull))-arrayOffset < positionCount { + return nil, fmt.Errorf("isNull length is less than positionCount") + } + return &LongColumn{ + arrayOffset: arrayOffset, + positionCount: positionCount, + valueIsNull: valueIsNull, + values: values, + }, nil +} + +func (c *LongColumn) GetDataType() TSDataType { + return INT64 +} + +func (c *LongColumn) GetEncoding() ColumnEncoding { + return INT64_ARRAY_COLUMN_ENCODING +} + +func (c *LongColumn) GetLong(position int32) (int64, error) { + return c.values[position+c.arrayOffset], nil +} + +func (c *LongColumn) GetLongs() ([]int64, error) { + return c.values, nil +} + +func (c *LongColumn) GetObject(position int32) (interface{}, error) { + return c.GetLong(position) +} + +func (c *LongColumn) MayHaveNull() bool { + return c.valueIsNull != nil +} + +func (c *LongColumn) IsNull(position int32) bool { + return c.valueIsNull != nil && c.valueIsNull[position+c.arrayOffset] +} + +func (c *LongColumn) IsNulls() []bool { + if c.valueIsNull != nil { + return c.valueIsNull + } + result := make([]bool, c.positionCount) + for i := int32(0); i < c.positionCount; i++ { + result[i] = false + } + return result +} + +func (c *LongColumn) GetPositionCount() int32 { + return c.positionCount +} + +type DoubleColumn struct { + baseColumn + arrayOffset int32 + positionCount int32 + valueIsNull []bool + values []float64 +} + +func NewDoubleColumn(arrayOffset int32, positionCount int32, valueIsNull []bool, values []float64) (*DoubleColumn, error) { + if arrayOffset < 0 { + return nil, fmt.Errorf("arrayOffset is negative") + } + if positionCount < 0 { + return nil, fmt.Errorf("positionCount is negative") + } + if int32(len(values))-arrayOffset < positionCount { + return nil, fmt.Errorf("values length is less than positionCount") + } + if valueIsNull != nil && int32(len(valueIsNull))-arrayOffset < positionCount { + return nil, fmt.Errorf("isNull length is less than positionCount") + } + return &DoubleColumn{ + arrayOffset: arrayOffset, + positionCount: positionCount, + valueIsNull: valueIsNull, + values: values, + }, nil +} + +func (c *DoubleColumn) GetDataType() TSDataType { + return DOUBLE +} + +func (c *DoubleColumn) GetEncoding() ColumnEncoding { + return INT64_ARRAY_COLUMN_ENCODING +} + +func (c *DoubleColumn) GetDouble(position int32) (float64, error) { + return c.values[position+c.arrayOffset], nil +} + +func (c *DoubleColumn) GetDoubles() ([]float64, error) { + return c.values, nil +} + +func (c *DoubleColumn) GetObject(position int32) (interface{}, error) { + return c.GetDouble(position) +} + +func (c *DoubleColumn) MayHaveNull() bool { + return c.valueIsNull != nil +} + +func (c *DoubleColumn) IsNull(position int32) bool { + return c.valueIsNull != nil && c.valueIsNull[position+c.arrayOffset] +} + +func (c *DoubleColumn) IsNulls() []bool { + if c.valueIsNull != nil { + return c.valueIsNull + } + result := make([]bool, c.positionCount) + for i := int32(0); i < c.positionCount; i++ { + result[i] = false + } + return result +} + +func (c *DoubleColumn) GetPositionCount() int32 { + return c.positionCount +} + +type BooleanColumn struct { + baseColumn + arrayOffset int32 + positionCount int32 + valueIsNull []bool + values []bool +} + +func NewBooleanColumn(arrayOffset int32, positionCount int32, valueIsNull []bool, values []bool) (*BooleanColumn, error) { + if arrayOffset < 0 { + return nil, fmt.Errorf("arrayOffset is negative") + } + if positionCount < 0 { + return nil, fmt.Errorf("positionCount is negative") + } + if int32(len(values))-arrayOffset < positionCount { + return nil, fmt.Errorf("values length is less than positionCount") + } + if valueIsNull != nil && int32(len(valueIsNull))-arrayOffset < positionCount { + return nil, fmt.Errorf("isNull length is less than positionCount") + } + return &BooleanColumn{ + arrayOffset: arrayOffset, + positionCount: positionCount, + valueIsNull: valueIsNull, + values: values, + }, nil +} + +func (c *BooleanColumn) GetDataType() TSDataType { + return BOOLEAN +} + +func (c *BooleanColumn) GetEncoding() ColumnEncoding { + return BYTE_ARRAY_COLUMN_ENCODING +} + +func (c *BooleanColumn) GetBoolean(position int32) (bool, error) { + return c.values[position+c.arrayOffset], nil +} + +func (c *BooleanColumn) GetBooleans() ([]bool, error) { + return c.values, nil +} + +func (c *BooleanColumn) GetObject(position int32) (interface{}, error) { + return c.GetBoolean(position) +} + +func (c *BooleanColumn) MayHaveNull() bool { + return c.valueIsNull != nil +} + +func (c *BooleanColumn) IsNull(position int32) bool { + return c.valueIsNull != nil && c.valueIsNull[position+c.arrayOffset] +} + +func (c *BooleanColumn) IsNulls() []bool { + if c.valueIsNull != nil { + return c.valueIsNull + } + result := make([]bool, c.positionCount) + for i := int32(0); i < c.positionCount; i++ { + result[i] = false + } + return result +} + +func (c *BooleanColumn) GetPositionCount() int32 { + return c.positionCount +} + +type RunLengthEncodedColumn struct { + baseColumn + value Column + positionCount int32 +} + +func NewRunLengthEncodedColumn(value Column, positionCount int32) (*RunLengthEncodedColumn, error) { + if value == nil { + return nil, fmt.Errorf("value is null") + } + if value.GetPositionCount() != 1 { + return nil, fmt.Errorf("expected value to contain a single position but has %v positions", value.GetPositionCount()) + } + if positionCount < 0 { + return nil, fmt.Errorf("positionCount is negative") + } + column := new(RunLengthEncodedColumn) + switch (value).(type) { + case *RunLengthEncodedColumn: + column.value = (value.(*RunLengthEncodedColumn)).GetValue() + default: + column.value = value + } + column.positionCount = positionCount + return column, nil +} + +func (c *RunLengthEncodedColumn) GetValue() Column { + return c.value +} + +func (c *RunLengthEncodedColumn) GetDataType() TSDataType { + return c.value.GetDataType() +} + +func (c *RunLengthEncodedColumn) GetEncoding() ColumnEncoding { + return RLE_COLUMN_ENCODING +} + +func (c *RunLengthEncodedColumn) GetBoolean(_ int32) (bool, error) { + return c.value.GetBoolean(0) +} + +func (c *RunLengthEncodedColumn) GetInt(_ int32) (int32, error) { + return c.value.GetInt(0) +} + +func (c *RunLengthEncodedColumn) GetLong(_ int32) (int64, error) { + return c.value.GetLong(0) +} + +func (c *RunLengthEncodedColumn) GetFloat(_ int32) (float32, error) { + return c.value.GetFloat(0) +} + +func (c *RunLengthEncodedColumn) GetDouble(_ int32) (float64, error) { + return c.value.GetDouble(0) +} + +func (c *RunLengthEncodedColumn) GetBinary(_ int32) (*Binary, error) { + return c.value.GetBinary(0) +} + +func (c *RunLengthEncodedColumn) GetObject(_ int32) (interface{}, error) { + return c.value.GetObject(0) +} + +func (c *RunLengthEncodedColumn) GetBooleans() ([]bool, error) { + v, err := c.value.GetBoolean(0) + if err != nil { + return nil, err + } + result := make([]bool, c.positionCount) + for i := int32(0); i < c.positionCount; i++ { + result[i] = v + } + return result, err +} + +func (c *RunLengthEncodedColumn) GetInts() ([]int32, error) { + v, err := c.value.GetInt(0) + if err != nil { + return nil, err + } + result := make([]int32, c.positionCount) + for i := int32(0); i < c.positionCount; i++ { + result[i] = v + } + return result, err +} + +func (c *RunLengthEncodedColumn) GetLongs() ([]int64, error) { + v, err := c.value.GetLong(0) + if err != nil { + return nil, err + } + result := make([]int64, c.positionCount) + for i := int32(0); i < c.positionCount; i++ { + result[i] = v + } + return result, err +} + +func (c *RunLengthEncodedColumn) GetFloats() ([]float32, error) { + v, err := c.value.GetFloat(0) + if err != nil { + return nil, err + } + result := make([]float32, c.positionCount) + for i := int32(0); i < c.positionCount; i++ { + result[i] = v + } + return result, err +} + +func (c *RunLengthEncodedColumn) GetDoubles() ([]float64, error) { + v, err := c.value.GetDouble(0) + if err != nil { + return nil, err + } + result := make([]float64, c.positionCount) + for i := int32(0); i < c.positionCount; i++ { + result[i] = v + } + return result, err +} + +func (c *RunLengthEncodedColumn) GetBinaries() ([]*Binary, error) { + v, err := c.value.GetBinary(0) + if err != nil { + return nil, err + } + result := make([]*Binary, c.positionCount) + for i := int32(0); i < c.positionCount; i++ { + result[i] = v + } + return result, err +} + +func (c *RunLengthEncodedColumn) GetObjects() ([]interface{}, error) { + v, err := c.value.GetObject(0) + if err != nil { + return nil, err + } + result := make([]interface{}, c.positionCount) + for i := int32(0); i < c.positionCount; i++ { + result[i] = v + } + return result, err +} + +func (c *RunLengthEncodedColumn) MayHaveNull() bool { + return c.value.MayHaveNull() +} + +func (c *RunLengthEncodedColumn) IsNull(_ int32) bool { + return c.value.IsNull(0) +} + +func (c *RunLengthEncodedColumn) IsNulls() []bool { + result := make([]bool, c.positionCount) + v := c.value.IsNull(0) + for i := int32(0); i < c.positionCount; i++ { + result[i] = v + } + return result +} + +func (c *RunLengthEncodedColumn) GetPositionCount() int32 { + return c.positionCount +} diff --git a/client/column_decoder.go b/client/column_decoder.go new file mode 100644 index 0000000..3367911 --- /dev/null +++ b/client/column_decoder.go @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package client + +import ( + "bytes" + "encoding/binary" + "fmt" +) + +type ColumnDecoder interface { + ReadTimeColumn(reader *bytes.Reader, positionCount int32) (*TimeColumn, error) + ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error) +} + +func deserializeNullIndicators(reader *bytes.Reader, positionCount int32) ([]bool, error) { + b, err := reader.ReadByte() + if err != nil { + return nil, err + } + mayHaveNull := b != 0 + if !mayHaveNull { + return nil, nil + } + return deserializeBooleanArray(reader, positionCount) +} + +func deserializeBooleanArray(reader *bytes.Reader, size int32) ([]bool, error) { + packedSize := (size + 7) / 8 + packedBytes := make([]byte, packedSize) + + _, err := reader.Read(packedBytes) + if err != nil { + return nil, err + } + + // read null bits 8 at a time + output := make([]bool, size) + currentByte := 0 + fullGroups := int(size) & ^0b111 + for pos := 0; pos < fullGroups; pos += 8 { + b := packedBytes[currentByte] + currentByte++ + + output[pos+0] = (b & 0b10000000) != 0 + output[pos+1] = (b & 0b01000000) != 0 + output[pos+2] = (b & 0b00100000) != 0 + output[pos+3] = (b & 0b00010000) != 0 + output[pos+4] = (b & 0b00001000) != 0 + output[pos+5] = (b & 0b00000100) != 0 + output[pos+6] = (b & 0b00000010) != 0 + output[pos+7] = (b & 0b00000001) != 0 + } + + // read last null bits + if remaining := int(size) % 8; remaining > 0 { + b := packedBytes[len(packedBytes)-1] + mask := uint8(0b10000000) + + for pos := fullGroups; pos < int(size); pos++ { + output[pos] = (b & mask) != 0 + mask >>= 1 + } + } + + return output, nil +} + +type baseColumnDecoder struct{} + +func (_ *baseColumnDecoder) ReadTimeColumn(_ *bytes.Reader, _ int32) (*TimeColumn, error) { + return nil, fmt.Errorf("unsupported operation: ReadTimeColumn") +} + +type Int32ArrayColumnDecoder struct { + baseColumnDecoder +} + +func (decoder *Int32ArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error) { + // Serialized data layout: + // +---------------+-----------------+-------------+ + // | may have null | null indicators | values | + // +---------------+-----------------+-------------+ + // | byte | list[byte] | list[int32] | + // +---------------+-----------------+-------------+ + nullIndicators, err := deserializeNullIndicators(reader, positionCount) + if err != nil { + return nil, err + } + switch dataType { + case INT32, DATE: + intValues := make([]int32, positionCount) + for i := int32(0); i < positionCount; i++ { + if nullIndicators != nil && nullIndicators[i] { + continue + } + err := binary.Read(reader, binary.BigEndian, &intValues[i]) + if err != nil { + return nil, err + } + } + return NewIntColumn(0, positionCount, nullIndicators, intValues) + case FLOAT: + floatValues := make([]float32, positionCount) + for i := int32(0); i < positionCount; i++ { + if nullIndicators != nil && nullIndicators[i] { + continue + } + err := binary.Read(reader, binary.BigEndian, &floatValues[i]) + if err != nil { + return nil, err + } + } + return NewFloatColumn(0, positionCount, nullIndicators, floatValues) + } + return nil, fmt.Errorf("invalid data type: %v", dataType) +} + +type Int64ArrayColumnDecoder struct { + baseColumnDecoder +} + +func (decoder *Int64ArrayColumnDecoder) ReadTimeColumn(reader *bytes.Reader, positionCount int32) (*TimeColumn, error) { + // Serialized data layout: + // +---------------+-----------------+-------------+ + // | may have null | null indicators | values | + // +---------------+-----------------+-------------+ + // | byte | list[byte] | list[int64] | + // +---------------+-----------------+-------------+ + + nullIndicators, err := deserializeNullIndicators(reader, positionCount) + if err != nil { + return nil, err + } + if nullIndicators != nil { + return nil, fmt.Errorf("time column should not contain null values") + } + values := make([]int64, positionCount) + for i := int32(0); i < positionCount; i++ { + err = binary.Read(reader, binary.BigEndian, &values[i]) + } + return NewTimeColumn(0, positionCount, values) +} + +func (decoder *Int64ArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error) { + // Serialized data layout: + // +---------------+-----------------+-------------+ + // | may have null | null indicators | values | + // +---------------+-----------------+-------------+ + // | byte | list[byte] | list[int64] | + // +---------------+-----------------+-------------+ + nullIndicators, err := deserializeNullIndicators(reader, positionCount) + if err != nil { + return nil, err + } + switch dataType { + case INT64, TIMESTAMP: + values := make([]int64, positionCount) + for i := int32(0); i < positionCount; i++ { + if nullIndicators != nil && nullIndicators[i] { + continue + } + if err = binary.Read(reader, binary.BigEndian, &values[i]); err != nil { + return nil, err + } + } + return NewLongColumn(0, positionCount, nullIndicators, values) + case DOUBLE: + values := make([]float64, positionCount) + for i := int32(0); i < positionCount; i++ { + if nullIndicators != nil && nullIndicators[i] { + continue + } + if err = binary.Read(reader, binary.BigEndian, &values[i]); err != nil { + return nil, err + } + } + return NewDoubleColumn(0, positionCount, nullIndicators, values) + } + return nil, fmt.Errorf("invalid data type: %v", dataType) +} + +type ByteArrayColumnDecoder struct { + baseColumnDecoder +} + +func (decoder *ByteArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error) { + // Serialized data layout: + // +---------------+-----------------+-------------+ + // | may have null | null indicators | values | + // +---------------+-----------------+-------------+ + // | byte | list[byte] | list[byte] | + // +---------------+-----------------+-------------+ + + if dataType != BOOLEAN { + return nil, fmt.Errorf("invalid data type: %v", dataType) + } + nullIndicators, err := deserializeNullIndicators(reader, positionCount) + if err != nil { + return nil, err + } + values, err := deserializeBooleanArray(reader, positionCount) + if err != nil { + return nil, err + } + return NewBooleanColumn(0, positionCount, nullIndicators, values) +} + +type BinaryArrayColumnDecoder struct { + baseColumnDecoder +} + +func (decoder *BinaryArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error) { + // Serialized data layout: + // +---------------+-----------------+-------------+ + // | may have null | null indicators | values | + // +---------------+-----------------+-------------+ + // | byte | list[byte] | list[entry] | + // +---------------+-----------------+-------------+ + // + // Each entry is represented as: + // +---------------+-------+ + // | value length | value | + // +---------------+-------+ + // | int32 | bytes | + // +---------------+-------+ + + if TEXT != dataType { + return nil, fmt.Errorf("invalid data type: %v", dataType) + } + nullIndicators, err := deserializeNullIndicators(reader, positionCount) + if err != nil { + return nil, err + } + values := make([]*Binary, positionCount) + for i := int32(0); i < positionCount; i++ { + if nullIndicators != nil && nullIndicators[i] { + continue + } + var length int32 + err := binary.Read(reader, binary.BigEndian, &length) + if err != nil { + return nil, err + } + value := make([]byte, length) + _, err = reader.Read(value) + if err != nil { + return nil, err + } + values[i] = NewBinary(value) + } + return NewBinaryColumn(0, positionCount, nullIndicators, values) +} + +type RunLengthColumnDecoder struct { + baseColumnDecoder +} + +func (decoder *RunLengthColumnDecoder) ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error) { + // Serialized data layout: + // +-----------+-------------------------+ + // | encoding | serialized inner column | + // +-----------+-------------------------+ + // | byte | list[byte] | + // +-----------+-------------------------+ + columnEncoding, err := deserializeColumnEncoding(reader) + if err != nil { + return nil, err + } + columnDecoder, err := getColumnDecoder(columnEncoding) + if err != nil { + return nil, err + } + column, err := columnDecoder.ReadColumn(reader, dataType, 1) + if err != nil { + return nil, err + } + return NewRunLengthEncodedColumn(column, positionCount) +} diff --git a/client/protocol.go b/client/protocol.go index edc211e..faedfad 100644 --- a/client/protocol.go +++ b/client/protocol.go @@ -19,6 +19,8 @@ package client +import "fmt" + type TSDataType int8 type TSEncoding uint8 @@ -39,6 +41,48 @@ const ( STRING TSDataType = 11 ) +var tsTypeMap = map[string]TSDataType{ + "BOOLEAN": BOOLEAN, + "INT32": INT32, + "INT64": INT64, + "FLOAT": FLOAT, + "DOUBLE": DOUBLE, + "TEXT": TEXT, + "TIMESTAMP": TIMESTAMP, + "DATE": DATE, + "BLOB": BLOB, + "STRING": STRING, +} + +var byteToTsDataType = map[byte]TSDataType{ + 0: BOOLEAN, + 1: INT32, + 2: INT64, + 3: FLOAT, + 4: DOUBLE, + 5: TEXT, + 8: TIMESTAMP, + 9: DATE, + 10: BLOB, + 11: STRING, +} + +func GetDataTypeByStr(name string) (TSDataType, error) { + dataType, exists := tsTypeMap[name] + if !exists { + return UNKNOWN, fmt.Errorf("invalid input: %v", name) + } + return dataType, nil +} + +func getDataTypeByByte(b byte) (TSDataType, error) { + dataType, exists := byteToTsDataType[b] + if !exists { + return UNKNOWN, fmt.Errorf("invalid input: %v", b) + } + return dataType, nil +} + const ( PLAIN TSEncoding = 0 DICTIONARY TSEncoding = 1 @@ -202,3 +246,7 @@ const ( CqAlreadyExist int32 = 1402 CqUpdateLastExecTimeError int32 = 1403 ) + +const ( + TimestampColumnName = "Time" +) diff --git a/client/rpcdataset.go b/client/rpcdataset.go index 11ec81a..0629d21 100644 --- a/client/rpcdataset.go +++ b/client/rpcdataset.go @@ -1,616 +1,600 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - package client import ( "context" - "encoding/binary" - "errors" "fmt" "github.com/apache/iotdb-client-go/common" - "math" - "time" - "github.com/apache/iotdb-client-go/rpc" + "strconv" + "time" ) -const ( - startIndex = 2 - flag = 0x80 -) - -var ( - errClosed error = errors.New("DataSet is Closed") - tsTypeMap map[string]TSDataType = map[string]TSDataType{ - "BOOLEAN": BOOLEAN, - "INT32": INT32, - "INT64": INT64, - "FLOAT": FLOAT, - "DOUBLE": DOUBLE, - "TEXT": TEXT, - "TIMESTAMP": TIMESTAMP, - "DATE": DATE, - "BLOB": BLOB, - "STRING": STRING, - } -) +const startIndex = int32(2) type IoTDBRpcDataSet struct { - columnCount int - sessionId int64 - queryId int64 - lastReadWasNull bool - rowsIndex int - queryDataSet *rpc.TSQueryDataSet sql string - fetchSize int32 + isClosed bool + client *rpc.IClientRPCServiceClient columnNameList []string - columnTypeList []TSDataType + columnTypeList []string columnOrdinalMap map[string]int32 columnTypeDeduplicatedList []TSDataType - currentBitmap []byte - time []byte - values [][]byte - client *rpc.IClientRPCServiceClient - emptyResultSet bool - ignoreTimeStamp bool - closed bool - timeoutMs *int64 -} + fetchSize int32 + timeout *int64 + hasCachedRecord bool + lastReadWasNull bool -func (s *IoTDBRpcDataSet) getColumnIndex(columnName string) int32 { - if s.closed { - return -1 - } - return s.columnOrdinalMap[columnName] - startIndex + columnSize int32 + + sessionId int64 + queryId int64 + statementId int64 + time int64 + ignoreTimestamp bool + // indicates that there is still more data in server side and we can call fetchResult to get more + moreData bool + + queryResult [][]byte + curTsBlock *TsBlock + queryResultSize int32 // the length of queryResult + queryResultIndex int32 // the index of bytebuffer in queryResult + tsBlockSize int32 // the size of current tsBlock + tsBlockIndex int32 // the row index in current tsBlock } -func (s *IoTDBRpcDataSet) getColumnType(columnName string) TSDataType { - if s.closed { - return UNKNOWN +func NewIoTDBRpcDataSet(sql string, columnNameList []string, columnTypeList []string, columnNameIndex map[string]int32, ignoreTimestamp bool, moreData bool, queryId int64, statementId int64, client *rpc.IClientRPCServiceClient, sessionId int64, queryResult [][]byte, fetchSize int32, timeout *int64) (rpcDataSet *IoTDBRpcDataSet, err error) { + ds := &IoTDBRpcDataSet{ + sessionId: sessionId, + statementId: statementId, + ignoreTimestamp: ignoreTimestamp, + sql: sql, + queryId: queryId, + client: client, + fetchSize: fetchSize, + timeout: timeout, + moreData: moreData, + columnSize: int32(len(columnNameList)), + columnNameList: make([]string, 0, len(columnNameList)+1), + columnTypeList: make([]string, 0, len(columnTypeList)+1), + columnOrdinalMap: make(map[string]int32), + } + if !ignoreTimestamp { + ds.columnNameList = append(ds.columnNameList, TimestampColumnName) + ds.columnTypeList = append(ds.columnTypeList, "INT64") + ds.columnOrdinalMap[TimestampColumnName] = 1 } - return s.columnTypeDeduplicatedList[s.getColumnIndex(columnName)] -} - -func (s *IoTDBRpcDataSet) isNullWithColumnName(columnName string) bool { - return s.isNull(int(s.getColumnIndex(columnName)), s.rowsIndex-1) -} + ds.columnNameList = append(ds.columnNameList, columnNameList...) + ds.columnTypeList = append(ds.columnTypeList, columnTypeList...) -func (s *IoTDBRpcDataSet) isNull(columnIndex int, rowIndex int) bool { - if s.closed { - return true - } - bitmap := s.currentBitmap[columnIndex] - shift := rowIndex % 8 - return ((flag >> shift) & (bitmap & 0xff)) == 0 -} + if columnNameIndex != nil { + deduplicatedColumnSize := getDeduplicatedColumnSize(columnNameIndex) + ds.columnTypeDeduplicatedList = make([]TSDataType, deduplicatedColumnSize) + for i, name := range columnNameList { + if _, exists := ds.columnOrdinalMap[name]; exists { + continue + } -func (s *IoTDBRpcDataSet) constructOneRow() error { - if s.closed { - return errClosed - } + index := columnNameIndex[name] + targetIndex := index + startIndex - // simulating buffer, read 8 bytes from data set and discard first 8 bytes which have been read. - s.time = s.queryDataSet.Time[:8] - s.queryDataSet.Time = s.queryDataSet.Time[8:] + valueExists := false + for _, v := range ds.columnOrdinalMap { + if v == targetIndex { + valueExists = true + break + } + } - for i := 0; i < len(s.queryDataSet.BitmapList); i++ { - bitmapBuffer := s.queryDataSet.BitmapList[i] - if s.rowsIndex%8 == 0 { - s.currentBitmap[i] = bitmapBuffer[0] - s.queryDataSet.BitmapList[i] = bitmapBuffer[1:] + if !valueExists { + if int(index) < len(ds.columnTypeDeduplicatedList) { + if ds.columnTypeDeduplicatedList[index], err = GetDataTypeByStr(columnTypeList[i]); err != nil { + return nil, err + } + } + } + ds.columnOrdinalMap[name] = targetIndex } - if !s.isNull(i, s.rowsIndex) { - valueBuffer := s.queryDataSet.ValueList[i] - dataType := s.columnTypeDeduplicatedList[i] - switch dataType { - case BOOLEAN: - s.values[i] = valueBuffer[:1] - s.queryDataSet.ValueList[i] = valueBuffer[1:] - case INT32, DATE: - s.values[i] = valueBuffer[:4] - s.queryDataSet.ValueList[i] = valueBuffer[4:] - case INT64, TIMESTAMP: - s.values[i] = valueBuffer[:8] - s.queryDataSet.ValueList[i] = valueBuffer[8:] - case FLOAT: - s.values[i] = valueBuffer[:4] - s.queryDataSet.ValueList[i] = valueBuffer[4:] - case DOUBLE: - s.values[i] = valueBuffer[:8] - s.queryDataSet.ValueList[i] = valueBuffer[8:] - case TEXT, BLOB, STRING: - length := bytesToInt32(valueBuffer[:4]) - s.values[i] = valueBuffer[4 : 4+length] - s.queryDataSet.ValueList[i] = valueBuffer[4+length:] - default: - return fmt.Errorf("unsupported data type %d", dataType) + } else { + ds.columnTypeDeduplicatedList = make([]TSDataType, 0) + index := startIndex + for i := 0; i < len(columnNameList); i++ { + name := columnNameList[i] + if _, exists := ds.columnOrdinalMap[name]; !exists { + dataType, err := GetDataTypeByStr(columnTypeList[i]) + if err != nil { + return nil, err + } + ds.columnTypeDeduplicatedList = append(ds.columnTypeDeduplicatedList, dataType) + ds.columnOrdinalMap[name] = int32(index) + index++ } } } - s.rowsIndex++ - return nil + ds.queryResult = queryResult + if queryResult != nil { + ds.queryResultSize = int32(len(queryResult)) + } else { + ds.queryResultSize = 0 + } + ds.queryResultIndex = 0 + ds.tsBlockSize = 0 + ds.tsBlockIndex = -1 + return ds, nil } -func (s *IoTDBRpcDataSet) GetTimestamp() int64 { - if s.closed { - return -1 +func getDeduplicatedColumnSize(columnNameList map[string]int32) int { + uniqueIndexes := make(map[int32]struct{}) + for _, idx := range columnNameList { + uniqueIndexes[idx] = struct{}{} } - return bytesToInt64(s.time) + return len(uniqueIndexes) } -func (s *IoTDBRpcDataSet) getText(columnName string) string { - if s.closed { - return "" +func (s *IoTDBRpcDataSet) Close() (err error) { + if s.isClosed { + return nil } - if columnName == TimestampColumnName { - return time.Unix(0, bytesToInt64(s.time)*1000000).Format(time.RFC3339) + closeRequest := &rpc.TSCloseOperationReq{ + SessionId: s.sessionId, + StatementId: &s.statementId, + QueryId: &s.queryId, } - columnIndex := s.getColumnIndex(columnName) - if columnIndex < 0 || int(columnIndex) >= len(s.values) || s.isNull(int(columnIndex), s.rowsIndex-1) { - s.lastReadWasNull = true - return "" + var status *common.TSStatus + status, err = s.client.CloseOperation(context.Background(), closeRequest) + if err == nil { + err = VerifySuccess(status) } - s.lastReadWasNull = false - return s.getString(int(columnIndex), s.columnTypeDeduplicatedList[columnIndex]) + s.client = nil + s.isClosed = true + return err } -func (s *IoTDBRpcDataSet) getString(columnIndex int, dataType TSDataType) string { - if s.closed { - return "" +func (s *IoTDBRpcDataSet) Next() (result bool, err error) { + if s.hasCachedBlock() { + s.lastReadWasNull = false + err = s.constructOneRow() + return true, err } - valueBytes := s.values[columnIndex] - switch dataType { - case BOOLEAN: - if valueBytes[0] != 0 { - return "true" + if s.hasCachedByteBuffer() { + if err = s.constructOneTsBlock(); err != nil { + return false, err } - return "false" - case INT32: - return int32ToString(bytesToInt32(valueBytes)) - case INT64, TIMESTAMP: - return int64ToString(bytesToInt64(valueBytes)) - case FLOAT: - bits := binary.BigEndian.Uint32(valueBytes) - return float32ToString(math.Float32frombits(bits)) - case DOUBLE: - bits := binary.BigEndian.Uint64(valueBytes) - return float64ToString(math.Float64frombits(bits)) - case TEXT, STRING: - return string(valueBytes) - case BLOB: - return bytesToHexString(valueBytes) - case DATE: - date, err := bytesToDate(valueBytes) + err = s.constructOneRow() + return true, err + } + + if s.moreData { + hasResultSet, err := s.fetchResults() if err != nil { - return "" + return false, err + } + if hasResultSet && s.hasCachedByteBuffer() { + if err = s.constructOneTsBlock(); err != nil { + return false, err + } + err = s.constructOneRow() + return true, err } - return date.Format("2006-01-02") - default: - return "" } + err = s.Close() + if err != nil { + return false, err + } + return false, nil } -func (s *IoTDBRpcDataSet) getValue(columnName string) interface{} { - if s.closed { - return nil +func (s *IoTDBRpcDataSet) fetchResults() (bool, error) { + if s.isClosed { + return false, fmt.Errorf("this data set is already closed") } - columnIndex := int(s.getColumnIndex(columnName)) - if s.isNull(columnIndex, s.rowsIndex-1) { - return nil + req := rpc.TSFetchResultsReq{ + SessionId: s.sessionId, + Statement: s.sql, + FetchSize: s.fetchSize, + QueryId: s.queryId, + IsAlign: true, } + req.Timeout = s.timeout - dataType := s.getColumnType(columnName) - valueBytes := s.values[columnIndex] - switch dataType { - case BOOLEAN: - return valueBytes[0] != 0 - case INT32: - return bytesToInt32(valueBytes) - case INT64, TIMESTAMP: - return bytesToInt64(valueBytes) - case FLOAT: - bits := binary.BigEndian.Uint32(valueBytes) - return math.Float32frombits(bits) - case DOUBLE: - bits := binary.BigEndian.Uint64(valueBytes) - return math.Float64frombits(bits) - case TEXT, STRING: - return string(valueBytes) - case BLOB: - return valueBytes - case DATE: - date, err := bytesToDate(valueBytes) - if err != nil { - return nil - } - return date - default: - return nil + resp, err := s.client.FetchResultsV2(context.Background(), &req) + + if err != nil { + return false, err } -} -func (s *IoTDBRpcDataSet) getRowRecord() (*RowRecord, error) { - if s.closed { - return nil, errClosed + if err = VerifySuccess(resp.Status); err != nil { + return false, err } - fields := make([]*Field, s.columnCount) - for i := 0; i < s.columnCount; i++ { - columnName := s.columnNameList[i] - field := Field{ - name: columnName, - dataType: s.getColumnType(columnName), - value: s.getValue(columnName), + if !resp.HasResultSet { + err = s.Close() + } else { + s.queryResult = resp.GetQueryResult_() + s.queryResultIndex = 0 + if s.queryResult != nil { + s.queryResultSize = int32(len(s.queryResult)) + } else { + s.queryResultSize = 0 } - fields[i] = &field + s.tsBlockSize = 0 + s.tsBlockIndex = -1 } - return &RowRecord{ - timestamp: s.GetTimestamp(), - fields: fields, - }, nil + return resp.HasResultSet, err } -func (s *IoTDBRpcDataSet) getBool(columnName string) bool { - if s.closed { - return false - } - columnIndex := s.getColumnIndex(columnName) - if !s.isNull(int(columnIndex), s.rowsIndex-1) { - return s.values[columnIndex][0] != 0 +func (s *IoTDBRpcDataSet) hasCachedBlock() bool { + return s.curTsBlock != nil && s.tsBlockIndex < s.tsBlockSize-1 +} + +func (s *IoTDBRpcDataSet) hasCachedByteBuffer() bool { + return s.queryResult != nil && s.queryResultIndex < s.queryResultSize +} + +func (s *IoTDBRpcDataSet) constructOneRow() (err error) { + s.tsBlockIndex++ + s.hasCachedRecord = true + s.time, err = s.curTsBlock.GetTimeColumn().GetLong(s.tsBlockIndex) + return err +} + +func (s *IoTDBRpcDataSet) constructOneTsBlock() (err error) { + s.lastReadWasNull = false + curTsBlockBytes := s.queryResult[s.queryResultIndex] + s.queryResultIndex = s.queryResultIndex + 1 + s.curTsBlock, err = DeserializeTsBlock(curTsBlockBytes) + if err != nil { + return err } - s.lastReadWasNull = true - return false + s.tsBlockIndex = -1 + s.tsBlockSize = s.curTsBlock.GetPositionCount() + return nil } -func (s *IoTDBRpcDataSet) scan(dest ...interface{}) error { - if s.closed { - return errClosed +func (s *IoTDBRpcDataSet) isNullByIndex(columnIndex int32) (bool, error) { + columnName, err := s.findColumnNameByIndex(columnIndex) + if err != nil { + return false, err } + index := s.columnOrdinalMap[columnName] - startIndex + // time column will never be null + if index < 0 { + return false, nil + } + return s.isNull(index, s.tsBlockIndex), nil +} - count := s.columnCount - if count > len(dest) { - count = len(dest) +func (s *IoTDBRpcDataSet) isNullByColumnName(columnName string) bool { + index := s.columnOrdinalMap[columnName] - startIndex + // time column will never be null + if index < 0 { + return false } + return s.isNull(index, s.tsBlockIndex) +} - for i := 0; i < count; i++ { - columnName := s.columnNameList[i] - columnIndex := int(s.getColumnIndex(columnName)) - if s.isNull(columnIndex, s.rowsIndex-1) { - continue - } +func (s *IoTDBRpcDataSet) isNull(index int32, rowNum int32) bool { + return s.curTsBlock.GetColumn(index).IsNull(rowNum) +} - dataType := s.getColumnType(columnName) - d := dest[i] - valueBytes := s.values[columnIndex] - switch dataType { - case BOOLEAN: - switch t := d.(type) { - case *bool: - *t = valueBytes[0] != 0 - case *string: - if valueBytes[0] != 0 { - *t = "true" - } else { - *t = "false" - } - default: - return fmt.Errorf("dest[%d] types must be *bool or *string", i) - } +func (s *IoTDBRpcDataSet) getBooleanByIndex(columnIndex int32) (bool, error) { + columnName, err := s.findColumnNameByIndex(columnIndex) + if err != nil { + return false, err + } + return s.getBoolean(columnName) +} - case INT32: - switch t := d.(type) { - case *int32: - *t = bytesToInt32(valueBytes) - case *string: - *t = int32ToString(bytesToInt32(valueBytes)) - default: - return fmt.Errorf("dest[%d] types must be *int32 or *string", i) - } - case INT64, TIMESTAMP: - switch t := d.(type) { - case *int64: - *t = bytesToInt64(valueBytes) - case *string: - *t = int64ToString(bytesToInt64(valueBytes)) - default: - return fmt.Errorf("dest[%d] types must be *int64 or *string", i) - } - case FLOAT: - switch t := d.(type) { - case *float32: - bits := binary.BigEndian.Uint32(valueBytes) - *t = math.Float32frombits(bits) - case *string: - bits := binary.BigEndian.Uint32(valueBytes) - *t = float32ToString(math.Float32frombits(bits)) - default: - return fmt.Errorf("dest[%d] types must be *float32 or *string", i) - } - case DOUBLE: - switch t := d.(type) { - case *float64: - bits := binary.BigEndian.Uint64(valueBytes) - *t = math.Float64frombits(bits) - case *string: - bits := binary.BigEndian.Uint64(valueBytes) - *t = float64ToString(math.Float64frombits(bits)) - default: - return fmt.Errorf("dest[%d] types must be *float64 or *string", i) - } - case TEXT, STRING: - switch t := d.(type) { - case *[]byte: - *t = valueBytes - case *string: - *t = string(valueBytes) - default: - return fmt.Errorf("dest[%d] types must be *[]byte or *string", i) - } - case BLOB: - switch t := d.(type) { - case *[]byte: - *t = valueBytes - case *string: - *t = bytesToHexString(valueBytes) - default: - return fmt.Errorf("dest[%d] types must be *[]byte or *string", i) - } - case DATE: - switch t := d.(type) { - case *time.Time: - *t, _ = bytesToDate(valueBytes) - case *string: - *t = int32ToString(bytesToInt32(valueBytes)) - date, err := bytesToDate(valueBytes) - if err != nil { - *t = "" - } - *t = date.Format("2006-01-02") - default: - return fmt.Errorf("dest[%d] types must be *time.Time or *string", i) - } - default: - return nil - } +func (s *IoTDBRpcDataSet) getBoolean(columnName string) (bool, error) { + if err := s.checkRecord(); err != nil { + return false, err } - return nil + index := s.columnOrdinalMap[columnName] - startIndex + if !s.isNull(index, s.tsBlockIndex) { + s.lastReadWasNull = false + return s.curTsBlock.GetColumn(index).GetBoolean(s.tsBlockIndex) + } else { + s.lastReadWasNull = true + return false, nil + } +} + +func (s *IoTDBRpcDataSet) getDoubleByIndex(columnIndex int32) (float64, error) { + columnName, err := s.findColumnNameByIndex(columnIndex) + if err != nil { + return 0, err + } + return s.getDouble(columnName) } -func (s *IoTDBRpcDataSet) getFloat(columnName string) float32 { - if s.closed { - return 0 +func (s *IoTDBRpcDataSet) getDouble(columnName string) (float64, error) { + if err := s.checkRecord(); err != nil { + return 0, err } - columnIndex := s.getColumnIndex(columnName) - if !s.isNull(int(columnIndex), s.rowsIndex-1) { + index := s.columnOrdinalMap[columnName] - startIndex + if !s.isNull(index, s.tsBlockIndex) { s.lastReadWasNull = false - bits := binary.BigEndian.Uint32(s.values[columnIndex]) - return math.Float32frombits(bits) + return s.curTsBlock.GetColumn(index).GetDouble(s.tsBlockIndex) + } else { + s.lastReadWasNull = true + return 0, nil } - s.lastReadWasNull = true - return 0 } -func (s *IoTDBRpcDataSet) getDouble(columnName string) float64 { - if s.closed { - return 0 +func (s *IoTDBRpcDataSet) getFloatByIndex(columnIndex int32) (float32, error) { + columnName, err := s.findColumnNameByIndex(columnIndex) + if err != nil { + return 0, err } - columnIndex := s.getColumnIndex(columnName) + return s.getFloat(columnName) +} - if !s.isNull(int(columnIndex), s.rowsIndex-1) { +func (s *IoTDBRpcDataSet) getFloat(columnName string) (float32, error) { + if err := s.checkRecord(); err != nil { + return 0, err + } + index := s.columnOrdinalMap[columnName] - startIndex + if !s.isNull(index, s.tsBlockIndex) { s.lastReadWasNull = false - bits := binary.BigEndian.Uint64(s.values[columnIndex]) - return math.Float64frombits(bits) + return s.curTsBlock.GetColumn(index).GetFloat(s.tsBlockIndex) + } else { + s.lastReadWasNull = true + return 0, nil + } +} + +func (s *IoTDBRpcDataSet) getIntByIndex(columnIndex int32) (int32, error) { + columnName, err := s.findColumnNameByIndex(columnIndex) + if err != nil { + return 0, err } - s.lastReadWasNull = true - return 0 + return s.getInt(columnName) } -func (s *IoTDBRpcDataSet) getInt32(columnName string) int32 { - if s.closed { - return 0 +func (s *IoTDBRpcDataSet) getInt(columnName string) (int32, error) { + if err := s.checkRecord(); err != nil { + return 0, err } - columnIndex := s.getColumnIndex(columnName) - if !s.isNull(int(columnIndex), s.rowsIndex-1) { + index := s.columnOrdinalMap[columnName] - startIndex + if !s.isNull(index, s.tsBlockIndex) { s.lastReadWasNull = false - return bytesToInt32(s.values[columnIndex]) + dataType := s.curTsBlock.GetColumn(index).GetDataType() + if dataType == INT64 { + if v, err := s.curTsBlock.GetColumn(index).GetLong(s.tsBlockIndex); err != nil { + return 0, err + } else { + return int32(v), nil + } + } + return s.curTsBlock.GetColumn(index).GetInt(s.tsBlockIndex) + } else { + s.lastReadWasNull = true + return 0, nil } +} - s.lastReadWasNull = true - return 0 +func (s *IoTDBRpcDataSet) getLongByIndex(columnIndex int32) (int64, error) { + columnName, err := s.findColumnNameByIndex(columnIndex) + if err != nil { + return 0, err + } + return s.getLong(columnName) } -func (s *IoTDBRpcDataSet) getInt64(columnName string) int64 { - if s.closed { - return 0 +func (s *IoTDBRpcDataSet) getLong(columnName string) (int64, error) { + if err := s.checkRecord(); err != nil { + return 0, err } if columnName == TimestampColumnName { - return bytesToInt64(s.time) + s.lastReadWasNull = false + return s.curTsBlock.GetTimeByIndex(s.tsBlockIndex) + } + index := s.columnOrdinalMap[columnName] - startIndex + if !s.isNull(index, s.tsBlockIndex) { + s.lastReadWasNull = false + return s.curTsBlock.GetColumn(index).GetLong(s.tsBlockIndex) + } else { + s.lastReadWasNull = true + return 0, nil } +} - columnIndex := s.getColumnIndex(columnName) - bys := s.values[columnIndex] +func (s *IoTDBRpcDataSet) getBinaryByIndex(columnIndex int32) (*Binary, error) { + columnName, err := s.findColumnNameByIndex(columnIndex) + if err != nil { + return nil, err + } + return s.getBinary(columnName) +} - if !s.isNull(int(columnIndex), s.rowsIndex-1) { +func (s *IoTDBRpcDataSet) getBinary(columnName string) (*Binary, error) { + if err := s.checkRecord(); err != nil { + return nil, err + } + index := s.columnOrdinalMap[columnName] - startIndex + if !s.isNull(index, s.tsBlockIndex) { s.lastReadWasNull = false - return bytesToInt64(bys) + return s.curTsBlock.GetColumn(index).GetBinary(s.tsBlockIndex) + } else { + s.lastReadWasNull = true + return nil, nil } - s.lastReadWasNull = true - return 0 } -func (s *IoTDBRpcDataSet) hasCachedResults() bool { - if s.closed { - return false +func (s *IoTDBRpcDataSet) getObjectByIndex(columnIndex int32) (interface{}, error) { + columnName, err := s.findColumnNameByIndex(columnIndex) + if err != nil { + return nil, err } - return s.queryDataSet != nil && len(s.queryDataSet.Time) > 0 + return s.getObject(columnName) } -func (s *IoTDBRpcDataSet) next() (bool, error) { - if s.closed { - return false, errClosed +func (s *IoTDBRpcDataSet) getObject(columnName string) (interface{}, error) { + if err := s.checkRecord(); err != nil { + return nil, err } - - if s.hasCachedResults() { - s.constructOneRow() - return true, nil + if columnName == TimestampColumnName { + s.lastReadWasNull = false + if value, err := s.curTsBlock.GetTimeByIndex(s.tsBlockIndex); err != nil { + return nil, err + } else { + return time.Unix(value/1e3, (value%1e3)*1e6), nil + } } - if s.emptyResultSet { - return false, nil + index := s.columnOrdinalMap[columnName] - startIndex + if index < 0 || index >= int32(len(s.columnTypeDeduplicatedList)) || s.isNull(index, s.tsBlockIndex) { + s.lastReadWasNull = true + return nil, nil } + s.lastReadWasNull = false + return s.curTsBlock.GetColumn(index).GetObject(s.tsBlockIndex) +} - r, err := s.fetchResults() - if err == nil && r { - s.constructOneRow() - return true, nil +func (s *IoTDBRpcDataSet) getStringByIndex(columnIndex int32) (string, error) { + columnName, err := s.findColumnNameByIndex(columnIndex) + if err != nil { + return "", err } - return false, nil + return s.getValueByName(columnName) } -func (s *IoTDBRpcDataSet) fetchResults() (bool, error) { - if s.closed { - return false, errClosed - } - s.rowsIndex = 0 - req := rpc.TSFetchResultsReq{ - SessionId: s.sessionId, - Statement: s.sql, - FetchSize: s.fetchSize, - QueryId: s.queryId, - IsAlign: true, - Timeout: s.timeoutMs, +func (s *IoTDBRpcDataSet) getTimestampByIndex(columnIndex int32) (time.Time, error) { + columnName, err := s.findColumnNameByIndex(columnIndex) + if err != nil { + return time.Time{}, err } - resp, err := s.client.FetchResults(context.Background(), &req) + return s.getTimestamp(columnName) +} +func (s *IoTDBRpcDataSet) getTimestamp(columnName string) (time.Time, error) { + longValue, err := s.getLong(columnName) if err != nil { - return false, err + return time.Time{}, err + } + if s.lastReadWasNull { + return time.Time{}, err + } else { + return time.Unix(longValue/1e3, (longValue%1e3)*1e6), nil } +} - if err = VerifySuccess(resp.Status); err != nil { - return false, err +func (s *IoTDBRpcDataSet) GetDateByIndex(columnIndex int32) (time.Time, error) { + columnName, err := s.findColumnNameByIndex(columnIndex) + if err != nil { + return time.Time{}, err } + return s.GetDate(columnName) +} - if !resp.HasResultSet { - s.emptyResultSet = true +func (s *IoTDBRpcDataSet) GetDate(columnName string) (time.Time, error) { + intValue, err := s.getInt(columnName) + if err != nil { + return time.Time{}, err + } + if s.lastReadWasNull { + return time.Time{}, err } else { - s.queryDataSet = resp.GetQueryDataSet() + return Int32ToDate(intValue) } - return resp.HasResultSet, nil } -func (s *IoTDBRpcDataSet) IsClosed() bool { - return s.closed +func (s *IoTDBRpcDataSet) findColumn(columnName string) int32 { + return s.columnOrdinalMap[columnName] } -func (s *IoTDBRpcDataSet) Close() (err error) { - if s.IsClosed() { - return nil +func (s *IoTDBRpcDataSet) getValueByName(columnName string) (string, error) { + err := s.checkRecord() + if err != nil { + return "", err } - if s.client != nil { - closeRequest := &rpc.TSCloseOperationReq{ - SessionId: s.sessionId, - QueryId: &s.queryId, + if columnName == TimestampColumnName { + s.lastReadWasNull = false + if t, err := s.curTsBlock.GetTimeByIndex(s.tsBlockIndex); err != nil { + return "", err + } else { + return int64ToString(t), nil } + } + index := s.columnOrdinalMap[columnName] - startIndex + if index < 0 || index >= int32(len(s.columnTypeDeduplicatedList)) || s.isNull(index, s.tsBlockIndex) { + s.lastReadWasNull = true + return "", err + } + s.lastReadWasNull = false + return s.getString(index, s.columnTypeDeduplicatedList[index]) +} - var status *common.TSStatus - status, err = s.client.CloseOperation(context.Background(), closeRequest) - if err == nil { - err = VerifySuccess(status) +func (s *IoTDBRpcDataSet) getString(index int32, tsDataType TSDataType) (string, error) { + switch tsDataType { + case BOOLEAN: + if v, err := s.curTsBlock.GetColumn(index).GetBoolean(s.tsBlockIndex); err != nil { + return "", nil + } else { + return strconv.FormatBool(v), nil + } + case INT32: + if v, err := s.curTsBlock.GetColumn(index).GetInt(s.tsBlockIndex); err != nil { + return "", err + } else { + return int32ToString(v), nil + } + case INT64, TIMESTAMP: + if v, err := s.curTsBlock.GetColumn(index).GetLong(s.tsBlockIndex); err != nil { + return "", err + } else { + return int64ToString(v), nil + } + case FLOAT: + if v, err := s.curTsBlock.GetColumn(index).GetFloat(s.tsBlockIndex); err != nil { + return "", err + } else { + return float32ToString(v), nil + } + case DOUBLE: + if v, err := s.curTsBlock.GetColumn(index).GetDouble(s.tsBlockIndex); err != nil { + return "", err + } else { + return float64ToString(v), nil } + case TEXT, STRING: + if v, err := s.curTsBlock.GetColumn(index).GetBinary(s.tsBlockIndex); err != nil { + return "", err + } else { + return v.GetStringValue(), nil + } + case BLOB: + if v, err := s.curTsBlock.GetColumn(index).GetBinary(s.tsBlockIndex); err != nil { + return "", err + } else { + return bytesToHexString(v.values), nil + } + case DATE: + v, err := s.curTsBlock.GetColumn(index).GetInt(s.tsBlockIndex) + if err != nil { + return "", err + } + t, err := Int32ToDate(v) + if err != nil { + return "", err + } + return t.Format("2006-01-02"), nil } - - s.columnCount = 0 - s.sessionId = -1 - s.queryId = -1 - s.rowsIndex = -1 - s.queryDataSet = nil - s.sql = "" - s.fetchSize = 0 - s.columnNameList = nil - s.columnTypeList = nil - s.columnOrdinalMap = nil - s.columnTypeDeduplicatedList = nil - s.currentBitmap = nil - s.time = nil - s.values = nil - s.client = nil - s.emptyResultSet = true - s.closed = true - return err + return "", nil } -func NewIoTDBRpcDataSet(sql string, columnNameList []string, columnTypes []string, - columnNameIndex map[string]int32, - queryId int64, client *rpc.IClientRPCServiceClient, sessionId int64, queryDataSet *rpc.TSQueryDataSet, - ignoreTimeStamp bool, fetchSize int32, timeoutMs *int64) *IoTDBRpcDataSet { - - ds := &IoTDBRpcDataSet{ - sql: sql, - columnNameList: columnNameList, - ignoreTimeStamp: ignoreTimeStamp, - queryId: queryId, - client: client, - sessionId: sessionId, - queryDataSet: queryDataSet, - fetchSize: fetchSize, - currentBitmap: make([]byte, len(columnNameList)), - values: make([][]byte, len(columnTypes)), - columnCount: len(columnNameList), - closed: false, - timeoutMs: timeoutMs, - } - - ds.columnTypeList = make([]TSDataType, 0) - - // deduplicate and map - ds.columnOrdinalMap = make(map[string]int32) - if !ignoreTimeStamp { - ds.columnOrdinalMap[TimestampColumnName] = 1 +func (s *IoTDBRpcDataSet) findColumnNameByIndex(columnIndex int32) (string, error) { + if columnIndex <= 0 { + return "", fmt.Errorf("column index should start from 1") + } + if columnIndex > int32(len(s.columnNameList)) { + return "", fmt.Errorf("column index %d out of range %d", columnIndex, len(s.columnNameList)) } + return s.columnNameList[columnIndex-1], nil +} - if columnNameIndex != nil { - ds.columnTypeDeduplicatedList = make([]TSDataType, len(columnNameIndex)) - for i, name := range columnNameList { - columnTypeString := columnTypes[i] - columnDataType := tsTypeMap[columnTypeString] - ds.columnTypeList = append(ds.columnTypeList, columnDataType) - if _, exists := ds.columnOrdinalMap[name]; !exists { - index := columnNameIndex[name] - ds.columnOrdinalMap[name] = index + startIndex - ds.columnTypeDeduplicatedList[index] = tsTypeMap[columnTypeString] - } - } - } else { - ds.columnTypeDeduplicatedList = make([]TSDataType, ds.columnCount) - index := startIndex - for i := 0; i < len(columnNameList); i++ { - name := columnNameList[i] - dataType := tsTypeMap[columnTypes[i]] - ds.columnTypeList = append(ds.columnTypeList, dataType) - ds.columnTypeDeduplicatedList[i] = dataType - if _, exists := ds.columnOrdinalMap[name]; !exists { - ds.columnOrdinalMap[name] = int32(index) - index++ - } - } +func (s *IoTDBRpcDataSet) checkRecord() (err error) { + if s.queryResultIndex > s.queryResultSize || s.tsBlockIndex >= s.tsBlockSize || s.queryResult == nil || s.curTsBlock == nil { + err = fmt.Errorf("no record remains") } - return ds + return err } diff --git a/client/rpcdataset_test.go b/client/rpcdataset_test.go deleted file mode 100644 index 82dd3b0..0000000 --- a/client/rpcdataset_test.go +++ /dev/null @@ -1,666 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package client - -import ( - "bytes" - "reflect" - "testing" - "time" - - "github.com/apache/iotdb-client-go/rpc" -) - -func createIoTDBRpcDataSet() *IoTDBRpcDataSet { - columns := []string{ - "root.ln.device1.restart_count", - "root.ln.device1.price", - "root.ln.device1.tick_count", - "root.ln.device1.temperature", - "root.ln.device1.description", - "root.ln.device1.status", - "root.ln.device1.description_string", - "root.ln.device1.description_blob", - "root.ln.device1.date", - "root.ln.device1.ts", - } - dataTypes := []string{"INT32", "DOUBLE", "INT64", "FLOAT", "TEXT", "BOOLEAN", "STRING", "BLOB", "DATE", "TIMESTAMP"} - columnNameIndex := map[string]int32{ - "root.ln.device1.restart_count": 2, - "root.ln.device1.price": 1, - "root.ln.device1.tick_count": 5, - "root.ln.device1.temperature": 4, - "root.ln.device1.description": 0, - "root.ln.device1.status": 3, - "root.ln.device1.description_string": 6, - "root.ln.device1.description_blob": 7, - "root.ln.device1.date": 8, - "root.ln.device1.ts": 9, - } - var queyrId int64 = 1 - var sessionId int64 = 1 - var client *rpc.IClientRPCServiceClient = nil - queryDataSet := rpc.TSQueryDataSet{ - Time: []byte{0, 0, 1, 118, 76, 52, 0, 236, 0, 0, 1, 118, 76, 52, 25, 228, 0, 0, 1, 118, 76, 52, 41, 42, 0, 0, 1, 118, 76, 52, 243, 148, 0, 0, 1, 118, 76, 95, 98, 255}, - ValueList: [][]byte{ - {0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49}, - {64, 159, 16, 204, 204, 204, 204, 205, 64, 159, 16, 204, 204, 204, 204, 205, 64, 159, 16, 204, 204, 204, 204, 205, 64, 159, 16, 204, 204, 204, 204, 205, 64, 159, 16, 204, 204, 204, 204, 205}, - {0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1}, - {1, 1, 1, 1, 1}, - {65, 65, 153, 154, 65, 65, 153, 154, 65, 65, 153, 154, 65, 65, 153, 154, 65, 65, 153, 154}, - {0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220, 213}, - {0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49}, - {0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49}, - {1, 52, 216, 17, 1, 52, 216, 17, 1, 52, 216, 17, 1, 52, 216, 17, 1, 52, 216, 17}, - {0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220, 213}, - }, - BitmapList: [][]byte{{248}, {248}, {248}, {248}, {248}, {248}, {248}, {248}, {248}, {248}}, - } - return NewIoTDBRpcDataSet("select * from root.ln.device1", columns, dataTypes, columnNameIndex, queyrId, client, sessionId, &queryDataSet, false, DefaultFetchSize, nil) -} - -func TestIoTDBRpcDataSet_getColumnType(t *testing.T) { - type args struct { - columnName string - } - - ds := createIoTDBRpcDataSet() - closedDataSet := createIoTDBRpcDataSet() - closedDataSet.Close() - tests := []struct { - name string - dataSet *IoTDBRpcDataSet - args args - want TSDataType - }{ - { - name: "Normal", - dataSet: ds, - args: args{ - columnName: "root.ln.device1.tick_count", - }, - want: INT64, - }, { - name: "Closed", - dataSet: closedDataSet, - args: args{ - columnName: "root.ln.device1.tick_count", - }, - want: UNKNOWN, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := tt.dataSet - if got := s.getColumnType(tt.args.columnName); got != tt.want { - t.Errorf("IoTDBRpcDataSet.getColumnType() = %v, want %v", got, tt.want) - } - s.Close() - }) - } -} - -func TestIoTDBRpcDataSet_getColumnIndex(t *testing.T) { - type args struct { - columnName string - } - closedDataSet := createIoTDBRpcDataSet() - closedDataSet.Close() - tests := []struct { - name string - dataset *IoTDBRpcDataSet - args args - want int32 - }{ - { - name: "Normal", - dataset: createIoTDBRpcDataSet(), - args: args{ - columnName: "root.ln.device1.tick_count", - }, - want: 5, - }, { - name: "Closed", - dataset: closedDataSet, - args: args{ - columnName: "root.ln.device1.tick_count", - }, - want: -1, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := tt.dataset - if got := s.getColumnIndex(tt.args.columnName); got != tt.want { - t.Errorf("IoTDBRpcDataSet.getColumnIndex() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestIoTDBRpcDataSet_isNull(t *testing.T) { - type args struct { - columnIndex int - rowIndex int - } - ds := createIoTDBRpcDataSet() - ds.next() - - tests := []struct { - name string - args args - want bool - }{ - { - name: "Normal", - args: args{ - columnIndex: 0, - rowIndex: 0, - }, - want: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := createIoTDBRpcDataSet() - s.next() - if got := s.isNull(tt.args.columnIndex, tt.args.rowIndex); got != tt.want { - t.Errorf("IoTDBRpcDataSet.isNull() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestIoTDBRpcDataSet_getValue(t *testing.T) { - - type args struct { - columnName string - } - tests := []struct { - name string - args args - want interface{} - }{ - { - name: "restart_count", - args: args{ - columnName: "root.ln.device1.restart_count", - }, - want: int32(1), - }, { - name: "tick_count", - args: args{ - columnName: "root.ln.device1.tick_count", - }, - want: int64(3333333), - }, { - name: "price", - args: args{ - columnName: "root.ln.device1.price", - }, - want: float64(1988.2), - }, { - name: "temperature", - args: args{ - columnName: "root.ln.device1.temperature", - }, - want: float32(12.1), - }, { - name: "description", - args: args{ - columnName: "root.ln.device1.description", - }, - want: "Test Device 1", - }, { - name: "status", - args: args{ - columnName: "root.ln.device1.status", - }, - want: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := createIoTDBRpcDataSet() - s.next() - if got := s.getValue(tt.args.columnName); !reflect.DeepEqual(got, tt.want) { - t.Errorf("IoTDBRpcDataSet.getValue() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestIoTDBRpcDataSet_scan(t *testing.T) { - type args struct { - dest []interface{} - } - - type want struct { - err error - values []interface{} - } - - var restartCount int32 - var price float64 - var tickCount int64 - var temperature float32 - var description string - var status bool - - var restartCountStr string - var priceStr string - var tickCountStr string - var temperatureStr string - var descriptionStr string - var statusStr string - - var wantRestartCount int32 = 1 - var wantPrice float64 = 1988.2 - var wantTickCount int64 = 3333333 - var wantTemperature float32 = 12.1 - var wantDescription string = "Test Device 1" - var wantStatus bool = true - - var wantRestartCountStr string = "1" - var wantPriceStr string = "1988.2" - var wantTickCountStr string = "3333333" - var wantTemperatureStr string = "12.1" - var wantDescriptionStr string = "Test Device 1" - var wantStatusStr string = "true" - - tests := []struct { - name string - args args - want want - }{ - { - name: "Normal", - args: args{ - dest: []interface{}{&restartCount, &price, &tickCount, &temperature, &description, &status}, - }, - want: want{ - err: nil, - values: []interface{}{&wantRestartCount, &wantPrice, &wantTickCount, &wantTemperature, &wantDescription, &wantStatus}, - }, - }, { - name: "String", - args: args{ - dest: []interface{}{&restartCountStr, &priceStr, &tickCountStr, &temperatureStr, &descriptionStr, &statusStr}, - }, - want: want{ - err: nil, - values: []interface{}{&wantRestartCountStr, &wantPriceStr, &wantTickCountStr, &wantTemperatureStr, &wantDescriptionStr, &wantStatusStr}, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := createIoTDBRpcDataSet() - s.next() - if err := s.scan(tt.args.dest...); err != tt.want.err { - t.Errorf("IoTDBRpcDataSet.scan() error = %v, wantErr %v", err, tt.want.err) - } - if got := tt.args.dest; !reflect.DeepEqual(got, tt.want.values) { - t.Errorf("IoTDBRpcDataSet.scan(), dest = %v, want %v", got, tt.want) - } - }) - } -} - -func TestIoTDBRpcDataSet_GetTimestamp(t *testing.T) { - tests := []struct { - name string - want int64 - }{ - { - name: "GetTimestamp", - want: 1607596245228, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := createIoTDBRpcDataSet() - s.next() - if got := s.GetTimestamp(); got != tt.want { - t.Errorf("IoTDBRpcDataSet.GetTimestamp() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestIoTDBRpcDataSet_getText(t *testing.T) { - type args struct { - columnName string - } - tests := []struct { - name string - args args - want string - }{ - { - name: "restart_count", - args: args{ - columnName: "root.ln.device1.restart_count", - }, - want: "1", - }, { - name: "price", - args: args{ - columnName: "root.ln.device1.price", - }, - want: "1988.2", - }, { - name: "tick_count", - args: args{ - columnName: "root.ln.device1.tick_count", - }, - want: "3333333", - }, { - name: "temperature", - args: args{ - columnName: "root.ln.device1.temperature", - }, - want: "12.1", - }, { - name: "description", - args: args{ - columnName: "root.ln.device1.description", - }, - want: "Test Device 1", - }, { - name: "status", - args: args{ - columnName: "root.ln.device1.status", - }, - want: "true", - }, { - name: TimestampColumnName, - args: args{ - columnName: TimestampColumnName, - }, - want: time.Unix(0, 1607596245228000000).Format(time.RFC3339), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := createIoTDBRpcDataSet() - s.next() - if got := s.getText(tt.args.columnName); got != tt.want { - t.Errorf("IoTDBRpcDataSet.getText() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestIoTDBRpcDataSet_getBool(t *testing.T) { - type args struct { - columnName string - } - tests := []struct { - name string - args args - want bool - }{ - { - name: "status", - args: args{ - columnName: "root.ln.device1.status", - }, - want: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := createIoTDBRpcDataSet() - s.next() - if got := s.getBool(tt.args.columnName); got != tt.want { - t.Errorf("IoTDBRpcDataSet.getBool() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestIoTDBRpcDataSet_getFloat(t *testing.T) { - type args struct { - columnName string - } - tests := []struct { - name string - args args - want float32 - }{ - { - name: "temperature", - args: args{ - columnName: "root.ln.device1.temperature", - }, - want: 12.1, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := createIoTDBRpcDataSet() - s.next() - if got := s.getFloat(tt.args.columnName); got != tt.want { - t.Errorf("IoTDBRpcDataSet.getFloat() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestIoTDBRpcDataSet_getDouble(t *testing.T) { - type args struct { - columnName string - } - tests := []struct { - name string - args args - want float64 - }{ - { - name: "price", - args: args{ - columnName: "root.ln.device1.price", - }, - want: 1988.2, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := createIoTDBRpcDataSet() - s.next() - if got := s.getDouble(tt.args.columnName); got != tt.want { - t.Errorf("IoTDBRpcDataSet.getDouble() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestIoTDBRpcDataSet_getInt32(t *testing.T) { - type args struct { - columnName string - } - tests := []struct { - name string - args args - want int32 - }{ - { - name: "restart_count", - args: args{ - columnName: "root.ln.device1.restart_count", - }, - want: 1, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := createIoTDBRpcDataSet() - s.next() - if got := s.getInt32(tt.args.columnName); got != tt.want { - t.Errorf("IoTDBRpcDataSet.getInt32() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestIoTDBRpcDataSet_getInt64(t *testing.T) { - type args struct { - columnName string - } - tests := []struct { - name string - args args - want int64 - }{ - { - name: "tick_count", - args: args{ - columnName: "root.ln.device1.tick_count", - }, - want: 3333333, - }, { - name: TimestampColumnName, - args: args{ - columnName: TimestampColumnName, - }, - want: 1607596245228, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := createIoTDBRpcDataSet() - s.next() - if got := s.getInt64(tt.args.columnName); got != tt.want { - t.Errorf("IoTDBRpcDataSet.getInt64() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestIoTDBRpcDataSet_getRowRecord(t *testing.T) { - tests := []struct { - name string - want *RowRecord - wantErr bool - }{ - { - name: "", - want: &RowRecord{ - timestamp: 0, - fields: []*Field{ - { - name: "root.ln.device1.restart_count", - dataType: INT32, - value: int32(1), - }, { - name: "root.ln.device1.price", - dataType: DOUBLE, - value: float64(1988.2), - }, { - name: "root.ln.device1.tick_count", - dataType: INT64, - value: int64(3333333), - }, { - name: "root.ln.device1.temperature", - dataType: FLOAT, - value: float32(12.1), - }, { - name: "root.ln.device1.description", - dataType: TEXT, - value: "Test Device 1", - }, { - name: "root.ln.device1.status", - dataType: BOOLEAN, - value: true, - }, { - name: "root.ln.device1.description_string", - dataType: STRING, - value: "Test Device 1", - }, { - name: "root.ln.device1.description_blob", - dataType: BLOB, - value: []byte("Test Device 1"), - }, { - name: "root.ln.device1.date", - dataType: DATE, - value: time.Date(2024, time.April, 1, 0, 0, 0, 0, time.UTC), - }, { - name: "root.ln.device1.ts", - dataType: TIMESTAMP, - value: int64(3333333), - }, - }, - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := createIoTDBRpcDataSet() - s.next() - got, err := s.getRowRecord() - if (err != nil) != tt.wantErr { - t.Errorf("IoTDBRpcDataSet.getRowRecord() error = %v, wantErr %v", err, tt.wantErr) - return - } - - match := true - for i := 0; i < len(got.fields); i++ { - gotField := got.fields[i] - wantField := tt.want.fields[i] - if gotField.dataType != BLOB { - if gotField.dataType != wantField.dataType || gotField.name != wantField.name || gotField.value != wantField.value { - match = false - } - } else { - if gotField.dataType != wantField.dataType || gotField.name != wantField.name || !bytes.Equal(gotField.value.([]byte), wantField.value.([]byte)) { - match = false - } - } - } - if !match { - t.Errorf("IoTDBRpcDataSet.getRowRecord() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestIoTDBRpcDataSet_Close(t *testing.T) { - - tests := []struct { - name string - wantErr bool - }{ - { - name: "", - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := createIoTDBRpcDataSet() - s.next() - if err := s.Close(); (err != nil) != tt.wantErr { - t.Errorf("IoTDBRpcDataSet.Close() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} diff --git a/client/session.go b/client/session.go index ef65721..07023a0 100644 --- a/client/session.go +++ b/client/session.go @@ -423,13 +423,13 @@ func (s *Session) ExecuteStatementWithContext(ctx context.Context, sql string) ( StatementId: s.requestStatementId, FetchSize: &s.config.FetchSize, } - resp, err := s.client.ExecuteStatement(ctx, &request) + resp, err := s.client.ExecuteStatementV2(ctx, &request) if err != nil && resp == nil { if s.reconnect() { request.SessionId = s.sessionId request.StatementId = s.requestStatementId - resp, err = s.client.ExecuteStatement(ctx, &request) + resp, err = s.client.ExecuteStatementV2(ctx, &request) } } @@ -437,7 +437,7 @@ func (s *Session) ExecuteStatementWithContext(ctx context.Context, sql string) ( return nil, statusErr } - return s.genDataSet(sql, resp), err + return s.genDataSet(sql, resp) } func (s *Session) ExecuteStatement(sql string) (*SessionDataSet, error) { @@ -451,13 +451,13 @@ func (s *Session) ExecuteNonQueryStatement(sql string) (r *common.TSStatus, err StatementId: s.requestStatementId, FetchSize: &s.config.FetchSize, } - resp, err := s.client.ExecuteStatement(context.Background(), &request) + resp, err := s.client.ExecuteStatementV2(context.Background(), &request) if err != nil && resp == nil { if s.reconnect() { request.SessionId = s.sessionId request.StatementId = s.requestStatementId - resp, err = s.client.ExecuteStatement(context.Background(), &request) + resp, err = s.client.ExecuteStatementV2(context.Background(), &request) } } @@ -467,9 +467,9 @@ func (s *Session) ExecuteNonQueryStatement(sql string) (r *common.TSStatus, err func (s *Session) ExecuteQueryStatement(sql string, timeoutMs *int64) (*SessionDataSet, error) { request := rpc.TSExecuteStatementReq{SessionId: s.sessionId, Statement: sql, StatementId: s.requestStatementId, FetchSize: &s.config.FetchSize, Timeout: timeoutMs} - if resp, err := s.client.ExecuteQueryStatement(context.Background(), &request); err == nil { + if resp, err := s.client.ExecuteQueryStatementV2(context.Background(), &request); err == nil { if statusErr := VerifySuccess(resp.Status); statusErr == nil { - return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err + return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize) } else { return nil, statusErr } @@ -477,9 +477,9 @@ func (s *Session) ExecuteQueryStatement(sql string, timeoutMs *int64) (*SessionD if s.reconnect() { request.SessionId = s.sessionId request.StatementId = s.requestStatementId - resp, err = s.client.ExecuteQueryStatement(context.Background(), &request) + resp, err = s.client.ExecuteQueryStatementV2(context.Background(), &request) if statusErr := VerifySuccess(resp.Status); statusErr == nil { - return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err + return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize) } else { return nil, statusErr } @@ -494,18 +494,18 @@ func (s *Session) ExecuteAggregationQuery(paths []string, aggregations []common. request := rpc.TSAggregationQueryReq{SessionId: s.sessionId, StatementId: s.requestStatementId, Paths: paths, Aggregations: aggregations, StartTime: startTime, EndTime: endTime, Interval: interval, FetchSize: &s.config.FetchSize, Timeout: timeoutMs} - if resp, err := s.client.ExecuteAggregationQuery(context.Background(), &request); err == nil { + if resp, err := s.client.ExecuteAggregationQueryV2(context.Background(), &request); err == nil { if statusErr := VerifySuccess(resp.Status); statusErr == nil { - return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err + return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize) } else { return nil, statusErr } } else { if s.reconnect() { request.SessionId = s.sessionId - resp, err = s.client.ExecuteAggregationQuery(context.Background(), &request) + resp, err = s.client.ExecuteAggregationQueryV2(context.Background(), &request) if statusErr := VerifySuccess(resp.Status); statusErr == nil { - return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err + return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize) } else { return nil, statusErr } @@ -521,18 +521,18 @@ func (s *Session) ExecuteAggregationQueryWithLegalNodes(paths []string, aggregat request := rpc.TSAggregationQueryReq{SessionId: s.sessionId, StatementId: s.requestStatementId, Paths: paths, Aggregations: aggregations, StartTime: startTime, EndTime: endTime, Interval: interval, FetchSize: &s.config.FetchSize, Timeout: timeoutMs, LegalPathNodes: legalNodes} - if resp, err := s.client.ExecuteAggregationQuery(context.Background(), &request); err == nil { + if resp, err := s.client.ExecuteAggregationQueryV2(context.Background(), &request); err == nil { if statusErr := VerifySuccess(resp.Status); statusErr == nil { - return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err + return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize) } else { return nil, statusErr } } else { if s.reconnect() { request.SessionId = s.sessionId - resp, err = s.client.ExecuteAggregationQuery(context.Background(), &request) + resp, err = s.client.ExecuteAggregationQueryV2(context.Background(), &request) if statusErr := VerifySuccess(resp.Status); statusErr == nil { - return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err + return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize) } else { return nil, statusErr } @@ -550,7 +550,7 @@ func (s *Session) ExecuteGroupByQueryIntervalQuery(database *string, device, mea Timeout: timeoutMs, IsAligned: isAligned} if resp, err := s.client.ExecuteGroupByQueryIntervalQuery(context.Background(), &request); err == nil { if statusErr := VerifySuccess(resp.Status); statusErr == nil { - return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err + return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize) } else { return nil, statusErr } @@ -559,7 +559,7 @@ func (s *Session) ExecuteGroupByQueryIntervalQuery(database *string, device, mea request.SessionId = s.sessionId resp, err = s.client.ExecuteGroupByQueryIntervalQuery(context.Background(), &request) if statusErr := VerifySuccess(resp.Status); statusErr == nil { - return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err + return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize) } else { return nil, statusErr } @@ -852,17 +852,17 @@ func (s *Session) ExecuteRawDataQuery(paths []string, startTime int64, endTime i EndTime: endTime, StatementId: s.requestStatementId, } - resp, err := s.client.ExecuteRawDataQuery(context.Background(), &request) + resp, err := s.client.ExecuteRawDataQueryV2(context.Background(), &request) if err != nil && resp == nil { if s.reconnect() { request.SessionId = s.sessionId request.StatementId = s.requestStatementId - resp, err = s.client.ExecuteRawDataQuery(context.Background(), &request) + resp, err = s.client.ExecuteRawDataQueryV2(context.Background(), &request) } } - return s.genDataSet("", resp), err + return s.genDataSet("", resp) } func (s *Session) ExecuteUpdateStatement(sql string) (*SessionDataSet, error) { @@ -872,28 +872,32 @@ func (s *Session) ExecuteUpdateStatement(sql string) (*SessionDataSet, error) { StatementId: s.requestStatementId, FetchSize: &s.config.FetchSize, } - resp, err := s.client.ExecuteUpdateStatement(context.Background(), &request) + resp, err := s.client.ExecuteUpdateStatementV2(context.Background(), &request) if err != nil && resp == nil { if s.reconnect() { request.SessionId = s.sessionId request.StatementId = s.requestStatementId - resp, err = s.client.ExecuteUpdateStatement(context.Background(), &request) + resp, err = s.client.ExecuteUpdateStatementV2(context.Background(), &request) } } - return s.genDataSet(sql, resp), err + return s.genDataSet(sql, resp) } -func (s *Session) genDataSet(sql string, resp *rpc.TSExecuteStatementResp) *SessionDataSet { +func (s *Session) genDataSet(sql string, resp *rpc.TSExecuteStatementResp) (*SessionDataSet, error) { var queryId int64 if resp.QueryId == nil { queryId = 0 } else { queryId = *resp.QueryId } + moreData := false + if resp.MoreData != nil { + moreData = *resp.MoreData + } return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, - queryId, s.client, s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, nil) + queryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, nil, moreData, s.config.FetchSize) } func (s *Session) genInsertTabletsReq(tablets []*Tablet, isAligned bool) (*rpc.TSInsertTabletsReq, error) { @@ -1028,7 +1032,7 @@ func valuesToBytes(dataTypes []TSDataType, values []interface{}) ([]byte, error) case DATE: switch s := v.(type) { case time.Time: - date, err := dateToInt32(s) + date, err := DateToInt32(s) if err != nil { return nil, err } @@ -1236,3 +1240,7 @@ func (s *Session) reconnect() bool { } return connectedSuccess } + +func (s *Session) SetFetchSize(fetchSize int32) { + s.config.FetchSize = fetchSize +} diff --git a/client/sessiondataset.go b/client/sessiondataset.go index b1fafd4..d177a44 100644 --- a/client/sessiondataset.go +++ b/client/sessiondataset.go @@ -1,128 +1,126 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - package client import ( "github.com/apache/iotdb-client-go/rpc" -) - -const ( - TimestampColumnName = "Time" + "time" ) type SessionDataSet struct { ioTDBRpcDataSet *IoTDBRpcDataSet } -// Next prepares the next result row for reading, -// returns true on success, or false if there is no next result row or an error -// appened while preparing it. -// consulted Err should be consulted to distinguish between the two cases. -// This is not goroutine safe +func NewSessionDataSet(sql string, columnNameList []string, columnTypeList []string, columnNameIndex map[string]int32, queryId int64, statementId int64, client *rpc.IClientRPCServiceClient, sessionId int64, queryResult [][]byte, ignoreTimestamp bool, timeout *int64, moreData bool, fetchSize int32) (*SessionDataSet, error) { + rpcDataSet, err := NewIoTDBRpcDataSet(sql, columnNameList, columnTypeList, columnNameIndex, ignoreTimestamp, moreData, queryId, statementId, client, sessionId, queryResult, fetchSize, timeout) + if err != nil { + return nil, err + } + return &SessionDataSet{ioTDBRpcDataSet: rpcDataSet}, nil +} + func (s *SessionDataSet) Next() (bool, error) { - return s.ioTDBRpcDataSet.next() + return s.ioTDBRpcDataSet.Next() } -// GetText returns string value of column value on row. -// This is not goroutine safe -func (s *SessionDataSet) GetText(columnName string) string { - return s.ioTDBRpcDataSet.getText(columnName) +func (s *SessionDataSet) Close() error { + return s.ioTDBRpcDataSet.Close() } -func (s *SessionDataSet) IsNull(columnName string) bool { - return s.ioTDBRpcDataSet.isNullWithColumnName(columnName) +func (s *SessionDataSet) IsNull(columnName string) (bool, error) { + return s.ioTDBRpcDataSet.isNullByColumnName(columnName), nil } -func (s *SessionDataSet) GetBool(columnName string) bool { - return s.ioTDBRpcDataSet.getBool(columnName) +func (s *SessionDataSet) IsNullByIndex(columnIndex int32) (bool, error) { + return s.ioTDBRpcDataSet.isNullByIndex(columnIndex) } -func (s *SessionDataSet) Scan(dest ...interface{}) error { - return s.ioTDBRpcDataSet.scan(dest...) +func (s *SessionDataSet) GetBooleanByIndex(columnIndex int32) (bool, error) { + return s.ioTDBRpcDataSet.getBooleanByIndex(columnIndex) } -func (s *SessionDataSet) GetFloat(columnName string) float32 { - return s.ioTDBRpcDataSet.getFloat(columnName) +func (s *SessionDataSet) GetBoolean(columnName string) (bool, error) { + return s.ioTDBRpcDataSet.getBoolean(columnName) +} + +func (s *SessionDataSet) GetDoubleByIndex(columnIndex int32) (float64, error) { + return s.ioTDBRpcDataSet.getDoubleByIndex(columnIndex) } -func (s *SessionDataSet) GetDouble(columnName string) float64 { +func (s *SessionDataSet) GetDouble(columnName string) (float64, error) { return s.ioTDBRpcDataSet.getDouble(columnName) } -func (s *SessionDataSet) GetInt32(columnName string) int32 { - return s.ioTDBRpcDataSet.getInt32(columnName) +func (s *SessionDataSet) GetFloatByIndex(columnIndex int32) (float32, error) { + return s.ioTDBRpcDataSet.getFloatByIndex(columnIndex) } -func (s *SessionDataSet) GetInt64(columnName string) int64 { - return s.ioTDBRpcDataSet.getInt64(columnName) +func (s *SessionDataSet) GetFloat(columnName string) (float32, error) { + return s.ioTDBRpcDataSet.getFloat(columnName) } -func (s *SessionDataSet) GetTimestamp() int64 { - return s.ioTDBRpcDataSet.GetTimestamp() +func (s *SessionDataSet) GetIntByIndex(columnIndex int32) (int32, error) { + return s.ioTDBRpcDataSet.getIntByIndex(columnIndex) } -func (s *SessionDataSet) GetValue(columnName string) interface{} { - return s.ioTDBRpcDataSet.getValue(columnName) +func (s *SessionDataSet) GetInt(columnName string) (int32, error) { + return s.ioTDBRpcDataSet.getInt(columnName) } -func (s *SessionDataSet) GetRowRecord() (*RowRecord, error) { - return s.ioTDBRpcDataSet.getRowRecord() +func (s *SessionDataSet) GetLongByIndex(columnIndex int32) (int64, error) { + return s.ioTDBRpcDataSet.getLongByIndex(columnIndex) } -func (s *SessionDataSet) GetColumnCount() int { - return s.ioTDBRpcDataSet.columnCount +func (s *SessionDataSet) GetLong(columnName string) (int64, error) { + return s.ioTDBRpcDataSet.getLong(columnName) } -func (s *SessionDataSet) GetColumnDataType(columnIndex int) TSDataType { - return s.ioTDBRpcDataSet.columnTypeList[columnIndex] +func (s *SessionDataSet) GetObjectByIndex(columnIndex int32) (interface{}, error) { + return s.ioTDBRpcDataSet.getObjectByIndex(columnIndex) } -func (s *SessionDataSet) GetColumnName(columnIndex int) string { - return s.ioTDBRpcDataSet.columnNameList[columnIndex] +func (s *SessionDataSet) GetObject(columnName string) (interface{}, error) { + return s.ioTDBRpcDataSet.getObject(columnName) } -func (s *SessionDataSet) GetColumnNames() []string { - return s.ioTDBRpcDataSet.columnNameList +func (s *SessionDataSet) GetStringByIndex(columnIndex int32) (string, error) { + return s.ioTDBRpcDataSet.getStringByIndex(columnIndex) } -func (s *SessionDataSet) IsIgnoreTimeStamp() bool { - return s.ioTDBRpcDataSet.ignoreTimeStamp +func (s *SessionDataSet) GetString(columnName string) (string, error) { + return s.ioTDBRpcDataSet.getValueByName(columnName) } -func (s *SessionDataSet) IsClosed() bool { - return s.ioTDBRpcDataSet.IsClosed() +func (s *SessionDataSet) GetTimestampByIndex(columnIndex int32) (time.Time, error) { + return s.ioTDBRpcDataSet.getTimestampByIndex(columnIndex) } -func (s *SessionDataSet) Close() error { - return s.ioTDBRpcDataSet.Close() +func (s *SessionDataSet) GetTimestamp(columnName string) (time.Time, error) { + return s.ioTDBRpcDataSet.getTimestamp(columnName) } -func NewSessionDataSet(sql string, columnNameList []string, columnTypeList []string, - columnNameIndex map[string]int32, - queryId int64, client *rpc.IClientRPCServiceClient, sessionId int64, queryDataSet *rpc.TSQueryDataSet, - ignoreTimeStamp bool, fetchSize int32, timeoutMs *int64) *SessionDataSet { +func (s *SessionDataSet) GetDateByIndex(columnIndex int32) (time.Time, error) { + return s.ioTDBRpcDataSet.GetDateByIndex(columnIndex) +} - return &SessionDataSet{ - ioTDBRpcDataSet: NewIoTDBRpcDataSet(sql, columnNameList, columnTypeList, - columnNameIndex, - queryId, client, sessionId, queryDataSet, - ignoreTimeStamp, fetchSize, timeoutMs), - } +func (s *SessionDataSet) GetDate(columnName string) (time.Time, error) { + return s.ioTDBRpcDataSet.GetDate(columnName) +} + +func (s *SessionDataSet) GetBlobByIndex(columnIndex int32) (*Binary, error) { + return s.ioTDBRpcDataSet.getBinaryByIndex(columnIndex) +} + +func (s *SessionDataSet) GetBlob(columnName string) (*Binary, error) { + return s.ioTDBRpcDataSet.getBinary(columnName) +} + +func (s *SessionDataSet) FindColumn(columnName string) int32 { + return s.ioTDBRpcDataSet.findColumn(columnName) +} + +func (s *SessionDataSet) GetColumnNames() []string { + return s.ioTDBRpcDataSet.columnNameList +} + +func (s *SessionDataSet) GetColumnTypes() []string { + return s.ioTDBRpcDataSet.columnTypeList } diff --git a/client/tablet.go b/client/tablet.go index 7b62f89..2268018 100644 --- a/client/tablet.go +++ b/client/tablet.go @@ -195,7 +195,7 @@ func (t *Tablet) SetValueAt(value interface{}, columnIndex, rowIndex int) error values := t.values[columnIndex].([]int32) switch v := value.(type) { case time.Time: - val, err := dateToInt32(v) + val, err := DateToInt32(v) if err != nil { return err } @@ -241,7 +241,7 @@ func (t *Tablet) GetValueAt(columnIndex, rowIndex int) (interface{}, error) { case BLOB: return t.values[columnIndex].([][]byte)[rowIndex], nil case DATE: - return int32ToDate(t.values[columnIndex].([]int32)[rowIndex]) + return Int32ToDate(t.values[columnIndex].([]int32)[rowIndex]) default: return nil, fmt.Errorf("illegal datatype %v", schema.DataType) } diff --git a/client/tsblock.go b/client/tsblock.go new file mode 100644 index 0000000..b9365e7 --- /dev/null +++ b/client/tsblock.go @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package client + +import ( + "bytes" + "encoding/binary" + "fmt" +) + +type TsBlock struct { + timeColumn *TimeColumn + valueColumns []Column + positionCount int32 +} + +func NewTsBlock(positionCount int32, timeColumn *TimeColumn, valueColumns ...Column) (*TsBlock, error) { + if valueColumns == nil { + return nil, fmt.Errorf("blocks is null") + } + return &TsBlock{ + timeColumn: timeColumn, + valueColumns: valueColumns, + positionCount: positionCount, + }, nil +} + +func DeserializeTsBlock(data []byte) (*TsBlock, error) { + // Serialized tsblock: + // +-------------+---------------+---------+------------+-----------+----------+ + // | val col cnt | val col types | pos cnt | encodings | time col | val col | + // +-------------+---------------+---------+------------+-----------+----------+ + // | int32 | list[byte] | int32 | list[byte] | bytes | bytes | + // +-------------+---------------+---------+------------+-----------+----------+ + + reader := bytes.NewReader(data) + // value column count + var valueColumnCount int32 + if err := binary.Read(reader, binary.BigEndian, &valueColumnCount); err != nil { + return nil, err + } + + // value column data types + valueColumnDataTypes := make([]TSDataType, valueColumnCount) + for i := int32(0); i < valueColumnCount; i++ { + dataType, err := deserializeDataType(reader) + if err != nil { + return nil, err + } + valueColumnDataTypes[i] = dataType + } + + // position count + var positionCount int32 + if err := binary.Read(reader, binary.BigEndian, &positionCount); err != nil { + return nil, err + } + + // column encodings + columnEncodings := make([]ColumnEncoding, valueColumnCount+1) + for i := int32(0); i < valueColumnCount+1; i++ { + columnEncoding, err := deserializeColumnEncoding(reader) + if err != nil { + return nil, err + } + columnEncodings[i] = columnEncoding + } + + // time column + timeColumnDecoder, err := getColumnDecoder(columnEncodings[0]) + if err != nil { + return nil, err + } + timeColumn, err := timeColumnDecoder.ReadTimeColumn(reader, positionCount) + if err != nil { + return nil, err + } + + // value columns + valueColumns := make([]Column, valueColumnCount) + for i := int32(0); i < valueColumnCount; i++ { + valueColumnDecoder, err := getColumnDecoder(columnEncodings[i+1]) + if err != nil { + return nil, err + } + valueColumn, err := valueColumnDecoder.ReadColumn(reader, valueColumnDataTypes[i], positionCount) + if err != nil { + return nil, err + } + valueColumns[i] = valueColumn + } + return NewTsBlock(positionCount, timeColumn, valueColumns...) +} + +func deserializeDataType(reader *bytes.Reader) (TSDataType, error) { + b, err := reader.ReadByte() + if err != nil { + return UNKNOWN, err + } + return getDataTypeByByte(b) +} + +func deserializeColumnEncoding(reader *bytes.Reader) (ColumnEncoding, error) { + b, err := reader.ReadByte() + if err != nil { + return RLE_COLUMN_ENCODING, err + } + return getColumnEncodingByByte(b) +} + +func (t *TsBlock) GetPositionCount() int32 { + return t.positionCount +} + +func (t *TsBlock) GetStartTime() int64 { + return t.timeColumn.GetStartTime() +} + +func (t *TsBlock) GetEndTime() int64 { + return t.timeColumn.GetEndTime() +} + +func (t *TsBlock) IsEmpty() bool { + return t.positionCount == 0 +} + +func (t *TsBlock) GetTimeByIndex(index int32) (int64, error) { + return t.timeColumn.GetLong(index) +} + +func (t *TsBlock) GetValueColumnCount() int32 { + return int32(len(t.valueColumns)) +} + +func (t *TsBlock) GetTimeColumn() Column { + return t.timeColumn +} + +func (t *TsBlock) GetValueColumns() *[]Column { + return &t.valueColumns +} + +func (t *TsBlock) GetColumn(columnIndex int32) Column { + return t.valueColumns[columnIndex] +} diff --git a/client/utils.go b/client/utils.go index 692e9a5..41cc783 100644 --- a/client/utils.go +++ b/client/utils.go @@ -81,7 +81,7 @@ func bytesToHexString(input []byte) string { return hexString } -func dateToInt32(localDate time.Time) (int32, error) { +func DateToInt32(localDate time.Time) (int32, error) { if localDate.IsZero() { return 0, errors.New("date expression is null or empty") } @@ -96,7 +96,7 @@ func dateToInt32(localDate time.Time) (int32, error) { return int32(result), nil } -func int32ToDate(val int32) (time.Time, error) { +func Int32ToDate(val int32) (time.Time, error) { date := int(val) year := date / 10000 month := (date / 100) % 100 @@ -112,7 +112,7 @@ func int32ToDate(val int32) (time.Time, error) { } func bytesToDate(bys []byte) (time.Time, error) { - return int32ToDate(bytesToInt32(bys)) + return Int32ToDate(bytesToInt32(bys)) } func verifySuccesses(statuses []*common.TSStatus) error { @@ -149,3 +149,19 @@ func VerifySuccess(status *common.TSStatus) error { } return nil } + +type Binary struct { + values []byte +} + +func NewBinary(v []byte) *Binary { + return &Binary{v} +} + +func (b *Binary) GetStringValue() string { + return string(b.values) +} + +func (b *Binary) GetValues() []byte { + return b.values +} diff --git a/example/session_example.go b/example/session_example.go index 32026c1..1b31514 100644 --- a/example/session_example.go +++ b/example/session_example.go @@ -162,41 +162,22 @@ func connectCluster() { } func printDevice1(sds *client.SessionDataSet) { - showTimestamp := !sds.IsIgnoreTimeStamp() - if showTimestamp { - fmt.Print("Time\t\t\t\t") - } - for _, columnName := range sds.GetColumnNames() { fmt.Printf("%s\t", columnName) } fmt.Println() for next, err := sds.Next(); err == nil && next; next, err = sds.Next() { - if showTimestamp { - fmt.Printf("%s\t", sds.GetText(client.TimestampColumnName)) - } - - var restartCount int32 - var price float64 - var tickCount int64 - var temperature float32 - var description string - var status bool - - // All of iotdb datatypes can be scan into string variables - // var restartCount string - // var price string - // var tickCount string - // var temperature string - // var description string - // var status string - - if err := sds.Scan(&restartCount, &tickCount, &price, &temperature, &description, &status); err != nil { - log.Fatal(err) - } + timestamp, _ := sds.GetStringByIndex(1) + restartCount, _ := sds.GetIntByIndex(2) + tickCount, _ := sds.GetLongByIndex(3) + price, _ := sds.GetDoubleByIndex(4) + temperature, _ := sds.GetFloatByIndex(5) + description, _ := sds.GetStringByIndex(6) + status, _ := sds.GetBooleanByIndex(7) whitespace := "\t\t" + fmt.Printf("%s\t", timestamp) fmt.Printf("%v%s", restartCount, whitespace) fmt.Printf("%v%s", price, whitespace) fmt.Printf("%v%s", tickCount, whitespace) @@ -209,35 +190,34 @@ func printDevice1(sds *client.SessionDataSet) { } func printDataSet0(sessionDataSet *client.SessionDataSet) { - showTimestamp := !sessionDataSet.IsIgnoreTimeStamp() - if showTimestamp { - fmt.Print("Time\t\t\t\t") - } - - for i := 0; i < sessionDataSet.GetColumnCount(); i++ { - fmt.Printf("%s\t", sessionDataSet.GetColumnName(i)) + columns := sessionDataSet.GetColumnNames() + for _, columnName := range columns { + fmt.Printf("%s\t", columnName) } fmt.Println() for next, err := sessionDataSet.Next(); err == nil && next; next, err = sessionDataSet.Next() { - if showTimestamp { - fmt.Printf("%s\t", sessionDataSet.GetText(client.TimestampColumnName)) - } - for i := 0; i < sessionDataSet.GetColumnCount(); i++ { - columnName := sessionDataSet.GetColumnName(i) - switch sessionDataSet.GetColumnDataType(i) { + for i, columnName := range columns { + dataType, _ := client.GetDataTypeByStr(sessionDataSet.GetColumnTypes()[i]) + switch dataType { case client.BOOLEAN: - fmt.Print(sessionDataSet.GetBool(columnName)) + value, _ := sessionDataSet.GetBoolean(columnName) + fmt.Print(value) case client.INT32: - fmt.Print(sessionDataSet.GetInt32(columnName)) + value, _ := sessionDataSet.GetInt(columnName) + fmt.Print(value) case client.INT64, client.TIMESTAMP: - fmt.Print(sessionDataSet.GetInt64(columnName)) + value, _ := sessionDataSet.GetLong(columnName) + fmt.Print(value) case client.FLOAT: - fmt.Print(sessionDataSet.GetFloat(columnName)) + value, _ := sessionDataSet.GetFloat(columnName) + fmt.Print(value) case client.DOUBLE: - fmt.Print(sessionDataSet.GetDouble(columnName)) + value, _ := sessionDataSet.GetDouble(columnName) + fmt.Print(value) case client.TEXT, client.STRING, client.BLOB, client.DATE: - fmt.Print(sessionDataSet.GetText(columnName)) + value, _ := sessionDataSet.GetString(columnName) + fmt.Print(value) default: } fmt.Print("\t\t") @@ -247,58 +227,46 @@ func printDataSet0(sessionDataSet *client.SessionDataSet) { } func printDataSet1(sds *client.SessionDataSet) { - showTimestamp := !sds.IsIgnoreTimeStamp() - if showTimestamp { - fmt.Print("Time\t\t\t\t") - } - - for i := 0; i < sds.GetColumnCount(); i++ { - fmt.Printf("%s\t", sds.GetColumnName(i)) + columnNames := sds.GetColumnNames() + for _, value := range columnNames { + fmt.Printf("%s\t", value) } fmt.Println() for next, err := sds.Next(); err == nil && next; next, err = sds.Next() { - if showTimestamp { - fmt.Printf("%s\t", sds.GetText(client.TimestampColumnName)) - } - for i := 0; i < sds.GetColumnCount(); i++ { - columnName := sds.GetColumnName(i) - v := sds.GetValue(columnName) - if v == nil { - v = "null" + for _, columnName := range columnNames { + isNull, _ := sds.IsNull(columnName) + + if isNull { + fmt.Printf("%v\t\t", "null") + } else { + v, _ := sds.GetString(columnName) + fmt.Printf("%v\t\t", v) } - fmt.Printf("%v\t\t", v) } fmt.Println() } } func printDataSet2(sds *client.SessionDataSet) { - showTimestamp := !sds.IsIgnoreTimeStamp() - if showTimestamp { - fmt.Print("Time\t\t\t\t") - } - - for i := 0; i < sds.GetColumnCount(); i++ { - fmt.Printf("%s\t", sds.GetColumnName(i)) + columnNames := sds.GetColumnNames() + for _, value := range columnNames { + fmt.Printf("%s\t", value) } fmt.Println() for next, err := sds.Next(); err == nil && next; next, err = sds.Next() { - if showTimestamp { - fmt.Printf("%s\t", sds.GetText(client.TimestampColumnName)) - } + for i := int32(0); i < int32(len(columnNames)); i++ { + isNull, _ := sds.IsNullByIndex(i) - if record, err := sds.GetRowRecord(); err == nil { - for _, field := range record.GetFields() { - v := field.GetValue() - if field.IsNull() { - v = "null" - } + if isNull { + fmt.Printf("%v\t\t", "null") + } else { + v, _ := sds.GetStringByIndex(i) fmt.Printf("%v\t\t", v) } - fmt.Println() } + fmt.Println() } } @@ -641,7 +609,7 @@ func executeAggregationQueryStatementWithLegalNodes(paths []string, aggregations } func executeRawDataQuery() { - session.ExecuteUpdateStatement("insert into root.ln.wf02.wt02(time,s5) values(1,true)") + session.ExecuteNonQueryStatement("insert into root.ln.wf02.wt02(time,s5) values(1,true)") var ( paths = []string{"root.ln.wf02.wt02.s5"} startTime int64 = 1 diff --git a/example/session_pool/session_pool_example.go b/example/session_pool/session_pool_example.go index 459393b..6f52ffe 100644 --- a/example/session_pool/session_pool_example.go +++ b/example/session_pool/session_pool_example.go @@ -26,7 +26,6 @@ import ( "log" "math/rand" "strings" - "sync" "time" "github.com/apache/iotdb-client-go/client" @@ -55,18 +54,6 @@ func main() { sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) defer sessionPool.Close() - var wg sync.WaitGroup - for i := 0; i < 10000; i++ { - var j = i - wg.Add(1) - go func() { - defer wg.Done() - setStorageGroup(fmt.Sprintf("root.ln-%d", j)) - deleteStorageGroup(fmt.Sprintf("root.ln-%d", j)) - - }() - - } //useNodeUrls() setStorageGroup("root.ln1") setStorageGroup("root.ln2") @@ -136,7 +123,6 @@ func main() { insertAlignedTablets() deleteTimeseries("root.ln.device1.*") executeQueryStatement("show timeseries root.**") - wg.Wait() } @@ -604,7 +590,7 @@ func executeRawDataQuery() { log.Print(err) return } - session.ExecuteUpdateStatement("insert into root.ln.wf02.wt02(time,s5) values(1,true)") + session.ExecuteNonQueryStatement("insert into root.ln.wf02.wt02(time,s5) values(1,true)") var ( paths []string = []string{"root.ln.wf02.wt02.s5"} startTime int64 = 1 @@ -631,41 +617,22 @@ func executeBatchStatement() { } func printDevice1(sds *client.SessionDataSet) { - showTimestamp := !sds.IsIgnoreTimeStamp() - if showTimestamp { - fmt.Print("Time\t\t\t\t") - } - for _, columnName := range sds.GetColumnNames() { fmt.Printf("%s\t", columnName) } fmt.Println() for next, err := sds.Next(); err == nil && next; next, err = sds.Next() { - if showTimestamp { - fmt.Printf("%s\t", sds.GetText(client.TimestampColumnName)) - } - - var restartCount int32 - var price float64 - var tickCount int64 - var temperature float32 - var description string - var status bool - - // All of iotdb datatypes can be scan into string variables - // var restartCount string - // var price string - // var tickCount string - // var temperature string - // var description string - // var status string - - if err := sds.Scan(&restartCount, &tickCount, &price, &temperature, &description, &status); err != nil { - log.Fatal(err) - } + timestamp, _ := sds.GetStringByIndex(1) + restartCount, _ := sds.GetIntByIndex(2) + tickCount, _ := sds.GetLongByIndex(3) + price, _ := sds.GetDoubleByIndex(4) + temperature, _ := sds.GetFloatByIndex(5) + description, _ := sds.GetStringByIndex(6) + status, _ := sds.GetBooleanByIndex(7) whitespace := "\t\t" + fmt.Printf("%s\t", timestamp) fmt.Printf("%v%s", restartCount, whitespace) fmt.Printf("%v%s", price, whitespace) fmt.Printf("%v%s", tickCount, whitespace) @@ -678,35 +645,34 @@ func printDevice1(sds *client.SessionDataSet) { } func printDataSet0(sessionDataSet *client.SessionDataSet) { - showTimestamp := !sessionDataSet.IsIgnoreTimeStamp() - if showTimestamp { - fmt.Print("Time\t\t\t\t") - } - - for i := 0; i < sessionDataSet.GetColumnCount(); i++ { - fmt.Printf("%s\t", sessionDataSet.GetColumnName(i)) + columns := sessionDataSet.GetColumnNames() + for _, columnName := range columns { + fmt.Printf("%s\t", columnName) } fmt.Println() for next, err := sessionDataSet.Next(); err == nil && next; next, err = sessionDataSet.Next() { - if showTimestamp { - fmt.Printf("%s\t", sessionDataSet.GetText(client.TimestampColumnName)) - } - for i := 0; i < sessionDataSet.GetColumnCount(); i++ { - columnName := sessionDataSet.GetColumnName(i) - switch sessionDataSet.GetColumnDataType(i) { + for i, columnName := range columns { + dataType, _ := client.GetDataTypeByStr(sessionDataSet.GetColumnTypes()[i]) + switch dataType { case client.BOOLEAN: - fmt.Print(sessionDataSet.GetBool(columnName)) + value, _ := sessionDataSet.GetBoolean(columnName) + fmt.Print(value) case client.INT32: - fmt.Print(sessionDataSet.GetInt32(columnName)) - case client.INT64: - fmt.Print(sessionDataSet.GetInt64(columnName)) + value, _ := sessionDataSet.GetInt(columnName) + fmt.Print(value) + case client.INT64, client.TIMESTAMP: + value, _ := sessionDataSet.GetLong(columnName) + fmt.Print(value) case client.FLOAT: - fmt.Print(sessionDataSet.GetFloat(columnName)) + value, _ := sessionDataSet.GetFloat(columnName) + fmt.Print(value) case client.DOUBLE: - fmt.Print(sessionDataSet.GetDouble(columnName)) - case client.TEXT: - fmt.Print(sessionDataSet.GetText(columnName)) + value, _ := sessionDataSet.GetDouble(columnName) + fmt.Print(value) + case client.TEXT, client.STRING, client.BLOB, client.DATE: + value, _ := sessionDataSet.GetString(columnName) + fmt.Print(value) default: } fmt.Print("\t\t") @@ -716,58 +682,46 @@ func printDataSet0(sessionDataSet *client.SessionDataSet) { } func printDataSet1(sds *client.SessionDataSet) { - showTimestamp := !sds.IsIgnoreTimeStamp() - if showTimestamp { - fmt.Print("Time\t\t\t\t") - } - - for i := 0; i < sds.GetColumnCount(); i++ { - fmt.Printf("%s\t", sds.GetColumnName(i)) + columnNames := sds.GetColumnNames() + for _, value := range columnNames { + fmt.Printf("%s\t", value) } fmt.Println() for next, err := sds.Next(); err == nil && next; next, err = sds.Next() { - if showTimestamp { - fmt.Printf("%s\t", sds.GetText(client.TimestampColumnName)) - } - for i := 0; i < sds.GetColumnCount(); i++ { - columnName := sds.GetColumnName(i) - v := sds.GetValue(columnName) - if v == nil { - v = "null" + for _, columnName := range columnNames { + isNull, _ := sds.IsNull(columnName) + + if isNull { + fmt.Printf("%v\t\t", "null") + } else { + v, _ := sds.GetString(columnName) + fmt.Printf("%v\t\t", v) } - fmt.Printf("%v\t\t", v) } fmt.Println() } } func printDataSet2(sds *client.SessionDataSet) { - showTimestamp := !sds.IsIgnoreTimeStamp() - if showTimestamp { - fmt.Print("Time\t\t\t\t") - } - - for i := 0; i < sds.GetColumnCount(); i++ { - fmt.Printf("%s\t", sds.GetColumnName(i)) + columnNames := sds.GetColumnNames() + for _, value := range columnNames { + fmt.Printf("%s\t", value) } fmt.Println() for next, err := sds.Next(); err == nil && next; next, err = sds.Next() { - if showTimestamp { - fmt.Printf("%s\t", sds.GetText(client.TimestampColumnName)) - } + for i := int32(0); i < int32(len(columnNames)); i++ { + isNull, _ := sds.IsNullByIndex(i) - if record, err := sds.GetRowRecord(); err == nil { - for _, field := range record.GetFields() { - v := field.GetValue() - if field.IsNull() { - v = "null" - } + if isNull { + fmt.Printf("%v\t\t", "null") + } else { + v, _ := sds.GetStringByIndex(i) fmt.Printf("%v\t\t", v) } - fmt.Println() } + fmt.Println() } } diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 16eaf77..faf204e 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -77,6 +77,11 @@ func (s *e2eTestSuite) checkError(status *common.TSStatus, err error) { } } +func (s *e2eTestSuite) Test_NonQuery() { + _, err := s.session.ExecuteStatement("flush") + s.Require().NoError(err) +} + func (s *e2eTestSuite) Test_WrongURL() { clusterConfig := client.ClusterConfig{ NodeUrls: strings.Split("iotdb1:6667", ","), @@ -102,8 +107,8 @@ func (s *e2eTestSuite) Test_CreateTimeseries() { assert.NoError(err) defer ds.Close() assert.True(ds.Next()) - var timeseries string - assert.NoError(ds.Scan(×eries)) + timeseries, err := ds.GetStringByIndex(1) + assert.NoError(err) assert.Equal(timeseries, "root.tsg1.dev1.status") } @@ -135,8 +140,8 @@ func (s *e2eTestSuite) Test_CreateAlignedTimeseries() { assert.NoError(err) defer ds.Close() assert.True(ds.Next()) - var timeseries string - assert.NoError(ds.Scan(×eries)) + timeseries, err := ds.GetStringByIndex(1) + assert.NoError(err) assert.Equal(timeseries, fullPath) } } @@ -156,8 +161,8 @@ func (s *e2eTestSuite) Test_InsertRecords() { assert.NoError(err) defer ds.Close() assert.True(ds.Next()) - var status string - assert.NoError(ds.Scan(&status)) + status, err := ds.GetString("root.tsg1.dev1.status") + assert.NoError(err) assert.Equal(status, "Working") } @@ -176,9 +181,9 @@ func (s *e2eTestSuite) Test_InsertAlignedRecord() { assert.NoError(err) defer ds.Close() assert.True(ds.Next()) - var status string - assert.NoError(ds.Scan(&status)) - assert.Equal(status, "Working") + status, err := ds.GetString("root.tsg2.dev1.status") + assert.NoError(err) + assert.Equal("Working", status) } func (s *e2eTestSuite) Test_InsertAlignedRecords() { @@ -195,8 +200,8 @@ func (s *e2eTestSuite) Test_InsertAlignedRecords() { assert.NoError(err) defer ds.Close() assert.True(ds.Next()) - var temperature string - assert.NoError(ds.Scan(&temperature)) + temperature, err := ds.GetString("root.al1.dev3.temperature") + assert.NoError(err) assert.Equal(temperature, "44") } @@ -227,10 +232,11 @@ func (s *e2eTestSuite) Test_InsertAlignedRecordsOfOneDevice() { assert.NoError(err) defer ds.Close() assert.True(ds.Next()) - var status string - assert.NoError(ds.Scan(&status)) - assert.Equal(status, "2024-04-01") + date, err := ds.GetString("root.al1.dev4.date") + assert.NoError(err) + assert.Equal("2024-04-01", date) } + func (s *e2eTestSuite) Test_InsertAlignedTablet() { var timeseries = []string{"root.ln.device1.**"} s.session.DeleteTimeseries(timeseries) @@ -247,8 +253,8 @@ func (s *e2eTestSuite) Test_InsertAlignedTablet() { assert.NoError(err) defer ds.Close() assert.True(ds.Next()) - var status string - assert.NoError(ds.Scan(&status)) + status, err := ds.GetStringByIndex(1) + assert.NoError(err) assert.Equal(status, "12") s.session.DeleteStorageGroup("root.ln.**") } @@ -269,8 +275,8 @@ func (s *e2eTestSuite) Test_InsertAlignedTabletWithNilValue() { assert.NoError(err) defer ds.Close() assert.True(ds.Next()) - var status string - assert.NoError(ds.Scan(&status)) + status, err := ds.GetStringByIndex(1) + assert.NoError(err) assert.Equal(status, "12") s.session.DeleteStorageGroup("root.ln.**") } @@ -389,12 +395,265 @@ func (s *e2eTestSuite) Test_InsertAlignedTablets() { assert.NoError(err) defer ds.Close() assert.True(ds.Next()) - var status string - assert.NoError(ds.Scan(&status)) + status, err := ds.GetStringByIndex(1) + assert.NoError(err) assert.Equal(status, "8") s.session.DeleteStorageGroup("root.ln.**") } +func (s *e2eTestSuite) Test_FetchMoreData() { + var timeseries = []string{"root.ln.device1.**"} + s.session.SetFetchSize(1000) + s.session.DeleteTimeseries(timeseries) + writeCount := 10000 + tablet1, err := createTablet(writeCount) + if err != nil { + log.Fatal(err) + } + + tablets := []*client.Tablet{tablet1} + s.checkError(s.session.InsertAlignedTablets(tablets, false)) + + ds, err := s.session.ExecuteQueryStatement("select * from root.ln.device1", nil) + count := 0 + for { + if hasNext, err := ds.Next(); err != nil || !hasNext { + break + } + count += 1 + } + s.Assert().Equal(writeCount, count) + s.session.DeleteStorageGroup("root.ln.**") +} + +func (s *e2eTestSuite) Test_QueryAllDataType() { + measurementSchemas := []*client.MeasurementSchema{ + { + Measurement: "s0", + DataType: client.BOOLEAN, + }, + { + Measurement: "s1", + DataType: client.INT32, + }, + { + Measurement: "s2", + DataType: client.INT64, + }, + { + Measurement: "s3", + DataType: client.FLOAT, + }, + { + Measurement: "s4", + DataType: client.DOUBLE, + }, + { + Measurement: "s5", + DataType: client.TEXT, + }, + { + Measurement: "s6", + DataType: client.TIMESTAMP, + }, + { + Measurement: "s7", + DataType: client.DATE, + }, + { + Measurement: "s8", + DataType: client.BLOB, + }, + { + Measurement: "s9", + DataType: client.STRING, + }, + } + tablet, err := client.NewTablet("root.tsg1.d1", measurementSchemas, 100) + s.NoError(err) + tablet.SetTimestamp(1, 0) + tablet.SetValueAt(true, 0, 0) + tablet.SetValueAt(int32(1), 1, 0) + tablet.SetValueAt(int64(1), 2, 0) + tablet.SetValueAt(float32(1), 3, 0) + tablet.SetValueAt(float64(1), 4, 0) + tablet.SetValueAt("text", 5, 0) + tablet.SetValueAt(int64(1), 6, 0) + expectedDate, _ := client.Int32ToDate(20250326) + tablet.SetValueAt(expectedDate, 7, 0) + tablet.SetValueAt([]byte{1}, 8, 0) + tablet.SetValueAt("string", 9, 0) + tablet.RowSize = 1 + + r, err := s.session.InsertAlignedTablet(tablet, true) + s.checkError(r, err) + + sessionDataSet, err := s.session.ExecuteQueryStatement("select s0, s1, s2, s3, s4, s5, s6, s7, s8, s9 from root.tsg1.d1 limit 1", nil) + for { + if hasNext, err := sessionDataSet.Next(); err != nil || !hasNext { + break + } + for _, columnName := range sessionDataSet.GetColumnNames() { + isNull, err := sessionDataSet.IsNull(columnName) + s.NoError(err) + s.False(isNull) + } + timeValue, err := sessionDataSet.GetLongByIndex(1) + s.NoError(err) + s.Equal(int64(1), timeValue) + boolValue, err := sessionDataSet.GetBooleanByIndex(2) + s.NoError(err) + s.Equal(true, boolValue) + + intValue, err := sessionDataSet.GetIntByIndex(3) + s.NoError(err) + s.Equal(int32(1), intValue) + + longValue, err := sessionDataSet.GetLongByIndex(4) + s.NoError(err) + s.Equal(int64(1), longValue) + + floatValue, err := sessionDataSet.GetFloatByIndex(5) + s.NoError(err) + s.Equal(float32(1), floatValue) + + doubleValue, err := sessionDataSet.GetDoubleByIndex(6) + s.NoError(err) + s.Equal(float64(1), doubleValue) + + textValue, err := sessionDataSet.GetStringByIndex(7) + s.NoError(err) + s.Equal("text", textValue) + + timestampValue, err := sessionDataSet.GetTimestampByIndex(8) + s.NoError(err) + s.Equal(time.Unix(0, 1e6), timestampValue) + + dateValue, err := sessionDataSet.GetDateByIndex(9) + s.NoError(err) + s.Equal(expectedDate, dateValue) + + blobValue, err := sessionDataSet.GetBlobByIndex(10) + s.NoError(err) + s.Equal([]byte{1}, blobValue.GetValues()) + + stringValue, err := sessionDataSet.GetStringByIndex(11) + s.NoError(err) + s.Equal("string", stringValue) + } + sessionDataSet.Close() + + sessionDataSet, err = s.session.ExecuteQueryStatement("select s0, s1, s2, s3, s4, s5, s6, s7, s8, s9 from root.tsg1.d1 limit 1", nil) + for { + if hasNext, err := sessionDataSet.Next(); err != nil || !hasNext { + break + } + for _, columnName := range sessionDataSet.GetColumnNames() { + isNull, err := sessionDataSet.IsNull(columnName) + s.NoError(err) + s.False(isNull) + } + timeValue, err := sessionDataSet.GetLong("Time") + s.NoError(err) + s.Equal(int64(1), timeValue) + + boolValue, err := sessionDataSet.GetBoolean("root.tsg1.d1.s0") + s.NoError(err) + s.Equal(true, boolValue) + + intValue, err := sessionDataSet.GetInt("root.tsg1.d1.s1") + s.NoError(err) + s.Equal(int32(1), intValue) + + longValue, err := sessionDataSet.GetLong("root.tsg1.d1.s2") + s.NoError(err) + s.Equal(int64(1), longValue) + + floatValue, err := sessionDataSet.GetFloat("root.tsg1.d1.s3") + s.NoError(err) + s.Equal(float32(1), floatValue) + + doubleValue, err := sessionDataSet.GetDouble("root.tsg1.d1.s4") + s.NoError(err) + s.Equal(float64(1), doubleValue) + + textValue, err := sessionDataSet.GetString("root.tsg1.d1.s5") + s.NoError(err) + s.Equal("text", textValue) + + timestampValue, err := sessionDataSet.GetTimestamp("root.tsg1.d1.s6") + s.NoError(err) + s.Equal(time.Unix(0, 1e6), timestampValue) + + dateValue, err := sessionDataSet.GetDate("root.tsg1.d1.s7") + s.NoError(err) + s.Equal(expectedDate, dateValue) + + blobValue, err := sessionDataSet.GetBlob("root.tsg1.d1.s8") + s.NoError(err) + s.Equal([]byte{1}, blobValue.GetValues()) + + stringValue, err := sessionDataSet.GetString("root.tsg1.d1.s9") + s.NoError(err) + s.Equal("string", stringValue) + } + sessionDataSet.Close() + + sessionDataSet, err = s.session.ExecuteQueryStatement("select * from root.tsg1.d1 limit 1", nil) + for { + if hasNext, err := sessionDataSet.Next(); err != nil || !hasNext { + break + } + for _, columnName := range sessionDataSet.GetColumnNames() { + isNull, err := sessionDataSet.IsNull(columnName) + s.NoError(err) + s.False(isNull) + } + timeValue, err := sessionDataSet.GetObject("Time") + s.NoError(err) + s.Equal(time.Unix(0, 1*1e6), timeValue) + + boolValue, err := sessionDataSet.GetObject("root.tsg1.d1.s0") + s.NoError(err) + s.Equal(true, boolValue) + + intValue, err := sessionDataSet.GetObject("root.tsg1.d1.s1") + s.NoError(err) + s.Equal(int32(1), intValue) + + longValue, err := sessionDataSet.GetObject("root.tsg1.d1.s2") + s.NoError(err) + s.Equal(int64(1), longValue) + + floatValue, err := sessionDataSet.GetObject("root.tsg1.d1.s3") + s.NoError(err) + s.Equal(float32(1), floatValue) + + doubleValue, err := sessionDataSet.GetObject("root.tsg1.d1.s4") + s.NoError(err) + s.Equal(float64(1), doubleValue) + + textValue, err := sessionDataSet.GetObject("root.tsg1.d1.s5") + s.NoError(err) + s.Equal("text", textValue.(*client.Binary).GetStringValue()) + + timestampValue, err := sessionDataSet.GetObject("root.tsg1.d1.s6") + s.NoError(err) + s.Equal(int64(1), timestampValue) + + dateValue, err := sessionDataSet.GetObject("root.tsg1.d1.s7") + s.NoError(err) + s.Equal(int32(20250326), dateValue) + + blobValue, err := sessionDataSet.GetObject("root.tsg1.d1.s8") + s.NoError(err) + s.Equal([]byte{1}, blobValue.(*client.Binary).GetValues()) + + stringValue, err := sessionDataSet.GetObject("root.tsg1.d1.s9") + s.NoError(err) + s.Equal("string", stringValue.(*client.Binary).GetStringValue()) + } +} func (s *e2eTestSuite) Test_InvalidSQL() { _, err := s.session.ExecuteStatementWithContext(context.Background(), "select1 from device") assert := s.Require()