Skip to content

Commit 2e8acf6

Browse files
committed
Add vectorized version of unblend()
1 parent 9237cb5 commit 2e8acf6

File tree

4 files changed

+328
-1
lines changed

4 files changed

+328
-1
lines changed

runtime/vam/expr/function/function.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,9 @@ func New(sctx *super.Context, name string, narg int) (expr.Function, error) {
149149
case "typeof":
150150
f = &TypeOf{sctx}
151151
case "unblend":
152-
f = &samFunc{sctx, function.NewUnblend(sctx)}
152+
f = &anonFunc{sctx, func(sctx *super.Context, vecs ...vector.Any) vector.Any {
153+
return expr.Unblend(sctx, vecs[0])
154+
}}
153155
case "under":
154156
f = newUnder(sctx)
155157
case "unflatten":
@@ -200,3 +202,12 @@ func (f *samFunc) Call(args ...vector.Any) vector.Any {
200202
}
201203
return b.Build(f.sctx)
202204
}
205+
206+
type anonFunc struct {
207+
sctx *super.Context
208+
fn func(*super.Context, ...vector.Any) vector.Any
209+
}
210+
211+
func (a *anonFunc) Call(args ...vector.Any) vector.Any {
212+
return a.fn(a.sctx, args...)
213+
}

