Skip to content

Commit 0fe9c33

Browse files
committed
Wait for last writtent message instead
1 parent d8687b7 commit 0fe9c33

File tree

2 files changed

+8
-28
lines changed

2 files changed

+8
-28
lines changed

internal/topic/topicwriterinternal/queue.go

Lines changed: 7 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -309,33 +309,13 @@ func (q *messageQueue) Wait(ctx context.Context, waiter MessageQueueAckWaiter) e
309309
}
310310
}
311311

312-
func (q *messageQueue) WaitEmpty(ctx context.Context) error {
313-
if err := ctx.Err(); err != nil {
314-
return err
315-
}
316-
317-
ctxDone := ctx.Done()
318-
for {
319-
ackReceived := q.acksReceivedEvent.Waiter()
320-
321-
keepWaiting := false
322-
q.m.WithRLock(func() {
323-
keepWaiting = len(q.messagesByOrder) > 0
324-
})
325-
326-
if !keepWaiting {
327-
return nil
328-
}
329-
330-
select {
331-
case <-ctxDone:
332-
return ctx.Err()
333-
case <-q.closedChan:
334-
return q.closedErr
335-
case <-ackReceived.Done():
336-
// pass next iteration
337-
}
338-
}
312+
// WaitLastWritten waits for last written message gets ack.
313+
func (q *messageQueue) WaitLastWritten(ctx context.Context) error {
314+
var lastIndex int
315+
q.m.WithRLock(func() {
316+
lastIndex = q.lastWrittenIndex
317+
})
318+
return q.Wait(ctx, MessageQueueAckWaiter{sequenseNumbers: []int{lastIndex}})
339319
}
340320

341321
type MessageQueueAckWaiter struct {

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ func (w *WriterReconnector) createMessagesWithContent(messages []Message) ([]mes
325325
}
326326

327327
func (w *WriterReconnector) Flush(ctx context.Context) error {
328-
return w.queue.WaitEmpty(ctx)
328+
return w.queue.WaitLastWritten(ctx)
329329
}
330330

331331
func (w *WriterReconnector) Close(ctx context.Context) error {

0 commit comments

Comments
 (0)