Skip to content

Commit 6890ee8

Browse files
jadewang-dbJade Wangclaudelidavidm
authored
fix(go): resolve intermittent EOF errors fetching large results (#192) (#194)
## 🥞 Stacked PR Use this [link](https://github.com/adbc-drivers/databricks/pull/194/files) to review incremental changes. - [**stack/githud_192**](#194) [[Files changed](https://github.com/adbc-drivers/databricks/pull/194/files)] --------- Fix intermittent "arrow/ipc: could not read continuation indicator: EOF" errors when fetching large result sets (30M+ rows) from Databricks. Root cause: The code called SchemaBytes() before any data fetch. In databricks-sql-go, when query results are large (no "direct results" in the response), the schema is populated lazily during the first Next() call. Calling SchemaBytes() before Next() returned empty bytes, causing the Arrow IPC reader to fail with EOF. The fix changes the initialization order to: 1. First call loadNextReader() which triggers data fetch 2. Get schema from the loaded IPC reader (which always has schema) 3. Fall back to SchemaBytes() only for empty result sets Fixes: #192 Co-Authored-By: Claude (databricks-claude-opus-4-5) <noreply@anthropic.com> --------- Co-authored-by: Jade Wang <jade.wang+data@databricks.com> Co-authored-by: Claude (databricks-claude-opus-4-5) <noreply@anthropic.com> Co-authored-by: David Li <li.davidm96@gmail.com>
1 parent 7fb7f83 commit 6890ee8

File tree

3 files changed

+91
-38
lines changed

3 files changed

+91
-38
lines changed

go/database.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"database/sql"
2828
"errors"
2929
"fmt"
30+
"net"
3031
"strconv"
3132
"strings"
3233
"time"
@@ -148,6 +149,10 @@ func (d *databaseImpl) resolveConnectionOptions() ([]dbsql.ConnOption, error) {
148149
}
149150

150151
// TLS/SSL handling
152+
// Configure a custom transport with proper timeout settings when custom
153+
// TLS config is needed. These settings match the defaults from
154+
// databricks-sql-go's PooledTransport to ensure reliable connections
155+
// for large result set downloads.
151156
if d.sslCertPool != nil || d.sslInsecure {
152157
tlsConfig := &tls.Config{
153158
MinVersion: tls.VersionTLS12,
@@ -162,7 +167,19 @@ func (d *databaseImpl) resolveConnectionOptions() ([]dbsql.ConnOption, error) {
162167
}
163168

164169
transport := &http.Transport{
165-
TLSClientConfig: tlsConfig,
170+
Proxy: http.ProxyFromEnvironment,
171+
DialContext: (&net.Dialer{
172+
Timeout: 30 * time.Second,
173+
KeepAlive: 30 * time.Second,
174+
}).DialContext,
175+
TLSClientConfig: tlsConfig,
176+
ForceAttemptHTTP2: true,
177+
MaxIdleConns: 100,
178+
IdleConnTimeout: 180 * time.Second,
179+
TLSHandshakeTimeout: 10 * time.Second,
180+
ExpectContinueTimeout: 1 * time.Second,
181+
MaxIdleConnsPerHost: 10,
182+
MaxConnsPerHost: 100,
166183
}
167184
opts = append(opts, dbsql.WithTransport(transport))
168185
}

go/driver_test.go

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/adbc-drivers/driverbase-go/validation"
3434
"github.com/apache/arrow-adbc/go/adbc"
3535
"github.com/apache/arrow-go/v18/arrow"
36+
"github.com/apache/arrow-go/v18/arrow/array"
3637
"github.com/apache/arrow-go/v18/arrow/memory"
3738
"github.com/stretchr/testify/assert"
3839
"github.com/stretchr/testify/require"
@@ -867,6 +868,7 @@ func (suite *DatabricksTests) TestDecimalTypes() {
867868
}
868869

869870
func (suite *DatabricksTests) TestMultiBatch() {
871+
// Regression test for issue reported directly to us
870872
query := fmt.Sprintf("CREATE OR REPLACE TABLE `%s`.`%s`.`test_multi_batch` (founder STRING, born STRING)", suite.Quirks.catalogName, suite.Quirks.schemaName)
871873
suite.Require().NoError(suite.stmt.SetSqlQuery(query))
872874
_, err := suite.stmt.ExecuteUpdate(suite.ctx)
@@ -877,15 +879,16 @@ func (suite *DatabricksTests) TestMultiBatch() {
877879
born string
878880
}
879881

880-
for _, r := range []row{
882+
rows := []row{
881883
{"Ali Ghodsi", "Iran"},
882884
{"Ion Stoica", "Romania"},
883885
{"Matei Zaharia", "Romania"},
884886
{"Patrick Wendell", "USA"},
885887
{"Reynold Xin", "China"},
886888
{"Andy Konwinski", "USA"},
887889
{"Arsalan Tavakoli-Shiraji", "Iran"},
888-
} {
890+
}
891+
for _, r := range rows {
889892
query = fmt.Sprintf("INSERT INTO `%s`.`%s`.`test_multi_batch` VALUES ('%s', '%s')", suite.Quirks.catalogName, suite.Quirks.schemaName, r.founder, r.born)
890893
suite.Require().NoError(suite.stmt.SetSqlQuery(query))
891894
_, err = suite.stmt.ExecuteUpdate(suite.ctx)
@@ -898,14 +901,30 @@ func (suite *DatabricksTests) TestMultiBatch() {
898901
suite.Require().NoError(err)
899902
defer rdr.Release()
900903

901-
// Databricks appears to put each into its own batch
902-
suite.True(rdr.Next())
903-
suite.True(rdr.Next())
904-
suite.True(rdr.Next())
905-
suite.True(rdr.Next())
906-
suite.True(rdr.Next())
907-
suite.True(rdr.Next())
908-
suite.True(rdr.Next())
909-
suite.False(rdr.Next())
904+
// This used to only return one row.
905+
rowCount := 0
906+
seen := map[string]string{}
907+
for rdr.Next() {
908+
batch := rdr.RecordBatch()
909+
rowCount += int(batch.NumRows())
910+
911+
founder := batch.Column(0).(*array.String)
912+
born := batch.Column(1).(*array.String)
913+
914+
for i := range int(batch.NumRows()) {
915+
seen[founder.Value(i)] = born.Value(i)
916+
}
917+
}
910918
suite.NoError(rdr.Err())
919+
suite.Equal(len(rows), rowCount)
920+
921+
suite.EqualValues(map[string]string{
922+
"Ali Ghodsi": "Iran",
923+
"Ion Stoica": "Romania",
924+
"Matei Zaharia": "Romania",
925+
"Patrick Wendell": "USA",
926+
"Reynold Xin": "China",
927+
"Andy Konwinski": "USA",
928+
"Arsalan Tavakoli-Shiraji": "Iran",
929+
}, seen)
911930
}

go/ipc_reader_adapter.go

Lines changed: 43 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -69,47 +69,64 @@ func newIPCReaderAdapter(ctx context.Context, rows driver.Rows) (array.RecordRea
6969
}
7070
}
7171

72-
schema_bytes, err := ipcIterator.SchemaBytes()
73-
if err != nil {
74-
return nil, adbc.Error{
75-
Code: adbc.StatusInternal,
76-
Msg: fmt.Sprintf("failed to get schema bytes: %v", err),
77-
}
72+
adapter := &ipcReaderAdapter{
73+
rows: rows,
74+
refCount: 1,
75+
ipcIterator: ipcIterator,
7876
}
7977

80-
// Read schema from bytes
81-
reader, err := ipc.NewReader(bytes.NewReader(schema_bytes))
82-
if err != nil {
78+
// Load the first IPC stream to get the schema.
79+
// Note: SchemaBytes() may return empty bytes if no direct results were
80+
// returned with the query response. The schema is populated lazily
81+
// during the first data fetch in databricks-sql-go. By loading the
82+
// first reader, we ensure the schema is available.
83+
err = adapter.loadNextReader()
84+
if err != nil && err != io.EOF {
8385
return nil, adbc.Error{
8486
Code: adbc.StatusInternal,
85-
Msg: fmt.Sprintf("failed to get schema reader: %v", err),
87+
Msg: fmt.Sprintf("failed to initialize IPC reader: %v", err),
8688
}
8789
}
88-
defer reader.Release()
8990

90-
schema := reader.Schema()
91-
if schema == nil {
92-
return nil, adbc.Error{
93-
Code: adbc.StatusInternal,
94-
Msg: "schema is nil",
91+
// Get schema from the first reader, or fall back to SchemaBytes() if
92+
// the result set is empty (no readers available)
93+
if adapter.currentReader != nil {
94+
adapter.schema = adapter.currentReader.Schema()
95+
} else {
96+
// Empty result set - try to get schema from SchemaBytes()
97+
schema_bytes, err := ipcIterator.SchemaBytes()
98+
if err != nil {
99+
return nil, adbc.Error{
100+
Code: adbc.StatusInternal,
101+
Msg: fmt.Sprintf("failed to get schema bytes: %v", err),
102+
}
95103
}
96-
}
97104

98-
adapter := &ipcReaderAdapter{
99-
rows: rows,
100-
refCount: 1,
101-
ipcIterator: ipcIterator,
102-
schema: schema,
105+
if len(schema_bytes) == 0 {
106+
return nil, adbc.Error{
107+
Code: adbc.StatusInternal,
108+
Msg: "schema bytes are empty and no data available",
109+
}
110+
}
111+
112+
reader, err := ipc.NewReader(bytes.NewReader(schema_bytes))
113+
if err != nil {
114+
return nil, adbc.Error{
115+
Code: adbc.StatusInternal,
116+
Msg: fmt.Sprintf("failed to read schema: %v", err),
117+
}
118+
}
119+
adapter.schema = reader.Schema()
120+
reader.Release()
103121
}
104122

105-
// Initialize the first reader
106-
err = adapter.loadNextReader()
107-
if err != nil && err != io.EOF {
123+
if adapter.schema == nil {
108124
return nil, adbc.Error{
109125
Code: adbc.StatusInternal,
110-
Msg: fmt.Sprintf("failed to initialize IPC reader: %v", err),
126+
Msg: "schema is nil",
111127
}
112128
}
129+
113130
return adapter, nil
114131
}
115132

0 commit comments

Comments
 (0)