Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,6 @@ func (btc *BigtableClient) mutateRow(ctx context.Context, tableName, rowKey stri
}

tbl := client.Open(tableName)
if timestamp == 0 {
timestamp = bigtable.Timestamp(bigtable.Now().Time().UnixMicro())
}

var mutationCount = 0
// Delete column families
Expand Down Expand Up @@ -1059,7 +1056,7 @@ func (btc *BigtableClient) PrepareStatement(ctx context.Context, query rh.QueryM
}
preparedStatement, err := client.PrepareStatement(ctx, query.Query, paramTypes)
if err != nil {
btc.Logger.Error("Failed to prepare statement", zap.String("query", query.Query), zap.Error(err))
btc.Logger.Error("Failed to prepare statement", zap.String("query", query.Query), zap.Any("params", paramTypes), zap.Error(err))
return nil, fmt.Errorf("failed to prepare statement: %w", err)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"cloud.google.com/go/bigtable"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/utilities"
"github.com/datastax/go-cassandra-native-protocol/datatype"
"github.com/datastax/go-cassandra-native-protocol/message"
"github.com/datastax/go-cassandra-native-protocol/primitive"
Expand All @@ -45,9 +46,10 @@ import (
// - time.Duration: The total elapsed time for the operation.
// - error: Error if the select statement execution fails.
func (btc *BigtableClient) SelectStatement(ctx context.Context, query rh.QueryMetadata) (*message.RowsResult, time.Time, error) {
btc.Logger.Debug("preparing select statement", zap.String("query", query.Query))
preparedStmt, err := btc.PrepareStatement(ctx, query)
if err != nil {
btc.Logger.Error("Failed to prepare statement", zap.String("query", query.Query), zap.Error(err))
btc.Logger.Error("Failed to prepare statement", zap.String("query", query.Query), zap.Any("params", query.ParamValues), zap.Error(err))
return nil, time.Now(), fmt.Errorf("failed to prepare statement: %w", err)
}
return btc.ExecutePreparedStatement(ctx, query, preparedStmt)
Expand All @@ -69,6 +71,8 @@ func (btc *BigtableClient) ExecutePreparedStatement(ctx context.Context, query r
var columnMetadata []*message.ColumnMetadata
var processingErr error

btc.Logger.Debug("binding select statement", zap.String("query", query.Query), zap.Any("params", query.Params))

boundStmt, err := preparedStmt.Bind(query.Params)
if err != nil {
btc.Logger.Error("Failed to bind parameters", zap.Any("params", query.Params), zap.Error(err))
Expand Down Expand Up @@ -209,7 +213,7 @@ func (btc *BigtableClient) convertResultRowToMap(resultRow bigtable.ResultRow, q
case float32:
rowMap[colName] = float64(v)
case time.Time:
encoded, _ := proxycore.EncodeType(datatype.Timestamp, primitive.ProtocolVersion4, v.UnixMicro())
encoded, _ := proxycore.EncodeType(datatype.Timestamp, primitive.ProtocolVersion4, utilities.TimeToBigtableBigInt(v))
rowMap[colName] = encoded
case nil:
rowMap[colName] = nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"reflect"
"slices"
"time"

"cloud.google.com/go/bigtable"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/global/types"
Expand Down Expand Up @@ -128,7 +129,9 @@ func createBigtableRowKeyField(col types.CreateColumn, intRowKeyEncoding types.I
switch col.TypeInfo.DataType() {
case datatype.Varchar:
return bigtable.StructField{FieldName: col.Name, FieldType: bigtable.StringType{Encoding: bigtable.StringUtf8BytesEncoding{}}}, nil
case datatype.Int, datatype.Bigint, datatype.Timestamp:
case datatype.Timestamp:
return bigtable.StructField{FieldName: col.Name, FieldType: bigtable.TimestampType{Encoding: bigtable.TimestampUnixMicrosInt64Encoding{UnixMicrosInt64Encoding: bigtable.Int64OrderedCodeBytesEncoding{}}}}, nil
case datatype.Int, datatype.Bigint:
switch intRowKeyEncoding {
case types.OrderedCodeEncoding:
return bigtable.StructField{FieldName: col.Name, FieldType: bigtable.Int64Type{Encoding: bigtable.Int64OrderedCodeBytesEncoding{}}}, nil
Expand All @@ -149,6 +152,8 @@ func inferSQLType(value interface{}) (bigtable.SQLType, error) {
return bigtable.StringSQLType{}, nil
case []byte:
return bigtable.BytesSQLType{}, nil
case time.Time:
return bigtable.TimestampSQLType{}, nil
case int, int8, int16, int32, int64:
return bigtable.Int64SQLType{}, nil
case float32:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"time"

"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/global/types"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/third_party/datastax/proxycore"
"github.com/datastax/go-cassandra-native-protocol/datatype"
"github.com/datastax/go-cassandra-native-protocol/primitive"
Expand All @@ -36,7 +37,7 @@ import (
// Returns:
// - An interface{} containing the decoded collection as a slice or map of the appropriate type.
// - An error if any decoding step fails.
func DecodeCollection(dt datatype.DataType, version primitive.ProtocolVersion, encoded []byte) (interface{}, error) {
func DecodeCollection(dt datatype.DataType, encoded []byte, qctx *types.QueryContext) (interface{}, error) {
reader := bytes.NewReader(encoded)
var length int32

Expand All @@ -49,13 +50,13 @@ func DecodeCollection(dt datatype.DataType, version primitive.ProtocolVersion, e
switch dt.GetDataTypeCode() {
case primitive.DataTypeCodeList:
listType := dt.(datatype.ListType)
return decodeListOrSet(listType.GetElementType(), version, reader, length)
return decodeListOrSet(listType.GetElementType(), qctx, reader, length)
case primitive.DataTypeCodeSet:
setType := dt.(datatype.SetType)
return decodeListOrSet(setType.GetElementType(), version, reader, length)
return decodeListOrSet(setType.GetElementType(), qctx, reader, length)
case primitive.DataTypeCodeMap:
mapType := dt.(datatype.MapType)
return decodeMap(mapType.GetValueType(), version, reader, mapType.GetKeyType(), length)
return decodeMap(mapType.GetValueType(), qctx, reader, mapType.GetKeyType(), length)
default:
return nil, fmt.Errorf("unsupported collection type: %v", dt.GetDataTypeCode())
}
Expand All @@ -71,7 +72,7 @@ func DecodeCollection(dt datatype.DataType, version primitive.ProtocolVersion, e
// Returns:
// - An interface{} containing the decoded elements as a slice of the appropriate type.
// - An error if any decoding step fails.
func decodeListOrSet(elementType datatype.DataType, version primitive.ProtocolVersion, reader *bytes.Reader, length int32) (interface{}, error) {
func decodeListOrSet(elementType datatype.DataType, qctx *types.QueryContext, reader *bytes.Reader, length int32) (interface{}, error) {
decodedElements := make([]interface{}, length)
for i := int32(0); i < length; i++ {
var elementLength int32
Expand All @@ -84,7 +85,7 @@ func decodeListOrSet(elementType datatype.DataType, version primitive.ProtocolVe
if err != nil {
return nil, err
}
decodedValue, err := proxycore.DecodeType(elementType, version, elementValue)
decodedValue, err := proxycore.DecodeType(elementType, qctx.ProtocolV, elementValue)
if err != nil {
return nil, err
}
Expand All @@ -104,7 +105,7 @@ func decodeListOrSet(elementType datatype.DataType, version primitive.ProtocolVe
// Returns:
// - An interface{} containing the decoded map as a map[interface{}]interface{} with keys and values of the appropriate types.
// - An error if any decoding step fails.
func decodeMap(valueType datatype.DataType, version primitive.ProtocolVersion, reader *bytes.Reader, keyType datatype.DataType, length int32) (interface{}, error) {
func decodeMap(valueType datatype.DataType, qctx *types.QueryContext, reader *bytes.Reader, keyType datatype.DataType, length int32) (interface{}, error) {
decodedMap := make(map[interface{}]interface{}, length)
for i := int32(0); i < length; i++ {
var keyLength int32
Expand All @@ -117,7 +118,7 @@ func decodeMap(valueType datatype.DataType, version primitive.ProtocolVersion, r
if err != nil {
return nil, err
}
decodedKey, err := proxycore.DecodeType(keyType, version, keyValue)
decodedKey, err := proxycore.DecodeType(keyType, qctx.ProtocolV, keyValue)
if err != nil {
return nil, err
}
Expand All @@ -132,7 +133,7 @@ func decodeMap(valueType datatype.DataType, version primitive.ProtocolVersion, r
if err != nil {
return nil, err
}
decodedValue, err := proxycore.DecodeType(valueType, version, value)
decodedValue, err := proxycore.DecodeType(valueType, qctx.ProtocolV, value)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/global/types"
"github.com/datastax/go-cassandra-native-protocol/datatype"
"github.com/datastax/go-cassandra-native-protocol/primitive"
)
Expand All @@ -32,6 +33,7 @@ const (
)

func TestDecodeCollection(t *testing.T) {
qctx := types.NewQueryContext(time.Now().UTC(), primitive.ProtocolVersion4)
type args struct {
dt datatype.DataType
version primitive.ProtocolVersion
Expand Down Expand Up @@ -153,7 +155,7 @@ func TestDecodeCollection(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := DecodeCollection(tt.args.dt, tt.args.version, tt.args.encoded)
got, err := DecodeCollection(tt.args.dt, tt.args.encoded, qctx)
if (err != nil) != tt.wantErr {
t.Errorf("DecodeCollection() error = %v, wantErr %v", err, tt.wantErr)
return
Expand All @@ -166,6 +168,7 @@ func TestDecodeCollection(t *testing.T) {
}

func TestDecodeListOrSet(t *testing.T) {
qctx := types.NewQueryContext(time.Now().UTC(), primitive.ProtocolVersion4)
type args struct {
elementType datatype.DataType
version primitive.ProtocolVersion
Expand Down Expand Up @@ -404,7 +407,7 @@ func TestDecodeListOrSet(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := decodeListOrSet(tt.args.elementType, tt.args.version, tt.args.reader, tt.args.length)
got, err := decodeListOrSet(tt.args.elementType, qctx, tt.args.reader, tt.args.length)
if (err != nil) != tt.wantErr {
t.Errorf("decodeListOrSet() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down Expand Up @@ -1094,6 +1097,8 @@ func TestConvertToTypedMap(t *testing.T) {
}

func TestDecodeMap(t *testing.T) {
qctx := types.NewQueryContext(time.Now().UTC(), primitive.ProtocolVersion4)

type args struct {
valueType datatype.DataType
version primitive.ProtocolVersion
Expand Down Expand Up @@ -1254,7 +1259,7 @@ func TestDecodeMap(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := decodeMap(tt.args.valueType, tt.args.version, tt.args.reader, tt.args.keyType, tt.args.length)
got, err := decodeMap(tt.args.valueType, qctx, tt.args.reader, tt.args.keyType, tt.args.length)
if (err != nil) != tt.wantErr {
t.Errorf("decodeMap() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,26 +92,26 @@ func (s ScalarType) IsFrozen() bool {

// Pre-defined constants for common scalar types for convenience.
var (
TypeAscii CqlDataType = ScalarType{name: "ascii", code: ASCII, dt: datatype.Varchar}
TypeVarchar CqlDataType = ScalarType{name: "varchar", code: VARCHAR, dt: datatype.Varchar}
TypeBigint CqlDataType = ScalarType{name: "bigint", code: BIGINT, dt: datatype.Bigint}
TypeBlob CqlDataType = ScalarType{name: "blob", code: BLOB, dt: datatype.Blob}
TypeBoolean CqlDataType = ScalarType{name: "boolean", code: BOOLEAN, dt: datatype.Boolean}
TypeCounter CqlDataType = ScalarType{name: "counter", code: COUNTER, dt: datatype.Counter}
TypeDate CqlDataType = ScalarType{name: "date", code: DATE, dt: datatype.Date}
TypeDecimal CqlDataType = ScalarType{name: "decimal", code: DECIMAL, dt: datatype.Decimal}
TypeDouble CqlDataType = ScalarType{name: "double", code: DOUBLE, dt: datatype.Double}
TypeFloat CqlDataType = ScalarType{name: "float", code: FLOAT, dt: datatype.Float}
TypeInet CqlDataType = ScalarType{name: "inet", code: INET, dt: datatype.Inet}
TypeInt CqlDataType = ScalarType{name: "int", code: INT, dt: datatype.Int}
TypeSmallint CqlDataType = ScalarType{name: "smallint", code: SMALLINT, dt: datatype.Smallint}
TypeText CqlDataType = ScalarType{name: "text", code: TEXT, dt: datatype.Varchar}
TypeTime CqlDataType = ScalarType{name: "time", code: TIME, dt: datatype.Time}
TypeTimestamp CqlDataType = ScalarType{name: "timestamp", code: TIMESTAMP, dt: datatype.Timestamp}
TypeTimeuuid CqlDataType = ScalarType{name: "timeuuid", code: TIMEUUID, dt: datatype.Timeuuid}
TypeTinyint CqlDataType = ScalarType{name: "tinyint", code: TINYINT, dt: datatype.Tinyint}
TypeUuid CqlDataType = ScalarType{name: "uuid", code: UUID, dt: datatype.Uuid}
TypeVarint CqlDataType = ScalarType{name: "varint", code: VARINT, dt: datatype.Varint}
TypeAscii = ScalarType{name: "ascii", code: ASCII, dt: datatype.Varchar}
TypeVarchar = ScalarType{name: "varchar", code: VARCHAR, dt: datatype.Varchar}
TypeBigint = ScalarType{name: "bigint", code: BIGINT, dt: datatype.Bigint}
TypeBlob = ScalarType{name: "blob", code: BLOB, dt: datatype.Blob}
TypeBoolean = ScalarType{name: "boolean", code: BOOLEAN, dt: datatype.Boolean}
TypeCounter = ScalarType{name: "counter", code: COUNTER, dt: datatype.Counter}
TypeDate = ScalarType{name: "date", code: DATE, dt: datatype.Date}
TypeDecimal = ScalarType{name: "decimal", code: DECIMAL, dt: datatype.Decimal}
TypeDouble = ScalarType{name: "double", code: DOUBLE, dt: datatype.Double}
TypeFloat = ScalarType{name: "float", code: FLOAT, dt: datatype.Float}
TypeInet = ScalarType{name: "inet", code: INET, dt: datatype.Inet}
TypeInt = ScalarType{name: "int", code: INT, dt: datatype.Int}
TypeSmallint = ScalarType{name: "smallint", code: SMALLINT, dt: datatype.Smallint}
TypeText = ScalarType{name: "text", code: TEXT, dt: datatype.Varchar}
TypeTime = ScalarType{name: "time", code: TIME, dt: datatype.Time}
TypeTimestamp = ScalarType{name: "timestamp", code: TIMESTAMP, dt: datatype.Timestamp}
TypeTimeuuid = ScalarType{name: "timeuuid", code: TIMEUUID, dt: datatype.Timeuuid}
TypeTinyint = ScalarType{name: "tinyint", code: TINYINT, dt: datatype.Tinyint}
TypeUuid = ScalarType{name: "uuid", code: UUID, dt: datatype.Uuid}
TypeVarint = ScalarType{name: "varint", code: VARINT, dt: datatype.Varint}
)

type MapType struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
package types

import (
"time"

"github.com/datastax/go-cassandra-native-protocol/message"
"github.com/datastax/go-cassandra-native-protocol/primitive"
)

type Column struct {
Expand All @@ -30,6 +33,15 @@ type Column struct {
Metadata message.ColumnMetadata
}

type QueryContext struct {
Now time.Time
ProtocolV primitive.ProtocolVersion
}

func NewQueryContext(now time.Time, protocolV primitive.ProtocolVersion) *QueryContext {
return &QueryContext{Now: now.UTC(), ProtocolV: protocolV}
}

type CreateColumn struct {
Name string
Index int32
Expand Down
47 changes: 25 additions & 22 deletions cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,42 @@ require (
github.com/hashicorp/golang-lru v0.5.4
github.com/stretchr/testify v1.10.0
github.com/tj/assert v0.0.3
go.opentelemetry.io/contrib/detectors/gcp v1.34.0
go.opentelemetry.io/otel v1.35.0
go.opentelemetry.io/contrib/detectors/gcp v1.36.0
go.opentelemetry.io/otel v1.37.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.24.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0
go.opentelemetry.io/otel/metric v1.35.0
go.opentelemetry.io/otel/sdk v1.35.0
go.opentelemetry.io/otel/sdk/metric v1.35.0
go.opentelemetry.io/otel/trace v1.35.0
go.opentelemetry.io/otel/metric v1.37.0
go.opentelemetry.io/otel/sdk v1.37.0
go.opentelemetry.io/otel/sdk/metric v1.37.0
go.opentelemetry.io/otel/trace v1.37.0
go.uber.org/atomic v1.11.0
go.uber.org/zap v1.26.0
google.golang.org/api v0.228.0
google.golang.org/grpc v1.71.1
google.golang.org/grpc v1.76.0
gopkg.in/yaml.v2 v2.4.0
)

require (
cel.dev/expr v0.23.0 // indirect
cel.dev/expr v0.24.0 // indirect
cloud.google.com/go v0.120.0 // indirect
cloud.google.com/go/auth v0.15.0 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect
cloud.google.com/go/iam v1.5.0 // indirect
cloud.google.com/go/longrunning v0.6.6 // indirect
cloud.google.com/go/monitoring v1.24.1 // indirect
github.com/BurntSushi/toml v1.5.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.25.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.29.0 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/envoyproxy/go-control-plane/envoy v1.32.4 // indirect
github.com/go-jose/go-jose/v4 v4.1.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/btree v1.1.3 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
github.com/spiffe/go-spiffe/v2 v2.5.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/zeebo/errs v1.4.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
Expand All @@ -55,34 +58,34 @@ require (

require (
cloud.google.com/go/bigtable v1.36.0
cloud.google.com/go/compute/metadata v0.6.0 // indirect
cloud.google.com/go/compute/metadata v0.7.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cncf/xds/go v0.0.0-20250326154945-ae57f3c0d45f // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/s2a-go v0.1.9 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
github.com/googleapis/gax-go/v2 v2.14.1 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.36.0 // indirect
golang.org/x/crypto v0.40.0 // indirect
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63
golang.org/x/net v0.38.0 // indirect
golang.org/x/oauth2 v0.28.0 // indirect
golang.org/x/sync v0.12.0 // indirect
golang.org/x/sys v0.31.0
golang.org/x/text v0.23.0 // indirect
golang.org/x/net v0.42.0 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sync v0.16.0 // indirect
golang.org/x/sys v0.34.0
golang.org/x/text v0.27.0 // indirect
golang.org/x/time v0.11.0 // indirect
google.golang.org/genproto v0.0.0-20250404141209-ee84b53bf3d0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250404141209-ee84b53bf3d0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250404141209-ee84b53bf3d0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250804133106-a7a43d27e69b // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading