@@ -54,6 +54,7 @@ type fastBatch struct {
5454 pending uint32 // Number of pending messages in the batch waiting to be persisted.
5555 ackMessages uint16 // Ack will be sent every N messages.
5656 maxAckMessages uint16 // Maximum ackMessages value the client allows.
57+ reply string // The last reply subject seen when persisting a message.
5758 gapOk bool // Whether a gap is okay, if not, the batch would be rejected.
5859 commit bool // If the batch is committed.
5960}
@@ -160,26 +161,78 @@ func (b *atomicBatch) readyForCommit() *BatchAbandonReason {
160161// newFastBatch creates a fast batch publish object.
161162// Lock should be held.
162163func (batches * batching ) newFastBatch (mset * stream , batchId string , gapOk bool , maxAckMessages uint16 ) * fastBatch {
164+ b := & fastBatch {gapOk : gapOk , maxAckMessages : maxAckMessages }
165+ batches .fastBatchInit (b )
166+ b .setupCleanupTimer (mset , batchId , batches )
167+ return b
168+ }
169+
170+ // fastBatchInit (re)initializes the ackMessages field for a fast batch.
171+ // Lock should be held.
172+ func (batches * batching ) fastBatchInit (b * fastBatch ) {
163173 // If it's the first batch, just allow what the client wants, otherwise we'll
164174 // need to coordinate and slowly ramp up this publisher.
165175 // TODO(mvv): fast ingest's initial flow value improvements?
166- ackMessages := min (500 , maxAckMessages )
176+ ackMessages := min (500 , b . maxAckMessages )
167177 if len (batches .fast ) > 1 {
168178 ackMessages = 1
169179 }
170- b := & fastBatch {gapOk : gapOk , ackMessages : ackMessages , maxAckMessages : maxAckMessages }
171- b .setupCleanupTimer (mset , batchId , batches )
172- return b
180+ b .ackMessages = ackMessages
181+ }
182+
183+ // fastBatchReset resets the fast batch to an empty state and sends a flow control message.
184+ // Lock should be held.
185+ func (batches * batching ) fastBatchReset (mset * stream , batchId string , b * fastBatch ) {
186+ // If the timer already stopped before we could commit, we clean it up.
187+ if b .timer == nil || (! b .commit && ! b .timer .Stop ()) {
188+ b .cleanupLocked (batchId , batches )
189+ return
190+ }
191+ // Otherwise, reset the state.
192+ batches .fastBatchInit (b )
193+ b .timer .Reset (getCleanupTimeout (mset ))
194+ b .commit = false
195+ b .pending = 0
196+ b .fseq , b .lseq = b .pseq , b .pseq
197+ b .sendFlowControl (b .fseq , mset , b .reply )
173198}
174199
175200// fastBatchRegisterSequences registers the highest stored batch and stream sequence and returns
176201// whether a PubAck should be sent if the batch has been committed.
202+ // If this is called on a follower, it only registers the highest stream and persisted batch sequences.
177203// Lock should be held.
178- func (batches * batching ) fastBatchRegisterSequences (mset * stream , reply string , batchId string , batchSeq , streamSeq uint64 ) bool {
179- b , ok := batches .fast [batchId ]
180- if ! ok {
204+ func (batches * batching ) fastBatchRegisterSequences (mset * stream , reply string , streamSeq uint64 , isLeader bool , batch * FastBatch ) bool {
205+ b , ok := batches .fast [batch .id ]
206+ if ! ok || ! isLeader {
207+ // If this batch has committed, we can clean it up.
208+ if batch .commit {
209+ if b != nil {
210+ b .cleanupLocked (batch .id , batches )
211+ }
212+ return false
213+ }
214+ // Otherwise, even as a follower, we record the latest state of this batch.
215+ if b == nil || ! b .resetCleanupTimer (mset ) {
216+ if b != nil {
217+ // The timer couldn't be reset, this means the timer already runs and is likely
218+ // waiting to acquire the lock. We reset the timer here so it doesn't clean up
219+ // this batch that we're about to overwrite.
220+ b .timer = nil
221+ }
222+ // We'll need a copy as we'll use it as a key and later for cleanup.
223+ batchId := copyString (batch .id )
224+ b = batches .newFastBatch (mset , batchId , batch .gapOk , batch .flow )
225+ if batches .fast == nil {
226+ batches .fast = make (map [string ]* fastBatch , 1 )
227+ }
228+ batches .fast [batchId ] = b
229+ }
230+ b .sseq = streamSeq
231+ b .pseq , b .lseq = batch .seq , batch .seq
232+ b .reply = reply
181233 return false
182234 }
235+ b .reply = reply
183236 if b .pending > 0 {
184237 b .pending --
185238 }
@@ -192,17 +245,17 @@ func (batches *batching) fastBatchRegisterSequences(mset *stream, reply string,
192245 skipped = true
193246 b .pseq = b .lseq
194247 } else {
195- b .pseq = batchSeq
248+ b .pseq = batch . seq
196249 }
197250 // If the PubAck needs to be sent now as a result of a commit.
198251 if b .lseq == b .pseq && b .commit {
199- b .cleanupLocked (batchId , batches )
252+ b .cleanupLocked (batch . id , batches )
200253 // If we skipped ahead due to duplicate messages, send the PubAck with the highest sequence.
201254 if skipped {
202255 var buf [256 ]byte
203256 pubAck := append (buf [:0 ], mset .pubAck ... )
204257 response := append (pubAck , strconv .FormatUint (b .sseq , 10 )... )
205- response = append (response , fmt .Sprintf (",\" batch\" :%q,\" count\" :%d}" , batchId , b .lseq )... )
258+ response = append (response , fmt .Sprintf (",\" batch\" :%q,\" count\" :%d}" , batch . id , b .lseq )... )
206259 if len (reply ) > 0 {
207260 mset .outq .sendMsg (reply , response )
208261 }
@@ -274,10 +327,14 @@ func (b *fastBatch) sendFlowControl(batchSeq uint64, mset *stream, reply string)
274327// after the last message has been persisted.
275328// Lock should be held.
276329func (batches * batching ) fastBatchCommit (b * fastBatch , batchId string , mset * stream , reply string ) bool {
330+ // Either we commit now, or we clean up later, so stop the timer.
331+ if b .timer == nil || (! b .commit && ! b .timer .Stop ()) {
332+ // Shouldn't be possible for the timer to already be stopped if we haven't committed yet,
333+ // since we pre-check being able to reset the timer. But guard against it anyhow.
334+ return true
335+ }
277336 // Mark that this batch commits.
278337 b .commit = true
279- // Either we commit now, or we clean up later, so stop the timer.
280- b .timer .Stop ()
281338 // If the whole batch has been persisted, we can respond with the PubAck now.
282339 if b .lseq == b .pseq {
283340 b .cleanupLocked (batchId , batches )
@@ -300,7 +357,10 @@ func (b *fastBatch) setupCleanupTimer(mset *stream, batchId string, batches *bat
300357 timeout := getCleanupTimeout (mset )
301358 b .timer = time .AfterFunc (timeout , func () {
302359 b .cleanup (batchId , batches )
303- mset .sendStreamBatchAbandonedAdvisory (batchId , BatchTimeout )
360+ // Only send the advisory if we're the leader. (Since we do the tracking on followers too)
361+ if mset .IsLeader () {
362+ mset .sendStreamBatchAbandonedAdvisory (batchId , BatchTimeout )
363+ }
304364 })
305365}
306366
@@ -310,6 +370,9 @@ func (b *fastBatch) resetCleanupTimer(mset *stream) bool {
310370 if b .commit {
311371 return true
312372 }
373+ if b .timer == nil {
374+ return false
375+ }
313376 timeout := getCleanupTimeout (mset )
314377 return b .timer .Reset (timeout )
315378}
@@ -323,6 +386,11 @@ func (b *fastBatch) cleanup(batchId string, batches *batching) {
323386
324387// Lock should be held.
325388func (b * fastBatch ) cleanupLocked (batchId string , batches * batching ) {
389+ // If the timer is nil, it means this batch has been replaced with a new one.
390+ // This can happen on a follower depending on timing.
391+ if b .timer == nil {
392+ return
393+ }
326394 b .timer .Stop ()
327395 delete (batches .fast , batchId )
328396}
0 commit comments