Skip to content

Commit 8d5f8b6

Browse files
committed
Add vectorized version of unblend()
1 parent 9237cb5 commit 8d5f8b6

File tree

4 files changed

+335
-2
lines changed

4 files changed

+335
-2
lines changed

runtime/vam/expr/function/function.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,9 @@ func New(sctx *super.Context, name string, narg int) (expr.Function, error) {
149149
case "typeof":
150150
f = &TypeOf{sctx}
151151
case "unblend":
152-
f = &samFunc{sctx, function.NewUnblend(sctx)}
152+
f = &anonFunc{sctx, func(sctx *super.Context, vecs ...vector.Any) vector.Any {
153+
return expr.Unblend(sctx, vecs[0])
154+
}}
153155
case "under":
154156
f = newUnder(sctx)
155157
case "unflatten":
@@ -200,3 +202,12 @@ func (f *samFunc) Call(args ...vector.Any) vector.Any {
200202
}
201203
return b.Build(f.sctx)
202204
}
205+
206+
type anonFunc struct {
207+
sctx *super.Context
208+
fn func(*super.Context, ...vector.Any) vector.Any
209+
}
210+
211+
func (a *anonFunc) Call(args ...vector.Any) vector.Any {
212+
return a.fn(a.sctx, args...)
213+
}

