Skip to content

Commit ca10438

Browse files
authored
user ops: Allow from statements (#4698)
Closes #4645
1 parent 8d32e57 commit ca10438

File tree

4 files changed

+74
-73
lines changed

4 files changed

+74
-73
lines changed

compiler/job.go

Lines changed: 1 addition & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,11 @@ func NewJob(octx *op.Context, in ast.Seq, src *data.Source, head *lakeparse.Comm
4040
if len(seq) == 0 {
4141
return nil, errors.New("internal error: AST seq cannot be empty")
4242
}
43-
from, reader, err := buildFrom(seq[0], head)
44-
if err != nil {
45-
return nil, err
46-
}
47-
if from != nil {
48-
seq.Prepend(from)
49-
}
5043
entry, err := semantic.Analyze(octx.Context, seq, src, head)
5144
if err != nil {
5245
return nil, err
5346
}
47+
reader, _ := entry[0].(*kernel.Reader)
5448
return &Job{
5549
octx: octx,
5650
builder: kernel.NewBuilder(octx, src),
@@ -60,71 +54,6 @@ func NewJob(octx *op.Context, in ast.Seq, src *data.Source, head *lakeparse.Comm
6054
}, nil
6155
}
6256

63-
func buildFrom(op ast.Op, head *lakeparse.Commitish) (*ast.From, *kernel.Reader, error) {
64-
var from *ast.From
65-
switch op := op.(type) {
66-
case *ast.From:
67-
// Already have an entry point with From. Do nothing.
68-
return nil, nil, nil
69-
case *ast.Scope:
70-
if len(op.Body) == 0 {
71-
return nil, nil, errors.New("internal error: scope op has empty body")
72-
}
73-
return buildFrom(op.Body[0], head)
74-
default:
75-
var readers *kernel.Reader
76-
trunk := ast.Trunk{Kind: "Trunk"}
77-
if head != nil {
78-
// For the lakes, if there is no from operator, then
79-
// we default to scanning HEAD (without any of the
80-
// from options).
81-
trunk.Source = &ast.Pool{
82-
Kind: "Pool",
83-
Spec: ast.PoolSpec{
84-
Pool: &ast.String{
85-
Kind: "String",
86-
Text: "HEAD",
87-
},
88-
},
89-
}
90-
} else {
91-
readers = &kernel.Reader{}
92-
trunk.Source = readers
93-
}
94-
from = &ast.From{
95-
Kind: "From",
96-
Trunks: []ast.Trunk{trunk},
97-
}
98-
// XXX why not move this above? or
99-
if isParallelWithLeadingFroms(op) {
100-
from = nil
101-
readers = nil
102-
}
103-
return from, readers, nil
104-
}
105-
}
106-
107-
func isParallelWithLeadingFroms(o ast.Op) bool {
108-
par, ok := o.(*ast.Parallel)
109-
if !ok {
110-
return false
111-
}
112-
for _, seq := range par.Paths {
113-
if !hasLeadingFrom(seq) {
114-
return false
115-
}
116-
}
117-
return true
118-
}
119-
120-
func hasLeadingFrom(seq ast.Seq) bool {
121-
if len(seq) == 0 {
122-
return false
123-
}
124-
_, ok := seq[0].(*ast.From)
125-
return ok
126-
}
127-
12857
func (j *Job) Entry() dag.Seq {
12958
//XXX need to prepend consts depending on context
13059
return j.entry

compiler/kernel/op.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -704,6 +704,8 @@ func isEntry(seq dag.Seq) bool {
704704
return true
705705
case *dag.Scope:
706706
return isEntry(op.Body)
707+
case *dag.UserOpCall:
708+
return isEntry(op.Body)
707709
case *dag.Fork:
708710
if len(op.Paths) == 0 {
709711
return false

compiler/semantic/analyzer.go

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,27 @@ import (
77
"github.com/brimdata/zed/compiler/ast"
88
"github.com/brimdata/zed/compiler/ast/dag"
99
"github.com/brimdata/zed/compiler/data"
10+
"github.com/brimdata/zed/compiler/kernel"
1011
"github.com/brimdata/zed/lakeparse"
1112
)
1213

1314
// Analyze performs a semantic analysis of the AST, translating it from AST
1415
// to DAG form, resolving syntax ambiguities, and performing constant propagation.
1516
// After semantic analysis, the DAG is ready for either optimization or compilation.
1617
func Analyze(ctx context.Context, seq ast.Seq, source *data.Source, head *lakeparse.Commitish) (dag.Seq, error) {
17-
return newAnalyzer(ctx, source, head).semSeq(seq)
18+
a := newAnalyzer(ctx, source, head)
19+
s, err := a.semSeq(seq)
20+
if err != nil {
21+
return nil, err
22+
}
23+
op, err := a.buildFrom(s[0])
24+
if err != nil {
25+
return nil, err
26+
}
27+
if op != nil {
28+
s.Prepend(op)
29+
}
30+
return s, nil
1831
}
1932

2033
type analyzer struct {
@@ -49,6 +62,37 @@ func (a *analyzer) exitScope() {
4962
a.scope = a.scope.parent
5063
}
5164

65+
func (a *analyzer) buildFrom(op dag.Op) (dag.Op, error) {
66+
switch op := op.(type) {
67+
case *dag.FileScan, *dag.HTTPScan, *dag.PoolScan, *dag.LakeMetaScan, *dag.PoolMetaScan, *dag.CommitMetaScan, *dag.DeleteScan:
68+
return nil, nil
69+
case *dag.Fork:
70+
return a.buildFrom(op.Paths[0][0])
71+
case *dag.Scope:
72+
return a.buildFrom(op.Body[0])
73+
case *dag.UserOpCall:
74+
return a.buildFrom(op.Body[0])
75+
}
76+
// No from so add a source.
77+
if a.head == nil {
78+
return &kernel.Reader{}, nil
79+
}
80+
pool := &ast.Pool{
81+
Kind: "Pool",
82+
Spec: ast.PoolSpec{
83+
Pool: &ast.String{
84+
Kind: "String",
85+
Text: "HEAD",
86+
},
87+
},
88+
}
89+
ops, err := a.semPool(pool)
90+
if err != nil {
91+
return nil, err
92+
}
93+
return ops[0], nil
94+
}
95+
5296
type opDecl struct {
5397
ast *ast.OpDecl
5498
deps []*dag.UserOp

runtime/op/ztests/user-from.yaml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
script: |
2+
zq -z -I test.zed
3+
4+
inputs:
5+
- name: test.zed
6+
data: |
7+
op fileA(): (
8+
file A.zson | sort a
9+
)
10+
fileA()
11+
- name: A.zson
12+
data: |
13+
{a:5}
14+
{a:1}
15+
{a:7}
16+
{a:3}
17+
{a:8}
18+
19+
outputs:
20+
- name: stdout
21+
data: |
22+
{a:1}
23+
{a:3}
24+
{a:5}
25+
{a:7}
26+
{a:8}

0 commit comments

Comments
 (0)