Skip to content

Commit f707f16

Browse files
committed
Create a new rawbatch iterator
1 parent 12801b3 commit f707f16

File tree

9 files changed

+612
-154
lines changed

9 files changed

+612
-154
lines changed

internal/cli_service/cli_service.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/rows/arrowbased/arrowRecordIterator.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,11 +168,17 @@ func (ri *arrowRecordIterator) getBatchIterator() error {
168168
// Create a new batch iterator from a page of the result set
169169
func (ri *arrowRecordIterator) newBatchIterator(fr *cli_service.TFetchResultsResp) (BatchIterator, error) {
170170
rowSet := fr.Results
171+
var rawBi RawBatchIterator
172+
var err error
171173
if len(rowSet.ResultLinks) > 0 {
172-
return NewCloudBatchIterator(ri.ctx, rowSet.ResultLinks, rowSet.StartRowOffset, &ri.cfg)
174+
rawBi, err = NewCloudRawBatchIterator(ri.ctx, rowSet.ResultLinks, rowSet.StartRowOffset, &ri.cfg)
173175
} else {
174-
return NewLocalBatchIterator(ri.ctx, rowSet.ArrowBatches, rowSet.StartRowOffset, ri.arrowSchemaBytes, &ri.cfg)
176+
rawBi, err = NewLocalRawBatchIterator(ri.ctx, rowSet.ArrowBatches, rowSet.StartRowOffset, ri.arrowSchemaBytes, &ri.cfg)
175177
}
178+
if err != nil {
179+
return nil, err
180+
}
181+
return NewBatchIterator(rawBi, ri.arrowSchemaBytes, &ri.cfg), nil
176182
}
177183

178184
// Return the schema of the records.

internal/rows/arrowbased/arrowRows.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,18 +113,22 @@ func NewArrowRowScanner(resultSetMetadata *cli_service.TGetResultSetMetadataResp
113113
}
114114

115115
var bi BatchIterator
116-
var err2 dbsqlerr.DBError
117116
if len(rowSet.ResultLinks) > 0 {
118117
logger.Debug().Msgf("Initialize CloudFetch loader, row set start offset: %d, file list:", rowSet.StartRowOffset)
119118
for _, resultLink := range rowSet.ResultLinks {
120119
logger.Debug().Msgf("- start row offset: %d, row count: %d", resultLink.StartRowOffset, resultLink.RowCount)
121120
}
122-
bi, err2 = NewCloudBatchIterator(context.Background(), rowSet.ResultLinks, rowSet.StartRowOffset, cfg)
121+
rawBi, err2 := NewCloudRawBatchIterator(context.Background(), rowSet.ResultLinks, rowSet.StartRowOffset, cfg)
122+
if err2 != nil {
123+
return nil, err2
124+
}
125+
bi = NewBatchIterator(rawBi, schemaBytes, cfg)
123126
} else {
124-
bi, err2 = NewLocalBatchIterator(context.Background(), rowSet.ArrowBatches, rowSet.StartRowOffset, schemaBytes, cfg)
125-
}
126-
if err2 != nil {
127-
return nil, err2
127+
rawBi, err2 := NewLocalRawBatchIterator(context.Background(), rowSet.ArrowBatches, rowSet.StartRowOffset, schemaBytes, cfg)
128+
if err2 != nil {
129+
return nil, err2
130+
}
131+
bi = NewBatchIterator(rawBi, schemaBytes, cfg)
128132
}
129133

130134
var location *time.Location = time.UTC

internal/rows/arrowbased/arrowRows_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,10 +1041,10 @@ func TestArrowRowScanner(t *testing.T) {
10411041
ars := d.(*arrowRowScanner)
10421042
assert.Equal(t, int64(53940), ars.NRows())
10431043

1044-
bi, ok := ars.batchIterator.(*localBatchIterator)
1045-
assert.True(t, ok)
1044+
// TODO: Update test to work with new architecture
1045+
// The batchIterator is now wrapped, so we can't cast to localBatchIterator directly
10461046
fbi := &batchIteratorWrapper{
1047-
bi: bi,
1047+
bi: ars.batchIterator,
10481048
}
10491049

10501050
ars.batchIterator = fbi

0 commit comments

Comments
 (0)