Skip to content

Commit 4488f73

Browse files
Close optimization for Rows (#64)
When a query returns direct results it is possible that all the query result rows are contained therein. In this case the server can automatically close the operation. This is indicated by a non-null CloseOperation member in the TSparkDirectResults structure. In this case calls to Rows.Close don't require any interaction with the server as the operation is already closed. - added a NewRows function and updated connection to use it when creating Rows - added logic in NewRows to check for an already closed operation - updated Rows.Close so we don't make a server round trip when not necessary - added a unit test for the close optimization logic
2 parents cc35e61 + 15cd232 commit 4488f73

File tree

3 files changed

+72
-26
lines changed

3 files changed

+72
-26
lines changed

connection.go

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -140,22 +140,9 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
140140
// hold on to the operation handle
141141
opHandle := exStmtResp.OperationHandle
142142

143-
rows := rows{
144-
connId: c.id,
145-
correlationId: corrId,
146-
client: c.client,
147-
opHandle: opHandle,
148-
pageSize: int64(c.cfg.MaxRows),
149-
location: c.cfg.Location,
150-
}
151-
152-
if exStmtResp.DirectResults != nil {
153-
// return results
154-
rows.fetchResults = exStmtResp.DirectResults.ResultSet
155-
rows.fetchResultsMetadata = exStmtResp.DirectResults.ResultSetMetadata
143+
rows := NewRows(c.id, corrId, c.client, opHandle, int64(c.cfg.MaxRows), c.cfg.Location, exStmtResp.DirectResults)
156144

157-
}
158-
return &rows, nil
145+
return rows, nil
159146

160147
}
161148

rows.go

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type rows struct {
2828
fetchResultsMetadata *cli_service.TGetResultSetMetadataResp
2929
nextRowIndex int64
3030
nextRowNumber int64
31+
closed bool
3132
}
3233

3334
var _ driver.Rows = (*rows)(nil)
@@ -41,6 +42,27 @@ var errRowsNoSchemaAvailable = "no schema in result set metadata response"
4142
var errRowsNoClient = "instance of Rows missing client"
4243
var errRowsNilRows = "nil Rows instance"
4344

45+
func NewRows(connID string, corrId string, client cli_service.TCLIService, opHandle *cli_service.TOperationHandle, pageSize int64, location *time.Location, directResults *cli_service.TSparkDirectResults) driver.Rows {
46+
r := &rows{
47+
connId: connID,
48+
correlationId: corrId,
49+
client: client,
50+
opHandle: opHandle,
51+
pageSize: pageSize,
52+
location: location,
53+
}
54+
55+
if directResults != nil {
56+
r.fetchResults = directResults.ResultSet
57+
r.fetchResultsMetadata = directResults.ResultSetMetadata
58+
if directResults.CloseOperation != nil {
59+
r.closed = true
60+
}
61+
}
62+
63+
return r
64+
}
65+
4466
// Columns returns the names of the columns. The number of
4567
// columns of the result is inferred from the length of the
4668
// slice. If a particular column name isn't known, an empty
@@ -72,20 +94,23 @@ func (r *rows) Columns() []string {
7294

7395
// Close closes the rows iterator.
7496
func (r *rows) Close() error {
75-
err := isValidRows(r)
76-
if err != nil {
77-
return err
78-
}
97+
if !r.closed {
98+
err := isValidRows(r)
99+
if err != nil {
100+
return err
101+
}
79102

80-
req := cli_service.TCloseOperationReq{
81-
OperationHandle: r.opHandle,
82-
}
83-
ctx := driverctx.NewContextWithCorrelationId(driverctx.NewContextWithConnId(context.Background(), r.connId), r.correlationId)
103+
req := cli_service.TCloseOperationReq{
104+
OperationHandle: r.opHandle,
105+
}
106+
ctx := driverctx.NewContextWithCorrelationId(driverctx.NewContextWithConnId(context.Background(), r.connId), r.correlationId)
84107

85-
_, err1 := r.client.CloseOperation(ctx, &req)
86-
if err1 != nil {
87-
return err1
108+
_, err1 := r.client.CloseOperation(ctx, &req)
109+
if err1 != nil {
110+
return err1
111+
}
88112
}
113+
89114
return nil
90115
}
91116

rows_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -799,6 +799,40 @@ func TestColumnTypeDatabaseTypeName(t *testing.T) {
799799
assert.Equal(t, expectedScanTypes, scanTypes)
800800
}
801801

802+
func TestRowsCloseOptimization(t *testing.T) {
803+
t.Parallel()
804+
805+
var closeCount int
806+
client := &client.TestClient{
807+
FnCloseOperation: func(ctx context.Context, req *cli_service.TCloseOperationReq) (_r *cli_service.TCloseOperationResp, _err error) {
808+
closeCount++
809+
return nil, nil
810+
},
811+
}
812+
813+
rowSet := NewRows("", "", client, &cli_service.TOperationHandle{}, 1, nil, nil)
814+
815+
// rowSet has no direct results calling Close should result in call to client to close operation
816+
err := rowSet.Close()
817+
assert.Nil(t, err, "rows.Close should not throw an error")
818+
assert.Equal(t, 1, closeCount)
819+
820+
// rowSet has direct results, but operation was not closed so it should call client to close operation
821+
closeCount = 0
822+
rowSet = NewRows("", "", client, &cli_service.TOperationHandle{}, 1, nil, &cli_service.TSparkDirectResults{})
823+
err = rowSet.Close()
824+
assert.Nil(t, err, "rows.Close should not throw an error")
825+
assert.Equal(t, 1, closeCount)
826+
827+
// rowSet has direct results which include a close operation response. rowSet should be marked as closed
828+
// and calling Close should not call into the client.
829+
closeCount = 0
830+
rowSet = NewRows("", "", client, &cli_service.TOperationHandle{}, 1, nil, &cli_service.TSparkDirectResults{CloseOperation: &cli_service.TCloseOperationResp{}})
831+
err = rowSet.Close()
832+
assert.Nil(t, err, "rows.Close should not throw an error")
833+
assert.Equal(t, 0, closeCount)
834+
}
835+
802836
type rowTestPagingResult struct {
803837
getMetadataCount int
804838
fetchResultsCount int

0 commit comments

Comments
 (0)