Skip to content

Commit db29e6c

Browse files
committed
Huge performance improvements
- Don't wait for messages to be consumed to continue parsing - Queue up (almost) all the things! - Configurable msgQueue buffer size - But: Cancel now won't stop the already parsed info from being used
1 parent 8b83f34 commit db29e6c

File tree

5 files changed

+109
-59
lines changed

5 files changed

+109
-59
lines changed

demoinfocs_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,6 @@ func TestCancelParseToEnd(t *testing.T) {
102102
tix++
103103
if tix == maxTicks {
104104
p.Cancel()
105-
} else if tix > maxTicks {
106-
t.Fatal("Parsing continued after cancellation")
107105
}
108106
})
109107

demopacket.go

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,6 @@ import (
1111
msg "github.com/markus-wa/demoinfocs-golang/msg"
1212
)
1313

14-
var packetEntitiesPool sync.Pool = sync.Pool{
15-
New: func() interface{} {
16-
return new(msg.CSVCMsg_PacketEntities)
17-
},
18-
}
19-
20-
var gameEventPool sync.Pool = sync.Pool{
21-
New: func() interface{} {
22-
return new(msg.CSVCMsg_GameEvent)
23-
},
24-
}
25-
2614
var byteSlicePool sync.Pool = sync.Pool{
2715
New: func() interface{} {
2816
s := make([]byte, 0, 256)
@@ -46,15 +34,16 @@ func (p *Parser) parsePacket() {
4634
var m proto.Message
4735
switch cmd {
4836
case int(msg.SVC_Messages_svc_PacketEntities):
49-
m = packetEntitiesPool.Get().(*msg.CSVCMsg_PacketEntities)
50-
defer packetEntitiesPool.Put(m)
37+
// TODO: Find a way to pool SVC_Messages_svc_PacketEntities
38+
// Need to make sure the message was consumed before pooling
39+
// and the message's contents will be overridden (either by protobuf or manually)
40+
m = new(msg.CSVCMsg_PacketEntities)
5141

5242
case int(msg.SVC_Messages_svc_GameEventList):
5343
m = new(msg.CSVCMsg_GameEventList)
5444

5545
case int(msg.SVC_Messages_svc_GameEvent):
56-
m = gameEventPool.Get().(*msg.CSVCMsg_GameEvent)
57-
defer gameEventPool.Put(m)
46+
m = new(msg.CSVCMsg_GameEvent)
5847

5948
case int(msg.SVC_Messages_svc_CreateStringTable):
6049
m = new(msg.CSVCMsg_CreateStringTable)
@@ -101,9 +90,6 @@ func (p *Parser) parsePacket() {
10190
p.bitReader.EndChunk()
10291
}
10392
p.bitReader.EndChunk()
104-
105-
// Make sure the created events are consumed so they can be pooled
106-
p.msgDispatcher.SyncQueues(p.msgQueue)
10793
}
10894

10995
// TODO: Find out what all this is good for and why we didn't use the removed functions on seVector, split & commandInfo

packet_handlers.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -673,3 +673,45 @@ func (p *Parser) handleUserMessage(um *msg.CSVCMsg_UserMessage) {
673673
// Maybe msg.ECstrike15UserMessages_CS_UM_RadioText
674674
}
675675
}
676+
677+
type frameParsedTokenType struct{}
678+
679+
var frameParsedToken = new(frameParsedTokenType)
680+
681+
func (p *Parser) handleFrameParsed(*frameParsedTokenType) {
682+
for k, rp := range p.rawPlayers {
683+
if rp == nil {
684+
continue
685+
}
686+
687+
if pl := p.players[k]; pl != nil {
688+
newPlayer := false
689+
if p.connectedPlayers[rp.UserID] == nil {
690+
p.connectedPlayers[rp.UserID] = pl
691+
newPlayer = true
692+
}
693+
694+
pl.Name = rp.Name
695+
pl.SteamID = rp.XUID
696+
pl.IsBot = rp.IsFakePlayer
697+
pl.AdditionalPlayerInformation = &p.additionalPlayerInfo[pl.EntityID]
698+
699+
if pl.IsAlive() {
700+
pl.LastAlivePosition = pl.Position
701+
}
702+
703+
if newPlayer && pl.SteamID != 0 {
704+
p.eventDispatcher.Dispatch(events.PlayerBindEvent{Player: pl})
705+
}
706+
}
707+
}
708+
709+
p.currentFrame++
710+
p.eventDispatcher.Dispatch(events.TickDoneEvent{})
711+
}
712+
713+
type ingameTickNumber int
714+
715+
func (p *Parser) handleIngameTickNumber(n ingameTickNumber) {
716+
p.ingameTick = int(n)
717+
}

parser.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@ func (ts TeamState) Flag() string {
162162
return ts.flag
163163
}
164164

165+
// TODO: Maybe we should use a channel instead of that WarnHandler stuff
166+
165167
// WarnHandler is a function that handles warnings of a Parser.
166168
type WarnHandler func(string)
167169

@@ -170,15 +172,26 @@ func WarnToStdErr(warning string) {
170172
fmt.Fprintln(os.Stderr, warning)
171173
}
172174

175+
// TODO: Change the New* methods (names + parameters)
176+
173177
// NewParser creates a new Parser on the basis of an io.Reader
174178
// - like os.File or bytes.Reader - that reads demo data.
175179
// Any warnings that don't stop the Parser from doing it's job
176180
// will be passed to the warnHandler if it's not nil.
177181
func NewParser(demostream io.Reader, warnHandler WarnHandler) *Parser {
182+
return NewParserWithBufferSize(demostream, -1, warnHandler)
183+
}
184+
185+
// NewParserWithBufferSize returns a new Parser with a custom msgQueue buffer size.
186+
// For large demos, fast i/o and slow CPUs higher numbers are suggested and vice versa.
187+
// The buffer size can easily be in the hundred-thousands to low millions for the best performance.
188+
// A negative value will make the Parser automatically decide the buffer size during ParseHeader()
189+
// based on the number of ticks in the demo (nubmer of ticks = buffer size).
190+
// See also: NewParser()
191+
func NewParserWithBufferSize(demostream io.Reader, msgQueueBufferSize int, warnHandler WarnHandler) *Parser {
178192
var p Parser
179193
// Init parser
180194
p.bitReader = bit.NewLargeBitReader(demostream)
181-
p.msgQueue = make(chan interface{}, 8)
182195
p.instanceBaselines = make(map[int][]byte)
183196
p.preprocessedBaselines = make(map[int]map[int]st.PropValue)
184197
p.equipmentMapping = make(map[*st.ServerClass]common.EquipmentElement)
@@ -195,7 +208,16 @@ func NewParser(demostream io.Reader, warnHandler WarnHandler) *Parser {
195208
p.msgDispatcher.RegisterHandler(p.handleCreateStringTable)
196209
p.msgDispatcher.RegisterHandler(p.handleUpdateStringTable)
197210
p.msgDispatcher.RegisterHandler(p.handleUserMessage)
211+
p.msgDispatcher.RegisterHandler(p.handleFrameParsed)
212+
p.msgDispatcher.RegisterHandler(p.handleIngameTickNumber)
198213

199-
p.msgDispatcher.AddQueues(p.msgQueue)
214+
if msgQueueBufferSize >= 0 {
215+
p.initMsgQueue(msgQueueBufferSize)
216+
}
200217
return &p
201218
}
219+
220+
func (p *Parser) initMsgQueue(buf int) {
221+
p.msgQueue = make(chan interface{}, buf)
222+
p.msgDispatcher.AddQueues(p.msgQueue)
223+
}

parsing.go

Lines changed: 38 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ const (
1515
playerWeaponPrePrefix = "bcc_nonlocaldata."
1616
)
1717

18+
const (
19+
msgHeaderNotParsed = "Tried to parse tick before parsing header"
20+
)
21+
1822
// ParseHeader attempts to parse the header of the demo.
1923
// Returns error if the filestamp (first 8 bytes) doesn't match HL2DEMO.
2024
func (p *Parser) ParseHeader() error {
@@ -35,6 +39,12 @@ func (p *Parser) ParseHeader() error {
3539
return errors.New("Invalid File-Type; expecting HL2DEMO in the first 8 bytes")
3640
}
3741

42+
// Initialize queue if the buffer size wasn't specified, the amount of ticks
43+
// seems to be a good indicator of how many events we'll get
44+
if p.msgQueue == nil {
45+
p.initMsgQueue(h.PlaybackTicks)
46+
}
47+
3848
p.header = &h
3949
p.eventDispatcher.Dispatch(events.HeaderParsedEvent{Header: h})
4050
return nil
@@ -44,20 +54,30 @@ func (p *Parser) ParseHeader() error {
4454
// Aborts and returns an error if Cancel() is called before the end.
4555
// May panic if the demo is corrupt in some way.
4656
func (p *Parser) ParseToEnd() error {
57+
if p.header == nil {
58+
panic(msgHeaderNotParsed)
59+
}
60+
4761
for {
4862
select {
4963
case <-p.cancelChan:
5064
return errors.New("Parsing was cancelled before it finished")
5165

5266
default:
53-
if !p.ParseNextFrame() {
67+
if !p.parseFrame() {
68+
// Make sure all the messages of the demo are handled
69+
p.msgDispatcher.SyncQueues(p.msgQueue)
70+
71+
// Close msgQueue
72+
close(p.msgQueue)
5473
return nil
5574
}
5675
}
5776
}
5877
}
5978

60-
// Cancel aborts ParseToEnd() on the upcoming tick.
79+
// Cancel aborts ParseToEnd(). All information that was already read
80+
// up to this point will still be used (and new events may still be sent).
6181
func (p *Parser) Cancel() {
6282
p.cancelChan <- struct{}{}
6383
}
@@ -67,56 +87,30 @@ func (p *Parser) Cancel() {
6787
// Panics if header hasn't been parsed yet - see Parser.ParseHeader().
6888
func (p *Parser) ParseNextFrame() bool {
6989
if p.header == nil {
70-
panic("Tried to parse tick before parsing header")
90+
panic(msgHeaderNotParsed)
7191
}
72-
b := p.parseFrame()
73-
74-
for k, rp := range p.rawPlayers {
75-
if rp == nil {
76-
continue
77-
}
78-
79-
if pl := p.players[k]; pl != nil {
80-
newPlayer := false
81-
if p.connectedPlayers[rp.UserID] == nil {
82-
p.connectedPlayers[rp.UserID] = pl
83-
newPlayer = true
84-
}
8592

86-
pl.Name = rp.Name
87-
pl.SteamID = rp.XUID
88-
pl.IsBot = rp.IsFakePlayer
89-
pl.AdditionalPlayerInformation = &p.additionalPlayerInfo[pl.EntityID]
90-
91-
if pl.IsAlive() {
92-
pl.LastAlivePosition = pl.Position
93-
}
94-
95-
if newPlayer && pl.SteamID != 0 {
96-
p.eventDispatcher.Dispatch(events.PlayerBindEvent{Player: pl})
97-
}
98-
}
99-
}
93+
b := p.parseFrame()
10094

101-
p.eventDispatcher.Dispatch(events.TickDoneEvent{})
95+
// Make sure all the messages of the frame are handled
96+
p.msgDispatcher.SyncQueues(p.msgQueue)
10297

98+
// Close msgQueue if we are done
10399
if !b {
104100
close(p.msgQueue)
105101
}
106-
107102
return b
108103
}
109104

110105
func (p *Parser) parseFrame() bool {
111106
cmd := demoCommand(p.bitReader.ReadSingleByte())
112107

113-
// Ingame tick number
114-
p.ingameTick = p.bitReader.ReadSignedInt(32)
108+
// Send ingame tick number update
109+
p.msgQueue <- ingameTickNumber(p.bitReader.ReadSignedInt(32))
110+
115111
// Skip 'player slot'
116112
p.bitReader.ReadSingleByte()
117113

118-
p.currentFrame++
119-
120114
switch cmd {
121115
case dcSynctick:
122116
// Ignore
@@ -130,6 +124,8 @@ func (p *Parser) parseFrame() bool {
130124
p.bitReader.EndChunk()
131125

132126
case dcDataTables:
127+
p.msgDispatcher.SyncQueues(p.msgQueue)
128+
133129
p.bitReader.BeginChunk(p.bitReader.ReadSignedInt(32) << 3)
134130
p.stParser.ParsePacket(p.bitReader)
135131
p.bitReader.EndChunk()
@@ -138,6 +134,8 @@ func (p *Parser) parseFrame() bool {
138134
p.bindEntities()
139135

140136
case dcStringTables:
137+
p.msgDispatcher.SyncQueues(p.msgQueue)
138+
141139
p.parseStringTables()
142140

143141
case dcUserCommand:
@@ -158,5 +156,9 @@ func (p *Parser) parseFrame() bool {
158156
default:
159157
panic(fmt.Sprintf("Canny handle it anymoe (command %v unknown)", cmd))
160158
}
159+
160+
// Queue up some post processing
161+
p.msgQueue <- frameParsedToken
162+
161163
return true
162164
}

0 commit comments

Comments
 (0)