runtime/vam/expr/unblend.go

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
package expr
2+
3+
import (
4+
"slices"
5+
6+
"github.com/brimdata/super"
7+
"github.com/brimdata/super/vector"
8+
)
9+
10+
func Unblend(sctx *super.Context, vec vector.Any) vector.Any {
11+
switch vec.Kind() {
12+
case vector.KindRecord:
13+
return unblendRecord(sctx, vec)
14+
case vector.KindArray:
15+
array := vector.PushdownContainerView(vec).(*vector.Array)
16+
tags, inners, offsets := unblendArrayOrSet(sctx, array.Offsets, array.Values)
17+
var vals []vector.Any
18+
for i, inner := range inners {
19+
typ := sctx.LookupTypeArray(inner.Type())
20+
vals = append(vals, vector.NewArray(typ, offsets[i], inner))
21+
}
22+
if len(vals) > 1 {
23+
return vector.NewDynamic(tags, vals)
24+
}
25+
return vals[0]
26+
case vector.KindSet:
27+
set := vector.PushdownContainerView(vec).(*vector.Set)
28+
tags, inners, offsets := unblendArrayOrSet(sctx, set.Offsets, set.Values)
29+
var vals []vector.Any
30+
for i, inner := range inners {
31+
typ := sctx.LookupTypeSet(inner.Type())
32+
vals = append(vals, vector.NewSet(typ, offsets[i], inner))
33+
}
34+
if len(vals) > 1 {
35+
return vector.NewDynamic(tags, vals)
36+
}
37+
return vals[0]
38+
case vector.KindMap:
39+
return unblendMap(sctx, vector.PushdownContainerView(vec).(*vector.Map))
40+
case vector.KindUnion:
41+
return vector.Apply(true, func(vecs ...vector.Any) vector.Any {
42+
return Unblend(sctx, vecs[0])
43+
}, vec)
44+
}
45+
return vec
46+
}
47+
48+
func unblendRecord(sctx *super.Context, in vector.Any) vector.Any {
49+
vec := in
50+
var index []uint32
51+
if view, ok := vec.(*vector.View); ok {
52+
index = view.Index
53+
vec = view.Any
54+
}
55+
rec := vec.(*vector.Record)
56+
if len(rec.Fields) == 0 {
57+
return in
58+
}
59+
fields := slices.Clone(rec.Fields)
60+
if index != nil {
61+
for i, field := range fields {
62+
fields[i] = vector.Pick(field, index)
63+
}
64+
}
65+
for i, field := range fields {
66+
fields[i] = Unblend(sctx, field)
67+
}
68+
return vector.Apply(false, func(vecs ...vector.Any) vector.Any {
69+
var fields []super.Field
70+
var fvecs []vector.Any
71+
for i, vec := range vecs {
72+
if _, ok := vec.(*vector.None); ok {
73+
continue
74+
}
75+
fvecs = append(fvecs, vec)
76+
fields = append(fields, super.NewField(rec.Typ.Fields[i].Name, vec.Type()))
77+
}
78+
rtyp := sctx.MustLookupTypeRecord(fields)
79+
return vector.NewRecord(rtyp, fvecs, vecs[0].Len())
80+
}, fields...)
81+
}
82+
83+
func unblendArrayOrSet(sctx *super.Context, offsets []uint32, elements vector.Any) ([]uint32, []vector.Any, [][]uint32) {
84+
elements = Unblend(sctx, elements)
85+
dynamic, ok := elements.(*vector.Dynamic)
86+
if !ok {
87+
return nil, []vector.Any{elements}, [][]uint32{offsets}
88+
}
89+
union := vector.NewUnionFromDynamic(sctx, dynamic)
90+
slotTypes := typesOfSlotsInList(sctx, union, offsets)
91+
// Accumulate unique array types.
92+
m := make(map[super.Type][]uint32)
93+
for i, typ := range slotTypes {
94+
m[typ] = append(m[typ], uint32(i))
95+
}
96+
dtags := make([]uint32, len(offsets)-1)
97+
var inners []vector.Any
98+
var offs [][]uint32
99+
for typ, index := range m {
100+
inner, off := subsetOfList(sctx, union, offsets, index, typ)
101+
for _, idx := range index {
102+
dtags[idx] = uint32(len(inners))
103+
}
104+
inners = append(inners, inner)
105+
offs = append(offs, off)
106+
}
107+
return dtags, inners, offs
108+
}
109+
110+
func unblendMap(sctx *super.Context, vmap *vector.Map) vector.Any {
111+
keys := Unblend(sctx, vmap.Keys)
112+
if dynamic, ok := keys.(*vector.Dynamic); ok {
113+
keys = vector.NewUnionFromDynamic(sctx, dynamic)
114+
}
115+
vals := Unblend(sctx, vmap.Values)
116+
if dynamic, ok := vals.(*vector.Dynamic); ok {
117+
vals = vector.NewUnionFromDynamic(sctx, dynamic)
118+
}
119+
if keys.Kind() != vector.KindUnion && vals.Kind() != vector.KindUnion {
120+
return vmap
121+
}
122+
keySlotTypes := typesOfSlotsInList(sctx, keys, vmap.Offsets)
123+
valSlotTypes := typesOfSlotsInList(sctx, vals, vmap.Offsets)
124+
type mapType struct {
125+
key super.Type
126+
val super.Type
127+
}
128+
// Accumulate unique map types.
129+
m := make(map[mapType][]uint32)
130+
for i := range vmap.Len() {
131+
mtyp := mapType{keySlotTypes[i], valSlotTypes[i]}
132+
m[mtyp] = append(m[mtyp], uint32(i))
133+
}
134+
dtags := make([]uint32, len(vmap.Offsets)-1)
135+
var vecs []vector.Any
136+
for mtyp, index := range m {
137+
keys, offsets := subsetOfList(sctx, keys, vmap.Offsets, index, mtyp.key)
138+
vals, _ := subsetOfList(sctx, vals, vmap.Offsets, index, mtyp.val)
139+
for _, idx := range index {
140+
dtags[idx] = uint32(len(vecs))
141+
}
142+
typ := sctx.LookupTypeMap(keys.Type(), vals.Type())
143+
vecs = append(vecs, vector.NewMap(typ, offsets, keys, vals))
144+
}
145+
if len(vecs) == 1 {
146+
return vecs[0]
147+
}
148+
return vector.NewDynamic(dtags, vecs)
149+
}
150+
151+
func subsetOfList(sctx *super.Context, elements vector.Any, parentOffsets, index []uint32, typ super.Type) (vector.Any, []uint32) {
152+
if typ == super.TypeNull {
153+
nulls := vector.NewNull(uint32(len(index)))
154+
offsets := make([]uint32, len(index)+1)
155+
return nulls, offsets
156+
}
157+
var allVals []vector.Any
158+
union, ok := elements.(*vector.Union)
159+
if ok {
160+
allVals = union.Values
161+
} else {
162+
allVals = []vector.Any{elements}
163+
}
164+
var subTypes []super.Type
165+
utyp, ok := typ.(*super.TypeUnion)
166+
if ok {
167+
subTypes = slices.Clone(utyp.Types)
168+
} else {
169+
subTypes = append(subTypes, typ)
170+
}
171+
subVals := make([]vector.Any, len(subTypes))
172+
// map parent union tags to subset union tags
173+
tagMap := make([]uint32, len(allVals))
174+
for i, typ := range subTypes {
175+
idx := slices.IndexFunc(allVals, func(vec vector.Any) bool {
176+
return vec.Type() == typ
177+
})
178+
tagMap[idx] = uint32(i)
179+
subVals[i] = allVals[idx]
180+
}
181+
// Generate:
182+
// - offsets for new array
183+
// - indexes to create view on values
184+
// - tags for union (if applicable)
185+
var forwardTags []uint32
186+
if union != nil {
187+
forwardTags = union.ForwardTagMap()
188+
}
189+
var tags []uint32
190+
indexes := make([][]uint32, len(subTypes))
191+
suboffsets := []uint32{0}
192+
for _, idx := range index {
193+
start, end := parentOffsets[idx], parentOffsets[idx+1]
194+
if union != nil {
195+
for i, origTag := range union.Tags[start:end] {
196+
tag := tagMap[origTag]
197+
tags = append(tags, tag)
198+
indexes[tag] = append(indexes[tag], forwardTags[start+uint32(i)])
199+
}
200+
} else {
201+
for i := start; i < end; i++ {
202+
indexes[0] = append(indexes[0], i)
203+
}
204+
}
205+
suboffsets = append(suboffsets, uint32(len(tags)))
206+
}
207+
for i, val := range subVals {
208+
subVals[i] = vector.Pick(val, indexes[i])
209+
}
210+
var inner vector.Any
211+
if len(subVals) > 1 {
212+
inner = vector.NewUnion(sctx.LookupTypeUnion(subTypes), tags, subVals)
213+
} else {
214+
inner = subVals[0]
215+
}
216+
return inner, suboffsets
217+
}
218+
219+
func typesOfSlotsInList(sctx *super.Context, inner vector.Any, offsets []uint32) []super.Type {
220+
union, _ := inner.(*vector.Union)
221+
var alltypes []super.Type
222+
if union != nil {
223+
for _, val := range union.Values {
224+
alltypes = append(alltypes, val.Type())
225+
}
226+
} else {
227+
alltypes = []super.Type{inner.Type()}
228+
}
229+
n := uint32(len(offsets) - 1)
230+
slotTypes := make([]super.Type, n)
231+
for i := range n {
232+
if union != nil {
233+
slotTypes[i] = typeOfRange(sctx, union, alltypes, offsets[i], offsets[i+1])
234+
} else {
235+
slotTypes[i] = alltypes[0]
236+
}
237+
}
238+
return slotTypes
239+
}
240+
241+
func typeOfRange(sctx *super.Context, union *vector.Union, alltypes []super.Type, start, end uint32) super.Type {
242+
tags := slices.Clone(union.Tags[start:end])
243+
slices.Sort(tags)
244+
uniq := slices.Compact(tags)
245+
if len(uniq) == 0 {
246+
return super.TypeNull
247+
}
248+
if len(uniq) == 1 {
249+
return alltypes[uniq[0]]
250+
}
251+
if len(uniq) == len(alltypes) {
252+
return union.Typ
253+
}
254+
var types []super.Type
255+
for _, tag := range uniq {
256+
types = append(types, alltypes[tag])
257+
}
258+
return sctx.LookupTypeUnion(types)
259+
}

