Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion compiler/semantic/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func Analyze(ctx context.Context, p *parser.AST, env *exec.Environment, extInput
seq.Prepend(&sem.NullScan{})
}
}
newChecker(t).check(t.reporter, seq)
t.checker.check(t.reporter, seq)
if err := t.Error(); err != nil {
return nil, err
}
Expand All @@ -55,6 +55,7 @@ type translator struct {
reporter
ctx context.Context
resolver *resolver
checker *checker
opStack []*ast.OpDecl
cteStack []*ast.SQLCTE
env *exec.Environment
Expand All @@ -70,6 +71,7 @@ func newTranslator(ctx context.Context, r reporter, env *exec.Environment) *tran
scope: NewScope(nil),
sctx: super.NewContext(),
}
t.checker = newChecker(t)
t.resolver = newResolver(t)
return t
}
Expand Down
6 changes: 6 additions & 0 deletions compiler/semantic/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,12 @@ func (c *checker) recordElems(typ super.Type, elems []sem.RecordElem) super.Type
for _, elem := range elems {
switch elem := elem.(type) {
case *sem.SpreadElem:
elemType := c.expr(typ, elem.Expr)
if hasUnknown(elemType) {
// If we're spreading an unknown type into this record, we don't
// know the result at all. Return unknown for the whole thing.
return c.unknown
}
fuser.fuse(c.expr(typ, elem.Expr))
case *sem.FieldElem:
column := super.Field{Name: elem.Name, Type: c.expr(typ, elem.Value)}
Expand Down
76 changes: 76 additions & 0 deletions compiler/semantic/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"strconv"
"strings"

"github.com/brimdata/super"
"github.com/brimdata/super/compiler/ast"
"github.com/brimdata/super/compiler/semantic/sem"
"github.com/brimdata/super/pkg/field"
"github.com/brimdata/super/sup"
)

type schema interface {
Expand Down Expand Up @@ -51,11 +53,40 @@ type subquerySchema struct {
inner schema
}

type pipeSchema struct {
name string
typ super.Type
record *super.TypeRecord
}

func newPipeSchema(name string, typ super.Type) *pipeSchema {
return &pipeSchema{
name: name,
typ: typ,
record: recordOf(typ),
}
}

func recordOf(typ super.Type) *super.TypeRecord {
switch typ := super.TypeUnder(typ).(type) {
case *super.TypeRecord:
return typ
case *super.TypeUnion:
for _, typ := range typ.Types {
if typ := recordOf(typ); typ != nil {
return typ
}
}
}
return nil
}

func (s *staticSchema) Name() string { return s.name }
func (d *dynamicSchema) Name() string { return d.name }
func (*selectSchema) Name() string { return "" }
func (*joinSchema) Name() string { return "" }
func (*subquerySchema) Name() string { return "" }
func (p *pipeSchema) Name() string { return p.name }

func badSchema() schema {
return &dynamicSchema{}
Expand Down Expand Up @@ -133,6 +164,13 @@ func (s *subquerySchema) resolveTable(table string) (schema, field.Path, error)
return sch, path, err
}

func (p *pipeSchema) resolveTable(table string) (schema, field.Path, error) {
if strings.EqualFold(p.name, table) {
return p, nil, nil
}
return nil, nil, nil
}

func (d *dynamicSchema) resolveColumn(col string) (field.Path, bool, error) {
if d.name != "" && d.name == col {
// Special case the dynamic schema referencing a table alias without
Expand Down Expand Up @@ -228,6 +266,15 @@ func (s *subquerySchema) resolveColumn(col string) (field.Path, bool, error) {
return nil, false, fmt.Errorf("column %q not found", col)
}

func (p *pipeSchema) resolveColumn(col string) (field.Path, bool, error) {
if p.record != nil && slices.ContainsFunc(p.record.Fields, func(f super.Field) bool {
return f.Name == col
}) {
return field.Path{col}, false, nil
}
return nil, false, fmt.Errorf("column %q: does not exist", col)
}

func (*dynamicSchema) resolveOrdinal(n ast.Node, col int) (sem.Expr, error) {
if col <= 0 {
return nil, fmt.Errorf("position %d is not in select list", col)
Expand Down Expand Up @@ -273,6 +320,13 @@ func (s *subquerySchema) resolveOrdinal(ast.Node, int) (sem.Expr, error) {
return nil, errors.New("ordinal column selection in subquery not supported")
}

func (p *pipeSchema) resolveOrdinal(n ast.Node, col int) (sem.Expr, error) {
if p.record == nil || col <= 0 || col > len(p.record.Fields) {
return nil, fmt.Errorf("position %d is not in select list", col)
}
return sem.NewThis(n, []string{p.record.Fields[col-1].Name}), nil
}

func appendExprToPath(path string, e sem.Expr) sem.Expr {
switch e := e.(type) {
case *sem.ThisExpr:
Expand Down Expand Up @@ -353,6 +407,13 @@ func (s *subquerySchema) deref(n ast.Node, name string) (sem.Expr, schema) {
panic(name)
}

func (p *pipeSchema) deref(n ast.Node, name string) (sem.Expr, schema) {
if name != "" {
p = &pipeSchema{name: name, typ: p.typ, record: p.record}
}
return nil, p
}

func (d *dynamicSchema) this(n ast.Node, path []string) sem.Expr {
return sem.NewThis(n, path)
}
Expand All @@ -375,6 +436,10 @@ func (s *subquerySchema) this(n ast.Node, path []string) sem.Expr {
panic("TBD")
}

func (p *pipeSchema) this(n ast.Node, path []string) sem.Expr {
return sem.NewThis(n, path)
}

func (d *dynamicSchema) tableOnly(n ast.Node, name string, path []string) (sem.Expr, error) {
if d.name != name {
return nil, fmt.Errorf("no such table %q", name)
Expand Down Expand Up @@ -412,6 +477,13 @@ func (s *subquerySchema) tableOnly(n ast.Node, name string, path []string) (sem.
return nil, fmt.Errorf("no such table %q", name) //XXX
}

func (p *pipeSchema) tableOnly(n ast.Node, name string, path []string) (sem.Expr, error) {
if p.name == name {
return p.this(n, path), nil
}
return nil, fmt.Errorf("no such table %q", name)
}

func (s *staticSchema) String() string {
return fmt.Sprintf("static <%s>: %s", s.name, strings.Join(s.columns, ", "))
}
Expand All @@ -432,6 +504,10 @@ func (s *subquerySchema) String() string {
return fmt.Sprintf("subquery:\n outer: %s\n inner: %s", s.outer, s.inner)
}

func (p *pipeSchema) String() string {
return fmt.Sprintf("pipe <%s>:\n type: %s\n", p.name, sup.FormatType(p.typ))
}

type havingSchema struct {
*selectSchema
}
Expand Down
27 changes: 26 additions & 1 deletion compiler/semantic/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,20 @@ func unravel(n ast.Node, elems []sem.RecordElem, schema schema, prefix field.Pat
case *joinSchema:
elems = unravel(n, elems, schema.left, append(prefix, "left"))
return unravel(n, elems, schema.right, append(prefix, "right"))
case *pipeSchema:
if schema.record == nil {
return append(elems, &sem.SpreadElem{
Node: n,
Expr: sem.NewThis(n, prefix),
})
}
for _, f := range schema.record.Fields {
elems = append(elems, &sem.FieldElem{
Name: f.Name,
Value: sem.NewThis(n, slices.Clone(append(prefix, f.Name))),
})
}
return elems
default:
panic(schema)
}
Expand Down Expand Up @@ -398,7 +412,18 @@ func (t *translator) sqlPipe(op *ast.SQLPipe, seq sem.Seq, alias *ast.TableAlias
t.error(alias, errors.New("cannot apply column aliases to dynamically typed data"))
}
}
return t.seq(op.Ops), &dynamicSchema{name: name}
seq = t.seq(op.Ops)
// We pass in type null for initial type here since we know a pipe subquery inside
// of a sql table expression must always have a data source (i.e., it cannot inherit
// data from a parent node somewhere outside of this seq). XXX this assumption may
// change when we add support for correlated subqueries and an embedded pipe may
// need access to the incoming type when feeding that type into the unnest scope
// that implements the correlated subquery.
typ := t.checker.seq(super.TypeNull, seq)
if isUnknown(typ) {
return seq, &dynamicSchema{name: name}
}
return seq, newPipeSchema(name, typ)
}

func derefSchemaAs(n ast.Node, sch schema, table string, seq sem.Seq) (sem.Seq, schema) {
Expand Down
12 changes: 12 additions & 0 deletions compiler/semantic/ztests/checker-spread-unknown.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
script: |
super -s -c "select x from (from x.sup | {...this,z:'foo'})"

inputs:
- name: x.sup
data: |
{x:1}

outputs:
- name: stdout
data: |
{x:1}
27 changes: 27 additions & 0 deletions compiler/semantic/ztests/pipe-schema-parquet.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
script: |
super -o x.parquet -f parquet x.sup
super -o y.parquet -f parquet y.sup
super -s -c "select x from x.parquet join y.parquet on x=y"
! super -s -c "select x from x.parquet join y.sup on x=y"

inputs:
- name: x.sup
data: |
{x:1}
- name: y.sup
data: |
{y:1}
{y:2}

outputs:
- name: stdout
data: |
{x:1}
- name: stderr
data: |
"x": ambiguous column reference at line 1, column 39:
select x from x.parquet join y.sup on x=y
~
"x": ambiguous column reference at line 1, column 8:
select x from x.parquet join y.sup on x=y
~
16 changes: 16 additions & 0 deletions compiler/semantic/ztests/pipe-schema-sup.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
script: |
super -s -c "select x from (from x.sup | values {x}) join (from y.sup | values {y}) on x=y"

inputs:
- name: x.sup
data: |
{x:1}
- name: y.sup
data: |
{y:1}
{y:2}

outputs:
- name: stdout
data: |
{x:1}
4 changes: 4 additions & 0 deletions compiler/semantic/ztests/pipe-schema-values.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
spq: select x from (values {x:1}) join (values {y:1},{y:2}) on x=y

output: |
{x:1}
3 changes: 3 additions & 0 deletions compiler/ztests/sql/cte-recursive.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ error: |
recursive WITH relations not currently supported at line 5, column 10:
FROM t
~
column "x": does not exist at line 8, column 14:
SELECT count(x) FROM t AS t1;
~