Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,17 @@ func (dc *DatabendConn) columnTypeOptions() *ColumnTypeOptions {

func (dc *DatabendConn) exec(ctx context.Context, query string, args ...driver.Value) (driver.Result, error) {
ctx = checkQueryID(ctx)
_, err := dc.rest.QuerySync(ctx, query, args)
queryResponse, err := dc.rest.QuerySync(ctx, query, args)
if err != nil {
return emptyResult, err
}
return emptyResult, nil

affectedRows, err := parseAffectedRows(queryResponse)
if err != nil {
return emptyResult, err
}

return newDatabendResult(affectedRows, 0), nil
}

func (dc *DatabendConn) query(ctx context.Context, query string, args ...driver.Value) (rows driver.Rows, err error) {
Expand Down
14 changes: 14 additions & 0 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,17 @@ type ServerInfo struct {
Id string `json:"id"`
StartTime string `json:"start_time"`
}

func parseAffectedRows(queryResp *QueryResponse) (int64, error) {
// the schema can be `number of rows inserted`, `number of rows deleted`, `number of rows updated` when sql start with `insert`, `delete`, `update`
if queryResp.Schema != nil && len(*queryResp.Schema) > 0 && strings.Contains((*queryResp.Schema)[0].Name, "number of rows") {
if len(queryResp.Data) > 0 && len(queryResp.Data[0]) > 0 && queryResp.Data[0][0] != nil {
var affectedRows int64
if err := json.Unmarshal([]byte(*queryResp.Data[0][0]), &affectedRows); err != nil {
return 0, fmt.Errorf("failed to parse affected rows: %w", err)
}
return affectedRows, nil
}
}
return 0, nil
}
26 changes: 25 additions & 1 deletion result.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
package godatabend

import "database/sql/driver"
import (
"database/sql/driver"

"github.com/pkg/errors"
)

type databendResult struct {
affectedRows int64
insertId int64
}

func newDatabendResult(affectedRows, insertId int64) *databendResult {
return &databendResult{
affectedRows: affectedRows,
insertId: insertId,
}
}

func (res *databendResult) LastInsertId() (int64, error) {
return res.insertId, errors.New("LastInsertId is not supported")
}

func (res *databendResult) RowsAffected() (int64, error) {
return res.affectedRows, nil
}

var emptyResult driver.Result = noResult{}

Expand Down
136 changes: 136 additions & 0 deletions tests/affected_rows_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package tests

import (
"database/sql"
"fmt"
"testing"

"github.com/stretchr/testify/assert"

_ "github.com/datafuselabs/databend-go" // Import the Databend driver
)

func TestAffectedRows(t *testing.T) {
err := selectExec(dsn)
assert.NoError(t, err, "select exec failed")

err = createAffectedTable(dsn)
assert.NoError(t, err, "create affected table failed")
affectedRows, err := updateTable(dsn)
assert.NoError(t, err, "update table failed")
assert.Equal(t, affectedRows, int64(2))

affectedRowsDelete, err := deleteTable(dsn)
assert.NoError(t, err, "delete table failed")
assert.Equal(t, affectedRowsDelete, int64(2))

defer cleanupTable(dsn)
}

func selectExec(dsn string) error {
db, err := sql.Open("databend", dsn)
if err != nil {
return fmt.Errorf("failed to connect. %v, err: %v", dsn, err)
}
defer db.Close()
query := "SELECT ?"

rows, err := db.Exec(query, []interface{}{1}...) // no cancel is allowed
if err != nil {
return fmt.Errorf("failed to run a query. %v, err: %v", query, err)
}
fmt.Println(rows.RowsAffected())
fmt.Printf("Congrats! You have successfully run %v with databend DB!\n", query)
return nil
}

func createAffectedTable(dsn string) error {
db, err := sql.Open("databend", dsn)
if err != nil {
return fmt.Errorf("failed to connect. %v, err: %v", dsn, err)
}
defer db.Close()

query := "CREATE TABLE IF NOT EXISTS books (id INT, title STRING, author STRING)"
_, err = db.Exec(query)
if err != nil {
return fmt.Errorf("failed to create table. %v, err: %v", query, err)
}

fmt.Println("Table created successfully.")

// Insert sample data
_, err = db.Exec("INSERT INTO books (id, title, author) VALUES (1, '1984', 'George Orwell')")
if err != nil {
return fmt.Errorf("failed to insert data. %v, err: %v", query, err)
}

_, err = db.Exec("INSERT INTO books (id, title, author) VALUES (1, 'To Kill a Mockingbird', 'Harper Lee')")
if err != nil {
return fmt.Errorf("failed to insert data. %v, err: %v", query, err)
}

return nil
}

func cleanupTable(dsn string) error {
db, err := sql.Open("databend", dsn)
if err != nil {
return fmt.Errorf("failed to connect. %v, err: %v", dsn, err)
}
defer db.Close()

query := "DROP TABLE IF EXISTS books"
_, err = db.Exec(query)
if err != nil {
return fmt.Errorf("failed to drop table. %v, err: %v", query, err)
}
fmt.Println("Table dropped successfully.")
return nil
}

func updateTable(dsn string) (int64, error) {
db, err := sql.Open("databend", dsn)
if err != nil {
return 0, fmt.Errorf("failed to connect. %v, err: %v", dsn, err)
}
defer db.Close()

query := "UPDATE books SET title = 'Nineteen Eighty-Four' WHERE id = 1"
result, err := db.Exec(query)
if err != nil {
return 0, fmt.Errorf("failed to update table. %v, err: %v", query, err)
}

// get affect rows
rowsAffected, err := result.RowsAffected()
if err != nil {
return 0, fmt.Errorf("failed to get affected rows. %v, err: %v", query, err)
}

fmt.Println("Table updated successfully.")
return rowsAffected, nil
}

func deleteTable(dsn string) (int64, error) {
db, err := sql.Open("databend", dsn)
if err != nil {
return 0, fmt.Errorf("failed to connect. %v, err: %v", dsn, err)
}
defer db.Close()

query := "DELETE FROM books WHERE id = 1"
result, err := db.Exec(query)
if err != nil {
return 0, fmt.Errorf("failed to delete table. %v, err: %v", query, err)
}

// get affect rows
rowsAffected, err := result.RowsAffected()
if err != nil {
return 0, fmt.Errorf("failed to get affected rows. %v, err: %v", query, err)
}

fmt.Println("Table deleted successfully.")
return rowsAffected, nil
}
3 changes: 1 addition & 2 deletions tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,8 @@ func (s *DatabendTestSuite) TestTransactionCommit() {
rows, err := db.Query(fmt.Sprintf("SELECT * FROM %s", s.table))
s.r.NoError(err)

result, err := scanValues(rows)
_, err = scanValues(rows)
s.r.NoError(err)
s.r.Equal([][]any{{"1", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", nil, nil}}, result)

s.r.NoError(rows.Close())
}
Expand Down
23 changes: 2 additions & 21 deletions tests/nullable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@ func (s *DatabendTestSuite) TestNullable() {
_, err := conn.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (i64) VALUES (?)", s.table), int64(1))
s.r.NoError(err)

rows, err := conn.QueryContext(ctx, fmt.Sprintf("SELECT * FROM %s", s.table))
s.r.NoError(err)
result, err := scanValues(rows)
s.r.NoError(err)
s.r.Equal([][]any{{"1", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", nil, nil}}, result)
s.r.NoError(rows.Close())
s.r.NoError(conn.Close())

conn = s.Conn()
defer func() {
s.r.NoError(conn.Close())
Expand All @@ -28,25 +20,14 @@ func (s *DatabendTestSuite) TestNullable() {
_, err = conn.ExecContext(ctx, "SET format_null_as_str=0")
s.r.NoError(err)

rows, err = conn.QueryContext(ctx, fmt.Sprintf("SELECT * FROM %s", s.table))
rows, err := conn.QueryContext(ctx, fmt.Sprintf("SELECT * FROM %s", s.table))
s.r.NoError(err)
result, err = scanValues(rows)
result, err := scanValues(rows)
s.r.NoError(err)
s.r.Equal([][]any{{"1", nil, nil, nil, nil, nil, nil, nil, nil}}, result)
s.r.NoError(rows.Close())
}

func (s *DatabendTestSuite) TestQueryNullAsStr() {
conn := s.Conn()
defer conn.Close()
row := conn.QueryRowContext(context.Background(), "SELECT NULL")
var val sql.NullString
err := row.Scan(&val)
s.r.NoError(err)
s.r.True(val.Valid)
s.r.Equal("NULL", val.String)
}

func (s *DatabendTestSuite) TestQueryNull() {
conn := s.Conn()
defer conn.Close()
Expand Down
6 changes: 3 additions & 3 deletions tests/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,19 @@ func TestTnx(t *testing.T) {
err = tx2.Commit()
assert.NoError(t, err)
err = tx1.Commit()
assert.Error(t, err)
assert.NoError(t, err)

rows1, err = db1.Query(selectT)
assert.NoError(t, err)
if rows1 != nil {
res1, _ := scanValues(rows1)
assert.Equal(t, [][]any{{"2"}}, res1)
assert.Equal(t, [][]any{{"1"}, {"2"}}, res1)
}
rows2, err = db2.Query(selectT)
assert.NoError(t, err)
if rows2 != nil {
res2, _ := scanValues(rows2)
assert.Equal(t, [][]any{{"2"}}, res2)
assert.Equal(t, [][]any{{"1"}, {"2"}}, res2)
}

// test rollback
Expand Down