Skip to content

Commit 1f7f71d

Browse files
Bug fix: The HasNext() function for Go driver arrow record iterator returning EOF for empty result set. (#186)
Bug fix: The HasNext() function for Go driver arrow record iterator returning EOF for empty result set. Essentially, the HasNext() logic was assuming there were records in the result set in the case that the original return from executing the statement didn't specify, or if there were no direct results. Updated arrowRecordIterator.HasNext() to check state of underlying iterators. This handles the case of direct results that contain no records. Updated resultPageIterator.HasNext() to try fetching a result page if necessary to determine if there are more records. Added tests for empty result sets with direct results enabled and disabled. Internal ticket: [ES-975398] [ES-975398]: https://databricks.atlassian.net/browse/ES-975398?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
2 parents 750c8a0 + 5f9163e commit 1f7f71d

File tree

5 files changed

+288
-7
lines changed

5 files changed

+288
-7
lines changed

internal/rows/arrowbased/arrowRecordIterator.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ func (ri *arrowRecordIterator) Next() (arrow.Record, error) {
6161

6262
// Indicate whether there are any more records available
6363
func (ri *arrowRecordIterator) HasNext() bool {
64+
ri.checkFinished()
6465
return !ri.isFinished
6566
}
6667

@@ -83,7 +84,10 @@ func (ri *arrowRecordIterator) Close() {
8384
}
8485

8586
func (ri *arrowRecordIterator) checkFinished() {
86-
finished := !ri.currentBatch.HasNext() && !ri.batchIterator.HasNext() && !ri.resultPageIterator.HasNext()
87+
finished := ri.isFinished ||
88+
((ri.currentBatch == nil || !ri.currentBatch.HasNext()) &&
89+
(ri.batchIterator == nil || !ri.batchIterator.HasNext()) &&
90+
(ri.resultPageIterator == nil || !ri.resultPageIterator.HasNext()))
8791

8892
if finished {
8993
// Reached end of result set so Close

internal/rows/rows_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -920,6 +920,52 @@ func TestGetArrowBatches(t *testing.T) {
920920
assert.Equal(t, fetchResp3.Results.ArrowBatches[1].RowCount, r6.NumRows())
921921
r6.Release()
922922
})
923+
924+
t.Run("with empty result set, no direct results", func(t *testing.T) {
925+
fetchResp1 := cli_service.TFetchResultsResp{}
926+
loadTestData(t, "zeroRows/zeroRowsFetchResult.json", &fetchResp1)
927+
928+
client := getSimpleClient([]cli_service.TFetchResultsResp{fetchResp1})
929+
cfg := config.WithDefaults()
930+
rows, err := NewRows("connId", "corrId", nil, client, cfg, nil)
931+
assert.Nil(t, err)
932+
933+
rows2, ok := rows.(dbsqlrows.Rows)
934+
assert.True(t, ok)
935+
936+
rs, err2 := rows2.GetArrowBatches(context.Background())
937+
assert.Nil(t, err2)
938+
939+
hasNext := rs.HasNext()
940+
assert.False(t, hasNext)
941+
r7, err2 := rs.Next()
942+
assert.Nil(t, r7)
943+
assert.ErrorContains(t, err2, io.EOF.Error())
944+
945+
})
946+
947+
t.Run("with empty result set, direct results", func(t *testing.T) {
948+
executeStatementResp := cli_service.TExecuteStatementResp{}
949+
loadTestData(t, "zeroRows/zeroRowsDirectResults.json", &executeStatementResp)
950+
executeStatementResp.DirectResults.ResultSet.Results.ArrowBatches = []*cli_service.TSparkArrowBatch{}
951+
952+
client := getSimpleClient([]cli_service.TFetchResultsResp{})
953+
cfg := config.WithDefaults()
954+
rows, err := NewRows("connId", "corrId", nil, client, cfg, executeStatementResp.DirectResults)
955+
assert.Nil(t, err)
956+
957+
rows2, ok := rows.(dbsqlrows.Rows)
958+
assert.True(t, ok)
959+
960+
rs, err2 := rows2.GetArrowBatches(context.Background())
961+
assert.Nil(t, err2)
962+
963+
hasNext := rs.HasNext()
964+
assert.False(t, hasNext)
965+
r7, err2 := rs.Next()
966+
assert.Nil(t, r7)
967+
assert.ErrorContains(t, err2, io.EOF.Error())
968+
})
923969
}
924970

925971
type rowTestPagingResult struct {

internal/rows/rowscanner/resultPageIterator.go

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,46 @@ type resultPageIterator struct {
9898
correlationId string
9999

100100
logger *dbsqllog.DBSQLLogger
101+
102+
// In some cases we don't know whether there are any records until we fetch
103+
// the first result page. So our behaviour is to fetch a result page as necessary
104+
// before Next() is called.
105+
nextResultPage *cli_service.TFetchResultsResp
106+
107+
// Hold on to errors so they can be returned by Next()
108+
err error
101109
}
102110

103111
var _ ResultPageIterator = (*resultPageIterator)(nil)
104112

105113
// Returns true if there are more pages in the result set.
106-
func (rpf *resultPageIterator) HasNext() bool { return !rpf.isFinished }
114+
func (rpf *resultPageIterator) HasNext() bool {
115+
if rpf.isFinished && rpf.nextResultPage == nil {
116+
// There are no more pages to load and there isn't an already fetched
117+
// page waiting to retrieved by Next()
118+
rpf.err = io.EOF
119+
return false
120+
}
121+
122+
// If there isn't an already fetched result page try to fetch one now
123+
if rpf.nextResultPage == nil {
124+
nrp, err := rpf.getNextPage()
125+
if err != nil {
126+
rpf.Close()
127+
rpf.isFinished = true
128+
rpf.err = err
129+
return false
130+
}
131+
132+
rpf.err = nil
133+
rpf.nextResultPage = nrp
134+
if !nrp.GetHasMoreRows() {
135+
rpf.Close()
136+
}
137+
}
138+
139+
return rpf.nextResultPage != nil
140+
}
107141

108142
// Returns the next page of the result set. io.EOF will be returned if there are
109143
// no more pages.
@@ -113,7 +147,18 @@ func (rpf *resultPageIterator) Next() (*cli_service.TFetchResultsResp, error) {
113147
return nil, dbsqlerrint.NewDriverError(context.Background(), errRowsNilResultPageFetcher, nil)
114148
}
115149

150+
if !rpf.HasNext() && rpf.nextResultPage == nil {
151+
return nil, rpf.err
152+
}
153+
154+
nrp := rpf.nextResultPage
155+
rpf.nextResultPage = nil
156+
return nrp, rpf.err
157+
}
158+
159+
func (rpf *resultPageIterator) getNextPage() (*cli_service.TFetchResultsResp, error) {
116160
if rpf.isFinished {
161+
// no more result pages to fetch
117162
return nil, io.EOF
118163
}
119164

@@ -168,14 +213,16 @@ func (rpf *resultPageIterator) Close() (err error) {
168213
// need to do that now
169214
if !rpf.closedOnServer {
170215
rpf.closedOnServer = true
216+
if rpf.client != nil {
217+
req := cli_service.TCloseOperationReq{
218+
OperationHandle: rpf.opHandle,
219+
}
171220

172-
req := cli_service.TCloseOperationReq{
173-
OperationHandle: rpf.opHandle,
221+
_, err = rpf.client.CloseOperation(context.Background(), &req)
222+
return err
174223
}
175-
176-
_, err = rpf.client.CloseOperation(context.Background(), &req)
177-
return err
178224
}
225+
179226
return
180227
}
181228

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
{
2+
"status": {
3+
"statusCode": "SUCCESS_STATUS"
4+
},
5+
"operationHandle": {
6+
"operationId": {
7+
"guid": "Ae6w6uAWEI+vWuB+fjGvOA==",
8+
"secret": "M41SnYJyRuuEgstBlGaDnQ==",
9+
"executionVersion": 0
10+
},
11+
"operationType": "EXECUTE_STATEMENT",
12+
"hasResultSet": true
13+
},
14+
"directResults": {
15+
"operationStatus": {
16+
"status": {
17+
"statusCode": "SUCCESS_STATUS"
18+
},
19+
"operationState": "FINISHED_STATE",
20+
"operationStarted": 1705023332453,
21+
"operationCompleted": 1705023332599,
22+
"idempotencyType": "IDEMPOTENT",
23+
"statementTimeout": 172800
24+
},
25+
"resultSetMetadata": {
26+
"status": {
27+
"statusCode": "SUCCESS_STATUS"
28+
},
29+
"schema": {
30+
"columns": [
31+
{
32+
"columnName": "id",
33+
"typeDesc": {
34+
"types": [
35+
{
36+
"primitiveEntry": {
37+
"type": "INT_TYPE"
38+
}
39+
}
40+
]
41+
},
42+
"position": 1,
43+
"comment": ""
44+
},
45+
{
46+
"columnName": "deleted",
47+
"typeDesc": {
48+
"types": [
49+
{
50+
"primitiveEntry": {
51+
"type": "BOOLEAN_TYPE"
52+
}
53+
}
54+
]
55+
},
56+
"position": 2,
57+
"comment": ""
58+
},
59+
{
60+
"columnName": "name",
61+
"typeDesc": {
62+
"types": [
63+
{
64+
"primitiveEntry": {
65+
"type": "STRING_TYPE"
66+
}
67+
}
68+
]
69+
},
70+
"position": 3,
71+
"comment": ""
72+
}
73+
]
74+
},
75+
"resultFormat": "ARROW_BASED_SET",
76+
"lz4Compressed": false,
77+
"arrowSchema": "/////2ACAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAMAAABsAQAArAAAAAQAAACy/v//FAAAAIgAAACIAAAAAAAFAYQAAAACAAAAQAAAAAQAAABo/v//CAAAABQAAAAIAAAAInN0cmluZyIAAAAAFwAAAFNwYXJrOkRhdGFUeXBlOkpzb25UeXBlAKD+//8IAAAAEAAAAAYAAABTVFJJTkcAABYAAABTcGFyazpEYXRhVHlwZTpTcWxOYW1lAAAAAAAAXP///wQAAABuYW1lAAAAAFb///8UAAAAiAAAAIwAAAAAAAYBiAAAAAIAAABAAAAABAAAAAz///8IAAAAFAAAAAkAAAAiYm9vbGVhbiIAAAAXAAAAU3Bhcms6RGF0YVR5cGU6SnNvblR5cGUARP///wgAAAAQAAAABwAAAEJPT0xFQU4AFgAAAFNwYXJrOkRhdGFUeXBlOlNxbE5hbWUAAAAAAAAEAAQABAAAAAcAAABkZWxldGVkAAAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAACMAAAAlAAAAAAAAgGYAAAAAgAAAEgAAAAEAAAAyP///wgAAAAUAAAACQAAACJpbnRlZ2VyIgAAABcAAABTcGFyazpEYXRhVHlwZTpKc29uVHlwZQAIAAwACAAEAAgAAAAIAAAADAAAAAMAAABJTlQAFgAAAFNwYXJrOkRhdGFUeXBlOlNxbE5hbWUAAAAAAAAIAAwACAAHAAgAAAAAAAABIAAAAAIAAABpZAAAAAAAAA==",
78+
"cacheLookupResult": "LOCAL_CACHE_HIT",
79+
"uncompressedBytes": 0,
80+
"compressedBytes": 0,
81+
"isStagingOperation": false,
82+
"reasonForNoCloudFetch": "CLOUD_FETCH_SUPPORT",
83+
"cacheLookupLatency": 4,
84+
"remoteResultCacheEnabled": true,
85+
"isServerless": true,
86+
"truncatedByThriftLimit": false
87+
},
88+
"resultSet": {
89+
"status": {
90+
"statusCode": "SUCCESS_STATUS"
91+
},
92+
"hasMoreRows": false,
93+
"results": {
94+
"startRowOffset": 0,
95+
"rows": []
96+
}
97+
},
98+
"closeOperation": {
99+
"status": {
100+
"statusCode": "SUCCESS_STATUS"
101+
}
102+
}
103+
},
104+
"executionRejected": false,
105+
"maxClusterCapacity": 10,
106+
"queryCost": 0.5,
107+
"currentClusterLoad": 1,
108+
"idempotencyType": "IDEMPOTENT",
109+
"remoteResultCacheEnabled": true,
110+
"isServerless": true
111+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
{
2+
"status": {
3+
"statusCode": "SUCCESS_STATUS"
4+
},
5+
"hasMoreRows": false,
6+
"results": {
7+
"startRowOffset": 0,
8+
"rows": []
9+
},
10+
"resultSetMetadata": {
11+
"status": {
12+
"statusCode": "SUCCESS_STATUS"
13+
},
14+
"schema": {
15+
"columns": [
16+
{
17+
"columnName": "id",
18+
"typeDesc": {
19+
"types": [
20+
{
21+
"primitiveEntry": {
22+
"type": "INT_TYPE"
23+
}
24+
}
25+
]
26+
},
27+
"position": 1,
28+
"comment": ""
29+
},
30+
{
31+
"columnName": "deleted",
32+
"typeDesc": {
33+
"types": [
34+
{
35+
"primitiveEntry": {
36+
"type": "BOOLEAN_TYPE"
37+
}
38+
}
39+
]
40+
},
41+
"position": 2,
42+
"comment": ""
43+
},
44+
{
45+
"columnName": "name",
46+
"typeDesc": {
47+
"types": [
48+
{
49+
"primitiveEntry": {
50+
"type": "STRING_TYPE"
51+
}
52+
}
53+
]
54+
},
55+
"position": 3,
56+
"comment": ""
57+
}
58+
]
59+
},
60+
"resultFormat": "ARROW_BASED_SET",
61+
"lz4Compressed": false,
62+
"arrowSchema": "/////2ACAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAMAAABsAQAArAAAAAQAAACy/v//FAAAAIgAAACIAAAAAAAFAYQAAAACAAAAQAAAAAQAAABo/v//CAAAABQAAAAIAAAAInN0cmluZyIAAAAAFwAAAFNwYXJrOkRhdGFUeXBlOkpzb25UeXBlAKD+//8IAAAAEAAAAAYAAABTVFJJTkcAABYAAABTcGFyazpEYXRhVHlwZTpTcWxOYW1lAAAAAAAAXP///wQAAABuYW1lAAAAAFb///8UAAAAiAAAAIwAAAAAAAYBiAAAAAIAAABAAAAABAAAAAz///8IAAAAFAAAAAkAAAAiYm9vbGVhbiIAAAAXAAAAU3Bhcms6RGF0YVR5cGU6SnNvblR5cGUARP///wgAAAAQAAAABwAAAEJPT0xFQU4AFgAAAFNwYXJrOkRhdGFUeXBlOlNxbE5hbWUAAAAAAAAEAAQABAAAAAcAAABkZWxldGVkAAAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAACMAAAAlAAAAAAAAgGYAAAAAgAAAEgAAAAEAAAAyP///wgAAAAUAAAACQAAACJpbnRlZ2VyIgAAABcAAABTcGFyazpEYXRhVHlwZTpKc29uVHlwZQAIAAwACAAEAAgAAAAIAAAADAAAAAMAAABJTlQAFgAAAFNwYXJrOkRhdGFUeXBlOlNxbE5hbWUAAAAAAAAIAAwACAAHAAgAAAAAAAABIAAAAAIAAABpZAAAAAAAAA==",
63+
"cacheLookupResult": "LOCAL_CACHE_HIT",
64+
"uncompressedBytes": 0,
65+
"compressedBytes": 0,
66+
"isStagingOperation": false,
67+
"reasonForNoCloudFetch": "CLOUD_FETCH_SUPPORT",
68+
"cacheLookupLatency": 2,
69+
"remoteResultCacheEnabled": true,
70+
"isServerless": true,
71+
"truncatedByThriftLimit": false
72+
}
73+
}

0 commit comments

Comments
 (0)