Skip to content

Commit c40f238

Browse files
authored
zq: Remove dual input join (#4689)
Remove functionality from zq where if a program starts with a headless join and is run with two inputs, the first file is the left part of the join while the second is the right side.
1 parent 8bf604c commit c40f238

File tree

12 files changed

+42
-75
lines changed

12 files changed

+42
-75
lines changed

compiler/file.go

Lines changed: 4 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,10 @@ func (f *fsCompiler) NewQuery(octx *op.Context, seq ast.Seq, readers []zio.Reade
2626
if err != nil {
2727
return nil, err
2828
}
29-
if isJoin(seq) {
30-
if len(readers) != 2 {
31-
return nil, errors.New("join operator requires two inputs")
32-
}
33-
if len(job.readers) != 2 {
34-
return nil, errors.New("internal error: join expected by semantic analyzer")
35-
}
36-
job.readers[0].Readers = readers[0:1]
37-
job.readers[1].Readers = readers[1:2]
38-
} else if len(readers) == 0 {
29+
if len(readers) == 0 {
3930
// If there's no reader but the DAG wants an input, then
4031
// flag an error.
41-
if len(job.readers) != 0 {
32+
if job.reader != nil {
4233
return nil, errors.New("no input specified: use a command-line file or a Zed source operator")
4334
}
4435
} else {
@@ -47,13 +38,10 @@ func (f *fsCompiler) NewQuery(octx *op.Context, seq ast.Seq, readers []zio.Reade
4738
// TBD: we could have such a configuration is a composite
4839
// from command includes a "pass" operator, but we can add this later.
4940
// See issue #2640.
50-
if len(job.readers) == 0 {
41+
if job.reader == nil {
5142
return nil, errors.New("redundant inputs specified: use either command-line files or a Zed source operator")
5243
}
53-
if len(job.readers) != 1 {
54-
return nil, errors.New("Zed query requires a single input path")
55-
}
56-
job.readers[0].Readers = readers
44+
job.reader.Readers = readers
5745
}
5846
return optimizeAndBuild(job)
5947
}
@@ -66,19 +54,6 @@ func (*fsCompiler) NewLakeDeleteQuery(octx *op.Context, program ast.Seq, head *l
6654
panic("NewLakeDeleteQuery called on compiler.fsCompiler")
6755
}
6856

69-
func isJoin(seq ast.Seq) bool {
70-
if len(seq) == 0 {
71-
return false
72-
}
73-
switch op := seq[0].(type) {
74-
case *ast.Join:
75-
return true
76-
case *ast.Scope:
77-
return isJoin(op.Body)
78-
}
79-
return false
80-
}
81-
8257
func optimizeAndBuild(job *Job) (*runtime.Query, error) {
8358
// Call optimize to possible push down a filter predicate into the
8459
// kernel.Reader so that the zng scanner can do boyer-moore.

compiler/job.go

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type Job struct {
2020
builder *kernel.Builder
2121
optimizer *optimizer.Optimizer
2222
outputs []zbuf.Puller
23-
readers []*kernel.Reader
23+
reader *kernel.Reader
2424
puller zbuf.Puller
2525
entry dag.Seq
2626
}
@@ -40,7 +40,7 @@ func NewJob(octx *op.Context, in ast.Seq, src *data.Source, head *lakeparse.Comm
4040
if len(seq) == 0 {
4141
return nil, errors.New("internal error: AST seq cannot be empty")
4242
}
43-
from, readers, err := buildFrom(seq[0], head)
43+
from, reader, err := buildFrom(seq[0], head)
4444
if err != nil {
4545
return nil, err
4646
}
@@ -55,38 +55,24 @@ func NewJob(octx *op.Context, in ast.Seq, src *data.Source, head *lakeparse.Comm
5555
octx: octx,
5656
builder: kernel.NewBuilder(octx, src),
5757
optimizer: optimizer.New(octx.Context, src),
58-
readers: readers,
58+
reader: reader,
5959
entry: entry,
6060
}, nil
6161
}
6262

63-
func buildFrom(op ast.Op, head *lakeparse.Commitish) (*ast.From, []*kernel.Reader, error) {
64-
var readers []*kernel.Reader
63+
func buildFrom(op ast.Op, head *lakeparse.Commitish) (*ast.From, *kernel.Reader, error) {
6564
var from *ast.From
6665
switch op := op.(type) {
6766
case *ast.From:
6867
// Already have an entry point with From. Do nothing.
6968
return nil, nil, nil
70-
case *ast.Join:
71-
readers = []*kernel.Reader{{}, {}}
72-
trunk0 := ast.Trunk{
73-
Kind: "Trunk",
74-
Source: readers[0],
75-
}
76-
trunk1 := ast.Trunk{
77-
Kind: "Trunk",
78-
Source: readers[1],
79-
}
80-
return &ast.From{
81-
Kind: "From",
82-
Trunks: []ast.Trunk{trunk0, trunk1},
83-
}, readers, nil
8469
case *ast.Scope:
8570
if len(op.Body) == 0 {
8671
return nil, nil, errors.New("internal error: scope op has empty body")
8772
}
8873
return buildFrom(op.Body[0], head)
8974
default:
75+
var readers *kernel.Reader
9076
trunk := ast.Trunk{Kind: "Trunk"}
9177
if head != nil {
9278
// For the lakes, if there is no from operator, then
@@ -102,8 +88,8 @@ func buildFrom(op ast.Op, head *lakeparse.Commitish) (*ast.From, []*kernel.Reade
10288
},
10389
}
10490
} else {
105-
readers = []*kernel.Reader{{}}
106-
trunk.Source = readers[0]
91+
readers = &kernel.Reader{}
92+
trunk.Source = readers
10793
}
10894
from = &ast.From{
10995
Kind: "From",

compiler/lake.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func (l *lakeCompiler) NewLakeQuery(octx *op.Context, program ast.Seq, paralleli
3636
if err != nil {
3737
return nil, err
3838
}
39-
if len(job.readers) != 0 {
39+
if job.reader != nil {
4040
return nil, errors.New("query must include a 'from' operator")
4141
}
4242
if err := job.Optimize(); err != nil {

compiler/reader.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package compiler
22

33
import (
4+
"errors"
45
"fmt"
56

67
"github.com/brimdata/zed/compiler/ast"
@@ -29,12 +30,12 @@ func CompileWithSortKey(octx *op.Context, seq ast.Seq, r zio.Reader, sortKey ord
2930
if err != nil {
3031
return nil, err
3132
}
32-
readers := job.readers
33-
if len(readers) != 1 {
34-
return nil, fmt.Errorf("CompileWithSortKey: Zed program expected %d readers", len(readers))
33+
reader := job.reader
34+
if reader == nil {
35+
return nil, errors.New("CompileWithSortKey: Zed program expected a reader")
3536
}
36-
readers[0].Readers = []zio.Reader{r}
37-
readers[0].SortKey = sortKey
37+
reader.Readers = []zio.Reader{r}
38+
reader.SortKey = sortKey
3839
return optimizeAndBuild(job)
3940
}
4041

runtime/op/join/ztests/empty-inner.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
script: |
2-
zq -z 'left join on a=b hit:=sc | sort a' A.zson C.zson
2+
zq -z 'left join (file C.zson) on a=b hit:=sc' A.zson
33
44
inputs:
55
- name: A.zson

runtime/op/join/ztests/expr.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
script: |
2-
zq -z 'left join on s b' A.zson B.zson
2+
zq -z 'left join (file B.zson) on s b' A.zson
33
echo ===
4-
zq -z 'left join on s=(lower(s)) b' A.zson B.zson
4+
zq -z 'left join (file B.zson) on s=(lower(s)) b' A.zson
55
echo ===
6-
zq -z 'left join on (lower(s))=(lower(s)) b' A.zson B.zson
6+
zq -z 'left join (file B.zson) on (lower(s))=(lower(s)) b' A.zson
77
echo ===
8-
zq -z 'left join on s' A.zson B.zson
8+
zq -z 'left join (file B.zson) on s' A.zson
99
1010
inputs:
1111
- name: A.zson

runtime/op/join/ztests/join-union.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
script: |
2-
zq -z 'inner join on a=b' a.zson b.zson
2+
zq -z 'inner join (file b.zson) on a=b' a.zson
33
inputs:
44
- name: a.zson
55
data: |

runtime/op/join/ztests/kinds.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
script: |
22
echo === ANTI ===
3-
zq -z 'anti join on a=b | sort a' A.zson B.zson
3+
zq -z 'anti join (file B.zson) on a=b | sort a' A.zson
44
echo === LEFT ===
5-
zq -z 'left join on a=b hit:=sb | sort a' A.zson B.zson
5+
zq -z 'left join (file B.zson) on a=b hit:=sb | sort a' A.zson
66
echo === INNER ===
7-
zq -z 'inner join on a=b hit:=sb | sort a' A.zson B.zson
7+
zq -z 'inner join (file B.zson) on a=b hit:=sb | sort a' A.zson
88
echo === RIGHT ===
9-
zq -z 'right join on b=c hit:=sb | sort c' B.zson C.zson
9+
zq -z 'right join (file C.zson) on b=c hit:=sb | sort c' B.zson
1010
1111
inputs:
1212
- name: A.zson

runtime/ztests/parallel-err.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@ inputs:
1111
outputs:
1212
- name: stderr
1313
data: |
14-
join operator requires two inputs
14+
join requires two upstream parallel query paths

runtime/ztests/parallel-stdin.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
script: |
2-
zq -z 'join on a=b b:=b' - B.zson
2+
zq -z 'join (file B.zson) on a=b b:=b' -
33
44
inputs:
55
- name: stdin

0 commit comments

Comments
 (0)