44 "bytes"
55 "context"
66 "io"
7-
7+
88 "github.com/databricks/databricks-sql-go/internal/cli_service"
99 "github.com/databricks/databricks-sql-go/internal/config"
1010 "github.com/databricks/databricks-sql-go/internal/rows/rowscanner"
@@ -14,13 +14,13 @@ import (
1414
1515// ipcStreamIterator provides access to raw Arrow IPC streams without deserialization
1616type ipcStreamIterator struct {
17- ctx context.Context
17+ ctx context.Context
1818 resultPageIterator rowscanner.ResultPageIterator
19- currentBatches []* cli_service.TSparkArrowBatch
20- currentIndex int
21- arrowSchemaBytes []byte
22- useLz4 bool
23- hasMorePages bool
19+ currentBatches []* cli_service.TSparkArrowBatch
20+ currentIndex int
21+ arrowSchemaBytes []byte
22+ useLz4 bool
23+ hasMorePages bool
2424}
2525
2626// NewIPCStreamIterator creates an iterator that returns raw IPC streams
@@ -35,12 +35,12 @@ func NewIPCStreamIterator(
3535 if cfg != nil {
3636 useLz4 = cfg .UseLz4Compression
3737 }
38-
38+
3939 var batches []* cli_service.TSparkArrowBatch
4040 if initialRowSet != nil {
4141 batches = initialRowSet .ArrowBatches
4242 }
43-
43+
4444 return & ipcStreamIterator {
4545 ctx : ctx ,
4646 resultPageIterator : resultPageIterator ,
@@ -59,38 +59,38 @@ func (it *ipcStreamIterator) NextIPCStream() (io.Reader, error) {
5959 if ! it .hasMorePages || it .resultPageIterator == nil {
6060 return nil , io .EOF
6161 }
62-
62+
6363 // Fetch next page
6464 fetchResult , err := it .resultPageIterator .Next ()
6565 if err != nil {
6666 return nil , err
6767 }
68-
68+
6969 if fetchResult == nil || fetchResult .Results == nil || fetchResult .Results .ArrowBatches == nil {
7070 return nil , io .EOF
7171 }
72-
72+
7373 it .currentBatches = fetchResult .Results .ArrowBatches
7474 it .currentIndex = 0
7575 it .hasMorePages = it .resultPageIterator .HasNext ()
76-
76+
7777 // If no batches in this page, recurse to try next page
7878 if len (it .currentBatches ) == 0 {
7979 return it .NextIPCStream ()
8080 }
8181 }
82-
82+
8383 batch := it .currentBatches [it .currentIndex ]
8484 it .currentIndex ++
85-
85+
8686 // Create reader for the batch data
8787 var batchReader io.Reader = bytes .NewReader (batch .Batch )
88-
88+
8989 // Handle LZ4 decompression if needed
9090 if it .useLz4 {
9191 batchReader = lz4 .NewReader (batchReader )
9292 }
93-
93+
9494 // Combine schema and batch data into a complete IPC stream
9595 // Arrow IPC format expects: [Schema][Batch1][Batch2]...
9696 return io .MultiReader (
@@ -112,4 +112,5 @@ func (it *ipcStreamIterator) Close() {
112112// GetSchemaBytes returns the Arrow schema in IPC format
113113func (it * ipcStreamIterator ) GetSchemaBytes () ([]byte , error ) {
114114 return it .arrowSchemaBytes , nil
115- }
115+ }
116+
0 commit comments