Skip to content

Commit 15cd232

Browse files
Close optimization for Rows
When a query returns direct results it is possible that all the query result rows are contained in the direct response. In this case the server can automatically close the operation. This is indicated by a non-null CloseOperation member in the TSparkDirectResults structure. In this case calls to Rows.Close don't require any interaction with the server as the operation is already closed. - added a NewRows function and updated connection to use it when creating Rows - added logic in NewRows to check for an already closed operation - updated Rows.Close so we don't make a server round trip when not necessary Signed-off-by: Raymond Cypher <[email protected]>
1 parent 5883857 commit 15cd232

File tree

3 files changed

+72
-26
lines changed

3 files changed

+72
-26
lines changed

connection.go

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -140,22 +140,9 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
140140
// hold on to the operation handle
141141
opHandle := exStmtResp.OperationHandle
142142

143-
rows := rows{
144-
connId: c.id,
145-
correlationId: corrId,
146-
client: c.client,
147-
opHandle: opHandle,
148-
pageSize: int64(c.cfg.MaxRows),
149-
location: c.cfg.Location,
150-
}
151-
152-
if exStmtResp.DirectResults != nil {
153-
// return results
154-
rows.fetchResults = exStmtResp.DirectResults.ResultSet
155-
rows.fetchResultsMetadata = exStmtResp.DirectResults.ResultSetMetadata
143+
rows := NewRows(c.id, corrId, c.client, opHandle, int64(c.cfg.MaxRows), c.cfg.Location, exStmtResp.DirectResults)
156144

157-
}
158-
return &rows, nil
145+
return rows, nil
159146

160147
}
161148

rows.go

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type rows struct {
2828
fetchResultsMetadata *cli_service.TGetResultSetMetadataResp
2929
nextRowIndex int64
3030
nextRowNumber int64
31+
closed bool
3132
}
3233

3334
var _ driver.Rows = (*rows)(nil)
@@ -41,6 +42,27 @@ var errRowsNoSchemaAvailable = "no schema in result set metadata response"
4142
var errRowsNoClient = "instance of Rows missing client"
4243
var errRowsNilRows = "nil Rows instance"
4344

45+
func NewRows(connID string, corrId string, client cli_service.TCLIService, opHandle *cli_service.TOperationHandle, pageSize int64, location *time.Location, directResults *cli_service.TSparkDirectResults) driver.Rows {
46+
r := &rows{
47+
connId: connID,
48+
correlationId: corrId,
49+
client: client,
50+
opHandle: opHandle,
51+
pageSize: pageSize,
52+
location: location,
53+
}
54+
55+
if directResults != nil {
56+
r.fetchResults = directResults.ResultSet
57+
r.fetchResultsMetadata = directResults.ResultSetMetadata
58+
if directResults.CloseOperation != nil {
59+
r.closed = true
60+
}
61+
}
62+
63+
return r
64+
}
65+
4466
// Columns returns the names of the columns. The number of
4567
// columns of the result is inferred from the length of the
4668
// slice. If a particular column name isn't known, an empty
@@ -72,20 +94,23 @@ func (r *rows) Columns() []string {
7294

7395
// Close closes the rows iterator.
7496
func (r *rows) Close() error {
75-
err := isValidRows(r)
76-
if err != nil {
77-
return err
78-
}
97+
if !r.closed {
98+
err := isValidRows(r)
99+
if err != nil {
100+
return err
101+
}
79102

80-
req := cli_service.TCloseOperationReq{
81-
OperationHandle: r.opHandle,
82-
}
83-
ctx := driverctx.NewContextWithCorrelationId(driverctx.NewContextWithConnId(context.Background(), r.connId), r.correlationId)
103+
req := cli_service.TCloseOperationReq{
104+
OperationHandle: r.opHandle,
105+
}
106+
ctx := driverctx.NewContextWithCorrelationId(driverctx.NewContextWithConnId(context.Background(), r.connId), r.correlationId)
84107

85-
_, err1 := r.client.CloseOperation(ctx, &req)
86-
if err1 != nil {
87-
return err1
108+
_, err1 := r.client.CloseOperation(ctx, &req)
109+
if err1 != nil {
110+
return err1
111+
}
88112
}
113+
89114
return nil
90115
}
91116

rows_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -799,6 +799,40 @@ func TestColumnTypeDatabaseTypeName(t *testing.T) {
799799
assert.Equal(t, expectedScanTypes, scanTypes)
800800
}
801801

802+
func TestRowsCloseOptimization(t *testing.T) {
803+
t.Parallel()
804+
805+
var closeCount int
806+
client := &client.TestClient{
807+
FnCloseOperation: func(ctx context.Context, req *cli_service.TCloseOperationReq) (_r *cli_service.TCloseOperationResp, _err error) {
808+
closeCount++
809+
return nil, nil
810+
},
811+
}
812+
813+
rowSet := NewRows("", "", client, &cli_service.TOperationHandle{}, 1, nil, nil)
814+
815+
// rowSet has no direct results calling Close should result in call to client to close operation
816+
err := rowSet.Close()
817+
assert.Nil(t, err, "rows.Close should not throw an error")
818+
assert.Equal(t, 1, closeCount)
819+
820+
// rowSet has direct results, but operation was not closed so it should call client to close operation
821+
closeCount = 0
822+
rowSet = NewRows("", "", client, &cli_service.TOperationHandle{}, 1, nil, &cli_service.TSparkDirectResults{})
823+
err = rowSet.Close()
824+
assert.Nil(t, err, "rows.Close should not throw an error")
825+
assert.Equal(t, 1, closeCount)
826+
827+
// rowSet has direct results which include a close operation response. rowSet should be marked as closed
828+
// and calling Close should not call into the client.
829+
closeCount = 0
830+
rowSet = NewRows("", "", client, &cli_service.TOperationHandle{}, 1, nil, &cli_service.TSparkDirectResults{CloseOperation: &cli_service.TCloseOperationResp{}})
831+
err = rowSet.Close()
832+
assert.Nil(t, err, "rows.Close should not throw an error")
833+
assert.Equal(t, 0, closeCount)
834+
}
835+
802836
type rowTestPagingResult struct {
803837
getMetadataCount int
804838
fetchResultsCount int

0 commit comments

Comments
 (0)