runtime/ztests/expr/function/unblend.yaml

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,25 @@
1-
# Test unblend on nested records.
1+
# Test unblend on records.
22
spq: unblend(this)
33

4+
vector: true
5+
46
input: |
57
{x:{y:2::(int64|bool)},z?:"foo"}
68
{x:{y:true::(int64|bool)},z?:_::string}
9+
{}
710
811
output: |
912
{x:{y:2},z:"foo"}
1013
{x:{y:true}}
14+
{}
1115
1216
---
1317

1418
# Test unblend on root unions.
1519
spq: unblend(this)
1620

21+
vector: true
22+
1723
input: |
1824
1::(int64|bool|{x:int64})
1925
true::(int64|bool|{x:int64})
@@ -29,6 +35,8 @@ output: |
2935
# Test unblend on arrays.
3036
spq: unblend(this)
3137

38+
vector: true
39+
3240
input: |
3341
[1::(int64|bool|string),2::(int64|bool|string),"string"::(int64|bool|string)]
3442
[1::(int64|bool|string),2::(int64|bool|string),true::(int64|bool|string)]
@@ -45,9 +53,28 @@ output: |
4553
4654
---
4755

56+
# Test unblend on elements in an array.
57+
spq: unblend(this)
58+
59+
vector: true
60+
61+
input: |
62+
[{x:1::(bool|int64|string)},{x:true::(bool|int64|string)},{x:"foo"::(bool|int64|string)}]
63+
[{x:2::(bool|int64|string)},{x:3::(bool|int64|string)},{x:"bar"::(bool|int64|string)}]
64+
[{x:true::(bool|int64|string)},{x:false::(bool|int64|string)},{x:"bar"::(bool|int64|string)}]
65+
66+
output: |
67+
[{x:1},{x:true},{x:"foo"}]
68+
[{x:2},{x:3},{x:"bar"}]
69+
[{x:true},{x:false},{x:"bar"}]
70+
71+
---
72+
4873
# Test unblend on sets.
4974
spq: unblend(this)
5075

