Skip to content

Commit 68103f5

Browse files
committed
Replace Rows with QueryResult
1 parent 76c2b9a commit 68103f5

File tree

12 files changed

+80
-28
lines changed

12 files changed

+80
-28
lines changed

app/server/datasource/rdbms/clickhouse/connection_http.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ type connectionHTTP struct {
5454
tableName string
5555
}
5656

57-
func (c *connectionHTTP) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) {
57+
func (c *connectionHTTP) Query(params *rdbms_utils.QueryParams) (*rdbms_utils.QueryResult, error) {
5858
c.queryLogger.Dump(params.QueryText, params.QueryArgs.Values()...)
5959

6060
out, err := c.DB.QueryContext(params.Ctx, params.QueryText, rewriteQueryArgs(params.QueryArgs.Values())...)
@@ -72,7 +72,10 @@ func (c *connectionHTTP) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Row
7272
return nil, fmt.Errorf("rows err: %w", err)
7373
}
7474

75-
return &rows{Rows: out}, nil
75+
rows := &rows{Rows: out}
76+
return &rdbms_utils.QueryResult{
77+
Rows: rows,
78+
}, nil
7679
}
7780

7881
func (c *connectionHTTP) DataSourceInstance() *api_common.TGenericDataSourceInstance {

app/server/datasource/rdbms/clickhouse/connection_native.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ type connectionNative struct {
5454
tableName string
5555
}
5656

57-
func (c *connectionNative) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) {
57+
func (c *connectionNative) Query(params *rdbms_utils.QueryParams) (*rdbms_utils.QueryResult, error) {
5858
c.queryLogger.Dump(params.QueryText, params.QueryArgs.Values()...)
5959

6060
out, err := c.Conn.Query(params.Ctx, params.QueryText, rewriteQueryArgs(params.QueryArgs.Values())...)
@@ -72,7 +72,10 @@ func (c *connectionNative) Query(params *rdbms_utils.QueryParams) (rdbms_utils.R
7272
return nil, fmt.Errorf("rows err: %w", err)
7373
}
7474

75-
return &rowsNative{Rows: out}, nil
75+
rows := &rowsNative{Rows: out}
76+
return &rdbms_utils.QueryResult{
77+
Rows: rows,
78+
}, nil
7679
}
7780

7881
func (c *connectionNative) DataSourceInstance() *api_common.TGenericDataSourceInstance {

app/server/datasource/rdbms/datasource.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,15 +226,15 @@ func (ds *dataSourceImpl) doReadSplitSingleConn(
226226
sink paging.Sink[any],
227227
conn rdbms_utils.Connection,
228228
) (int64, error) {
229-
var rows rdbms_utils.Rows
229+
var queryResult *rdbms_utils.QueryResult
230230

231231
err := ds.retrierSet.Query.Run(
232232
ctx,
233233
logger,
234234
func() error {
235235
var queryErr error
236236

237-
if rows, queryErr = conn.Query(&query.QueryParams); queryErr != nil {
237+
if queryResult, queryErr = conn.Query(&query.QueryParams); queryErr != nil {
238238
return fmt.Errorf("query error: %w", queryErr)
239239
}
240240

@@ -246,8 +246,9 @@ func (ds *dataSourceImpl) doReadSplitSingleConn(
246246
return 0, fmt.Errorf("query: %w", err)
247247
}
248248

249-
defer common.LogCloserError(logger, rows, "close rows")
249+
defer common.LogCloserError(logger, queryResult, "close query result")
250250

251+
rows := queryResult.Rows
251252
transformer, err := rows.MakeTransformer(query.YdbColumns, ds.converterCollection)
252253
if err != nil {
253254
return 0, fmt.Errorf("make transformer: %w", err)

app/server/datasource/rdbms/ms_sql_server/connection.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,17 @@ func (c *Connection) TableName() string {
3232
return c.tableName
3333
}
3434

35-
func (c *Connection) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) {
35+
func (c *Connection) Query(params *rdbms_utils.QueryParams) (*rdbms_utils.QueryResult, error) {
3636
c.queryLogger.Dump(params.QueryText, params.QueryArgs.Values()...)
3737

3838
out, err := c.db.QueryContext(params.Ctx, params.QueryText, params.QueryArgs.Values()...)
39+
if err != nil {
40+
return nil, err
41+
}
3942

40-
return rows{out}, err
43+
return &rdbms_utils.QueryResult{
44+
Rows: rows{out},
45+
}, nil
4146
}
4247

4348
func (c *Connection) Logger() *zap.Logger {

app/server/datasource/rdbms/mysql/connection.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func transformArgs(src *rdbms_utils.QueryArgs) []any {
4242
return dst
4343
}
4444

45-
func (c *connection) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) {
45+
func (c *connection) Query(params *rdbms_utils.QueryParams) (*rdbms_utils.QueryResult, error) {
4646
c.queryLogger.Dump(params.QueryText, params.QueryArgs.Values()...)
4747

4848
results := make(chan rowData, c.cfg.ResultChanCapacity)
@@ -62,7 +62,7 @@ func (c *connection) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, e
6262

6363
stmt, err := c.conn.Prepare(params.QueryText)
6464
if err != nil {
65-
return r, fmt.Errorf("mysql: failed to prepare query: %w", err)
65+
return &rdbms_utils.QueryResult{Rows: r}, fmt.Errorf("mysql: failed to prepare query: %w", err)
6666
}
6767

6868
go func() {
@@ -105,7 +105,9 @@ func (c *connection) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, e
105105
)
106106
}()
107107

108-
return r, nil
108+
return &rdbms_utils.QueryResult{
109+
Rows: r,
110+
}, nil
109111
}
110112

111113
func (c *connection) Logger() *zap.Logger {

app/server/datasource/rdbms/oracle/connection.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func (c *connection) Close() error {
2525
return c.conn.Close()
2626
}
2727

28-
func (c *connection) Query(queryParams *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) {
28+
func (c *connection) Query(queryParams *rdbms_utils.QueryParams) (*rdbms_utils.QueryResult, error) {
2929
c.queryLogger.Dump(queryParams.QueryText, queryParams.QueryArgs.Values()...)
3030

3131
valueArgs := make([]driver.NamedValue, queryParams.QueryArgs.Count())
@@ -46,7 +46,9 @@ func (c *connection) Query(queryParams *rdbms_utils.QueryParams) (rdbms_utils.Ro
4646

4747
rows := newRows(out)
4848

49-
return rows, nil
49+
return &rdbms_utils.QueryResult{
50+
Rows: rows,
51+
}, nil
5052
}
5153

5254
func (c *connection) DataSourceInstance() *api_common.TGenericDataSourceInstance {

app/server/datasource/rdbms/postgresql/connection_manager.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,17 @@ func (c *connection) Close() error {
5757
return c.Conn.Close(context.TODO())
5858
}
5959

60-
func (c *connection) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) {
60+
func (c *connection) Query(params *rdbms_utils.QueryParams) (*rdbms_utils.QueryResult, error) {
6161
c.queryLogger.Dump(params.QueryText, params.QueryArgs.Values()...)
6262

6363
out, err := c.Conn.Query(params.Ctx, params.QueryText, params.QueryArgs.Values()...)
6464
if err != nil {
6565
return nil, fmt.Errorf("query error: %w", err)
6666
}
6767

68-
return rows{Rows: out}, nil
68+
return &rdbms_utils.QueryResult{
69+
Rows: rows{Rows: out},
70+
}, nil
6971
}
7072

7173
func (c *connection) DataSourceInstance() *api_common.TGenericDataSourceInstance {

app/server/datasource/rdbms/utils/interface.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,28 @@ type QueryResult struct {
4949
Columns Columns
5050
}
5151

52+
// Close implements io.Closer interface
53+
func (qr *QueryResult) Close() error {
54+
var rowsErr, columnsErr error
55+
56+
if qr.Rows != nil {
57+
rowsErr = qr.Rows.Close()
58+
}
59+
60+
if qr.Columns != nil {
61+
columnsErr = qr.Columns.Close()
62+
}
63+
64+
if rowsErr != nil {
65+
return rowsErr
66+
}
67+
68+
return columnsErr
69+
}
70+
5271
type Connection interface {
5372
// Query runs a query on a specific connection.
54-
Query(params *QueryParams) (Rows, error)
73+
Query(params *QueryParams) (*QueryResult, error)
5574
// DataSourceInstance comprehensively describing the target of the connection
5675
DataSourceInstance() *api_common.TGenericDataSourceInstance
5776
// The name of a table that will be read via this connection.

app/server/datasource/rdbms/utils/mock.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,19 @@ type ConnectionMock struct {
2121
mock.Mock
2222
}
2323

24-
func (m *ConnectionMock) Query(params *QueryParams) (Rows, error) {
24+
func (m *ConnectionMock) Query(params *QueryParams) (*QueryResult, error) {
2525
called := []any{params.QueryText}
2626
called = append(called, params.QueryArgs.Values()...)
2727
args := m.Called(called...)
2828

29-
return args.Get(0).(Rows), args.Error(1)
29+
rows := args.Get(0)
30+
if rows == nil {
31+
return nil, args.Error(1)
32+
}
33+
34+
return &QueryResult{
35+
Rows: rows.(Rows),
36+
}, args.Error(1)
3037
}
3138

3239
func (m *ConnectionMock) Close() error {

app/server/datasource/rdbms/utils/schema_provider.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ func (f *defaultSchemaProvider) GetSchema(
3333
QueryArgs: args,
3434
}
3535

36-
rows, err := conn.Query(queryParams)
36+
queryResult, err := conn.Query(queryParams)
3737
if err != nil {
3838
return nil, fmt.Errorf("query builder error: %w", err)
3939
}
4040

41-
defer func() { common.LogCloserError(logger, rows, "close rows") }()
41+
defer func() { common.LogCloserError(logger, queryResult, "close query result") }()
4242

4343
var (
4444
columnName *string
@@ -47,6 +47,7 @@ func (f *defaultSchemaProvider) GetSchema(
4747

4848
sb := NewSchemaBuilder(f.typeMapper, request.TypeMappingSettings)
4949

50+
rows := queryResult.Rows
5051
for rows.Next() {
5152
if err = rows.Scan(&columnName, &typeName); err != nil {
5253
return nil, fmt.Errorf("rows scan: %w", err)

0 commit comments

Comments
 (0)