Skip to content

Commit 43c1328

Browse files
valyalatruepele
authored andcommitted
lib/logstorage: improve the performance of obtaining _stream column value
Substitute global streamTagsCache with per-blockSearch cache for ((stream.id) -> (_stream value)) entries. This improves scalability of obtaining _stream values on a machine with many CPU cores, since every CPU has its own blockSearch instance. This also should reduce memory usage when querying logs over big number of streams, since per-blockSearch cache of ((stream.id) -> (_stream value)) entries is limited in size, and its lifetime is bounded by a single query.
1 parent 4c7a102 commit 43c1328

File tree

4 files changed

+57
-80
lines changed

4 files changed

+57
-80
lines changed

lib/logstorage/block_result.go

Lines changed: 3 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -500,43 +500,11 @@ func (br *blockResult) addStreamIDColumn(bs *blockSearch) {
500500
}
501501

502502
func (br *blockResult) addStreamColumn(bs *blockSearch) bool {
503-
if !bs.prevStreamID.equal(&bs.bsw.bh.streamID) {
504-
return br.addStreamColumnSlow(bs)
505-
}
506-
507-
if len(bs.prevStream) == 0 {
503+
streamStr := bs.getStreamStr()
504+
if streamStr == "" {
508505
return false
509506
}
510-
br.addConstColumn("_stream", bytesutil.ToUnsafeString(bs.prevStream))
511-
return true
512-
}
513-
514-
func (br *blockResult) addStreamColumnSlow(bs *blockSearch) bool {
515-
bb := bbPool.Get()
516-
defer bbPool.Put(bb)
517-
518-
streamID := &bs.bsw.bh.streamID
519-
bb.B = bs.bsw.p.pt.appendStreamTagsByStreamID(bb.B[:0], streamID)
520-
if len(bb.B) == 0 {
521-
// Couldn't find stream tags by streamID. This may be the case when the corresponding log stream
522-
// was recently registered and its tags aren't visible to search yet.
523-
// The stream tags must become visible in a few seconds.
524-
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6042
525-
bs.prevStreamID = *streamID
526-
bs.prevStream = bs.prevStream[:0]
527-
return false
528-
}
529-
530-
st := GetStreamTags()
531-
mustUnmarshalStreamTags(st, bb.B)
532-
bb.B = st.marshalString(bb.B[:0])
533-
PutStreamTags(st)
534-
535-
s := bytesutil.ToUnsafeString(bb.B)
536-
br.addConstColumn("_stream", s)
537-
538-
bs.prevStreamID = *streamID
539-
bs.prevStream = append(bs.prevStream[:0], s...)
507+
br.addConstColumn("_stream", streamStr)
540508
return true
541509
}
542510

lib/logstorage/block_search.go

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ func getBlockSearch() *blockSearch {
8585

8686
func putBlockSearch(bs *blockSearch) {
8787
bs.reset()
88+
89+
// reset seenStreams before returning bs to the pool in order to reduce memory usage.
90+
bs.seenStreams = nil
91+
8892
blockSearchPool.Put(bs)
8993
}
9094

@@ -115,10 +119,9 @@ type blockSearch struct {
115119
// a is used for storing unmarshaled data in csh
116120
a arena
117121

118-
// prevStreamID and prevStream are used for speeding up fetching _stream columns
119-
// across sequential blocks belonging to the same stream.
120-
prevStreamID streamID
121-
prevStream []byte
122+
// seenStreams contains seen streamIDs for the recent searches.
123+
// It is used for speeding up fetching _stream column.
124+
seenStreams map[u128]string
122125
}
123126

124127
func (bs *blockSearch) reset() {
@@ -145,6 +148,8 @@ func (bs *blockSearch) reset() {
145148
bs.sbu.reset()
146149
bs.csh.reset()
147150
bs.a.reset()
151+
152+
// Do not reset seenStreams, since its' lifetime is managed by blockResult.addStreamColumn() code.
148153
}
149154

150155
func (bs *blockSearch) partPath() string {
@@ -326,3 +331,48 @@ func (ih *indexBlockHeader) mustReadBlockHeaders(dst []blockHeader, p *part) []b
326331

327332
return dst
328333
}
334+
335+
// getStreamStr returns _stream value for the given block at bs.
336+
func (bs *blockSearch) getStreamStr() string {
337+
sid := bs.bsw.bh.streamID.id
338+
streamStr := bs.seenStreams[sid]
339+
if streamStr != "" {
340+
// Fast path - streamStr is found in the seenStreams.
341+
return streamStr
342+
}
343+
344+
// Slow path - load streamStr from the storage.
345+
streamStr = bs.getStreamStrSlow()
346+
if streamStr != "" {
347+
// Store the found streamStr in seenStreams.
348+
if len(bs.seenStreams) > 20_000 {
349+
bs.seenStreams = nil
350+
}
351+
if bs.seenStreams == nil {
352+
bs.seenStreams = make(map[u128]string)
353+
}
354+
bs.seenStreams[sid] = streamStr
355+
}
356+
return streamStr
357+
}
358+
359+
func (bs *blockSearch) getStreamStrSlow() string {
360+
bb := bbPool.Get()
361+
defer bbPool.Put(bb)
362+
363+
bb.B = bs.bsw.p.pt.idb.appendStreamTagsByStreamID(bb.B[:0], &bs.bsw.bh.streamID)
364+
if len(bb.B) == 0 {
365+
// Couldn't find stream tags by sid. This may be the case when the corresponding log stream
366+
// was recently registered and its tags aren't visible to search yet.
367+
// The stream tags must become visible in a few seconds.
368+
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6042
369+
return ""
370+
}
371+
372+
st := GetStreamTags()
373+
mustUnmarshalStreamTags(st, bb.B)
374+
bb.B = st.marshalString(bb.B[:0])
375+
PutStreamTags(st)
376+
377+
return string(bb.B)
378+
}

lib/logstorage/partition.go

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -159,32 +159,6 @@ func (pt *partition) logIngestedRows(lr *LogRows) {
159159
}
160160
}
161161

162-
// appendStreamTagsByStreamID appends canonical representation of stream tags for the given sid to dst
163-
// and returns the result.
164-
func (pt *partition) appendStreamTagsByStreamID(dst []byte, sid *streamID) []byte {
165-
// Search for the StreamTags in the cache.
166-
key := bbPool.Get()
167-
defer bbPool.Put(key)
168-
169-
// There is no need in putting partition name into key here,
170-
// since StreamTags is uniquely identified by streamID.
171-
key.B = sid.marshal(key.B)
172-
dstLen := len(dst)
173-
dst = pt.s.streamTagsCache.GetBig(dst, key.B)
174-
if len(dst) > dstLen {
175-
// Fast path - the StreamTags have been found in cache.
176-
return dst
177-
}
178-
179-
// Slow path - search for StreamTags in idb
180-
dst = pt.idb.appendStreamTagsByStreamID(dst, sid)
181-
if len(dst) > dstLen {
182-
// Store the found StreamTags to cache
183-
pt.s.streamTagsCache.SetBig(key.B, dst[dstLen:])
184-
}
185-
return dst
186-
}
187-
188162
func (pt *partition) hasStreamIDInCache(sid *streamID) bool {
189163
var result [1]byte
190164

lib/logstorage/storage.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -135,15 +135,6 @@ type Storage struct {
135135
// the check whether the given stream is already registered in the persistent storage.
136136
streamIDCache *workingsetcache.Cache
137137

138-
// streamTagsCache caches StreamTags entries keyed by streamID.
139-
//
140-
// There is no need to put partition into the key for StreamTags,
141-
// since StreamTags are uniquely identified by streamID.
142-
//
143-
// It reduces the load on persistent storage during querying
144-
// when StreamTags must be found for the particular streamID
145-
streamTagsCache *workingsetcache.Cache
146-
147138
// filterStreamCache caches streamIDs keyed by (partition, []TenanID, StreamFilter).
148139
//
149140
// It reduces the load on persistent storage during querying by _stream:{...} filter.
@@ -253,8 +244,6 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage {
253244
streamIDCachePath := filepath.Join(path, cacheDirname, streamIDCacheFilename)
254245
streamIDCache := workingsetcache.Load(streamIDCachePath, mem/16)
255246

256-
streamTagsCache := workingsetcache.New(mem / 10)
257-
258247
filterStreamCache := workingsetcache.New(mem / 10)
259248

260249
s := &Storage{
@@ -270,7 +259,6 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage {
270259
stopCh: make(chan struct{}),
271260

272261
streamIDCache: streamIDCache,
273-
streamTagsCache: streamTagsCache,
274262
filterStreamCache: filterStreamCache,
275263
}
276264

@@ -474,9 +462,6 @@ func (s *Storage) MustClose() {
474462
s.streamIDCache.Stop()
475463
s.streamIDCache = nil
476464

477-
s.streamTagsCache.Stop()
478-
s.streamTagsCache = nil
479-
480465
s.filterStreamCache.Stop()
481466
s.filterStreamCache = nil
482467

0 commit comments

Comments
 (0)