76+
vector: true
77+
5178
input: |
5279
|[1::(int64|bool|string),2::(int64|bool|string),"string"::(int64|bool|string)]|
5380
|[1::(int64|bool|string),2::(int64|bool|string),true::(int64|bool|string)]|
@@ -71,10 +98,12 @@ input: |
7198
|{"k1"::(int64|string):"v1"::(int64|string),"k2"::(int64|string):2::(int64|string)}|
7299
|{"k1"::(int64|string):1::(int64|string),"k2"::(int64|string):2::(int64|string)}|
73100
|{"k1"::(int64|string):"v1"::(int64|string),2::(int64|string):"v2"::(int64|string)}|
101+
|{"k1":"v1"::(int64|string),"k2":"v2"::(int64|string)}|
74102
|{}|::|{(int64|string):(int64|string)}|
75103
76104
output: |
77105
|{"k1":"v1","k2":2}|
78106
|{"k1":1,"k2":2}|
79107
|{2:"v2","k1":"v1"}|
108+
|{"k1":"v1","k2":"v2"}|
80109
|{}|

vector/array.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,37 @@ func Inner(val Any) Any {
6666
}
6767
panic(val)
6868
}
69+
70+
func PushdownContainerView(val Any) Any {
71+
view, ok := val.(*View)
72+
if !ok {
73+
return val
74+
}
75+
switch val := view.Any.(type) {
76+
case *Array:
77+
inner, offsets := pickList(val.Values, view.Index, val.Offsets)
78+
return NewArray(val.Typ, offsets, inner)
79+
case *Set:
80+
inner, offsets := pickList(val.Values, view.Index, val.Offsets)
81+
return NewSet(val.Typ, offsets, inner)
82+
case *Map:
83+
keys, offsets := pickList(val.Keys, view.Index, val.Offsets)
84+
values, _ := pickList(val.Values, view.Index, val.Offsets)
85+
return NewMap(val.Typ, offsets, keys, values)
86+
default:
87+
panic(val)
88+
}
89+
}
90+
91+
func pickList(inner Any, index, offsets []uint32) (Any, []uint32) {
92+
newOffsets := []uint32{0}
93+
var innerIndex []uint32
94+
for _, idx := range index {
95+
start, end := offsets[idx], offsets[idx+1]
96+
for ; start < end; start++ {
97+
innerIndex = append(innerIndex, start)
98+
}
99+
newOffsets = append(newOffsets, uint32(len(innerIndex)))
100+
}
101+
return Pick(inner, innerIndex), newOffsets
102+
}

0 commit comments

Comments
 (0)