Skip to content

Commit cc3ba10

Browse files
authored
Add Object/Array foreach (#49)
Some cleanups and better performance for small inputs.
1 parent 0c4d67f commit cc3ba10

14 files changed

+192
-42
lines changed

benchmarks_test.go

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ func benchmarkFromFile(b *testing.B, filename string) {
6060

6161
}
6262

63+
func BenchmarkSmall(b *testing.B) { benchmarkFromFile(b, "payload-small") }
64+
func BenchmarkMedium(b *testing.B) { benchmarkFromFile(b, "payload-medium") }
65+
func BenchmarkLarge(b *testing.B) { benchmarkFromFile(b, "payload-large") }
6366
func BenchmarkApache_builds(b *testing.B) { benchmarkFromFile(b, "apache_builds") }
6467
func BenchmarkCanada(b *testing.B) { benchmarkFromFile(b, "canada") }
6568
func BenchmarkCitm_catalog(b *testing.B) { benchmarkFromFile(b, "citm_catalog") }
@@ -72,7 +75,7 @@ func BenchmarkMesh_pretty(b *testing.B) { benchmarkFromFile(b, "mesh.pretty")
7275
func BenchmarkNumbers(b *testing.B) { benchmarkFromFile(b, "numbers") }
7376
func BenchmarkRandom(b *testing.B) { benchmarkFromFile(b, "random") }
7477
func BenchmarkTwitter(b *testing.B) { benchmarkFromFile(b, "twitter") }
75-
func BenchmarkTwitterescaped(b *testing.B) { benchmarkFromFile(b, "twitterescaped") }
78+
func BenchmarkTwitterEscaped(b *testing.B) { benchmarkFromFile(b, "twitterescaped") }
7679
func BenchmarkUpdate_center(b *testing.B) { benchmarkFromFile(b, "update-center") }
7780

7881
func benchmarkJsoniter(b *testing.B, filename string) {
@@ -138,3 +141,92 @@ func BenchmarkJsoniterRandom(b *testing.B) { benchmarkJsoniter(b, "rando
138141
func BenchmarkJsoniterTwitter(b *testing.B) { benchmarkJsoniter(b, "twitter") }
139142
func BenchmarkJsoniterTwitterescaped(b *testing.B) { benchmarkJsoniter(b, "twitterescaped") }
140143
func BenchmarkJsoniterUpdate_center(b *testing.B) { benchmarkJsoniter(b, "update-center") }
144+
145+
func BenchmarkJsonParserLarge(b *testing.B) {
146+
largeFixture := loadCompressed(b, "payload-large")
147+
148+
b.Run("nocopy", func(b *testing.B) {
149+
pj := &ParsedJson{}
150+
b.SetBytes(int64(len(largeFixture)))
151+
b.ReportAllocs()
152+
b.ResetTimer()
153+
var elem *Element
154+
var ar *Array
155+
var obj *Object
156+
var onlyKeys = map[string]struct{}{
157+
"id": {},
158+
"slug": {},
159+
}
160+
const checkErrs = false
161+
for i := 0; i < b.N; i++ {
162+
// Reset tape
163+
var err error
164+
pj, err = Parse(largeFixture, pj, WithCopyStrings(false))
165+
if checkErrs && err != nil {
166+
b.Fatal(err)
167+
}
168+
iter := pj.Iter()
169+
elem, err = iter.FindElement("users", elem)
170+
if checkErrs && err != nil {
171+
b.Fatal(err)
172+
}
173+
ar, err = elem.Iter.Array(ar)
174+
if checkErrs && err != nil {
175+
b.Fatal(err)
176+
}
177+
ar.ForEach(func(t Type, i Iter) {
178+
elem, err = i.FindElement("username", elem)
179+
if checkErrs && err != nil {
180+
b.Fatal(err)
181+
}
182+
_, _ = elem.Iter.StringBytes()
183+
})
184+
185+
elem, err = iter.FindElement("topics/topics", elem)
186+
if checkErrs && err != nil {
187+
b.Fatal(err)
188+
}
189+
ar, err = elem.Iter.Array(ar)
190+
if checkErrs && err != nil {
191+
b.Fatal(err)
192+
}
193+
ar.ForEach(func(t Type, i Iter) {
194+
if true {
195+
// Use foreach...
196+
obj, err = i.Object(obj)
197+
if checkErrs && err != nil {
198+
b.Fatal(err)
199+
}
200+
obj.ForEach(func(key []byte, i Iter) {
201+
if string(key) == "id" {
202+
_, err = i.Int()
203+
if checkErrs && err != nil {
204+
b.Fatal(err)
205+
}
206+
}
207+
if string(key) == "slug" {
208+
_, err = i.StringBytes()
209+
if checkErrs && err != nil {
210+
b.Fatal(err)
211+
}
212+
}
213+
214+
}, onlyKeys)
215+
} else {
216+
elem, err = i.FindElement("id", elem)
217+
if checkErrs && err != nil {
218+
b.Fatal(err)
219+
}
220+
_, _ = elem.Iter.Int()
221+
//b.Log(elem.Iter.Int())
222+
elem, err = i.FindElement("slug", elem)
223+
if checkErrs && err != nil {
224+
b.Fatal(err)
225+
}
226+
_, _ = elem.Iter.StringBytes()
227+
//b.Log(elem.Iter.String())
228+
}
229+
})
230+
}
231+
})
232+
}

parse_json_amd64.go

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -49,16 +49,7 @@ func (pj *internalParsedJson) initialize(size int) {
4949
pj.containingScopeOffset = pj.containingScopeOffset[:0]
5050
}
5151

52-
func (pj *internalParsedJson) parseMessage(msg []byte) error {
53-
return pj.parseMessageInternal(msg, false)
54-
}
55-
56-
func (pj *internalParsedJson) parseMessageNdjson(msg []byte) error {
57-
return pj.parseMessageInternal(msg, true)
58-
}
59-
60-
func (pj *internalParsedJson) parseMessageInternal(msg []byte, ndjson bool) (err error) {
61-
52+
func (pj *internalParsedJson) parseMessage(msg []byte, ndjson bool) (err error) {
6253
// Cache message so we can point directly to strings
6354
// TODO: Find out why TestVerifyTape/instruments fails without bytes.TrimSpace
6455
pj.Message = bytes.TrimSpace(msg)
@@ -70,33 +61,46 @@ func (pj *internalParsedJson) parseMessageInternal(msg []byte, ndjson bool) (err
7061
pj.ndjson = 0
7162
}
7263

73-
var wg sync.WaitGroup
74-
wg.Add(2)
75-
7664
// Make the capacity of the channel smaller than the number of slots.
7765
// This way the sender will automatically block until the consumer
7866
// has finished the slot it is working on.
7967
pj.indexChans = make(chan indexChan, indexSlots-2)
8068
pj.buffersOffset = ^uint64(0)
8169

8270
var errStage1 error
83-
go func() {
84-
if !findStructuralIndices(pj.Message, pj) {
71+
72+
// Do long inputs async
73+
if len(pj.Message) > 8<<10 {
74+
var wg sync.WaitGroup
75+
wg.Add(1)
76+
go func() {
77+
defer wg.Done()
78+
if !pj.unifiedMachine(pj.Message) {
79+
err = errors.New("Bad parsing while executing stage 2")
80+
// drain the channel until empty
81+
for range pj.indexChans {
82+
}
83+
}
84+
}()
85+
if !pj.findStructuralIndices(pj.Message) {
8586
errStage1 = errors.New("Failed to find all structural indices for stage 1")
8687
}
87-
wg.Done()
88-
}()
89-
go func() {
90-
if !unifiedMachine(pj.Message, pj) {
91-
err = errors.New("Bad parsing while executing stage 2")
88+
wg.Wait()
89+
} else {
90+
if !pj.findStructuralIndices(pj.Message) {
9291
// drain the channel until empty
9392
for range pj.indexChans {
9493
}
94+
return errors.New("Failed to find all structural indices for stage 1")
9595
}
96-
wg.Done()
97-
}()
98-
99-
wg.Wait()
96+
if !pj.unifiedMachine(pj.Message) {
97+
// drain the channel until empty
98+
for range pj.indexChans {
99+
}
100+
return errors.New("Bad parsing while executing stage 2")
101+
}
102+
return nil
103+
}
100104

101105
if errStage1 != nil {
102106
return errStage1

parse_json_amd64_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func TestDemoNdjson(t *testing.T) {
3636

3737
pj := internalParsedJson{}
3838

39-
if err := pj.parseMessageNdjson([]byte(demo_ndjson)); err != nil {
39+
if err := pj.parseMessage([]byte(demo_ndjson), true); err != nil {
4040
t.Errorf("TestDemoNdjson: got: %v want: nil", err)
4141
}
4242

@@ -63,7 +63,7 @@ func TestNdjsonEmptyLines(t *testing.T) {
6363
pj := internalParsedJson{}
6464

6565
for _, json := range ndjson_emptylines {
66-
if err := pj.parseMessageNdjson([]byte(json)); err != nil {
66+
if err := pj.parseMessage([]byte(json), true); err != nil {
6767
t.Errorf("TestNdjsonEmptyLine: got: %v want: nil", err)
6868
}
6969
}
@@ -77,7 +77,7 @@ func BenchmarkNdjsonStage2(b *testing.B) {
7777
b.ReportAllocs()
7878
b.ResetTimer()
7979
for i := 0; i < b.N; i++ {
80-
err := pj.parseMessageNdjson(ndjson)
80+
err := pj.parseMessage(ndjson, true)
8181
if err != nil {
8282
panic(err)
8383
}
@@ -97,7 +97,7 @@ func BenchmarkNdjsonStage1(b *testing.B) {
9797
for i := 0; i < b.N; i++ {
9898
// Create new channel (large enough so we won't block)
9999
pj.indexChans = make(chan indexChan, 128*10240)
100-
findStructuralIndices([]byte(ndjson), &pj)
100+
pj.findStructuralIndices([]byte(ndjson))
101101
}
102102
}
103103

@@ -113,7 +113,7 @@ func BenchmarkNdjsonColdCountStar(b *testing.B) {
113113
b.ResetTimer()
114114

115115
for i := 0; i < b.N; i++ {
116-
pj.parseMessageNdjson(ndjson)
116+
pj.parseMessage(ndjson, true)
117117
count_raw_tape(pj.Tape)
118118
}
119119
}
@@ -129,7 +129,7 @@ func BenchmarkNdjsonColdCountStarWithWhere(b *testing.B) {
129129
b.ReportAllocs()
130130

131131
for i := 0; i < b.N; i++ {
132-
err := pj.parseMessageNdjson(ndjson)
132+
err := pj.parseMessage(ndjson, true)
133133
if err != nil {
134134
b.Fatal(err)
135135
}
@@ -654,7 +654,7 @@ func TestVerifyTape(t *testing.T) {
654654
ref := loadCompressed(t, tt.name)
655655

656656
pj := internalParsedJson{}
657-
if err := pj.parseMessage(ref); err != nil {
657+
if err := pj.parseMessage(ref, false); err != nil {
658658
t.Errorf("parseMessage failed: %v\n", err)
659659
return
660660
}

parsed_array.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,19 @@ func (a *Array) Iter() Iter {
4242
return i
4343
}
4444

45+
// ForEach calls the provided function for every element.
46+
func (a *Array) ForEach(fn func(t Type, i Iter)) {
47+
i := a.Iter()
48+
for {
49+
t := i.Advance()
50+
if t == TypeNone {
51+
break
52+
}
53+
fn(t, i)
54+
}
55+
return
56+
}
57+
4558
// FirstType will return the type of the first element.
4659
// If there are no elements, TypeNone is returned.
4760
func (a *Array) FirstType() Type {

parsed_json.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,7 @@ func (i *Iter) Int() (int64, error) {
582582
}
583583
return int64(v), nil
584584
default:
585-
return 0, fmt.Errorf("unable to convert type %v to float", i.t)
585+
return 0, fmt.Errorf("unable to convert type %v to int", i.t)
586586
}
587587
}
588588

@@ -633,7 +633,7 @@ func (i *Iter) Uint() (uint64, error) {
633633
v := i.tape.Tape[i.off]
634634
return v, nil
635635
default:
636-
return 0, fmt.Errorf("unable to convert type %v to float", i.t)
636+
return 0, fmt.Errorf("unable to convert type %v to uint", i.t)
637637
}
638638
}
639639

parsed_object.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,47 @@ func (o *Object) FindKey(key string, dst *Element) *Element {
138138
}
139139
}
140140

141+
// ForEach will call back fn for each key.
142+
// A key filter can be provided for optional filtering.
143+
func (o *Object) ForEach(fn func(key []byte, i Iter), onlyKeys map[string]struct{}) error {
144+
tmp := o.tape.Iter()
145+
tmp.off = o.off
146+
n := 0
147+
for {
148+
typ := tmp.Advance()
149+
// We want name and at least one value.
150+
if typ != TypeString || tmp.off+1 >= len(tmp.tape.Tape) {
151+
return fmt.Errorf("object: unexpected name tag %v", tmp.t)
152+
}
153+
// Advance must be string or end of object
154+
offset := tmp.cur
155+
length := tmp.tape.Tape[tmp.off]
156+
// Read name
157+
name, err := tmp.tape.stringByteAt(offset, length)
158+
if err != nil {
159+
return fmt.Errorf("getting object name: %w", err)
160+
}
161+
162+
if len(onlyKeys) > 0 {
163+
if _, ok := onlyKeys[string(name)]; !ok {
164+
// Skip the value
165+
tmp.Advance()
166+
continue
167+
}
168+
}
169+
170+
t := tmp.Advance()
171+
if t == TypeNone {
172+
return nil
173+
}
174+
fn(name, tmp)
175+
n++
176+
if n == len(onlyKeys) {
177+
return nil
178+
}
179+
}
180+
}
181+
141182
// ErrPathNotFound is returned
142183
var ErrPathNotFound = errors.New("path not found")
143184

simdjson_amd64.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func Parse(b []byte, reuse *ParsedJson, opts ...ParserOption) (*ParsedJson, erro
6767
if err != nil {
6868
return nil, err
6969
}
70-
err = pj.parseMessage(b)
70+
err = pj.parseMessage(b, false)
7171
if err != nil {
7272
return nil, err
7373
}
@@ -83,7 +83,7 @@ func ParseND(b []byte, reuse *ParsedJson, opts ...ParserOption) (*ParsedJson, er
8383
if err != nil {
8484
return nil, err
8585
}
86-
err = pj.parseMessageNdjson(bytes.TrimSpace(b))
86+
err = pj.parseMessage(bytes.TrimSpace(b), true)
8787
if err != nil {
8888
return nil, err
8989
}
@@ -186,7 +186,7 @@ func ParseNDStream(r io.Reader, res chan<- Stream, reuse <-chan *ParsedJson) {
186186

187187
default:
188188
}
189-
parseErr := pj.parseMessageNdjson(tmp)
189+
parseErr := pj.parseMessage(tmp, true)
190190
if parseErr != nil {
191191
result <- Stream{
192192
Value: nil,

stage1_find_marks_amd64.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func jsonMarkup(b byte) bool {
3939
return jsonMarkupTable[b]
4040
}
4141

42-
func findStructuralIndices(buf []byte, pj *internalParsedJson) bool {
42+
func (pj *internalParsedJson) findStructuralIndices(buf []byte) bool {
4343

4444
f := find_structural_bits_in_slice
4545
if cpuid.CPU.Has(cpuid.AVX512F) {

stage1_find_marks_amd64_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func TestFindStructuralIndices(t *testing.T) {
141141
pj.indexChans = make(chan indexChan, 16)
142142

143143
// No need to spawn go-routine since the channel is large enough
144-
findStructuralIndices([]byte(demo_json), &pj)
144+
pj.findStructuralIndices([]byte(demo_json))
145145

146146
ipos, pos := 0, ^uint64(0)
147147
for ic := range pj.indexChans {
@@ -169,6 +169,6 @@ func BenchmarkStage1(b *testing.B) {
169169
for i := 0; i < b.N; i++ {
170170
// Create new channel (large enough so we won't block)
171171
pj.indexChans = make(chan indexChan, 128)
172-
findStructuralIndices([]byte(msg), &pj)
172+
pj.findStructuralIndices([]byte(msg))
173173
}
174174
}

0 commit comments

Comments
 (0)