Skip to content

Commit 2ecc5b9

Browse files
committed
Create writer and batch
1 parent eca1644 commit 2ecc5b9

File tree

9 files changed

+70
-182
lines changed

9 files changed

+70
-182
lines changed

batch.go

Lines changed: 3 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package cassandra
33
import (
44
"fmt"
55
"reflect"
6-
"strings"
76
)
87

98
func InterfaceSlice(slice interface{}) ([]interface{}, error) {
@@ -24,13 +23,13 @@ func ToArrayIndex(value reflect.Value, indices []int) []int {
2423
}
2524
return indices
2625
}
27-
func BuildToInsertBatch(table string, models interface{}, options...*Schema) ([]Statement, error) {
26+
func BuildToInsertBatch(table string, models interface{}, options ...*Schema) ([]Statement, error) {
2827
return BuildToInsertBatchWithVersion(table, models, -1, false, options...)
2928
}
30-
func BuildToInsertOrUpdateBatch(table string, models interface{}, orUpdate bool, options...*Schema) ([]Statement, error) {
29+
func BuildToInsertOrUpdateBatch(table string, models interface{}, orUpdate bool, options ...*Schema) ([]Statement, error) {
3130
return BuildToInsertBatchWithVersion(table, models, -1, orUpdate, options...)
3231
}
33-
func BuildToInsertBatchWithVersion(table string, models interface{}, versionIndex int, orUpdate bool, options...*Schema) ([]Statement, error) {
32+
func BuildToInsertBatchWithVersion(table string, models interface{}, versionIndex int, orUpdate bool, options ...*Schema) ([]Statement, error) {
3433
s := reflect.Indirect(reflect.ValueOf(models))
3534
if s.Kind() != reflect.Slice {
3635
return nil, fmt.Errorf("models is not a slice")
@@ -87,74 +86,3 @@ func BuildToUpdateBatchWithVersion(table string, models interface{}, versionInde
8786
}
8887
return stmts, nil
8988
}
90-
func BuildToSaveBatch(table string, models interface{}, options ...*Schema) (string, []interface{}, error) {
91-
s := reflect.Indirect(reflect.ValueOf(models))
92-
if s.Kind() != reflect.Slice {
93-
return "", nil, fmt.Errorf("models must be a slice")
94-
}
95-
slen := s.Len()
96-
if slen <= 0 {
97-
return "", nil, nil
98-
}
99-
buildParam := BuildParam
100-
var cols []*FieldDB
101-
// var schema map[string]FieldDB
102-
if len(options) > 0 && options[0] != nil {
103-
cols = options[0].Columns
104-
// schema = options[0].Fields
105-
} else {
106-
first := s.Index(0).Interface()
107-
modelType := reflect.TypeOf(first)
108-
m := CreateSchema(modelType)
109-
cols = m.Columns
110-
}
111-
placeholders := make([]string, 0)
112-
args := make([]interface{}, 0)
113-
i := 1
114-
icols := make([]string, 0)
115-
for _, fdb := range cols {
116-
if fdb.Insert {
117-
icols = append(icols, fdb.Column)
118-
}
119-
}
120-
for j := 0; j < slen; j++ {
121-
model := s.Index(j).Interface()
122-
mv := reflect.ValueOf(model)
123-
values := make([]string, 0)
124-
for _, fdb := range cols {
125-
if fdb.Insert {
126-
f := mv.Field(fdb.Index)
127-
fieldValue := f.Interface()
128-
isNil := false
129-
if f.Kind() == reflect.Ptr {
130-
if reflect.ValueOf(fieldValue).IsNil() {
131-
isNil = true
132-
} else {
133-
fieldValue = reflect.Indirect(reflect.ValueOf(fieldValue)).Interface()
134-
}
135-
}
136-
if isNil {
137-
icols = append(icols, fdb.Column)
138-
values = append(values, "null")
139-
} else {
140-
v, ok := GetDBValue(fieldValue, fdb.Scale)
141-
if ok {
142-
values = append(values, v)
143-
} else {
144-
values = append(values, buildParam(i))
145-
i = i + 1
146-
args = append(args, fieldValue)
147-
}
148-
}
149-
}
150-
}
151-
x := "(" + strings.Join(values, ",") + ")"
152-
placeholders = append(placeholders, x)
153-
}
154-
query := fmt.Sprintf(fmt.Sprintf("insert into %s (%s) values %s",
155-
table,
156-
strings.Join(icols, ","),
157-
strings.Join(placeholders, ","),
158-
))
159-
return query, args, nil
160-
}
Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,34 @@
1-
package cassandra
1+
package batch
22

33
import (
44
"context"
5-
"github.com/gocql/gocql"
65
"reflect"
6+
7+
c "github.com/core-go/cassandra"
8+
"github.com/gocql/gocql"
79
)
810

911
type BatchInserter struct {
1012
db *gocql.ClusterConfig
1113
tableName string
1214
Map func(ctx context.Context, model interface{}) (interface{}, error)
1315
VersionIndex int
14-
Schema *Schema
16+
Schema *c.Schema
1517
}
18+
1619
func NewBatchInserter(db *gocql.ClusterConfig, tableName string, modelType reflect.Type, options ...func(context.Context, interface{}) (interface{}, error)) *BatchInserter {
1720
var mp func(context.Context, interface{}) (interface{}, error)
1821
if len(options) > 0 && options[0] != nil {
1922
mp = options[0]
2023
}
2124
return NewBatchInserterWithVersion(db, tableName, modelType, mp)
2225
}
23-
func NewBatchInserterWithVersion(db *gocql.ClusterConfig, tableName string, modelType reflect.Type, mp func(context.Context, interface{}) (interface{}, error), options...int) *BatchInserter {
26+
func NewBatchInserterWithVersion(db *gocql.ClusterConfig, tableName string, modelType reflect.Type, mp func(context.Context, interface{}) (interface{}, error), options ...int) *BatchInserter {
2427
versionIndex := -1
2528
if len(options) > 0 && options[0] >= 0 {
2629
versionIndex = options[0]
2730
}
28-
schema := CreateSchema(modelType)
31+
schema := c.CreateSchema(modelType)
2932
return &BatchInserter{db: db, tableName: tableName, Schema: schema, VersionIndex: versionIndex, Map: mp}
3033
}
3134
func (w *BatchInserter) Write(ctx context.Context, models interface{}) ([]int, []int, error) {
@@ -34,11 +37,11 @@ func (w *BatchInserter) Write(ctx context.Context, models interface{}) ([]int, [
3437
var models2 interface{}
3538
var er0 error
3639
if w.Map != nil {
37-
models2, er0 = MapModels(ctx, models, w.Map)
40+
models2, er0 = c.MapModels(ctx, models, w.Map)
3841
if er0 != nil {
3942
s0 := reflect.ValueOf(models2)
40-
_, er0b := InterfaceSlice(models2)
41-
failIndices = ToArrayIndex(s0, failIndices)
43+
_, er0b := c.InterfaceSlice(models2)
44+
failIndices = c.ToArrayIndex(s0, failIndices)
4245
return successIndices, failIndices, er0b
4346
}
4447
} else {
@@ -49,15 +52,15 @@ func (w *BatchInserter) Write(ctx context.Context, models interface{}) ([]int, [
4952
return successIndices, failIndices, er0
5053
}
5154
defer session.Close()
52-
_, err := InsertBatchWithVersion(ctx, session, w.tableName, models2, w.VersionIndex, w.Schema)
55+
_, err := c.InsertBatchWithVersion(ctx, session, w.tableName, models2, w.VersionIndex, w.Schema)
5356
s := reflect.ValueOf(models)
5457
if err == nil {
5558
// Return full success
56-
successIndices = ToArrayIndex(s, successIndices)
59+
successIndices = c.ToArrayIndex(s, successIndices)
5760
return successIndices, failIndices, err
5861
} else {
5962
// Return full fail
60-
failIndices = ToArrayIndex(s, failIndices)
63+
failIndices = c.ToArrayIndex(s, failIndices)
6164
}
6265
return successIndices, failIndices, err
6366
}
Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,34 @@
1-
package cassandra
1+
package batch
22

33
import (
44
"context"
5-
"github.com/gocql/gocql"
65
"reflect"
6+
7+
c "github.com/core-go/cassandra"
8+
"github.com/gocql/gocql"
79
)
810

911
type BatchUpdater struct {
1012
db *gocql.ClusterConfig
1113
tableName string
1214
Map func(ctx context.Context, model interface{}) (interface{}, error)
1315
VersionIndex int
14-
Schema *Schema
16+
Schema *c.Schema
1517
}
18+
1619
func NewBatchUpdater(session *gocql.ClusterConfig, tableName string, modelType reflect.Type, options ...func(context.Context, interface{}) (interface{}, error)) *BatchUpdater {
1720
var mp func(context.Context, interface{}) (interface{}, error)
1821
if len(options) > 0 && options[0] != nil {
1922
mp = options[0]
2023
}
2124
return NewBatchUpdaterWithVersion(session, tableName, modelType, mp)
2225
}
23-
func NewBatchUpdaterWithVersion(session *gocql.ClusterConfig, tableName string, modelType reflect.Type, mp func(context.Context, interface{}) (interface{}, error), options...int) *BatchUpdater {
26+
func NewBatchUpdaterWithVersion(session *gocql.ClusterConfig, tableName string, modelType reflect.Type, mp func(context.Context, interface{}) (interface{}, error), options ...int) *BatchUpdater {
2427
versionIndex := -1
2528
if len(options) > 0 && options[0] >= 0 {
2629
versionIndex = options[0]
2730
}
28-
schema := CreateSchema(modelType)
31+
schema := c.CreateSchema(modelType)
2932
return &BatchUpdater{db: session, tableName: tableName, Schema: schema, VersionIndex: versionIndex, Map: mp}
3033
}
3134
func (w *BatchUpdater) Write(ctx context.Context, models interface{}) ([]int, []int, error) {
@@ -34,11 +37,11 @@ func (w *BatchUpdater) Write(ctx context.Context, models interface{}) ([]int, []
3437
var models2 interface{}
3538
var er0 error
3639
if w.Map != nil {
37-
models2, er0 = MapModels(ctx, models, w.Map)
40+
models2, er0 = c.MapModels(ctx, models, w.Map)
3841
if er0 != nil {
3942
s0 := reflect.ValueOf(models2)
40-
_, er0b := InterfaceSlice(models2)
41-
failIndices = ToArrayIndex(s0, failIndices)
43+
_, er0b := c.InterfaceSlice(models2)
44+
failIndices = c.ToArrayIndex(s0, failIndices)
4245
return successIndices, failIndices, er0b
4346
}
4447
} else {
@@ -49,15 +52,15 @@ func (w *BatchUpdater) Write(ctx context.Context, models interface{}) ([]int, []
4952
return successIndices, failIndices, er0
5053
}
5154
defer session.Close()
52-
_, err := UpdateBatchWithVersion(ctx, session, w.tableName, models2, w.VersionIndex, w.Schema)
55+
_, err := c.UpdateBatchWithVersion(ctx, session, w.tableName, models2, w.VersionIndex, w.Schema)
5356
s := reflect.ValueOf(models)
5457
if err == nil {
5558
// Return full success
56-
successIndices = ToArrayIndex(s, successIndices)
59+
successIndices = c.ToArrayIndex(s, successIndices)
5760
return successIndices, failIndices, err
5861
} else {
5962
// Return full fail
60-
failIndices = ToArrayIndex(s, failIndices)
63+
failIndices = c.ToArrayIndex(s, failIndices)
6164
}
6265
return successIndices, failIndices, err
6366
}
Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,34 @@
1-
package cassandra
1+
package batch
22

33
import (
44
"context"
5-
"github.com/gocql/gocql"
65
"reflect"
6+
7+
c "github.com/core-go/cassandra"
8+
"github.com/gocql/gocql"
79
)
810

911
type BatchWriter struct {
1012
db *gocql.ClusterConfig
1113
tableName string
1214
Map func(ctx context.Context, model interface{}) (interface{}, error)
1315
VersionIndex int
14-
Schema *Schema
16+
Schema *c.Schema
1517
}
18+
1619
func NewBatchWriter(session *gocql.ClusterConfig, tableName string, modelType reflect.Type, options ...func(context.Context, interface{}) (interface{}, error)) *BatchWriter {
1720
var mp func(context.Context, interface{}) (interface{}, error)
1821
if len(options) > 0 && options[0] != nil {
1922
mp = options[0]
2023
}
2124
return NewBatchWriterWithVersion(session, tableName, modelType, mp)
2225
}
23-
func NewBatchWriterWithVersion(session *gocql.ClusterConfig, tableName string, modelType reflect.Type, mp func(context.Context, interface{}) (interface{}, error), options...int) *BatchWriter {
26+
func NewBatchWriterWithVersion(session *gocql.ClusterConfig, tableName string, modelType reflect.Type, mp func(context.Context, interface{}) (interface{}, error), options ...int) *BatchWriter {
2427
versionIndex := -1
2528
if len(options) > 0 && options[0] >= 0 {
2629
versionIndex = options[0]
2730
}
28-
schema := CreateSchema(modelType)
31+
schema := c.CreateSchema(modelType)
2932
return &BatchWriter{db: session, tableName: tableName, Schema: schema, VersionIndex: versionIndex, Map: mp}
3033
}
3134
func (w *BatchWriter) Write(ctx context.Context, models interface{}) ([]int, []int, error) {
@@ -34,11 +37,11 @@ func (w *BatchWriter) Write(ctx context.Context, models interface{}) ([]int, []i
3437
var models2 interface{}
3538
var er0 error
3639
if w.Map != nil {
37-
models2, er0 = MapModels(ctx, models, w.Map)
40+
models2, er0 = c.MapModels(ctx, models, w.Map)
3841
if er0 != nil {
3942
s0 := reflect.ValueOf(models2)
40-
_, er0b := InterfaceSlice(models2)
41-
failIndices = ToArrayIndex(s0, failIndices)
43+
_, er0b := c.InterfaceSlice(models2)
44+
failIndices = c.ToArrayIndex(s0, failIndices)
4245
return successIndices, failIndices, er0b
4346
}
4447
} else {
@@ -49,15 +52,15 @@ func (w *BatchWriter) Write(ctx context.Context, models interface{}) ([]int, []i
4952
return successIndices, failIndices, er0
5053
}
5154
defer session.Close()
52-
_, err := SaveBatch(ctx, session, w.tableName, models2, w.Schema)
55+
_, err := c.SaveBatch(ctx, session, w.tableName, models2, w.Schema)
5356
s := reflect.ValueOf(models)
5457
if err == nil {
5558
// Return full success
56-
successIndices = ToArrayIndex(s, successIndices)
59+
successIndices = c.ToArrayIndex(s, successIndices)
5760
return successIndices, failIndices, err
5861
} else {
5962
// Return full fail
60-
failIndices = ToArrayIndex(s, failIndices)
63+
failIndices = c.ToArrayIndex(s, failIndices)
6164
}
6265
return successIndices, failIndices, err
6366
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package cassandra
1+
package health
22

33
import (
44
"context"

stream_writer.go

Lines changed: 0 additions & 55 deletions
This file was deleted.

0 commit comments

Comments
 (0)