Skip to content

Commit e7f9e27

Browse files
committed
fix compile and linter errors
1 parent db11c88 commit e7f9e27

File tree

3 files changed

+8
-1
lines changed

3 files changed

+8
-1
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added Flush method for topic writer
2+
13
## v3.66.0
24
* Added experimental package `retry/budget` for limit second and subsequent retry attempts
35
* Refactored internals for enabling `containedctx` linter

internal/topic/topicwriterinternal/queue.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ func (q *messageQueue) WaitLastWritten(ctx context.Context) error {
326326
q.m.WithRLock(func() {
327327
lastIndex = q.lastWrittenIndex
328328
})
329+
329330
return q.Wait(ctx, MessageQueueAckWaiter{sequenseNumbers: []int{lastIndex}})
330331
}
331332

internal/topic/topicwriterinternal/writer_reconnector_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,11 @@ func TestWriterImpl_CloseWithFlush(t *testing.T) {
554554

555555
closeCompleted := make(empty.Chan)
556556
go func() {
557-
err := e.writer.Write(e.ctx, []Message{{SeqNo: seqNo, CreatedAt: messageTime, Data: bytes.NewReader(messageData)}})
557+
err := e.writer.Write(e.ctx, []PublicMessage{{
558+
SeqNo: seqNo,
559+
CreatedAt: messageTime,
560+
Data: bytes.NewReader(messageData),
561+
}})
558562
close(writeCompleted)
559563
require.NoError(t, err)
560564
}()

0 commit comments

Comments
 (0)