Skip to content

Commit 358335b

Browse files
authored
Fix Row lifetime for IPC stream (#92)
* Fix Row lifetime for IPC stream * Rename row variable for clarity on its purpose
1 parent 07af6d4 commit 358335b

File tree

2 files changed

+17
-16
lines changed

2 files changed

+17
-16
lines changed

go/adbc/driver/databricks/ipc_reader_adapter.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package databricks
2020
import (
2121
"bytes"
2222
"context"
23+
"database/sql/driver"
2324
"fmt"
2425
"io"
2526
"sync/atomic"
@@ -38,12 +39,13 @@ type rowsWithIPCStream interface {
3839

3940
// ipcReaderAdapter uses the new IPC stream interface for Arrow access
4041
type ipcReaderAdapter struct {
41-
ipcIterator dbsqlrows.ArrowIPCStreamIterator
42-
currentReader *ipc.Reader
43-
currentRecord arrow.RecordBatch
44-
schema *arrow.Schema
45-
closed bool
46-
refCount int64
42+
dbsqlRowsHandle dbsqlrows.Rows // Row from which we obtain the iterator - their lifetimes are tied together
43+
ipcIterator dbsqlrows.ArrowIPCStreamIterator
44+
currentReader *ipc.Reader
45+
currentRecord arrow.RecordBatch
46+
schema *arrow.Schema
47+
closed bool
48+
refCount int64
4749
}
4850

4951
// newIPCReaderAdapter creates a RecordReader using direct IPC stream access
@@ -86,9 +88,10 @@ func newIPCReaderAdapter(ctx context.Context, rows dbsqlrows.Rows) (array.Record
8688
}
8789

8890
adapter := &ipcReaderAdapter{
89-
refCount: 1,
90-
ipcIterator: ipcIterator,
91-
schema: schema,
91+
refCount: 1,
92+
dbsqlRowsHandle: rows,
93+
ipcIterator: ipcIterator,
94+
schema: schema,
9295
}
9396

9497
// Initialize the first reader
@@ -202,6 +205,11 @@ func (r *ipcReaderAdapter) Release() {
202205
}
203206

204207
r.ipcIterator.Close()
208+
209+
if r.dbsqlRowsHandle != nil {
210+
r.dbsqlRowsHandle.(driver.Rows).Close()
211+
r.dbsqlRowsHandle = nil
212+
}
205213
}
206214
}
207215

go/adbc/driver/databricks/statement.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"context"
2222
"database/sql"
2323
"database/sql/driver"
24-
"errors"
2524
"fmt"
2625

2726
"github.com/apache/arrow-adbc/go/adbc"
@@ -132,12 +131,6 @@ func (s *statementImpl) ExecuteQuery(ctx context.Context) (array.RecordReader, i
132131
}
133132
}
134133

135-
defer func() {
136-
if closeErr := driverRows.Close(); closeErr != nil {
137-
err = errors.Join(err, closeErr)
138-
}
139-
}()
140-
141134
// Convert to databricks rows interface to get Arrow batches
142135
databricksRows, ok := driverRows.(dbsqlrows.Rows)
143136
if !ok {

0 commit comments

Comments
 (0)