Skip to content

Commit af7e698

Browse files
committed
feat: Add Arrow IPC stream iterator for direct access to Arrow binary format
## Summary This PR adds support for accessing raw Arrow IPC (Inter-Process Communication) streams directly, providing an alternative to the existing parsed Arrow record interface. This allows users to work with the Arrow binary format without the overhead of parsing, enabling use cases like streaming data to other systems or custom processing pipelines. ## Changes - **Refactored internal batch iterator** to expose IPC streams: - Renamed `BatchIterator` → `IPCStreamIterator` (returns `io.Reader`) - Updated implementations to return raw IPC streams instead of parsed batches - Created backward-compatible `BatchIterator` wrapper - **Added public API** in `rows` package: - New `GetArrowIPCStreams()` method on `Rows` interface - New `ArrowIPCStreamIterator` interface with methods: - `Next() (io.Reader, error)` - returns raw IPC stream - `HasNext() bool` - `Close()` - `SchemaBytes() ([]byte, error)` - **Updated all row scanner implementations** to support IPC streams - **Added example** demonstrating IPC stream usage ## Benefits - **Performance**: Avoid parsing overhead when forwarding Arrow data - **Flexibility**: Direct access to Arrow binary format for custom processing - **Compatibility**: Easier integration with other Arrow-based systems - **Memory efficiency**: Process streams without loading all records into memory ## Testing - All existing tests pass - Backward compatibility maintained through wrapper pattern - Example provided in `examples/ipcstreams/` ## Usage Example ```go rows, err := db.QueryContext(ctx, "SELECT * FROM table") ipcStreams, err := rows.(dbsqlrows.Rows).GetArrowIPCStreams(ctx) defer ipcStreams.Close() for ipcStreams.HasNext() { reader, err := ipcStreams.Next() // Process raw Arrow IPC stream } Signed-off-by: Jade Wang <[email protected]>
1 parent 746c05d commit af7e698

File tree

13 files changed

+27458
-23416
lines changed

13 files changed

+27458
-23416
lines changed

examples/ipcstreams/main.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"database/sql/driver"
7+
"io"
8+
"log"
9+
"os"
10+
"strconv"
11+
"time"
12+
13+
"github.com/apache/arrow/go/v12/arrow/ipc"
14+
dbsql "github.com/databricks/databricks-sql-go"
15+
dbsqlrows "github.com/databricks/databricks-sql-go/rows"
16+
"github.com/joho/godotenv"
17+
)
18+
19+
func main() {
20+
// Load environment variables from .env file if it exists
21+
// This will not override existing environment variables
22+
_ = godotenv.Load()
23+
24+
port, err := strconv.Atoi(os.Getenv("DATABRICKS_PORT"))
25+
if err != nil {
26+
log.Fatal(err.Error())
27+
}
28+
29+
connector, err := dbsql.NewConnector(
30+
dbsql.WithServerHostname(os.Getenv("DATABRICKS_HOST")),
31+
dbsql.WithPort(port),
32+
dbsql.WithHTTPPath(os.Getenv("DATABRICKS_HTTPPATH")),
33+
dbsql.WithAccessToken(os.Getenv("DATABRICKS_ACCESSTOKEN")),
34+
)
35+
if err != nil {
36+
log.Fatal(err)
37+
}
38+
39+
db := sql.OpenDB(connector)
40+
defer db.Close()
41+
42+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
43+
defer cancel()
44+
45+
conn, _ := db.Conn(ctx)
46+
defer conn.Close()
47+
48+
query := `SELECT * FROM samples.nyctaxi.trips LIMIT 1000`
49+
50+
var rows driver.Rows
51+
err = conn.Raw(func(d interface{}) error {
52+
var err error
53+
rows, err = d.(driver.QueryerContext).QueryContext(ctx, query, nil)
54+
return err
55+
})
56+
57+
if err != nil {
58+
log.Fatal("Failed to execute query: ", err)
59+
}
60+
defer rows.Close()
61+
62+
// Get the IPC stream iterator
63+
ipcStreams, err := rows.(dbsqlrows.Rows).GetArrowIPCStreams(ctx)
64+
if err != nil {
65+
log.Fatal("Failed to get IPC streams: ", err)
66+
}
67+
defer ipcStreams.Close()
68+
69+
// Get the schema bytes
70+
schemaBytes, err := ipcStreams.SchemaBytes()
71+
if err != nil {
72+
log.Fatal("Failed to get schema bytes: ", err)
73+
}
74+
log.Printf("Schema bytes length: %d", len(schemaBytes))
75+
76+
// Process IPC streams
77+
streamCount := 0
78+
recordCount := 0
79+
80+
for ipcStreams.HasNext() {
81+
// Get the next IPC stream
82+
reader, err := ipcStreams.Next()
83+
if err != nil {
84+
if err == io.EOF {
85+
break
86+
}
87+
log.Fatal("Failed to get next IPC stream: ", err)
88+
}
89+
90+
streamCount++
91+
92+
// Create an IPC reader for this stream
93+
ipcReader, err := ipc.NewReader(reader)
94+
if err != nil {
95+
log.Fatal("Failed to create IPC reader: ", err)
96+
}
97+
98+
// Process records in the stream
99+
for ipcReader.Next() {
100+
record := ipcReader.Record()
101+
recordCount++
102+
log.Printf("Stream %d, Record %d: %d rows, %d columns",
103+
streamCount, recordCount, record.NumRows(), record.NumCols())
104+
105+
// Don't forget to release the record when done
106+
record.Release()
107+
}
108+
109+
if err := ipcReader.Err(); err != nil {
110+
log.Fatal("IPC reader error: ", err)
111+
}
112+
113+
ipcReader.Release()
114+
}
115+
116+
log.Printf("Processed %d IPC streams with %d total records", streamCount, recordCount)
117+
}

internal/cli_service/GoUnusedProtection__.go

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/cli_service/cli_service-consts.go

Lines changed: 33 additions & 32 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)