runtime/vam/expr/unblend.go

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
package expr
2+
3+
import (
4+
"slices"
5+
6+
"github.com/brimdata/super"
7+
"github.com/brimdata/super/vector"
8+
)
9+
10+
func Unblend(sctx *super.Context, vec vector.Any) vector.Any {
11+
switch vec.Kind() {
12+
case vector.KindRecord:
13+
return unblendRecord(sctx, vec)
14+
case vector.KindArray:
15+
array := vector.PushdownContainerView(vec).(*vector.Array)
16+
tags, inners, offsets := unblendArrayOrSet(sctx, array.Offsets, array.Values)
17+
var vals []vector.Any
18+
for i, inner := range inners {
19+
typ := sctx.LookupTypeArray(inner.Type())
20+
vals = append(vals, vector.NewArray(typ, offsets[i], inner))
21+
}
22+
if len(vals) > 1 {
23+
return vector.NewDynamic(tags, vals)
24+
}
25+
return vals[0]
26+
case vector.KindSet:
27+
set := vector.PushdownContainerView(vec).(*vector.Set)
28+
tags, inners, offsets := unblendArrayOrSet(sctx, set.Offsets, set.Values)
29+
var vals []vector.Any
30+
for i, inner := range inners {
31+
typ := sctx.LookupTypeSet(inner.Type())
32+
vals = append(vals, vector.NewSet(typ, offsets[i], inner))
33+
}
34+
if len(vals) > 1 {
35+
return vector.NewDynamic(tags, vals)
36+
}
37+
return vals[0]
38+
case vector.KindMap:
39+
return unblendMap(sctx, vector.PushdownContainerView(vec).(*vector.Map))
40+
case vector.KindUnion:
41+
return vector.Apply(true, func(vecs ...vector.Any) vector.Any {
42+
return Unblend(sctx, vecs[0])
43+
}, vec)
44+
}
45+
return vec
46+
}
47+
48+
func unblendRecord(sctx *super.Context, vec vector.Any) vector.Any {
49+
var index []uint32
50+
if view, ok := vec.(*vector.View); ok {
51+
index = view.Index
52+
vec = view.Any
53+
}
54+
rec := vec.(*vector.Record)
55+
fields := slices.Clone(rec.Fields)
56+
if index != nil {
57+
for i, field := range fields {
58+
fields[i] = vector.Pick(field, index)
59+
}
60+
}
61+
for i, field := range fields {
62+
fields[i] = Unblend(sctx, field)
63+
}
64+
return vector.Apply(false, func(vecs ...vector.Any) vector.Any {
65+
var fields []super.Field
66+
var fvecs []vector.Any
67+
for i, vec := range vecs {
68+
if _, ok := vec.(*vector.None); ok {
69+
continue
70+
}
71+
fvecs = append(fvecs, vec)
72+
fields = append(fields, super.NewField(rec.Typ.Fields[i].Name, vec.Type()))
73+
}
74+
rtyp := sctx.MustLookupTypeRecord(fields)
75+
return vector.NewRecord(rtyp, fvecs, vecs[0].Len())
76+
}, fields...)
77+
}
78+
79+
func unblendArrayOrSet(sctx *super.Context, offsets []uint32, elements vector.Any) ([]uint32, []vector.Any, [][]uint32) {
80+
elements = Unblend(sctx, elements)
81+
dynamic, ok := elements.(*vector.Dynamic)
82+
if !ok {
83+
return nil, []vector.Any{elements}, [][]uint32{offsets}
84+
}
85+
union := vector.NewUnionFromDynamic(sctx, dynamic)
86+
slotTypes := typesOfSlotsInList(sctx, union, offsets)
87+
// Accumulate unique array types.
88+
m := make(map[super.Type][]uint32)
89+
for i, typ := range slotTypes {
90+
m[typ] = append(m[typ], uint32(i))
91+
}
92+
dtags := make([]uint32, len(offsets)-1)
93+
var inners []vector.Any
94+
var offs [][]uint32
95+
for typ, index := range m {
96+
inner, off := subsetOfList(sctx, union, offsets, index, typ)
97+
for _, idx := range index {
98+
dtags[idx] = uint32(len(inners))
99+
}
100+
inners = append(inners, inner)
101+
offs = append(offs, off)
102+
}
103+
return dtags, inners, offs
104+
}
105+
106+
func unblendMap(sctx *super.Context, vmap *vector.Map) vector.Any {
107+
keys := Unblend(sctx, vmap.Keys)
108+
if dynamic, ok := keys.(*vector.Dynamic); ok {
109+
keys = vector.NewUnionFromDynamic(sctx, dynamic)
110+
}
111+
vals := Unblend(sctx, vmap.Values)
112+
if dynamic, ok := vals.(*vector.Dynamic); ok {
113+
vals = vector.NewUnionFromDynamic(sctx, dynamic)
114+
}
115+
if keys.Kind() != vector.KindUnion && vals.Kind() != vector.KindUnion {
116+
return vmap
117+
}
118+
keySlotTypes := typesOfSlotsInList(sctx, keys, vmap.Offsets)
119+
valSlotTypes := typesOfSlotsInList(sctx, vals, vmap.Offsets)
120+
type mapType struct {
121+
key super.Type
122+
val super.Type
123+
}
124+
// Accumulate unique map types.
125+
m := make(map[mapType][]uint32)
126+
for i := range vmap.Len() {
127+
mtyp := mapType{keySlotTypes[i], valSlotTypes[i]}
128+
m[mtyp] = append(m[mtyp], uint32(i))
129+
}
130+
dtags := make([]uint32, len(vmap.Offsets)-1)
131+
var vecs []vector.Any
132+
for mtyp, index := range m {
133+
keys, offsets := subsetOfList(sctx, keys, vmap.Offsets, index, mtyp.key)
134+
vals, _ := subsetOfList(sctx, vals, vmap.Offsets, index, mtyp.val)
135+
for _, idx := range index {
136+
dtags[idx] = uint32(len(vecs))
137+
}
138+
typ := sctx.LookupTypeMap(keys.Type(), vals.Type())
139+
vecs = append(vecs, vector.NewMap(typ, offsets, keys, vals))
140+
}
141+
if len(vecs) == 1 {
142+
return vecs[0]
143+
}
144+
return vector.NewDynamic(dtags, vecs)
145+
}
146+
147+
func subsetOfList(sctx *super.Context, elements vector.Any, parentOffsets, index []uint32, typ super.Type) (vector.Any, []uint32) {
148+
if typ == super.TypeNull {
149+
nulls := vector.NewNull(uint32(len(index)))
150+
offsets := make([]uint32, len(index)+1)
151+
return nulls, offsets
152+
}
153+
var allVals []vector.Any
154+
union, ok := elements.(*vector.Union)
155+
if ok {
156+
allVals = union.Values
157+
} else {
158+
allVals = []vector.Any{elements}
159+
}
160+
var subTypes []super.Type
161+
utyp, ok := typ.(*super.TypeUnion)
162+
if ok {
163+
subTypes = slices.Clone(utyp.Types)
164+
} else {
165+
subTypes = append(subTypes, typ)
166+
}
167+
subVals := make([]vector.Any, len(subTypes))
168+
// map parent union tags to subset union tags
169+
tagMap := make([]uint32, len(allVals))
170+
for i, typ := range subTypes {
171+
idx := slices.IndexFunc(allVals, func(vec vector.Any) bool {
172+
return vec.Type() == typ
173+
})
174+
tagMap[idx] = uint32(i)
175+
subVals[i] = allVals[idx]
176+
}
177+
// Generate:
178+
// - offsets for new array
179+
// - indexes to create view on values
180+
// - tags for union (if applicable)
181+
var forwardTags []uint32
182+
if union != nil {
183+
forwardTags = union.ForwardTagMap()
184+
}
185+
var tags []uint32
186+
indexes := make([][]uint32, len(subTypes))
187+
suboffsets := []uint32{0}
188+
for _, idx := range index {
189+
start, end := parentOffsets[idx], parentOffsets[idx+1]
190+
if union != nil {
191+
for i, origTag := range union.Tags[start:end] {
192+
tag := tagMap[origTag]
193+
tags = append(tags, tag)
194+
indexes[tag] = append(indexes[tag], forwardTags[start+uint32(i)])
195+
}
196+
} else {
197+
for i := start; i < end; i++ {
198+
indexes[0] = append(indexes[0], i)
199+
}
200+
}
201+
suboffsets = append(suboffsets, uint32(len(tags)))
202+
}
203+
for i, val := range subVals {
204+
subVals[i] = vector.Pick(val, indexes[i])
205+
}
206+
var inner vector.Any
207+
if len(subVals) > 1 {
208+
inner = vector.NewUnion(sctx.LookupTypeUnion(subTypes), tags, subVals)
209+
} else {
210+
inner = subVals[0]
211+
}
212+
return inner, suboffsets
213+
}
214+
215+
func typesOfSlotsInList(sctx *super.Context, inner vector.Any, offsets []uint32) []super.Type {
216+
union, _ := inner.(*vector.Union)
217+
var alltypes []super.Type
218+
if union != nil {
219+
for _, val := range union.Values {
220+
alltypes = append(alltypes, val.Type())
221+
}
222+
} else {
223+
alltypes = []super.Type{inner.Type()}
224+
}
225+
n := uint32(len(offsets) - 1)
226+
slotTypes := make([]super.Type, n)
227+
for i := range n {
228+
if union != nil {
229+
slotTypes[i] = typeOfRange(sctx, union, alltypes, offsets[i], offsets[i+1])
230+
} else {
231+
slotTypes[i] = alltypes[0]
232+
}
233+
}
234+
return slotTypes
235+
}
236+
237+
func typeOfRange(sctx *super.Context, union *vector.Union, alltypes []super.Type, start, end uint32) super.Type {
238+
tags := slices.Clone(union.Tags[start:end])
239+
slices.Sort(tags)
240+
uniq := slices.Compact(tags)
241+
if len(uniq) == 0 {
242+
return super.TypeNull
243+
}
244+
if len(uniq) == 1 {
245+
return alltypes[uniq[0]]
246+
}
247+
if len(uniq) == len(alltypes) {
248+
return union.Typ
249+
}
250+
var types []super.Type
251+
for _, tag := range uniq {
252+
types = append(types, alltypes[tag])
253+
}
254+
return sctx.LookupTypeUnion(types)
255+
}

