Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions arrow/flight/flightsql/example/sqlite_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (s *SQLiteFlightSQLServer) DoGetTables(ctx context.Context, cmd flightsql.G
}

schema := rdr.Schema()
go flight.StreamChunksFromReader(rdr, ch)
go flight.StreamChunksFromReader(ctx, rdr, ch)
return schema, ch, nil
}

Expand Down Expand Up @@ -485,7 +485,7 @@ func doGetQuery(ctx context.Context, mem memory.Allocator, db dbQueryCtx, query
}

ch := make(chan flight.StreamChunk)
go flight.StreamChunksFromReader(rdr, ch)
go flight.StreamChunksFromReader(ctx, rdr, ch)
return schema, ch, nil
}

Expand Down
31 changes: 18 additions & 13 deletions arrow/flight/flightsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func (b *BaseServer) GetFlightInfoSqlInfo(_ context.Context, _ GetSqlInfo, desc
}

// DoGetSqlInfo returns a flight stream containing the list of sqlinfo results
func (b *BaseServer) DoGetSqlInfo(_ context.Context, cmd GetSqlInfo) (*arrow.Schema, <-chan flight.StreamChunk, error) {
func (b *BaseServer) DoGetSqlInfo(ctx context.Context, cmd GetSqlInfo) (*arrow.Schema, <-chan flight.StreamChunk, error) {
if b.Alloc == nil {
b.Alloc = memory.DefaultAllocator
}
Expand Down Expand Up @@ -430,7 +430,7 @@ func (b *BaseServer) DoGetSqlInfo(_ context.Context, cmd GetSqlInfo) (*arrow.Sch
}

// StreamChunksFromReader will call release on the reader when done
go flight.StreamChunksFromReader(rdr, ch)
go flight.StreamChunksFromReader(ctx, rdr, ch)
return schema_ref.SqlInfo, ch, nil
}

Expand Down Expand Up @@ -927,19 +927,24 @@ func (f *flightSqlServer) DoGet(request *flight.Ticket, stream flight.FlightServ
wr := flight.NewRecordWriter(stream, ipc.WithSchema(sc))
defer wr.Close()

for chunk := range cc {
if chunk.Err != nil {
return chunk.Err
}

wr.SetFlightDescriptor(chunk.Desc)
if err = wr.WriteWithAppMetadata(chunk.Data, chunk.AppMetadata); err != nil {
return err
for {
select {
case <-stream.Context().Done():
return stream.Context().Err()
case chunk, ok := <-cc:
if !ok {
return nil
}
if chunk.Err != nil {
return chunk.Err
}
wr.SetFlightDescriptor(chunk.Desc)
if err := wr.WriteWithAppMetadata(chunk.Data, chunk.AppMetadata); err != nil {
return err
}
chunk.Data.Release()
}
chunk.Data.Release()
}

return err
}

type putMetadataWriter struct {
Expand Down
23 changes: 19 additions & 4 deletions arrow/flight/record_batch_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package flight

import (
"bytes"
"context"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -212,24 +213,38 @@ type haserr interface {

// StreamChunksFromReader is a convenience function to populate a channel
// from a record reader. It is intended to be run using a separate goroutine
// by calling `go flight.StreamChunksFromReader(rdr, ch)`.
// by calling `go flight.StreamChunksFromReader(ctx, rdr, ch)`.
//
// If the record reader panics, an error chunk will get sent on the channel.
//
// This will close the channel and release the reader when it completes.
func StreamChunksFromReader(rdr array.RecordReader, ch chan<- StreamChunk) {
func StreamChunksFromReader(ctx context.Context, rdr array.RecordReader, ch chan<- StreamChunk) {
defer close(ch)
defer func() {
if err := recover(); err != nil {
ch <- StreamChunk{Err: utils.FormatRecoveredError("panic while reading", err)}
select {
case ch <- StreamChunk{Err: utils.FormatRecoveredError("panic while reading", err)}:
case <-ctx.Done():
}
}
}()

defer rdr.Release()
for rdr.Next() {
select {
case <-ctx.Done():
return
default:
}

rec := rdr.RecordBatch()
rec.Retain()
ch <- StreamChunk{Data: rec}
select {
case ch <- StreamChunk{Data: rec}:
case <-ctx.Done():
rec.Release()
return
}
}

if e, ok := rdr.(haserr); ok {
Expand Down
Loading