Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 27 additions & 30 deletions compiler/semantic/op.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package semantic

import (
"context"
"errors"
"fmt"
"net/url"
Expand Down Expand Up @@ -101,8 +100,12 @@ func (t *translator) semFromEntity(entity ast.FromEntity, alias *ast.TableAlias,
}
op, def := t.semFromName(entity, entity.Text, args)
if op, ok := op.(*sem.FileScan); ok {
if cols, ok := t.fileScanColumns(op); ok {
schema := schema(&staticSchema{def, cols})
if rec := super.TypeRecordOf(op.Type); rec != nil {
var columns []string
for _, f := range rec.Fields {
columns = append(columns, f.Name)
}
schema := schema(&staticSchema{def, columns})
seq, schema, err := derefSchemaWithAlias(op, schema, alias, sem.Seq{op})
if err != nil {
t.error(alias, err)
Expand Down Expand Up @@ -145,33 +148,6 @@ func (t *translator) fromCTE(entity ast.FromEntity, c *ast.SQLCTE, alias *ast.Ta
return seq, schema
}

// XXX this should find a type from the schema rather than the columns
func (t *translator) fileScanColumns(op *sem.FileScan) ([]string, bool) {
if op.Format != "parquet" {
return nil, false
}
uri, err := storage.ParseURI(op.Path)
if err != nil {
return nil, false
}
ctx, cancel := context.WithCancel(t.ctx)
defer cancel()
sr, err := t.env.Engine().Get(ctx, uri)
if err != nil {
return nil, false
}
defer sr.Close()
op.Type = parquetio.Type(t.sctx, sr)
if op.Type == nil {
return nil, false
}
var cols []string
for _, f := range op.Type.(*super.TypeRecord).Fields {
cols = append(cols, f.Name)
}
return cols, true
}

func (t *translator) semFromExpr(entity *ast.ExprEntity, args []ast.OpArg, seq sem.Seq) (sem.Seq, string) {
expr := t.semExpr(entity.Expr)
val, ok := t.maybeEval(expr)
Expand Down Expand Up @@ -260,13 +236,34 @@ func (t *translator) semFile(n ast.Node, name string, args []ast.OpArg) sem.Op {
if format == "" {
format = sio.FormatFromPath(name)
}
typ, err := t.fileType(name, format)
if err != nil {
t.error(n, err)
}
return &sem.FileScan{
Node: n,
Type: typ,
Path: name,
Format: format,
}
}

func (t *translator) fileType(path, format string) (super.Type, error) {
if format != "parquet" {
return nil, nil
}
uri, err := storage.ParseURI(path)
if err != nil {
return nil, err
}
r, err := t.env.Engine().Get(t.ctx, uri)
if err != nil {
return nil, err
}
defer r.Close()
return parquetio.Type(t.sctx, r), nil
}

func (t *translator) semFromFileGlob(globLoc ast.Node, pattern string, args []ast.OpArg) sem.Op {
names, err := filepath.Glob(pattern)
if err != nil {
Expand Down