runtime/ztests/expr/function/unblend.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Test unblend on nested records.
22
spq: unblend(this)
33

4+
vector: true
5+
46
input: |
57
{x:{y:2::(int64|bool)},z?:"foo"}
68
{x:{y:true::(int64|bool)},z?:_::string}
@@ -14,6 +16,8 @@ output: |
1416
# Test unblend on root unions.
1517
spq: unblend(this)
1618

19+
vector: true
20+
1721
input: |
1822
1::(int64|bool|{x:int64})
1923
true::(int64|bool|{x:int64})
@@ -29,6 +33,8 @@ output: |
2933
# Test unblend on arrays.
3034
spq: unblend(this)
3135

36+
vector: true
37+
3238
input: |
3339
[1::(int64|bool|string),2::(int64|bool|string),"string"::(int64|bool|string)]
3440
[1::(int64|bool|string),2::(int64|bool|string),true::(int64|bool|string)]
@@ -45,9 +51,28 @@ output: |
4551
4652
---
4753

54+
# Test unblend on elements in an array.
55+
spq: unblend(this)
56+
57+
vector: true
58+
59+
input: |
60+
[{x:1::(bool|int64|string)},{x:true::(bool|int64|string)},{x:"foo"::(bool|int64|string)}]
61+
[{x:2::(bool|int64|string)},{x:3::(bool|int64|string)},{x:"bar"::(bool|int64|string)}]
62+
[{x:true::(bool|int64|string)},{x:false::(bool|int64|string)},{x:"bar"::(bool|int64|string)}]
63+
64+
output: |
65+
[{x:1},{x:true},{x:"foo"}]
66+
[{x:2},{x:3},{x:"bar"}]
67+
[{x:true},{x:false},{x:"bar"}]
68+
69+
---
70+
4871
# Test unblend on sets.
4972
spq: unblend(this)
5073

74+
vector: true
75+
5176
input: |
5277
|[1::(int64|bool|string),2::(int64|bool|string),"string"::(int64|bool|string)]|
5378
|[1::(int64|bool|string),2::(int64|bool|string),true::(int64|bool|string)]|
@@ -71,10 +96,12 @@ input: |
7196
|{"k1"::(int64|string):"v1"::(int64|string),"k2"::(int64|string):2::(int64|string)}|
7297
|{"k1"::(int64|string):1::(int64|string),"k2"::(int64|string):2::(int64|string)}|
7398
|{"k1"::(int64|string):"v1"::(int64|string),2::(int64|string):"v2"::(int64|string)}|
99+
|{"k1":"v1"::(int64|string),"k2":"v2"::(int64|string)}|
74100
|{}|::|{(int64|string):(int64|string)}|
75101
76102
output: |
77103
|{"k1":"v1","k2":2}|
78104
|{"k1":1,"k2":2}|
79105
|{2:"v2","k1":"v1"}|
106+
|{"k1":"v1","k2":"v2"}|
80107
|{}|

vector/array.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,37 @@ func Inner(val Any) Any {
6666
}
6767
panic(val)
6868
}
69+
70+
func PushdownContainerView(val Any) Any {
71+
view, ok := val.(*View)
72+
if !ok {
73+
return val
74+
}
75+
switch val := val.(type) {
76+
case *Array:
77+
inner, offsets := pickList(val.Values, view.Index, val.Offsets)
78+
return NewArray(val.Typ, offsets, inner)
79+
case *Set:
80+
inner, offsets := pickList(val.Values, view.Index, val.Offsets)
81+
return NewSet(val.Typ, offsets, inner)
82+
case *Map:
83+
keys, offsets := pickList(val.Keys, view.Index, val.Offsets)
84+
values, _ := pickList(val.Values, view.Index, val.Offsets)
85+
return NewMap(val.Typ, offsets, keys, values)
86+
default:
87+
panic(val)
88+
}
89+
}
90+
91+
func pickList(inner Any, index, offsets []uint32) (Any, []uint32) {
92+
newOffsets := []uint32{0}
93+
var innerIndex []uint32
94+
for _, idx := range index {
95+
start, end := offsets[idx], offsets[idx+1]
96+
for ; start < end; start++ {
97+
innerIndex = append(innerIndex, start)
98+
}
99+
newOffsets = append(newOffsets, uint32(len(innerIndex)))
100+
}
101+
return Pick(inner, innerIndex), newOffsets
102+
}

0 commit comments

Comments
 (0)